From f3a88d8f28673fd8b158e5a99bffb1ad44fe5cd0 Mon Sep 17 00:00:00 2001 From: Ricardo Zanini Date: Fri, 1 May 2026 11:37:23 -0400 Subject: [PATCH 1/5] Isolate for loop variables in completable future Signed-off-by: Ricardo Zanini --- .../impl/executors/ForExecutor.java | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java index 26ca371b2..80cd0debf 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java @@ -80,14 +80,25 @@ protected CompletableFuture internalExecute( CompletableFuture future = CompletableFuture.completedFuture(taskContext.input()); while (iter.hasNext()) { - taskContext.variables().put(task.getFor().getEach(), iter.next()); - taskContext.variables().put(task.getFor().getAt(), i++); + // Capture iteration variables as final locals to avoid race condition with async tasks + final Object currentItem = iter.next(); + final int currentIndex = i++; + + // Set variables for condition evaluation + taskContext.variables().put(task.getFor().getEach(), currentItem); + taskContext.variables().put(task.getFor().getAt(), currentIndex); + if (whileExpr.map(w -> w.test(workflow, taskContext, taskContext.input())).orElse(true)) { future = future.thenCompose( - input -> - TaskExecutorHelper.processTaskList( - taskExecutor, workflow, Optional.of(taskContext), input)); + input -> { + // Set variables from captured finals before executing subtasks + // This ensures each async task gets its own iteration values + taskContext.variables().put(task.getFor().getEach(), currentItem); + taskContext.variables().put(task.getFor().getAt(), currentIndex); + return TaskExecutorHelper.processTaskList( + taskExecutor, workflow, Optional.of(taskContext), input); + }); } else { break; } From b064ba90ce844d9d43b48e792fe0bbdca93284fb Mon Sep 17 00:00:00 2001 From: Ricardo Zanini Date: Fri, 1 May 2026 20:42:18 -0400 Subject: [PATCH 2/5] Fix #1354 - Isolate variables in ForExecutor to avoid racing condition to overwrite loop variables Signed-off-by: Ricardo Zanini --- .../fluent/test/ForEachFuncTest.java | 13 ++--- .../fluent/test/LaggedInMemoryEvents.java | 50 +++++++++++++++++++ .../impl/events/InMemoryEvents.java | 10 ++-- .../impl/executors/ForExecutor.java | 5 -- .../lifecycle}/TraceExecutionListener.java | 13 +---- .../impl/test/DBGenerator.java | 4 +- .../impl/test/MvStorePersistenceTest.java | 3 +- 7 files changed, 67 insertions(+), 31 deletions(-) create mode 100644 experimental/test/src/test/java/io/serverlessworkflow/fluent/test/LaggedInMemoryEvents.java rename impl/{test/src/test/java/io/serverlessworkflow/impl/test => core/src/main/java/io/serverlessworkflow/impl/lifecycle}/TraceExecutionListener.java (83%) diff --git a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/ForEachFuncTest.java b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/ForEachFuncTest.java index 801f578cf..01737f035 100644 --- a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/ForEachFuncTest.java +++ b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/ForEachFuncTest.java @@ -25,7 +25,7 @@ import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.events.InMemoryEvents; +import io.serverlessworkflow.impl.lifecycle.TraceExecutionListener; import java.time.Duration; import java.util.Collection; import java.util.List; @@ -35,13 +35,13 @@ public class ForEachFuncTest { - private static record Order(String id) {} + private record Order(String id) {} - private static record EnhancedOrder(String id, int salary) {} + private record EnhancedOrder(String id, int salary) {} - private static record OrdersPayload(List orders) {} + private record OrdersPayload(List orders) {} - private static record OrderName(String id, String name) {} + private record OrderName(String id, String name) {} @Test void testForEachIteration() { @@ -75,13 +75,14 @@ void testForEachEmit() { .build(); List publishedEvents = new CopyOnWriteArrayList<>(); - InMemoryEvents eventBroker = new InMemoryEvents(); + LaggedInMemoryEvents eventBroker = new LaggedInMemoryEvents(); eventBroker.register(eventType, ce -> publishedEvents.add(ce)); try (WorkflowApplication app = WorkflowApplication.builder() .withEventConsumer(eventBroker) .withEventPublisher(eventBroker) + .withListener(new TraceExecutionListener()) .build()) { app.workflowDefinition(workflow) .instance( diff --git a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/LaggedInMemoryEvents.java b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/LaggedInMemoryEvents.java new file mode 100644 index 000000000..bf9b33b0d --- /dev/null +++ b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/LaggedInMemoryEvents.java @@ -0,0 +1,50 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.test; + +import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.events.InMemoryEvents; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LaggedInMemoryEvents extends InMemoryEvents { + + private static final Logger logger = LoggerFactory.getLogger(InMemoryEvents.class); + + @Override + public CompletableFuture publish(CloudEvent ce) { + return CompletableFuture.runAsync( + () -> { + Consumer allConsumer = allConsumerRef.get(); + if (allConsumer != null) { + allConsumer.accept(ce); + } + Consumer consumer = topicMap.get(ce.getType()); + if (consumer != null) { + consumer.accept(ce); + } + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + logger.info("Accepted event {} for topic {}", ce.getId(), ce.getType()); + }, + serviceFactory.get()); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java index 05473090c..0b28ed1d0 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java @@ -30,6 +30,10 @@ */ public class InMemoryEvents extends AbstractTypeConsumer implements EventPublisher { + protected final ExecutorServiceFactory serviceFactory; + protected final Map> topicMap = new ConcurrentHashMap<>(); + protected final AtomicReference> allConsumerRef = new AtomicReference<>(); + public InMemoryEvents() { this(new DefaultExecutorServiceFactory()); } @@ -38,12 +42,6 @@ public InMemoryEvents(ExecutorServiceFactory serviceFactory) { this.serviceFactory = serviceFactory; } - private ExecutorServiceFactory serviceFactory; - - private Map> topicMap = new ConcurrentHashMap<>(); - - private AtomicReference> allConsumerRef = new AtomicReference<>(); - @Override public void register(String topicName, Consumer consumer) { topicMap.put(topicName, consumer); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java index 80cd0debf..2983e6c86 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java @@ -80,11 +80,8 @@ protected CompletableFuture internalExecute( CompletableFuture future = CompletableFuture.completedFuture(taskContext.input()); while (iter.hasNext()) { - // Capture iteration variables as final locals to avoid race condition with async tasks final Object currentItem = iter.next(); final int currentIndex = i++; - - // Set variables for condition evaluation taskContext.variables().put(task.getFor().getEach(), currentItem); taskContext.variables().put(task.getFor().getAt(), currentIndex); @@ -92,8 +89,6 @@ protected CompletableFuture internalExecute( future = future.thenCompose( input -> { - // Set variables from captured finals before executing subtasks - // This ensures each async task gets its own iteration values taskContext.variables().put(task.getFor().getEach(), currentItem); taskContext.variables().put(task.getFor().getAt(), currentIndex); return TaskExecutorHelper.processTaskList( diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TraceExecutionListener.java similarity index 83% rename from impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java rename to impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TraceExecutionListener.java index 9e75cd38f..762a60586 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TraceExecutionListener.java @@ -13,19 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.impl.test; +package io.serverlessworkflow.impl.lifecycle; -import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; -import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent; -import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; -import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; -import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowStatusEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java index 9369edc85..0e2dfd4a9 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java @@ -47,7 +47,9 @@ private static void runInstance(String dbName, boolean suspend) throws IOExcepti DefaultPersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); WorkflowApplication application = PersistenceApplicationBuilder.builder( - WorkflowApplication.builder().withListener(new TraceExecutionListener()), + WorkflowApplication.builder() + .withListener( + new io.serverlessworkflow.impl.lifecycle.TraceExecutionListener()), factories.writer()) .build()) { WorkflowDefinition definition = diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java index a05149de4..db458fa52 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java @@ -114,7 +114,8 @@ private void runIt(String dbName, WorkflowStatus expectedStatus) throws IOExcept PersistenceApplicationBuilder.builder( WorkflowApplication.builder() .withListener(taskCounter) - .withListener(new TraceExecutionListener()), + .withListener( + new io.serverlessworkflow.impl.lifecycle.TraceExecutionListener()), handlers.writer()) .build(); ) { WorkflowDefinition definition = From c39a7edb937602bc9c5aa29376b1147b142b40b8 Mon Sep 17 00:00:00 2001 From: Ricardo Zanini Date: Fri, 1 May 2026 20:46:53 -0400 Subject: [PATCH 3/5] Minor formatting Signed-off-by: Ricardo Zanini --- .../java/io/serverlessworkflow/impl/test/DBGenerator.java | 5 ++--- .../serverlessworkflow/impl/test/MvStorePersistenceTest.java | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java index 0e2dfd4a9..d8273b3c4 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java @@ -20,6 +20,7 @@ import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.lifecycle.TraceExecutionListener; import io.serverlessworkflow.impl.persistence.DefaultPersistenceInstanceHandlers; import io.serverlessworkflow.impl.persistence.PersistenceApplicationBuilder; import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers; @@ -47,9 +48,7 @@ private static void runInstance(String dbName, boolean suspend) throws IOExcepti DefaultPersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); WorkflowApplication application = PersistenceApplicationBuilder.builder( - WorkflowApplication.builder() - .withListener( - new io.serverlessworkflow.impl.lifecycle.TraceExecutionListener()), + WorkflowApplication.builder().withListener(new TraceExecutionListener()), factories.writer()) .build()) { WorkflowDefinition definition = diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java index db458fa52..5d0143f0c 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java @@ -22,6 +22,7 @@ import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.lifecycle.TraceExecutionListener; import io.serverlessworkflow.impl.persistence.DefaultPersistenceInstanceHandlers; import io.serverlessworkflow.impl.persistence.PersistenceApplicationBuilder; import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers; @@ -114,8 +115,7 @@ private void runIt(String dbName, WorkflowStatus expectedStatus) throws IOExcept PersistenceApplicationBuilder.builder( WorkflowApplication.builder() .withListener(taskCounter) - .withListener( - new io.serverlessworkflow.impl.lifecycle.TraceExecutionListener()), + .withListener(new TraceExecutionListener()), handlers.writer()) .build(); ) { WorkflowDefinition definition = From 19f8c9bc21512f91db216505f8732088b8c08347 Mon Sep 17 00:00:00 2001 From: fjtirado Date: Tue, 5 May 2026 12:23:20 +0200 Subject: [PATCH 4/5] [Fix #1354] ForExecutor was not properly implemented for multithread Signed-off-by: fjtirado --- .../fluent/test/LaggedInMemoryEvents.java | 33 +++++--------- .../impl/executors/ForExecutor.java | 43 ++++++++++--------- 2 files changed, 32 insertions(+), 44 deletions(-) diff --git a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/LaggedInMemoryEvents.java b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/LaggedInMemoryEvents.java index bf9b33b0d..d2e4151ed 100644 --- a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/LaggedInMemoryEvents.java +++ b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/LaggedInMemoryEvents.java @@ -18,33 +18,20 @@ import io.cloudevents.CloudEvent; import io.serverlessworkflow.impl.events.InMemoryEvents; import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class LaggedInMemoryEvents extends InMemoryEvents { - private static final Logger logger = LoggerFactory.getLogger(InMemoryEvents.class); - @Override public CompletableFuture publish(CloudEvent ce) { - return CompletableFuture.runAsync( - () -> { - Consumer allConsumer = allConsumerRef.get(); - if (allConsumer != null) { - allConsumer.accept(ce); - } - Consumer consumer = topicMap.get(ce.getType()); - if (consumer != null) { - consumer.accept(ce); - } - try { - Thread.sleep(10); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - logger.info("Accepted event {} for topic {}", ce.getId(), ce.getType()); - }, - serviceFactory.get()); + + return super.publish(ce) + .thenRun( + () -> { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java index 2983e6c86..b4c024bee 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java @@ -75,29 +75,30 @@ protected ForExecutor(ForExecutorBuilder builder) { @Override protected CompletableFuture internalExecute( WorkflowContext workflow, TaskContext taskContext) { - Iterator iter = collectionExpr.apply(workflow, taskContext, taskContext.input()).iterator(); - int i = 0; - CompletableFuture future = - CompletableFuture.completedFuture(taskContext.input()); - while (iter.hasNext()) { - final Object currentItem = iter.next(); - final int currentIndex = i++; - taskContext.variables().put(task.getFor().getEach(), currentItem); - taskContext.variables().put(task.getFor().getAt(), currentIndex); + return buildLoopFuture( + workflow, + taskContext, + taskContext.input(), + collectionExpr.apply(workflow, taskContext, taskContext.input()).iterator(), + -1); + } - if (whileExpr.map(w -> w.test(workflow, taskContext, taskContext.input())).orElse(true)) { - future = - future.thenCompose( - input -> { - taskContext.variables().put(task.getFor().getEach(), currentItem); - taskContext.variables().put(task.getFor().getAt(), currentIndex); - return TaskExecutorHelper.processTaskList( - taskExecutor, workflow, Optional.of(taskContext), input); - }); - } else { - break; + private CompletableFuture buildLoopFuture( + WorkflowContext workflow, + TaskContext taskContext, + WorkflowModel input, + Iterator iter, + int index) { + final int newIndex = index + 1; + if (iter.hasNext()) { + taskContext.variables().put(task.getFor().getEach(), iter.next()); + taskContext.variables().put(task.getFor().getAt(), newIndex); + if (whileExpr.map(w -> w.test(workflow, taskContext, input)).orElse(true)) { + return TaskExecutorHelper.processTaskList( + taskExecutor, workflow, Optional.of(taskContext), input) + .thenCompose(output -> buildLoopFuture(workflow, taskContext, output, iter, newIndex)); } } - return future; + return CompletableFuture.completedFuture(input); } } From 67a16894a1ffc06d0d66810789687e5bb3360e26 Mon Sep 17 00:00:00 2001 From: Ricardo Zanini Date: Tue, 5 May 2026 11:40:34 -0400 Subject: [PATCH 5/5] Revert InMemoryEvents private fields Signed-off-by: Ricardo Zanini --- .../io/serverlessworkflow/impl/events/InMemoryEvents.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java index 0b28ed1d0..f94e5511d 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java @@ -30,9 +30,9 @@ */ public class InMemoryEvents extends AbstractTypeConsumer implements EventPublisher { - protected final ExecutorServiceFactory serviceFactory; - protected final Map> topicMap = new ConcurrentHashMap<>(); - protected final AtomicReference> allConsumerRef = new AtomicReference<>(); + private final ExecutorServiceFactory serviceFactory; + private final Map> topicMap = new ConcurrentHashMap<>(); + private final AtomicReference> allConsumerRef = new AtomicReference<>(); public InMemoryEvents() { this(new DefaultExecutorServiceFactory());