diff --git a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/ForEachFuncTest.java b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/ForEachFuncTest.java index 801f578cf..01737f035 100644 --- a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/ForEachFuncTest.java +++ b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/ForEachFuncTest.java @@ -25,7 +25,7 @@ import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.events.InMemoryEvents; +import io.serverlessworkflow.impl.lifecycle.TraceExecutionListener; import java.time.Duration; import java.util.Collection; import java.util.List; @@ -35,13 +35,13 @@ public class ForEachFuncTest { - private static record Order(String id) {} + private record Order(String id) {} - private static record EnhancedOrder(String id, int salary) {} + private record EnhancedOrder(String id, int salary) {} - private static record OrdersPayload(List orders) {} + private record OrdersPayload(List orders) {} - private static record OrderName(String id, String name) {} + private record OrderName(String id, String name) {} @Test void testForEachIteration() { @@ -75,13 +75,14 @@ void testForEachEmit() { .build(); List publishedEvents = new CopyOnWriteArrayList<>(); - InMemoryEvents eventBroker = new InMemoryEvents(); + LaggedInMemoryEvents eventBroker = new LaggedInMemoryEvents(); eventBroker.register(eventType, ce -> publishedEvents.add(ce)); try (WorkflowApplication app = WorkflowApplication.builder() .withEventConsumer(eventBroker) .withEventPublisher(eventBroker) + .withListener(new TraceExecutionListener()) .build()) { app.workflowDefinition(workflow) .instance( diff --git a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/LaggedInMemoryEvents.java b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/LaggedInMemoryEvents.java new file mode 100644 index 000000000..8b5ed4893 --- /dev/null +++ b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/LaggedInMemoryEvents.java @@ -0,0 +1,51 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.test; + +import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.events.InMemoryEvents; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LaggedInMemoryEvents extends InMemoryEvents { + + private static final Logger logger = LoggerFactory.getLogger(LaggedInMemoryEvents.class); + + @Override + public CompletableFuture publish(CloudEvent ce) { + return CompletableFuture.runAsync( + () -> { + Consumer allConsumer = allConsumerRef.get(); + if (allConsumer != null) { + allConsumer.accept(ce); + } + Consumer consumer = topicMap.get(ce.getType()); + if (consumer != null) { + consumer.accept(ce); + } + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + logger.info("Accepted event {} for topic {}", ce.getId(), ce.getType()); + }, + serviceFactory.get()); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java index 05473090c..0b28ed1d0 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java @@ -30,6 +30,10 @@ */ public class InMemoryEvents extends AbstractTypeConsumer implements EventPublisher { + protected final ExecutorServiceFactory serviceFactory; + protected final Map> topicMap = new ConcurrentHashMap<>(); + protected final AtomicReference> allConsumerRef = new AtomicReference<>(); + public InMemoryEvents() { this(new DefaultExecutorServiceFactory()); } @@ -38,12 +42,6 @@ public InMemoryEvents(ExecutorServiceFactory serviceFactory) { this.serviceFactory = serviceFactory; } - private ExecutorServiceFactory serviceFactory; - - private Map> topicMap = new ConcurrentHashMap<>(); - - private AtomicReference> allConsumerRef = new AtomicReference<>(); - @Override public void register(String topicName, Consumer consumer) { topicMap.put(topicName, consumer); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java index 26ca371b2..2983e6c86 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java @@ -80,14 +80,20 @@ protected CompletableFuture internalExecute( CompletableFuture future = CompletableFuture.completedFuture(taskContext.input()); while (iter.hasNext()) { - taskContext.variables().put(task.getFor().getEach(), iter.next()); - taskContext.variables().put(task.getFor().getAt(), i++); + final Object currentItem = iter.next(); + final int currentIndex = i++; + taskContext.variables().put(task.getFor().getEach(), currentItem); + taskContext.variables().put(task.getFor().getAt(), currentIndex); + if (whileExpr.map(w -> w.test(workflow, taskContext, taskContext.input())).orElse(true)) { future = future.thenCompose( - input -> - TaskExecutorHelper.processTaskList( - taskExecutor, workflow, Optional.of(taskContext), input)); + input -> { + taskContext.variables().put(task.getFor().getEach(), currentItem); + taskContext.variables().put(task.getFor().getAt(), currentIndex); + return TaskExecutorHelper.processTaskList( + taskExecutor, workflow, Optional.of(taskContext), input); + }); } else { break; } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TraceExecutionListener.java similarity index 83% rename from impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java rename to impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TraceExecutionListener.java index 9e75cd38f..762a60586 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TraceExecutionListener.java @@ -13,19 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.impl.test; +package io.serverlessworkflow.impl.lifecycle; -import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; -import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent; -import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; -import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; -import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowStatusEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java index 9369edc85..d8273b3c4 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java @@ -20,6 +20,7 @@ import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.lifecycle.TraceExecutionListener; import io.serverlessworkflow.impl.persistence.DefaultPersistenceInstanceHandlers; import io.serverlessworkflow.impl.persistence.PersistenceApplicationBuilder; import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers; diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java index a05149de4..5d0143f0c 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java @@ -22,6 +22,7 @@ import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.lifecycle.TraceExecutionListener; import io.serverlessworkflow.impl.persistence.DefaultPersistenceInstanceHandlers; import io.serverlessworkflow.impl.persistence.PersistenceApplicationBuilder; import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers;