Overview
folio-flow-engine
is a Java library, that utilizes native classes for Asynchronous execution like CompletableFuture
, ExecutorService
etc. to provide a friendly interface for composing tasks to be executed with different rules.
GitHub repository: https://github.com/folio-org/folio-flow-engine
Key Functionality
Flow engine provides the following functionality:
Asynchronous execution
Stage execution in sequence
Stage execution in parallel
Complex flow execution, including execution of nested flows in parallel
Dynamic stage generation based on context data
Stage cancellation of one of the upstream stages failed
Stage execution observability
methods to observe critical moments of stage execution
explicit logging, including flow engine reports in logs
Java-friendly context management using the Decorator pattern
Java code example
import java.util.concurrent.Executors; import org.folio.flow.api.Flow; import org.folio.flow.api.FlowEngine; import org.folio.flow.api.Stage; import org.folio.flow.api.StageContext; import org.folio.flow.model.FlowExecutionStrategy; public class FlowEngineExample { public static void main(String[] args) { var sampleFlow = Flow.builder() .id("sample flow") .stage(new Stage1()) .stage(new Stage2()) .executionStrategy(FlowExecutionStrategy.IGNORE_ON_ERROR) .build(); var flowEngine = FlowEngine.builder() .executor(Executors.newFixedThreadPool(4)) .build(); flowEngine.execute(sampleFlow); } public static class Stage1 implements Stage<StageContext> { @Override public void execute(StageContext context) { System.out.println("Stage1 executed"); } @Override public void cancel(StageContext context) { System.out.println("Stage1 cancelled"); } @Override public void recover(StageContext context) { System.out.println("Stage1 recovered"); } } public static class Stage2 implements Stage<StageContext> { @Override public void execute(StageContext context) { throw new RuntimeException("Stage2 failed"); } @Override public void cancel(StageContext context) { System.out.println("Stage2 cancelled"); } @Override public void recover(StageContext context) { System.out.println("Stage2 recovered"); } } }
Sequence Task Execution
Description
The Flow Engine allows tasks to be executed in a specific order, ensuring each task is completed before the next one begins.
Usage
Define a sequence of tasks in the desired order.
The Flow Engine will execute each task one after the other.
If a task fails, subsequent tasks in the sequence will not be executed.
Flow engine follows the following scenarios:
Simple flow execution
A flow engine can execute stages in sequence, waiting for the previous stage to be finished before executing the next one.
Data can be transferred between stages using StageContext model.
Stage context store data with key as String
and value as Object
. There is a way to make context more Java-friendly using AbstractStageContextWrapper
Flow.builder() .id("Simple Flow") .stage(task1) .stage(task2) .stage(task3) .build();
Simple flow execution (failed stage)
When one of the stages in flow fails - the flow manager stops the execution and calls onFlowErrorStage
(if defined)
Flow.builder() .id("Simple Flow") .stage(task1) .stage(task2) .stage(task3) .stage(task4) .onFlowError(onFlowErrorStage) .build();
This behavior is default, and it can be re-configured:
Flow.builder() // stages .executionStrategy(CANCEL_ON_ERROR | IGNORE_ON_ERROR) .build()
Simple flow execution (failed stage with cancellation)
Flow.builder() .id("Simple Flow") .stage(task1) .stage(task2) .stage(task3) .stage(task4) .onFlowCancellation(onFlowCancellationStage) .onFlowError(onFlowErrorStage) .executionStrategy(CANCEL_ON_ERROR | IGNORE_ON_ERROR) .build();
Parallel Task Execution
Description
The Flow Engine supports the execution of multiple tasks simultaneously, leveraging concurrency to improve efficiency and reduce total execution time.
Usage
Define tasks that can be executed in parallel.
The Flow Engine will manage the concurrent execution of these tasks.
parallelism is configured in the
FlowEngine
component
If one task fails, all running parallel tasks will be canceled (if the strategy is
CANCEL_ON_ERROR
).
Parallel Stage execution
Flow.builder() .id("Simple Flow") .stage(task1) .stage(ParallelStage.of(task2_1, task2_2, task2_3)) .stage(task3) .build();
Note that parallelism is customizable by defining an executor when building FlowEngine
component
public FlowEngine flowEngine() { return FlowEngine.builder() .name("flow-engine") .executor(/* custom executor can be defined here */)) .executionTimeout(/* stage timeout execution can be defined here */) .build(); }
Parallel Stage execution(stage failed)
Flow.builder() .id("Simple Flow") .stage(task1) .stage(ParallelStage.of(task2_1, task2_2, task2_3)) .stage(task3) .onFlowError(onFlowErrorStage) .build();