Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2143,6 +2143,47 @@ describe('e2e', () => {
}
);

test(
'hookWithSleepFinalStepWorkflow - step only on final payload',
{ timeout: 120_000 },
async () => {
// Regression test for the v0chat incident. Mirrors the production
// shape: a hook + fire-and-forget sleep, where the step runs only
// once the final (done) payload arrives. Replay ends up with two
// `hook_received` events followed by a single `step_created`, which
// is the race window for the deferred unconsumed-event check.
const token = Math.random().toString(36).slice(2);

const run = await start(await e2e('hookWithSleepFinalStepWorkflow'), [
token,
]);

// Wait for the hook to register.
await new Promise((resolve) => setTimeout(resolve, 5_000));

let hook = await getHookByToken(token);
expect(hook.runId).toBe(run.runId);
await resumeHook(hook, { type: 'msg', id: 1 });

// Let the workflow replay and suspend before the next payload so the
// final event log contains two `hook_received` entries before any
// `step_created` — the exact replay shape from production.
await new Promise((resolve) => setTimeout(resolve, 3_000));

hook = await getHookByToken(token);
await resumeHook(hook, { type: 'final', id: 2, done: true });

const returnValue = await run.returnValue;
expect(returnValue).toEqual({
seen: [1, 2],
finalResult: { processed: true, type: 'final', id: 2 },
});

const { json: runData } = await cliInspectJson(`runs ${run.runId}`);
expect(runData.status).toBe('completed');
}
);

test(
'sleepInLoopWorkflow - sleep inside loop with steps actually delays each iteration',
{ timeout: 60_000 },
Expand Down
10 changes: 9 additions & 1 deletion packages/core/src/events-consumer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import type { Event } from '@workflow/world';
import { eventsLogger } from './logger.js';

/**
* Delay before firing the deferred unconsumed-event check after the promise
* queue has drained. Must be long enough for cross-VM microtask chains to
* propagate (resolve in host → workflow code in VM → subscribe call back
* in host). Any subscribe() arriving during this window cancels the check.
*/
export const DEFERRED_CHECK_DELAY_MS = 100;

export enum EventConsumerResult {
/**
* Callback consumed the event, but should not be removed from the callbacks list
Expand Down Expand Up @@ -138,7 +146,7 @@ export class EventsConsumer {
this.pendingUnconsumedCheck = null;
this.onUnconsumedEvent(currentEvent);
}
}, 100);
}, DEFERRED_CHECK_DELAY_MS);
});
}
};
Expand Down
165 changes: 164 additions & 1 deletion packages/core/src/hook-sleep-interaction.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { WorkflowRuntimeError } from '@workflow/errors';
import { withResolvers } from '@workflow/utils';
import type { Event } from '@workflow/world';
import * as nanoid from 'nanoid';
Expand Down Expand Up @@ -34,12 +35,23 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext {
const ulid = monotonicFactory(() => context.globalThis.Math.random());
const workflowStartedAt = context.globalThis.Date.now();
const promiseQueueHolder = { current: Promise.resolve() };
// Forward onUnconsumedEvent through ctx.onWorkflowError so tests that wire
// onWorkflowError to a discontinuation promise (see runWithDiscontinuation)
// actually observe false-positive unconsumed-event detections instead of
// silently dropping them.
const ctxRef: { current?: WorkflowOrchestratorContext } = {};
const ctx: WorkflowOrchestratorContext = {
runId: 'wrun_test',
encryptionKey: undefined,
globalThis: context.globalThis,
eventsConsumer: new EventsConsumer(events, {
onUnconsumedEvent: () => {},
onUnconsumedEvent: (event) => {
ctxRef.current?.onWorkflowError(
new WorkflowRuntimeError(
`Unconsumed event in event log: eventType=${event.eventType}, correlationId=${event.correlationId}, eventId=${event.eventId}. This indicates a corrupted or invalid event log.`
)
);
},
getPromiseQueue: () => promiseQueueHolder.current,
}),
invocationsQueue: new Map(),
Expand All @@ -56,6 +68,7 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext {
},
pendingDeliveries: 0,
};
ctxRef.current = ctx;
return ctx;
}

Expand Down Expand Up @@ -599,6 +612,156 @@ function defineTests(mode: 'sync' | 'async') {
});
});

describe(`hook + sleep with step per payload ${label}`, () => {
it('should not trigger unconsumed event error when for-await loop calls a step per hook payload', async () => {
// Reproduces CI failure: hookWithSleepWorkflow event log had alternating
// hook_received + step lifecycle events. During replay, the EventsConsumer
// advances past the second step_created before the for-await loop has
// called processPayload (and registered the step consumer). The deferred
// unconsumed check must wait for the new async work (hook payload
// deserialization) before declaring the event orphaned.
await setupHydrateMock();
const ops: Promise<any>[] = [];
const [payload1, payload2, stepResult1, stepResult2] = await Promise.all([
dehydrateStepReturnValue(
{ type: 'subscribe', id: 1 },
'wrun_test',
undefined,
ops
),
dehydrateStepReturnValue(
{ type: 'done', done: true },
'wrun_test',
undefined,
ops
),
dehydrateStepReturnValue(
{ processed: true, type: 'subscribe', id: 1 },
'wrun_test',
undefined,
ops
),
dehydrateStepReturnValue(
{ processed: true, type: 'done' },
'wrun_test',
undefined,
ops
),
]);

const ctx = setupWorkflowContext([
{
eventId: 'evnt_0',
runId: 'wrun_test',
eventType: 'hook_created',
correlationId: `hook_${CORR_IDS[0]}`,
eventData: { token: 'test-token', isWebhook: false },
createdAt: new Date(),
Comment on lines +652 to +659
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is intended to ensure an "unconsumed event" error does not occur, but setupWorkflowContext() configures EventsConsumer with onUnconsumedEvent: () => {}. That means the regression would manifest as a hang until the test times out (and it won’t surface the same error the real runtime throws). To make the regression signal deterministic and match production, consider letting the test pass an onUnconsumedEvent handler that rejects via ctx.onWorkflowError (or throws) so a failure is immediate and clearly attributed to the unconsumed-event path.

Copilot uses AI. Check for mistakes.
},
{
eventId: 'evnt_1',
runId: 'wrun_test',
eventType: 'wait_created',
correlationId: `wait_${CORR_IDS[1]}`,
eventData: { resumeAt: new Date('2099-01-01') },
createdAt: new Date(),
},
// First hook payload → step lifecycle
{
eventId: 'evnt_2',
runId: 'wrun_test',
eventType: 'hook_received',
correlationId: `hook_${CORR_IDS[0]}`,
eventData: { payload: payload1 },
createdAt: new Date(),
},
{
eventId: 'evnt_3',
runId: 'wrun_test',
eventType: 'step_created',
correlationId: `step_${CORR_IDS[2]}`,
eventData: { stepName: 'processPayload', input: payload1 },
createdAt: new Date(),
},
{
eventId: 'evnt_4',
runId: 'wrun_test',
eventType: 'step_started',
correlationId: `step_${CORR_IDS[2]}`,
eventData: {},
createdAt: new Date(),
},
{
eventId: 'evnt_5',
runId: 'wrun_test',
eventType: 'step_completed',
correlationId: `step_${CORR_IDS[2]}`,
eventData: { result: stepResult1 },
createdAt: new Date(),
},
// Second hook payload → step lifecycle
{
eventId: 'evnt_6',
runId: 'wrun_test',
eventType: 'hook_received',
correlationId: `hook_${CORR_IDS[0]}`,
eventData: { payload: payload2 },
createdAt: new Date(),
},
{
eventId: 'evnt_7',
runId: 'wrun_test',
eventType: 'step_created',
correlationId: `step_${CORR_IDS[3]}`,
eventData: { stepName: 'processPayload', input: payload2 },
createdAt: new Date(),
},
{
eventId: 'evnt_8',
runId: 'wrun_test',
eventType: 'step_started',
correlationId: `step_${CORR_IDS[3]}`,
eventData: {},
createdAt: new Date(),
},
{
eventId: 'evnt_9',
runId: 'wrun_test',
eventType: 'step_completed',
correlationId: `step_${CORR_IDS[3]}`,
eventData: { result: stepResult2 },
createdAt: new Date(),
},
]);

const createHook = createCreateHook(ctx);
const sleep = createSleep(ctx);
const useStep = createUseStep(ctx);

const { result, error } = await runWithDiscontinuation(ctx, async () => {
const hook = createHook();
void sleep('1d');

const processPayload = useStep<[any], any>('processPayload');
const results: any[] = [];

for await (const payload of hook) {
const processed = await processPayload(payload);
results.push(processed);
if ((payload as any).done) break;
}

return results;
});

expect(error).toBeUndefined();
expect(result).toEqual([
{ processed: true, type: 'subscribe', id: 1 },
{ processed: true, type: 'done' },
]);
});
});

describe(`hook only (no concurrent pending entity) ${label}`, () => {
it('should deliver all hook payloads and reach step when no sleep or incomplete step exists', async () => {
await setupHydrateMock();
Expand Down
Loading
Loading