Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Simple flow execution (failed stage with cancellation)

Drawio sketch
lbox
mVer2
zoom1
simple0
zoominComment10
inCommentcustContentId0554434581
pageId553320467
custContentIdlbox5544345811
diagramDisplayNameSimple flow execution (with cancellation)
1contentVer6
revision6
baseUrlhttps://folio-org.atlassian.net/wiki
diagramNameSimple flow execution (with cancellation)
pCenter0
width827
links
tbstyle
height232

...

  • Define tasks that can be executed in parallel.

  • The Flow Engine will manage the concurrent execution of these tasks.

    • parallelism is configured in the FlowEngine component

  • If one task fails, all running parallel tasks will be canceled except failed ones (if the strategy is CANCEL_ON_ERROR).

...

Parallel Stage execution (stage failed with cancellation)

Drawio sketch
mVer2
zoom1
simple0
zoominComment10
inCommentcustContentId0556433431
pageId553320467
custContentIdlbox5564334311
diagramDisplayNameParallel Stage execution (failed stage with cancellation)lbox1
contentVer2
revision2
baseUrlhttps://folio-org.atlassian.net/wiki
diagramNameParallel Stage execution (failed stage with cancellation)
pCenter0
width619
links
tbstyle
height477

...

Nested flow execution

Drawio sketch
lbox
mVer2
zoom1
simple0
zoominComment10
inCommentcustContentId0556826642
pageId553320467
custContentIdlbox5568266421
diagramDisplayNameNested flow execution
1contentVer2
revision2
baseUrlhttps://folio-org.atlassian.net/wiki
diagramNameNested flow execution
pCenter0
width834
links
tbstyle
height419

...

Flow engine builder allows to customize a FlowEngine object

Method

Description

executename

Executes flow using executor and timeout from FlowEngineBuilder synchronously

executeAsync

Executes flow using executor and timeout from FlowEngineBuilder asynchronously, returning Future<Void as a result

getFlowStatus

Allows to get flow status from FlowEngine using flow identifier

Posible flow statuses:

  • IN_PROGRESS

  • SUCCESS

  • FAILED

  • CANCELLED

  • CANCELLATION_FAILED

  • UNKNOWN (if flow by id is not found)

builder

Provides a FlowEngineBuilder to create a FlowEngine object

Stage

This interface is functional, which means that it can be used as lambda for stage definition

...

languagejava

...

Allows to define flow engine name

lastExecutionsStatusCacheSize

Allows to define how much history preserve in memory for latest flow executions

executor

Allows to define custom executor for the FlowEngine, by default is common ForkJoinPool is used

printFlowResult

Defines if flow result must be printed in logs

stageReportProvider

Allows to define custom stage report provider instead of custom one

Stage Report Provider

Stage report is a useful way to customize intends and stage execution messages

Default stage report example:

Code Block
languagejava
var flowEngine = FlowEngine.builder()
    .printFlowResult(true)
    .executor(newSingleThreadExecutor())
    .executionTimeout(FIVE_SECONDS)
    .name("single-thread-flow-engine")
    .lastExecutionsStatusCacheSize(10)
    .build();
    
var flow = Flow.builder()
  .id("Simple Flow")
  .stage(stage1task1)
  .stage(stage2task2)
  .stage(contexttask3)
 -> System.out.println("Stage3 executed")onFlowCancellationError(onFlowCancellationErrorStage)
  .build();
Note

The limitation of using lambdas - it’s their nature in Java and this approach won’t allow to determine in information about generic type, so context customization is not working with this case, and raw StageContext will be passed

Flow Methods

The flow engine uses flow methods to determine, what capabilities the stage provides:

  • Stage cancellation

  • Stage recovery

...

Method

...

Description

...

execute

...

Main method, which is must be defined for stage with business logic

...

getId

...

Returns stage identifier, which will be printed in the final report or in listenable methods

...

recover

...

Provides the ability to recover a stage after fail in execute method

...

cancel

...

Provides an ability to revert changed, done in execute and recover methods, if flow is failed by upstream stage

...

shouldCancelIfFailed

...

Defines execution of cancel method using StageContext, by default - it’s false

Listenable Methods

Listenable methods provided at crucial points of stage execution to provide better observability for the stage execution status

...

Method

...

Description

...

onStart

...

This method is called before stage execution

...

onSuccess

...

This method is called after successful stage execution including successful execution of recover method, if execute failed

...

onError

...

This method is called after unsuccessful stage execution (execute and recover methods)

...

onCancel

...

This method is called after successful execution of cancel method

...

onCancelError

...

This method is called after unsuccessful execution of cancel method

Parallel Stage

Dynamic Stage

...


  
flow.execute(flow);
Code Block
17:06:51.051 INFO  FlowEngineImpl       [l-2-thread-1] Flow 'Simple Flow' execution report:
  -> Stage: task1 |> CANCELLED
  -> Stage: task2 |> CANCELLATION_FAILED with [RuntimeException]: error
  -> Stage: onFlowCancellationErrorStage |> SUCCESS

Custom stage report example:

Code Block
languagejava
var flowEngine = FlowEngine.builder()
    .printFlowResult(true)
    .executor(newSingleThreadExecutor())
    .executionTimeout(FIVE_SECONDS)
    .name("single-thread-flow-engine")
    .stageReportProvider(StageReportProvider.builder()
      .template("[${flowId}] -> [${stageId}] is ${statusName}")
      .errorTemplate("[${flowId}] -> [${stageId}] is ${statusName} with [${errorType}]: ${shortErrorMessage}")
      .build())
    .lastExecutionsStatusCacheSize(10)
    .build();

var flow = Flow.builder()
  .id("Simple Flow")
  .stage(task1)
  .stage(task2)
  .stage(task3)
  .onFlowCancellationError(onFlowCancellationErrorStage)
  .build();

flowEngine.execute(flow);
Code Block
17:04:15.015 INFO  FlowEngineImpl       [l-2-thread-1] Flow 'Simple Flow' execution report:
  [flow-tc9s82] -> [task1] is CANCELLED
  [flow-tc9s82] -> [task2] is CANCELLATION_FAILED with [RuntimeException]: error
  [flow-tc9s82] -> [onFlowCancellationErrorStage] is SUCCESS

Stage

This interface is functional, which means that it can be used as lambda for stage definition

Code Block
languagejava
var stage1 = new Stage<>() {

  @Override
  public void execute(StageContext context) {
    System.out.println("Stage executed");
  }
};

var stage2 = (Stage<StageContext>) context -> System.out.println("Stage2 executed");

Flow.builder()
  .id("Simple Flow")
  .stage(stage1)
  .stage(stage2)
  .stage(context -> System.out.println("Stage3 executed"))
  .build();
Note

The limitation of using lambdas - it’s their nature in Java and this approach won’t allow to determine in information about generic type, so context customization is not working with this case, and raw StageContext will be passed

Flow Methods

The flow engine uses flow methods to determine, what capabilities the stage provides:

  • Stage cancellation

  • Stage recovery

Method

Description

execute

Main method, which is must be defined for stage with business logic

getId

Returns stage identifier, which will be printed in the final report or in listenable methods

recover

Provides the ability to recover a stage after fail in execute method

cancel

Provides an ability to revert changed, done in execute and recover methods, if flow is failed by upstream stage

shouldCancelIfFailed

Defines execution of cancel method using StageContext, by default - it’s false

Listenable Methods

Listenable methods provided at crucial points of stage execution to provide better observability for the stage execution status

Method

Description

onStart

This method is called before stage execution

onSuccess

This method is called after successful stage execution including successful execution of recover method, if execute failed

onError

This method is called after unsuccessful stage execution (execute and recover methods)

onCancel

This method is called after successful execution of cancel method

onCancelError

This method is called after unsuccessful execution of cancel method

Parallel Stage

A parallel stage is used to specify a group of stages that need to be executed simultaneously.

Code Block
var flowEngine = FlowEngine.builder().build();
var parallelStage = ParallelStage.of(task1, task2, task3);

var flow = Flow.builder()
  .id("Simple Flow")
  .stage(parallelStage)
  .build();

flowEngine.execute(flow);

Parallel Stage Builder

id

Used to define flow identifier that will be used for report and in the trace logs

stage

Allows to define a Stage or StageExecutor to be executed in the parallel

stages

Allows to define a list of Stage objects to be executed in parallel

shouldCancelIfFailed

Defines if parallel stage must call cancel method for inner stages if failed

Dynamic Stage

A dynamic stage generates a stage, flow, or parallel stage based on the data within StageContext.

Code Block
languagejava
var flowEngine = FlowEngine.builder().build();
var stage1 = (Stage<StageContext>) ctx -> ctx.put("values", List.of("v1", "v2"));
var parallelStage = DynamicStage.of("dynamicStageId", ctx -> {
  var flowBuilder = Flow.builder().id("Generated flow by dynamicStageId");
  var listValues = ctx.<List<String>>get("values");
  for (var value : listValues) {
    flowBuilder.stage(c -> System.out.println(value));
  }
  return flowBuilder.build();
});

var flow = Flow.builder()
  .id("Simple Flow")
  .stage(stage1)
  .stage(parallelStage)
  .build();

flowEngine.execute(flow);

Stage Context

Stage context is generated before execution of the stage and it will contain the following data

  • Current flow identifier

  • Previous stage context data

  • Flow parameters (nested flow parameters will always override main flow parameters until exiting the nested flow)

Stage executor

A StageExecutor is an interface that allows developers to implement a custom stage execution method. It has already been implemented as follows:

  • DefaultStageExecutor - used to execute all stages implementing Stage interface

  • FlowExecutor - used to execute all Flow objects

  • ParallelStageExecutor - used to execute stages in parallel

  • DynamicStageExecutor - used to execute DynamicStage objects

id

Used to define flow identifier that will be used for report and in the trace logs

stage

Allows to define a Stage or StageExecutor to be executed in the parallel

stages

Allows to define a list of Stage objects to be executed in parallel

shouldCancelIfFailed

Defines if parallel stage must call cancel method for inner stages if failed

Logging

flow engine provides a general logger with the name folio-flow-engine, and the log level can be customized

Code Block
logger.flow-engine.name=folio-flow-engine
logger.flow-engine.level=debug

By default - all execution logs appear at the DEBUF level, and flow engine reports at the INFO level

Code Block
languagejava
var flowEngine = FlowEngine.builder()
    .printFlowResult(true)
    .executor(newSingleThreadExecutor())
    .executionTimeout(FIVE_SECONDS)
    .name("single-thread-flow-engine")
    .lastExecutionsStatusCacheSize(10)
    .build();
    
var flow = Flow.builder()
  .id("Simple Flow")
  .stage(task1)
  .stage(task2)
  .stage(task3)
  .onFlowCancellationError(onFlowCancellationErrorStage)
  .build();
  
flow.execute(flow);
Code Block
17:06:51.051 DEBUG FlowExecutor         [l-2-thread-1] [Simple Flow] Initializing flow...
17:06:51.051 DEBUG DefaultStageExecutor [l-2-thread-1] [Simple Flow] Stage 'task1' executed with status: SUCCESS
17:06:51.051 DEBUG DefaultStageExecutor [l-2-thread-1] [Simple Flow] Stage 'task2' executed with status: FAILED
java.lang.RuntimeException: error
	at org.folio.flow.api.models.CancellableTestStage.execute(CancellableTestStage.java:11) ~[test-classes/:?]
	at org.folio.flow.impl.DefaultStageExecutor.tryExecuteStage(DefaultStageExecutor.java:129) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:684) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:662) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2168) ~[?:?]
	at org.folio.flow.impl.DefaultStageExecutor.execute(DefaultStageExecutor.java:95) ~[classes/:?]
	at org.folio.flow.impl.FlowExecutor.tryExecuteStageOrSkip(FlowExecutor.java:85) ~[classes/:?]
	at org.folio.flow.impl.FlowExecutor.lambda$execute$1(FlowExecutor.java:74) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150) [?:?]
	at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) [?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
	at java.base/java.lang.Thread.run(Thread.java:840) [?:?]
17:06:51.051 DEBUG FlowExecutor         [l-2-thread-1] [Simple Flow] Flow is finished with status: FAILED
17:06:51.051 DEBUG FlowExecutor         [l-2-thread-1] [Simple Flow] Initializing flow cancellation...
17:06:51.051 DEBUG DefaultStageExecutor [l-2-thread-1] [Simple Flow] Stage 'task2' cancellation failed
java.lang.RuntimeException: error
	at org.folio.flow.api.models.CancellableTestStage.cancel(CancellableTestStage.java:15) ~[test-classes/:?]
	at org.folio.flow.impl.DefaultStageExecutor.tryCancelStage(DefaultStageExecutor.java:163) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:684) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:662) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2168) ~[?:?]
	at org.folio.flow.impl.DefaultStageExecutor.cancel(DefaultStageExecutor.java:117) ~[classes/:?]
	at org.folio.flow.utils.StageUtils.cancelStageAsync(StageUtils.java:36) ~[classes/:?]
	at org.folio.flow.impl.FlowExecutor.lambda$cancel$8(FlowExecutor.java:116) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150) [?:?]
	at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) [?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
	at java.base/java.lang.Thread.run(Thread.java:840) [?:?]
17:06:51.051 DEBUG DefaultStageExecutor [l-2-thread-1] [Simple Flow] Stage 'task1' is cancelled
17:06:51.051 DEBUG DefaultStageExecutor [l-2-thread-1] [Simple Flow] Stage 'onFlowCancellationError' executed with status: SUCCESS
17:06:51.051 DEBUG FlowExecutor         [l-2-thread-1] [Simple Flow] Flow is cancelled with status: CANCELLATION_FAILED