diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java index 19a488e775..5e2f1dbbd3 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java @@ -265,6 +265,21 @@ void mutableSideEffect( Func1, Optional> func, Functions.Proc1> callback); + default Integer getVersion( + String changeId, + int minSupported, + int maxSupported, + Functions.Proc2 callback) { + return getVersion( + changeId, + minSupported, + maxSupported, + (version, exception) -> { + callback.apply(version, exception); + return true; + }); + } + /** * GetVersion is used to safely perform backwards incompatible changes to workflow definitions. It * is not allowed to update workflow code while there are workflows running as it is going to @@ -278,14 +293,14 @@ void mutableSideEffect( * @param changeId identifier of a particular change * @param minSupported min version supported for the change * @param maxSupported max version supported for the change - * @param callback used to return version + * @param callback used to return version. Returning true requests an additional event loop turn. * @return True if the identifier is not present in history */ Integer getVersion( String changeId, int minSupported, int maxSupported, - Functions.Proc2 callback); + Functions.Func2 callback); /** Replay safe random. */ Random newRandom(); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java index dd1844a316..1dc1412547 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java @@ -335,7 +335,7 @@ public Integer getVersion( String changeId, int minSupported, int maxSupported, - Functions.Proc2 callback) { + Functions.Func2 callback) { return workflowStateMachines.getVersion(changeId, minSupported, maxSupported, callback); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java index 2f2c716f24..fbc30bb97d 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java @@ -1229,7 +1229,7 @@ public Integer getVersion( String changeId, int minSupported, int maxSupported, - Functions.Proc2 callback) { + Functions.Func2 callback) { VersionStateMachine stateMachine = versions.computeIfAbsent( changeId, @@ -1261,11 +1261,14 @@ public Integer getVersion( return sa; }, (v, e) -> { - callback.apply(v, e); - // without this getVersion call will trigger the end of WFT, - // instead we want to prepare subsequent commands and unblock the execution one more - // time. - eventLoop(); + if (Boolean.TRUE.equals(callback.apply(v, e))) { + // without this getVersion call will trigger the end of WFT, + // instead we want to prepare subsequent commands and unblock the execution one more + // time. + eventLoop(); + } else if (e != null) { + throw e; + } }); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 7a956dc4b7..5415cb1f93 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -1143,21 +1143,27 @@ private R mutableSideEffectImpl( @Override public int getVersion(String changeId, int minSupported, int maxSupported) { CompletablePromise result = Workflow.newPromise(); + AtomicBoolean callbackScheduled = new AtomicBoolean(); Integer versionToUse = replayContext.getVersion( changeId, minSupported, maxSupported, - (v, e) -> - runner.executeInWorkflowThread( - "version-callback", - () -> { - if (v != null) { - result.complete(v); - } else { - result.completeExceptionally(e); - } - })); + (v, e) -> { + if (!callbackScheduled.compareAndSet(false, true)) { + return false; + } + runner.executeInWorkflowThread( + "version-callback", + () -> { + if (v != null) { + result.complete(v); + } else { + result.completeExceptionally(e); + } + }); + return true; + }); /* * If we are replaying a workflow and encounter a getVersion call it is possible that this call did not exist * on the original execution. If the call did not exist on the original execution then we cannot block on results diff --git a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/VersionStateMachineTest.java b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/VersionStateMachineTest.java index 62fcf501c4..4bace38ffc 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/VersionStateMachineTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/VersionStateMachineTest.java @@ -41,6 +41,14 @@ private WorkflowStateMachines newStateMachines(TestEntityManagerListenerBase lis return new WorkflowStateMachines(listener, stateMachineList::add); } + private static Functions.Func2 continueAsEventLoopTurn( + Functions.Proc2 callback) { + return (t1, t2) -> { + callback.apply(t1, t2); + return true; + }; + } + @AfterClass public static void generateCoverage() { List>> @@ -65,7 +73,9 @@ class TestListener extends TestEntityManagerListenerBase { public void buildWorkflow(AsyncWorkflowBuilder builder) { builder .add2( - (v, c) -> stateMachines.getVersion("id1", DEFAULT_VERSION, maxSupported, c)) + (v, c) -> + stateMachines.getVersion( + "id1", DEFAULT_VERSION, maxSupported, continueAsEventLoopTurn(c))) .add((v) -> stateMachines.completeWorkflow(converter.toPayloads(v.getT1()))); } } @@ -125,16 +135,20 @@ class TestListener extends TestEntityManagerListenerBase { public void buildWorkflow(AsyncWorkflowBuilder builder) { builder .add2( - (v, c) -> stateMachines.getVersion("id1", DEFAULT_VERSION, maxSupported, c)) + (v, c) -> + stateMachines.getVersion( + "id1", DEFAULT_VERSION, maxSupported, continueAsEventLoopTurn(c))) .add2( (v, c) -> { assertNull(v.getT2()); - stateMachines.getVersion("id1", DEFAULT_VERSION, maxSupported + 10, c); + stateMachines.getVersion( + "id1", DEFAULT_VERSION, maxSupported + 10, continueAsEventLoopTurn(c)); }) .add2( (v, c) -> { assertNull(v.getT2()); - stateMachines.getVersion("id1", DEFAULT_VERSION, maxSupported + 100, c); + stateMachines.getVersion( + "id1", DEFAULT_VERSION, maxSupported + 100, continueAsEventLoopTurn(c)); }) .add( (v) -> { @@ -211,11 +225,14 @@ class TestListener extends TestEntityManagerListenerBase { public void buildWorkflow(AsyncWorkflowBuilder builder) { builder .add2( - (v, c) -> stateMachines.getVersion("id1", DEFAULT_VERSION, maxSupported, c)) + (v, c) -> + stateMachines.getVersion( + "id1", DEFAULT_VERSION, maxSupported, continueAsEventLoopTurn(c))) .add2( (v, c) -> { assertNull(v.getT2()); - stateMachines.getVersion("id1", maxSupported + 10, maxSupported + 10, c); + stateMachines.getVersion( + "id1", maxSupported + 10, maxSupported + 10, continueAsEventLoopTurn(c)); }) .add( (v) -> { @@ -295,7 +312,8 @@ class ReplayTestListener extends TestEntityManagerListenerBase { @Override public void buildWorkflow(AsyncWorkflowBuilder builder) { builder - .add2((v, c) -> stateMachines.getVersion("id1", 1, 1, c)) + .add2( + (v, c) -> stateMachines.getVersion("id1", 1, 1, continueAsEventLoopTurn(c))) .add( (v) -> { versionCallException.set(v.getT2()); @@ -346,16 +364,20 @@ class TestListener extends TestEntityManagerListenerBase { protected void buildWorkflow(AsyncWorkflowBuilder builder) { builder .add2( - (v, c) -> stateMachines.getVersion("id1", DEFAULT_VERSION, maxSupported, c)) + (v, c) -> + stateMachines.getVersion( + "id1", DEFAULT_VERSION, maxSupported, continueAsEventLoopTurn(c))) .add2( (v, c) -> { trace.append(v.getT1()).append(", "); - stateMachines.getVersion("id1", DEFAULT_VERSION, maxSupported + 10, c); + stateMachines.getVersion( + "id1", DEFAULT_VERSION, maxSupported + 10, continueAsEventLoopTurn(c)); }) .add2( (v, c) -> { trace.append(v.getT1()).append(", "); - stateMachines.getVersion("id1", DEFAULT_VERSION, maxSupported + 100, c); + stateMachines.getVersion( + "id1", DEFAULT_VERSION, maxSupported + 100, continueAsEventLoopTurn(c)); }) .add1( (v, c) -> { @@ -421,7 +443,9 @@ class TestListener extends TestEntityManagerListenerBase { protected void buildWorkflow(AsyncWorkflowBuilder builder) { builder .add2( - (v, c) -> stateMachines.getVersion("id1", DEFAULT_VERSION, maxSupported, c)) + (v, c) -> + stateMachines.getVersion( + "id1", DEFAULT_VERSION, maxSupported, continueAsEventLoopTurn(c))) .add( (v) -> { assertNull(v.getT2()); @@ -464,11 +488,14 @@ class TestListener extends TestEntityManagerListenerBase { public void buildWorkflow(AsyncWorkflowBuilder builder) { builder .add2( - (v, c) -> stateMachines.getVersion("id1", DEFAULT_VERSION, maxSupported, c)) + (v, c) -> + stateMachines.getVersion( + "id1", DEFAULT_VERSION, maxSupported, continueAsEventLoopTurn(c))) .add2( (v, c) -> { trace.append(v.getT1()).append(", "); - stateMachines.getVersion("id1", DEFAULT_VERSION, maxSupported + 10, c); + stateMachines.getVersion( + "id1", DEFAULT_VERSION, maxSupported + 10, continueAsEventLoopTurn(c)); }) .add1( (v, c) -> { @@ -489,11 +516,14 @@ public void buildWorkflow(AsyncWorkflowBuilder builder) { null, c)) .add2( - (v, c) -> stateMachines.getVersion("id1", maxSupported - 3, maxSupported + 10, c)) + (v, c) -> + stateMachines.getVersion( + "id1", maxSupported - 3, maxSupported + 10, continueAsEventLoopTurn(c))) .add2( (v, c) -> { trace.append(v.getT1()).append(", "); - stateMachines.getVersion("id1", DEFAULT_VERSION, maxSupported + 100, c); + stateMachines.getVersion( + "id1", DEFAULT_VERSION, maxSupported + 100, continueAsEventLoopTurn(c)); }) .add( (v) -> { @@ -608,9 +638,17 @@ class TestListener extends TestEntityManagerListenerBase { @Override protected void buildWorkflow(AsyncWorkflowBuilder builder) { builder - /*.add((v, c) -> stateMachines.getVersion("id1", DEFAULT_VERSION, maxSupported, c))*/ + /*.add( + (v, c) -> + stateMachines.getVersion( + "id1", + DEFAULT_VERSION, + maxSupported, + continueAsEventLoopTurn(c)))*/ .add2( - (v, c) -> stateMachines.getVersion("id1", DEFAULT_VERSION, maxSupported + 10, c)) + (v, c) -> + stateMachines.getVersion( + "id1", DEFAULT_VERSION, maxSupported + 10, continueAsEventLoopTurn(c))) .add1( (v, c) -> { trace.append(v.getT1()).append(", "); @@ -630,8 +668,16 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { null, c)) .add2( - (v, c) -> stateMachines.getVersion("id1", maxSupported - 3, maxSupported + 10, c)) - /*.add((v, c) -> stateMachines.getVersion("id1", DEFAULT_VERSION, maxSupported + 100, c));*/ + (v, c) -> + stateMachines.getVersion( + "id1", maxSupported - 3, maxSupported + 10, continueAsEventLoopTurn(c))) + /*.add( + (v, c) -> + stateMachines.getVersion( + "id1", + DEFAULT_VERSION, + maxSupported + 100, + continueAsEventLoopTurn(c)));*/ .add( (v) -> { trace.append(v.getT1()); @@ -721,6 +767,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { assertNull(e); versionId2 = r; c.apply(r); + return true; })) .add1( (v, c) -> @@ -814,11 +861,21 @@ class TestListener extends TestEntityManagerListenerBase { protected void buildWorkflow(AsyncWorkflowBuilder builder) { builder /* - .add((v, c) -> stateMachines.getVersion("id1", DEFAULT_VERSION, maxSupported, c)) + .add( + (v, c) -> + stateMachines.getVersion( + "id1", + DEFAULT_VERSION, + maxSupported, + continueAsEventLoopTurn(c))) .add( (v, c) -> { trace.append(v + ", "); - stateMachines.getVersion("id1", DEFAULT_VERSION, maxSupported + 10, c); + stateMachines.getVersion( + "id1", + DEFAULT_VERSION, + maxSupported + 10, + continueAsEventLoopTurn(c)); }) */ .add1( @@ -838,9 +895,16 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { null, c)) /*.add( - (v, c) -> stateMachines.getVersion("id1", maxSupported - 3, maxSupported + 10, c))*/ + (v, c) -> + stateMachines.getVersion( + "id1", + maxSupported - 3, + maxSupported + 10, + continueAsEventLoopTurn(c)))*/ .add2( - (v, c) -> stateMachines.getVersion("id1", DEFAULT_VERSION, maxSupported + 100, c)) + (v, c) -> + stateMachines.getVersion( + "id1", DEFAULT_VERSION, maxSupported + 100, continueAsEventLoopTurn(c))) .add( (v) -> { trace.append(v.getT1()); @@ -1062,7 +1126,9 @@ class TestListener extends TestEntityManagerListenerBase { protected void buildWorkflow(AsyncWorkflowBuilder builder) { builder .add2( - (v, c) -> stateMachines.getVersion("id1", DEFAULT_VERSION, maxSupported, c)) + (v, c) -> + stateMachines.getVersion( + "id1", DEFAULT_VERSION, maxSupported, continueAsEventLoopTurn(c))) .add1( (v, c) -> { assertNull(v.getT2()); @@ -1074,7 +1140,9 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { c); }) .add2( - (v, c) -> stateMachines.getVersion("id1", DEFAULT_VERSION, maxSupported + 100, c)) + (v, c) -> + stateMachines.getVersion( + "id1", DEFAULT_VERSION, maxSupported + 100, continueAsEventLoopTurn(c))) .add( (v) -> { assertNull(v.getT2()); @@ -1176,7 +1244,9 @@ public void buildWorkflow(AsyncWorkflowBuilder builder) { ignore -> {}))) .add((v) -> cancelTimerProc.get().apply()) .add2( - (v, c) -> stateMachines.getVersion("id1", maxSupported - 3, maxSupported + 10, c)) + (v, c) -> + stateMachines.getVersion( + "id1", maxSupported - 3, maxSupported + 10, continueAsEventLoopTurn(c))) .add1( (v, c) -> { assertNull(v.getT2()); @@ -1250,7 +1320,9 @@ class TestListener extends TestEntityManagerListenerBase { protected void buildWorkflow(AsyncWorkflowBuilder builder) { builder .add2( - (v, c) -> stateMachines.getVersion("id1", DEFAULT_VERSION, maxSupported, c)) + (v, c) -> + stateMachines.getVersion( + "id1", DEFAULT_VERSION, maxSupported, continueAsEventLoopTurn(c))) .add1( (v, c) -> { assertNull(v.getT2()); @@ -1262,7 +1334,9 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { c); }) .add2( - (v, c) -> stateMachines.getVersion("id1", DEFAULT_VERSION, maxSupported, c)) + (v, c) -> + stateMachines.getVersion( + "id1", DEFAULT_VERSION, maxSupported, continueAsEventLoopTurn(c))) .add( (v) -> { assertNull(v.getT2()); diff --git a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/WorkflowStateMachinesTest.java b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/WorkflowStateMachinesTest.java index a90f7a9cb7..a88f2574bf 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/WorkflowStateMachinesTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/WorkflowStateMachinesTest.java @@ -1,11 +1,18 @@ package io.temporal.internal.statemachines; +import static io.temporal.workflow.Workflow.DEFAULT_VERSION; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import io.temporal.api.command.v1.Command; +import io.temporal.api.enums.v1.CommandType; import io.temporal.api.enums.v1.EventType; +import io.temporal.api.history.v1.HistoryEvent; import io.temporal.api.history.v1.WorkflowTaskCompletedEventAttributes; import io.temporal.api.sdk.v1.WorkflowTaskCompletedMetadata; +import io.temporal.internal.common.UpdateMessage; import io.temporal.serviceclient.Version; +import java.util.List; import java.util.Optional; import org.junit.Test; @@ -70,4 +77,68 @@ public void writesOnlyNameIfChanged() { public void writesOnlyVersionIfChanged() { sdkNameAndVersionTest("safklasjf", Version.SDK_NAME, null, Version.LIBRARY_VERSION); } + + @Test + public void getVersionFalseCallbackDoesNotTriggerExtraEventLoop() { + final int maxSupported = 7; + + class FalsePathListener implements StatesMachinesCallback { + int eventLoopCalls; + int versionCallbackCalls; + Integer callbackVersion; + Integer returnedVersion; + + @Override + public void start(HistoryEvent startWorkflowEvent) {} + + @Override + public void signal(HistoryEvent signalEvent) {} + + @Override + public void update(UpdateMessage message) {} + + @Override + public void cancel(HistoryEvent cancelEvent) {} + + @Override + public void eventLoop() { + eventLoopCalls++; + if (eventLoopCalls > 1) { + return; + } + + returnedVersion = + stateMachines.getVersion( + "id1", + DEFAULT_VERSION, + maxSupported, + (version, exception) -> { + versionCallbackCalls++; + callbackVersion = version; + assertNull(exception); + return false; + }); + stateMachines.completeWorkflow(Optional.empty()); + } + } + + FalsePathListener listener = new FalsePathListener(); + stateMachines = new WorkflowStateMachines(listener, m -> {}); + + TestHistoryBuilder h = + new TestHistoryBuilder() + .add(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED) + .addWorkflowTask(); + + List commands = h.handleWorkflowTaskTakeCommands(stateMachines, 1); + + assertEquals(1, listener.eventLoopCalls); + assertEquals(1, listener.versionCallbackCalls); + assertEquals(Integer.valueOf(maxSupported), listener.callbackVersion); + assertEquals(Integer.valueOf(maxSupported), listener.returnedVersion); + assertEquals(2, commands.size()); + assertEquals(CommandType.COMMAND_TYPE_RECORD_MARKER, commands.get(0).getCommandType()); + assertEquals( + CommandType.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, commands.get(1).getCommandType()); + } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/failure/WorkflowFailureGetVersionTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/failure/WorkflowFailureGetVersionTest.java new file mode 100644 index 0000000000..d240c5d86f --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/failure/WorkflowFailureGetVersionTest.java @@ -0,0 +1,81 @@ +package io.temporal.workflow.failure; + +import static io.temporal.testUtils.Eventually.assertEventually; + +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.enums.v1.EventType; +import io.temporal.api.failure.v1.Failure; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowException; +import io.temporal.client.WorkflowStub; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1; +import java.time.Duration; +import java.util.List; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +public class WorkflowFailureGetVersionTest { + + @Rule public TestName testName = new TestName(); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestWorkflowGetVersionAndException.class) + .build(); + + @Test + public void getVersionAndException() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + WorkflowExecution execution = WorkflowClient.start(workflow::execute, testName.getMethodName()); + WorkflowStub workflowStub = WorkflowStub.fromTyped(workflow); + + try { + HistoryEvent workflowTaskFailed = + assertEventually( + Duration.ofSeconds(5), + () -> { + List failedEvents = + testWorkflowRule.getHistoryEvents( + execution.getWorkflowId(), EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED); + Assert.assertFalse("No workflow task failure recorded", failedEvents.isEmpty()); + return failedEvents.get(0); + }); + + Failure failure = + getDeepestFailure(workflowTaskFailed.getWorkflowTaskFailedEventAttributes().getFailure()); + Assert.assertEquals("Any error", failure.getMessage()); + Assert.assertTrue(failure.hasApplicationFailureInfo()); + Assert.assertEquals( + RuntimeException.class.getName(), failure.getApplicationFailureInfo().getType()); + } finally { + try { + workflowStub.terminate("terminate test workflow"); + } catch (WorkflowException ignored) { + } + } + } + + private static Failure getDeepestFailure(Failure failure) { + while (failure.hasCause()) { + failure = failure.getCause(); + } + return failure; + } + + public static class TestWorkflowGetVersionAndException implements TestWorkflow1 { + + @Override + public String execute(String unused) { + String changeId = "change-id"; + Workflow.getVersion(changeId, Workflow.DEFAULT_VERSION, 1); + Workflow.getVersion(changeId, Workflow.DEFAULT_VERSION, 1); + throw new RuntimeException("Any error"); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionMultipleCallsTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionMultipleCallsTest.java index 4bd8052e69..7a1f143103 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionMultipleCallsTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionMultipleCallsTest.java @@ -1,7 +1,14 @@ package io.temporal.workflow.versionTests; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.enums.v1.EventType; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowStub; import io.temporal.testing.WorkflowReplayer; import io.temporal.testing.internal.SDKTestOptions; import io.temporal.testing.internal.SDKTestWorkflowRule; @@ -11,6 +18,7 @@ import io.temporal.workflow.shared.TestActivities.VariousTestActivities; import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1; import java.time.Duration; +import java.util.List; import org.junit.Rule; import org.junit.Test; @@ -40,6 +48,55 @@ public void testGetVersionMultipleCalls() { assertEquals("activity1", result); } + @Test + public void testGetVersionMultipleCallsDoesNotYieldBeforeSleep() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + WorkflowExecution execution = + WorkflowClient.start(workflow::execute, testWorkflowRule.getTaskQueue()); + WorkflowStub workflowStub = WorkflowStub.fromTyped(workflow); + + String result = workflowStub.getResult(String.class); + assertEquals("activity1", result); + + List historyEvents = + testWorkflowRule + .getExecutionHistory(execution.getWorkflowId()) + .getHistory() + .getEventsList(); + int completedWorkflowTasksBeforeTimer = 0; + boolean timerStarted = false; + boolean workflowTaskFailedBeforeTimer = false; + int workflowSignalsBeforeTimer = 0; + for (HistoryEvent event : historyEvents) { + if (event.getEventType() == EventType.EVENT_TYPE_TIMER_STARTED) { + timerStarted = true; + break; + } + if (event.getEventType() == EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED) { + completedWorkflowTasksBeforeTimer++; + } + if (event.getEventType() == EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED) { + workflowTaskFailedBeforeTimer = true; + } + if (event.getEventType() == EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED) { + workflowSignalsBeforeTimer++; + } + } + + assertTrue("Expected Workflow.sleep to start a timer", timerStarted); + assertFalse( + "Duplicate getVersion calls should not fail the workflow task before the timer starts", + workflowTaskFailedBeforeTimer); + assertEquals( + "Duplicate getVersion calls should not need a signal before Workflow.sleep starts a timer", + 0, + workflowSignalsBeforeTimer); + assertEquals( + "Duplicate getVersion calls before Workflow.sleep should stay in the same workflow task", + 1, + completedWorkflowTasksBeforeTimer); + } + @Test public void testGetVersionMultipleCallsReplay() throws Exception { WorkflowReplayer.replayWorkflowExecutionFromResource( diff --git a/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java b/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java index 9f7aa44b6d..6a80ca4e5f 100644 --- a/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java +++ b/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java @@ -278,7 +278,7 @@ public Integer getVersion( String changeId, int minSupported, int maxSupported, - Functions.Proc2 callback) { + Functions.Func2 callback) { throw new UnsupportedOperationException("not implemented"); }