...
Simple flow execution (failed stage with cancellation)
Drawio sketch | ||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
...
Define tasks that can be executed in parallel.
The Flow Engine will manage the concurrent execution of these tasks.
parallelism is configured in the
FlowEngine
component
If one task fails, all running parallel tasks will be canceled except failed ones (if the strategy is
CANCEL_ON_ERROR
).
...
Parallel Stage execution (stage failed with cancellation)
Drawio sketch | ||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
...
Nested flow execution
Drawio sketch | ||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
...
Flow engine builder allows to customize a FlowEngine
object
Method | Description |
---|---|
| Executes flow using executor and timeout from |
| Executes flow using executor and timeout from |
| Allows to get flow status from Posible flow statuses:
|
| Provides a |
Stage
This interface is functional, which means that it can be used as lambda for stage definition
...
language | java |
---|
...
Allows to define flow engine name | |
| Allows to define how much history preserve in memory for latest flow executions |
| Allows to define custom executor for the |
| Defines if flow result must be printed in logs |
| Allows to define custom stage report provider instead of custom one |
Stage Report Provider
Stage report is a useful way to customize intends and stage execution messages
Default stage report example:
Code Block | ||
---|---|---|
| ||
var flowEngine = FlowEngine.builder() .printFlowResult(true) .executor(newSingleThreadExecutor()) .executionTimeout(FIVE_SECONDS) .name("single-thread-flow-engine") .lastExecutionsStatusCacheSize(10) .build(); var flow = Flow.builder() .id("Simple Flow") .stage(stage1task1) .stage(stage2task2) .stage(contexttask3) -> System.out.println("Stage3 executed")onFlowCancellationError(onFlowCancellationErrorStage) .build(); |
Note |
---|
The limitation of using lambdas - it’s their nature in Java and this approach won’t allow to determine in information about generic type, so context customization is not working with this case, and raw StageContext will be passed |
Flow Methods
The flow engine uses flow methods to determine, what capabilities the stage provides:
Stage cancellation
Stage recovery
...
Method
...
Description
...
execute
...
Main method, which is must be defined for stage with business logic
...
getId
...
Returns stage identifier, which will be printed in the final report or in listenable methods
...
recover
...
Provides the ability to recover a stage after fail in execute
method
...
cancel
...
Provides an ability to revert changed, done in execute
and recover
methods, if flow is failed by upstream stage
...
shouldCancelIfFailed
...
Defines execution of cancel
method using StageContext
, by default - it’s false
Listenable Methods
Listenable methods provided at crucial points of stage execution to provide better observability for the stage execution status
...
Method
...
Description
...
onStart
...
This method is called before stage execution
...
onSuccess
...
This method is called after successful stage execution including successful execution of recover
method, if execute
failed
...
onError
...
This method is called after unsuccessful stage execution (execute
and recover
methods)
...
onCancel
...
This method is called after successful execution of cancel
method
...
onCancelError
...
This method is called after unsuccessful execution of cancel
method
Parallel Stage
Dynamic Stage
...
flow.execute(flow); |
Code Block |
---|
17:06:51.051 INFO FlowEngineImpl [l-2-thread-1] Flow 'Simple Flow' execution report:
-> Stage: task1 |> CANCELLED
-> Stage: task2 |> CANCELLATION_FAILED with [RuntimeException]: error
-> Stage: onFlowCancellationErrorStage |> SUCCESS |
Custom stage report example:
Code Block | ||
---|---|---|
| ||
var flowEngine = FlowEngine.builder()
.printFlowResult(true)
.executor(newSingleThreadExecutor())
.executionTimeout(FIVE_SECONDS)
.name("single-thread-flow-engine")
.stageReportProvider(StageReportProvider.builder()
.template("[${flowId}] -> [${stageId}] is ${statusName}")
.errorTemplate("[${flowId}] -> [${stageId}] is ${statusName} with [${errorType}]: ${shortErrorMessage}")
.build())
.lastExecutionsStatusCacheSize(10)
.build();
var flow = Flow.builder()
.id("Simple Flow")
.stage(task1)
.stage(task2)
.stage(task3)
.onFlowCancellationError(onFlowCancellationErrorStage)
.build();
flowEngine.execute(flow); |
Code Block |
---|
17:04:15.015 INFO FlowEngineImpl [l-2-thread-1] Flow 'Simple Flow' execution report:
[flow-tc9s82] -> [task1] is CANCELLED
[flow-tc9s82] -> [task2] is CANCELLATION_FAILED with [RuntimeException]: error
[flow-tc9s82] -> [onFlowCancellationErrorStage] is SUCCESS |
Stage
This interface is functional, which means that it can be used as lambda for stage definition
Code Block | ||
---|---|---|
| ||
var stage1 = new Stage<>() {
@Override
public void execute(StageContext context) {
System.out.println("Stage executed");
}
};
var stage2 = (Stage<StageContext>) context -> System.out.println("Stage2 executed");
Flow.builder()
.id("Simple Flow")
.stage(stage1)
.stage(stage2)
.stage(context -> System.out.println("Stage3 executed"))
.build(); |
Note |
---|
The limitation of using lambdas - it’s their nature in Java and this approach won’t allow to determine in information about generic type, so context customization is not working with this case, and raw StageContext will be passed |
Flow Methods
The flow engine uses flow methods to determine, what capabilities the stage provides:
Stage cancellation
Stage recovery
Method | Description |
---|---|
| Main method, which is must be defined for stage with business logic |
| Returns stage identifier, which will be printed in the final report or in listenable methods |
| Provides the ability to recover a stage after fail in |
| Provides an ability to revert changed, done in |
| Defines execution of |
Listenable Methods
Listenable methods provided at crucial points of stage execution to provide better observability for the stage execution status
Method | Description |
---|---|
| This method is called before stage execution |
| This method is called after successful stage execution including successful execution of |
| This method is called after unsuccessful stage execution ( |
| This method is called after successful execution of |
| This method is called after unsuccessful execution of |
Parallel Stage
A parallel stage is used to specify a group of stages that need to be executed simultaneously.
Code Block |
---|
var flowEngine = FlowEngine.builder().build();
var parallelStage = ParallelStage.of(task1, task2, task3);
var flow = Flow.builder()
.id("Simple Flow")
.stage(parallelStage)
.build();
flowEngine.execute(flow); |
Parallel Stage Builder
| Used to define flow identifier that will be used for report and in the trace logs |
| Allows to define a |
| Allows to define a list of |
| Defines if parallel stage must call |
Dynamic Stage
A dynamic stage generates a stage, flow, or parallel stage based on the data within StageContext.
Code Block | ||
---|---|---|
| ||
var flowEngine = FlowEngine.builder().build();
var stage1 = (Stage<StageContext>) ctx -> ctx.put("values", List.of("v1", "v2"));
var parallelStage = DynamicStage.of("dynamicStageId", ctx -> {
var flowBuilder = Flow.builder().id("Generated flow by dynamicStageId");
var listValues = ctx.<List<String>>get("values");
for (var value : listValues) {
flowBuilder.stage(c -> System.out.println(value));
}
return flowBuilder.build();
});
var flow = Flow.builder()
.id("Simple Flow")
.stage(stage1)
.stage(parallelStage)
.build();
flowEngine.execute(flow); |
Stage Context
Stage context is generated before execution of the stage and it will contain the following data
Current flow identifier
Previous stage context data
Flow parameters (nested flow parameters will always override main flow parameters until exiting the nested flow)
Stage executor
A StageExecutor is an interface that allows developers to implement a custom stage execution method. It has already been implemented as follows:
DefaultStageExecutor
- used to execute all stages implementingStage
interfaceFlowExecutor
- used to execute allFlow
objectsParallelStageExecutor
- used to execute stages in parallelDynamicStageExecutor
- used to executeDynamicStage
objects
| Used to define flow identifier that will be used for report and in the trace logs |
| Allows to define a |
| Allows to define a list of |
| Defines if parallel stage must call |
Logging
flow engine provides a general logger with the name folio-flow-engine
, and the log level can be customized
Code Block |
---|
logger.flow-engine.name=folio-flow-engine
logger.flow-engine.level=debug |
By default - all execution logs appear at the DEBUF
level, and flow engine reports at the INFO
level
Code Block | ||
---|---|---|
| ||
var flowEngine = FlowEngine.builder()
.printFlowResult(true)
.executor(newSingleThreadExecutor())
.executionTimeout(FIVE_SECONDS)
.name("single-thread-flow-engine")
.lastExecutionsStatusCacheSize(10)
.build();
var flow = Flow.builder()
.id("Simple Flow")
.stage(task1)
.stage(task2)
.stage(task3)
.onFlowCancellationError(onFlowCancellationErrorStage)
.build();
flow.execute(flow); |
Code Block |
---|
17:06:51.051 DEBUG FlowExecutor [l-2-thread-1] [Simple Flow] Initializing flow...
17:06:51.051 DEBUG DefaultStageExecutor [l-2-thread-1] [Simple Flow] Stage 'task1' executed with status: SUCCESS
17:06:51.051 DEBUG DefaultStageExecutor [l-2-thread-1] [Simple Flow] Stage 'task2' executed with status: FAILED
java.lang.RuntimeException: error
at org.folio.flow.api.models.CancellableTestStage.execute(CancellableTestStage.java:11) ~[test-classes/:?]
at org.folio.flow.impl.DefaultStageExecutor.tryExecuteStage(DefaultStageExecutor.java:129) ~[classes/:?]
at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:684) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:662) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2168) ~[?:?]
at org.folio.flow.impl.DefaultStageExecutor.execute(DefaultStageExecutor.java:95) ~[classes/:?]
at org.folio.flow.impl.FlowExecutor.tryExecuteStageOrSkip(FlowExecutor.java:85) ~[classes/:?]
at org.folio.flow.impl.FlowExecutor.lambda$execute$1(FlowExecutor.java:74) ~[classes/:?]
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150) [?:?]
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) [?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
at java.base/java.lang.Thread.run(Thread.java:840) [?:?]
17:06:51.051 DEBUG FlowExecutor [l-2-thread-1] [Simple Flow] Flow is finished with status: FAILED
17:06:51.051 DEBUG FlowExecutor [l-2-thread-1] [Simple Flow] Initializing flow cancellation...
17:06:51.051 DEBUG DefaultStageExecutor [l-2-thread-1] [Simple Flow] Stage 'task2' cancellation failed
java.lang.RuntimeException: error
at org.folio.flow.api.models.CancellableTestStage.cancel(CancellableTestStage.java:15) ~[test-classes/:?]
at org.folio.flow.impl.DefaultStageExecutor.tryCancelStage(DefaultStageExecutor.java:163) ~[classes/:?]
at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:684) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:662) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2168) ~[?:?]
at org.folio.flow.impl.DefaultStageExecutor.cancel(DefaultStageExecutor.java:117) ~[classes/:?]
at org.folio.flow.utils.StageUtils.cancelStageAsync(StageUtils.java:36) ~[classes/:?]
at org.folio.flow.impl.FlowExecutor.lambda$cancel$8(FlowExecutor.java:116) ~[classes/:?]
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150) [?:?]
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) [?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
at java.base/java.lang.Thread.run(Thread.java:840) [?:?]
17:06:51.051 DEBUG DefaultStageExecutor [l-2-thread-1] [Simple Flow] Stage 'task1' is cancelled
17:06:51.051 DEBUG DefaultStageExecutor [l-2-thread-1] [Simple Flow] Stage 'onFlowCancellationError' executed with status: SUCCESS
17:06:51.051 DEBUG FlowExecutor [l-2-thread-1] [Simple Flow] Flow is cancelled with status: CANCELLATION_FAILED |