Skip to content

Commit 354ebb2

Browse files
committed
fix(runner): ensure deterministic loop execution order
- Enforce deterministic sort order for node executions in DB queries by adding secondary ID sort. - Add regression test for loop execution order to prevent future regressions.
1 parent 464da28 commit 354ebb2

3 files changed

Lines changed: 157 additions & 6 deletions

File tree

packages/db/pkg/sqlc/gen/flow.sql.go

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/db/pkg/sqlc/queries/flow.sql

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -589,20 +589,20 @@ WHERE id = ?;
589589
-- name: ListNodeExecutions :many
590590
SELECT * FROM node_execution
591591
WHERE node_id = ?
592-
ORDER BY completed_at DESC
592+
ORDER BY completed_at DESC, id DESC
593593
LIMIT ? OFFSET ?;
594594

595595
-- name: ListNodeExecutionsByState :many
596596
SELECT * FROM node_execution
597597
WHERE node_id = ? AND state = ?
598-
ORDER BY completed_at DESC
598+
ORDER BY completed_at DESC, id DESC
599599
LIMIT ? OFFSET ?;
600600

601601
-- name: ListNodeExecutionsByFlowRun :many
602602
SELECT ne.* FROM node_execution ne
603603
JOIN flow_node fn ON ne.node_id = fn.id
604604
WHERE fn.flow_id = ?
605-
ORDER BY ne.completed_at DESC;
605+
ORDER BY ne.completed_at DESC, ne.id DESC;
606606

607607
-- name: CreateNodeExecution :one
608608
INSERT INTO node_execution (
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package runner_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"testing"
8+
"time"
9+
10+
"the-dev-tools/server/pkg/flow/node"
11+
"the-dev-tools/server/pkg/flow/node/nfor"
12+
"the-dev-tools/server/pkg/flow/node/nstart"
13+
"the-dev-tools/server/pkg/flow/runner"
14+
"the-dev-tools/server/pkg/flow/runner/flowlocalrunner"
15+
"the-dev-tools/server/pkg/idwrap"
16+
"the-dev-tools/server/pkg/model/mflow"
17+
18+
"github.com/stretchr/testify/assert"
19+
"github.com/stretchr/testify/require"
20+
)
21+
22+
// trackingNode is a simple node that records when it runs
23+
type trackingNode struct {
24+
id idwrap.IDWrap
25+
name string
26+
mu *sync.Mutex
27+
log *[]string
28+
delay time.Duration
29+
}
30+
31+
func newTrackingNode(name string, mu *sync.Mutex, log *[]string, delay time.Duration) *trackingNode {
32+
return &trackingNode{
33+
id: idwrap.NewNow(),
34+
name: name,
35+
mu: mu,
36+
log: log,
37+
delay: delay,
38+
}
39+
}
40+
41+
func (n *trackingNode) GetID() idwrap.IDWrap { return n.id }
42+
func (n *trackingNode) GetName() string { return n.name }
43+
44+
func (n *trackingNode) RunSync(ctx context.Context, req *node.FlowNodeRequest) node.FlowNodeResult {
45+
fmt.Printf("Node %s running\n", n.name)
46+
if n.delay > 0 {
47+
time.Sleep(n.delay)
48+
}
49+
n.mu.Lock()
50+
defer n.mu.Unlock()
51+
*n.log = append(*n.log, n.name)
52+
53+
nextID := mflow.GetNextNodeID(req.EdgeSourceMap, n.id, mflow.HandleThen)
54+
return node.FlowNodeResult{NextNodeID: nextID}
55+
}
56+
57+
func (n *trackingNode) RunAsync(ctx context.Context, req *node.FlowNodeRequest, resultChan chan node.FlowNodeResult) {
58+
resultChan <- n.RunSync(ctx, req)
59+
}
60+
61+
func TestLoopExecutionOrder(t *testing.T) {
62+
// Setup shared log
63+
var executionLog []string
64+
var mu sync.Mutex
65+
66+
// Create nodes
67+
startNode := nstart.New(idwrap.NewNow(), "Start")
68+
69+
// Loop node: 3 iterations
70+
loopNode := nfor.New(idwrap.NewNow(), "Loop", 3, 10*time.Second, mflow.ErrorHandling_ERROR_HANDLING_UNSPECIFIED)
71+
72+
// Child nodes inside the loop
73+
// We add a small delay to node A to simulate work and potential race conditions
74+
nodeA := newTrackingNode("Node A", &mu, &executionLog, 10*time.Millisecond)
75+
nodeB := newTrackingNode("Node B", &mu, &executionLog, 0)
76+
77+
// Build edges
78+
// Start -> Loop
79+
// Loop (Loop handle) -> Node A
80+
// Node A -> Node B
81+
edges := []mflow.Edge{
82+
mflow.NewEdge(idwrap.NewNow(), startNode.GetID(), loopNode.GetID(), mflow.HandleUnspecified),
83+
mflow.NewEdge(idwrap.NewNow(), loopNode.GetID(), nodeA.GetID(), mflow.HandleLoop),
84+
mflow.NewEdge(idwrap.NewNow(), nodeA.GetID(), nodeB.GetID(), mflow.HandleThen),
85+
}
86+
edgeMap := mflow.NewEdgesMap(edges)
87+
88+
// Setup node registry
89+
nodeRegistry := map[idwrap.IDWrap]node.FlowNode{
90+
startNode.GetID(): startNode,
91+
loopNode.GetID(): loopNode,
92+
nodeA.GetID(): nodeA,
93+
nodeB.GetID(): nodeB,
94+
}
95+
96+
// Capture log events
97+
var logEvents []runner.FlowNodeStatus
98+
var logMu sync.Mutex
99+
100+
// Setup variable system
101+
varSystem := make(map[string]any)
102+
103+
// Execution context
104+
ctx := context.Background()
105+
req := &node.FlowNodeRequest{
106+
VarMap: varSystem,
107+
ReadWriteLock: &sync.RWMutex{},
108+
NodeMap: nodeRegistry,
109+
EdgeSourceMap: edgeMap,
110+
Timeout: 30 * time.Second,
111+
LogPushFunc: func(status runner.FlowNodeStatus) {
112+
logMu.Lock()
113+
defer logMu.Unlock()
114+
// Only capture completion events (SUCCESS/FAILURE) to verify completion order
115+
if status.State == mflow.NODE_STATE_SUCCESS || status.State == mflow.NODE_STATE_FAILURE {
116+
logEvents = append(logEvents, status)
117+
}
118+
},
119+
}
120+
121+
// Calculate predecessors
122+
predecessors := flowlocalrunner.BuildPredecessorMap(edgeMap)
123+
124+
// Run the flow starting from Start node
125+
err := flowlocalrunner.RunNodeASync(ctx, startNode.GetID(), req, req.LogPushFunc, predecessors)
126+
require.NoError(t, err)
127+
128+
// Verify actual execution order
129+
expectedExec := []string{"Node A", "Node B", "Node A", "Node B", "Node A", "Node B"}
130+
131+
mu.Lock()
132+
defer mu.Unlock()
133+
assert.Equal(t, expectedExec, executionLog, "Physical execution order mismatch")
134+
135+
// Verify Log Event Order
136+
logMu.Lock()
137+
defer logMu.Unlock()
138+
139+
var eventNames []string
140+
for _, e := range logEvents {
141+
// Filter out Loop events themselves, just check child nodes
142+
if e.Name == "Node A" || e.Name == "Node B" {
143+
eventNames = append(eventNames, e.Name)
144+
}
145+
}
146+
147+
assert.Equal(t, expectedExec, eventNames, "Log event emission order mismatch")
148+
if !assert.ObjectsAreEqual(expectedExec, eventNames) {
149+
fmt.Printf("Expected Events: %v\nActual Events: %v\n", expectedExec, eventNames)
150+
}
151+
}

0 commit comments

Comments
 (0)