Skip to content

Commit 9e4ab9a

Browse files
committed
fix(server): Fix linter warnings and formatting issues
Resolves all Go linter warnings and Prettier formatting issues: - Use Go 1.22+ integer range syntax in concurrency tests - Replace if-else chains with switch statements for better readability - Convert struct types instead of using struct literals - Remove unnecessary type conversions (EdgeHandle is already int32) - Add safe int32→uint64 conversion with overflow protection - Remove unused publishBulkEdgeDelete and publishBulkFlowVariableDelete functions - Format markdown files with Prettier All linter checks now pass with 0 issues.
1 parent c8da189 commit 9e4ab9a

8 files changed

Lines changed: 65 additions & 67 deletions

File tree

packages/server/docs/specs/BULK_SYNC_TRANSACTION_WRAPPERS.md

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ type TopicExtractor[T any, Topic any] func(item T) Topic
193193

194194
## Single-Topic vs Multi-Topic Wrappers
195195

196-
### When to Use Single-Topic (SyncTx*)
196+
### When to Use Single-Topic (SyncTx\*)
197197

198198
**Scenario:** All items in a single request belong to the same workspace
199199

@@ -228,7 +228,7 @@ syncTx.CommitAndPublish(ctx, func(http mhttp.HTTP) {
228228
})
229229
```
230230

231-
### When to Use Multi-Topic (BulkSyncTx*)
231+
### When to Use Multi-Topic (BulkSyncTx\*)
232232

233233
**Scenario:** Items in a single request can belong to different workspaces
234234

@@ -541,6 +541,7 @@ type UpdateEvent[T, P any] struct {
541541
```
542542

543543
The patch is preserved through the entire flow:
544+
544545
1. Built during update loop
545546
2. Tracked with `syncTx.Track(item, patch)`
546547
3. Passed to publish handler as `UpdateEvent`
@@ -705,14 +706,14 @@ type HttpHeaderEvent struct {
705706
"type": "update",
706707
"isDelta": false,
707708
"patch": {
708-
"value": {"value": "text/html", "set": true}, // ✅ Changed
709-
"enabled": {"set": false}, // ❌ Unchanged
710-
"description": {"set": false} // ❌ Unchanged
709+
"value": { "value": "text/html", "set": true }, // ✅ Changed
710+
"enabled": { "set": false }, // ❌ Unchanged
711+
"description": { "set": false } // ❌ Unchanged
711712
},
712713
"httpHeader": {
713714
"httpHeaderId": "abc123",
714715
"key": "Content-Type",
715-
"value": "text/html", // Updated
716+
"value": "text/html", // Updated
716717
"enabled": true,
717718
"description": "API header",
718719
"order": 1.0
@@ -721,10 +722,12 @@ type HttpHeaderEvent struct {
721722
```
722723

723724
**Frontend action:**
725+
724726
- **Smart clients:** Apply only changed fields from `patch` (efficient)
725727
- **Simple clients:** Replace with full `httpHeader` (works but less efficient)
726728

727729
**Why send both?**
730+
728731
- `Patch` - For efficient updates (only changed fields)
729732
- `HttpHeader` - For backwards compatibility (simple clients)
730733

@@ -746,7 +749,7 @@ type HttpHeaderEvent struct {
746749
"type": "delete",
747750
"isDelta": false,
748751
"httpHeader": {
749-
"httpHeaderId": "abc123" // Only ID
752+
"httpHeaderId": "abc123" // Only ID
750753
}
751754
}
752755
```
@@ -774,6 +777,7 @@ FlushInterval: 50ms
774777
```
775778

776779
Frontend receives events in incremental batches:
780+
777781
- Events 1-100: Batch 1 (sent when buffer reaches 100 OR 50ms timeout)
778782
- Events 101-200: Batch 2
779783
- etc.
@@ -892,6 +896,7 @@ syncTx.Track(item, headerPatch) // Patch contains only changed fields
892896
**Scenario:** User changes only `Value` field on 50 headers
893897

894898
**Without patches (old way):**
899+
895900
```json
896901
// 50 full objects × ~200 bytes = ~10KB
897902
[
@@ -902,11 +907,12 @@ syncTx.Track(item, headerPatch) // Patch contains only changed fields
902907
```
903908

904909
**With patches (new way):**
910+
905911
```json
906912
// 50 patches × ~30 bytes = ~1.5KB (plus full objects for compatibility)
907913
[
908-
{"patch": {"value": {"value": "NEW", "set": true}}},
909-
{"patch": {"value": {"value": "NEW", "set": true}}},
914+
{ "patch": { "value": { "value": "NEW", "set": true } } },
915+
{ "patch": { "value": { "value": "NEW", "set": true } } }
910916
// ... 48 more
911917
]
912918
```
@@ -919,13 +925,13 @@ syncTx.Track(item, headerPatch) // Patch contains only changed fields
919925

920926
### Publish Call Reduction
921927

922-
| Scenario | Items | Workspaces | Before | After | Improvement |
923-
| ----------------------------- | ----- | ---------- | ------------ | ---------- | ----------- |
924-
| Small batch, single workspace | 10 | 1 | 10 calls | 1 call | 10x |
925-
| Medium batch, 3 workspaces | 50 | 3 | 50 calls | 3 calls | 16x |
926-
| Large batch, single workspace | 100 | 1 | 100 calls | 1 call | 100x |
927-
| Bulk import, 5 workspaces | 500 | 5 | 500 calls | 5 calls | 100x |
928-
| Massive import, 10 workspaces | 1000 | 10 | 1000 calls | 10 calls | 100x |
928+
| Scenario | Items | Workspaces | Before | After | Improvement |
929+
| ----------------------------- | ----- | ---------- | ---------- | -------- | ----------- |
930+
| Small batch, single workspace | 10 | 1 | 10 calls | 1 call | 10x |
931+
| Medium batch, 3 workspaces | 50 | 3 | 50 calls | 3 calls | 16x |
932+
| Large batch, single workspace | 100 | 1 | 100 calls | 1 call | 100x |
933+
| Bulk import, 5 workspaces | 500 | 5 | 500 calls | 5 calls | 100x |
934+
| Massive import, 10 workspaces | 1000 | 10 | 1000 calls | 10 calls | 100x |
929935

930936
### Extra Query Elimination
931937

@@ -961,18 +967,21 @@ syncTx.Track(headerWithWorkspace{header, workspaceID})
961967
**Operation:** Update 50 headers across 3 workspaces
962968

963969
**Before:**
970+
964971
- 50 database updates (transaction)
965972
- 50 HTTP lookups (after commit) ❌
966973
- 50 publish calls ❌
967974
- **Total: 100 database operations**
968975

969976
**After:**
977+
970978
- 50 HTTP lookups (during validation) ✅
971979
- 50 database updates (transaction)
972980
- 3 publish calls (auto-grouped) ✅
973981
- **Total: 53 database operations (47% reduction)**
974982

975983
Plus:
984+
976985
- 16x fewer publish calls (50 → 3)
977986
- Compile-time safety (impossible to forget sync)
978987
- Patch preservation (85% less data for partial updates)
@@ -986,6 +995,7 @@ Plus:
986995
- Pre-allocated slices reused
987996

988997
**Example:**
998+
989999
- 100 items, 1 workspace: ~1KB overhead
9901000
- 1000 items, 10 workspaces: ~10KB overhead
9911001

@@ -1005,6 +1015,7 @@ Plus:
10051015
```
10061016

10071017
**NOT like log batching systems:**
1018+
10081019
- ❌ NO timeout waiting (e.g., 500ms)
10091020
- ❌ NO buffer accumulation
10101021
- ✅ Synchronous, immediate publish after commit
@@ -1601,7 +1612,7 @@ for _, item := range req.Items {
16011612
| ---------- | ------- | ---------------------------------------------------------------------------------------------------------------------------- |
16021613
| 2025-12-26 | 1.0.0 | Initial implementation with HttpHeaderInsert proof of concept |
16031614
| 2025-12-26 | 2.0.0 | Completed Phase 3 migration: All HTTP child entity CRUD operations (Batches 1-5, 14 operations) |
1604-
| 2025-12-27 | 2.1.0 | Comprehensive spec rewrite with examples, troubleshooting, and real-world usage from completed work |
1615+
| 2025-12-27 | 2.1.0 | Comprehensive spec rewrite with examples, troubleshooting, and real-world usage from completed work |
16051616
| 2025-12-27 | 3.0.0 | Completed HttpBodyRaw (Batch 7) and Flow operations (Batches 8-9): Added transaction safety + bulk sync to Edge/FlowVariable |
16061617

16071618
---
@@ -1611,56 +1622,67 @@ for _, item := range req.Items {
16111622
### ✅ Completed
16121623

16131624
**Phase 1: Foundation**
1625+
16141626
-`bulk_sync_tx.go` implementation
16151627
- ✅ Comprehensive unit tests
16161628
- ✅ Documentation
16171629

16181630
**Phase 2: Proof of Concept**
1631+
16191632
- ✅ HttpHeaderInsert (first migration)
16201633

16211634
**Phase 3: Gradual Rollout**
16221635

16231636
**Batch 1: HTTP Headers**
1637+
16241638
- ✅ HttpHeaderUpdate
16251639
- ✅ HttpHeaderDelete
16261640

16271641
**Batch 2: HTTP Search Params**
1642+
16281643
- ✅ HttpSearchParamInsert
16291644
- ✅ HttpSearchParamUpdate
16301645
- ✅ HttpSearchParamDelete
16311646

16321647
**Batch 3: HTTP Asserts**
1648+
16331649
- ✅ HttpAssertInsert
16341650
- ✅ HttpAssertUpdate
16351651
- ✅ HttpAssertDelete
16361652

16371653
**Batch 4: HTTP Body Form Data**
1654+
16381655
- ✅ HttpBodyFormDataInsert
16391656
- ✅ HttpBodyFormDataUpdate
16401657
- ✅ HttpBodyFormDataDelete
16411658

16421659
**Batch 5: HTTP Body URL Encoded**
1660+
16431661
- ✅ HttpBodyUrlEncodedInsert
16441662
- ✅ HttpBodyUrlEncodedUpdate
16451663
- ✅ HttpBodyUrlEncodedDelete
16461664

16471665
**Batch 6: Main HTTP Entry**
1666+
16481667
- ✅ HttpUpdate (refactored to bulk sync wrapper)
16491668
- ✅ HttpDelete (refactored to bulk sync wrapper)
16501669
- ✅ HttpInsert (already uses SyncTxInsert - single-topic wrapper)
16511670

16521671
**Batch 7: HTTP Body Raw**
1672+
16531673
- ✅ HttpBodyRawInsert (refactored to bulk sync wrapper)
16541674
- ✅ HttpBodyRawUpdate (refactored to bulk sync wrapper with patch tracking)
16551675
- Note: HttpBodyRaw is singleton (one per HTTP entry), migrated for pattern consistency
16561676

16571677
**Batch 8: Flow Edge Operations**
1678+
16581679
- ✅ EdgeInsert (CRITICAL: Added transaction safety + bulk sync wrapper)
16591680
- ✅ EdgeUpdate (CRITICAL: Added transaction safety + bulk sync wrapper with patch tracking)
16601681
- ✅ EdgeDelete (CRITICAL: Added transaction safety, manual publish kept)
16611682
- Note: Phase 1 fixed critical data corruption bug (partial commits on failure)
16621683

16631684
**Batch 9: Flow Variable Operations**
1685+
16641686
- ✅ FlowVariableInsert (CRITICAL: Added transaction safety + bulk sync wrapper)
16651687
- ✅ FlowVariableUpdate (CRITICAL: Added transaction safety + bulk sync wrapper with patch tracking)
16661688
- ✅ FlowVariableDelete (CRITICAL: Added transaction safety, manual publish kept)
@@ -1671,9 +1693,11 @@ for _, item := range req.Items {
16711693
### ⏳ Remaining
16721694

16731695
**Phase 4: Flow Nodes** (future)
1696+
16741697
- FlowNodeInsert/Update/Delete
16751698

16761699
**Phase 5: Other Resources** (future)
1700+
16771701
- Environment CRUD
16781702
- File CRUD
16791703
- Reference CRUD

packages/server/internal/api/rflowv2/rflowv2_edge.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ func (s *FlowServiceV2RPC) EdgeUpdate(ctx context.Context, req *connect.Request[
200200

201201
if item.SourceHandle != nil {
202202
existing.SourceHandler = convertHandle(item.GetSourceHandle())
203-
edgePatch.SourceHandler = patch.NewOptional(int32(existing.SourceHandler))
203+
edgePatch.SourceHandler = patch.NewOptional(existing.SourceHandler)
204204
}
205205

206206
validatedUpdates = append(validatedUpdates, updateData{

packages/server/internal/api/rflowv2/rflowv2_edge_sync.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,4 @@ func (s *FlowServiceV2RPC) publishBulkEdgeUpdate(
3434
for _, evt := range events {
3535
s.publishEdgeEvent(edgeEventUpdate, evt.Item.edge)
3636
}
37-
}
38-
39-
// publishBulkEdgeDelete publishes multiple edge delete events in bulk.
40-
// Groups edges by flow ID and publishes all deletions for that flow in a single event batch.
41-
func (s *FlowServiceV2RPC) publishBulkEdgeDelete(
42-
topic EdgeTopic,
43-
items []edgeWithFlow,
44-
) {
45-
for _, item := range items {
46-
s.publishEdgeEvent(edgeEventDelete, item.edge)
47-
}
48-
}
37+
}

packages/server/internal/api/rflowv2/rflowv2_flow.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -292,11 +292,7 @@ func (s *FlowServiceV2RPC) FlowInsert(ctx context.Context, req *connect.Request[
292292
}
293293

294294
// Track for bulk event publishing
295-
syncTx.Track(flowNodePair{
296-
flow: data.flow,
297-
startNode: data.startNode,
298-
workspaceID: data.workspaceID,
299-
})
295+
syncTx.Track(flowNodePair(data))
300296

301297
// Increment workspace flow count
302298
workspaceUpdates[data.workspaceID].FlowCount++
@@ -371,7 +367,8 @@ func (s *FlowServiceV2RPC) FlowUpdate(ctx context.Context, req *connect.Request[
371367
flowPatch.Duration = patch.NewOptional(uint64(0))
372368
case flowv1.FlowUpdate_DurationUnion_KIND_VALUE:
373369
flow.Duration = du.GetValue()
374-
flowPatch.Duration = patch.NewOptional(uint64(du.GetValue()))
370+
durVal := max(0, int64(du.GetValue()))
371+
flowPatch.Duration = patch.NewOptional(uint64(durVal)) //nolint:gosec // G115: Safe conversion - value is clamped to non-negative
375372
}
376373
}
377374

@@ -493,10 +490,7 @@ func (s *FlowServiceV2RPC) FlowDelete(ctx context.Context, req *connect.Request[
493490
workspace.FlowCount--
494491
}
495492

496-
deletedFlows = append(deletedFlows, flowWithWorkspace{
497-
flow: data.flow,
498-
workspaceID: data.workspaceID,
499-
})
493+
deletedFlows = append(deletedFlows, flowWithWorkspace(data))
500494
}
501495

502496
// Update all workspaces

packages/server/internal/api/rflowv2/rflowv2_node.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,7 @@ func (s *FlowServiceV2RPC) NodeInsert(
110110
return nil, connect.NewError(connect.CodeInternal, err)
111111
}
112112

113-
syncTx.Track(nodeWithFlow{
114-
node: data.node,
115-
flowID: data.flowID,
116-
})
113+
syncTx.Track(nodeWithFlow(data))
117114
}
118115

119116
// 4. Commit transaction and publish events in bulk

packages/server/internal/api/rflowv2/rflowv2_variable_sync.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,4 @@ func (s *FlowServiceV2RPC) publishBulkFlowVariableUpdate(
3434
for _, evt := range events {
3535
s.publishFlowVariableEvent(flowVarEventUpdate, evt.Item.variable)
3636
}
37-
}
38-
39-
// publishBulkFlowVariableDelete publishes multiple flow variable delete events in bulk.
40-
// Groups variables by flow ID and publishes all deletions for that flow in a single event batch.
41-
func (s *FlowServiceV2RPC) publishBulkFlowVariableDelete(
42-
topic FlowVariableTopic,
43-
items []variableWithFlow,
44-
) {
45-
for _, item := range items {
46-
s.publishFlowVariableEvent(flowVarEventDelete, item.variable)
47-
}
48-
}
37+
}

packages/server/pkg/testutil/concurrency.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func RunConcurrentInserts[T any](
9999
var wg sync.WaitGroup
100100

101101
// Launch concurrent operations
102-
for i := 0; i < config.NumGoroutines; i++ {
102+
for i := range config.NumGoroutines {
103103
wg.Add(1)
104104
go func(index int) {
105105
defer wg.Done()
@@ -160,12 +160,13 @@ func RunConcurrentInserts[T any](
160160
)
161161

162162
for result := range resultChan {
163-
if result.timeout {
163+
switch {
164+
case result.timeout:
164165
timeoutCount++
165166
t.Logf("⚠️ Operation timed out after %v (potential deadlock)", result.duration)
166-
} else if result.success {
167+
case result.success:
167168
successCount++
168-
} else {
169+
default:
169170
errorCount++
170171
t.Logf("❌ Operation failed: %v", result.err)
171172
}

0 commit comments

Comments
 (0)