Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Asynchronous execution

  • Stage execution in sequence

  • Stage execution in parallel

  • Complex flow execution, including execution of nested flows in parallel

  • Dynamic stage generation based on context data

  • Stage cancellation of one of the upstream stages failed

  • Stage execution observability

    • methods to observe critical moments of stage execution

    • explicit logging, including flow engine reports in logs

  • Java-friendly context management using the Decorator pattern

Expand
titleJava code example
Code Block
languagejava
import java.util.concurrent.Executors;
import org.folio.flow.api.Flow;
import org.folio.flow.api.FlowEngine;
import org.folio.flow.api.Stage;
import org.folio.flow.api.StageContext;
import org.folio.flow.model.FlowExecutionStrategy;

public class FlowEngineExample {

  public static void main(String[] args) {
    var sampleFlow = Flow.builder()
      .id("sample flow")
      .stage(new Stage1())
      .stage(new Stage2())
      .executionStrategy(FlowExecutionStrategy.IGNORE_ON_ERROR)
      .build();

    var flowEngine = FlowEngine.builder()
      .executor(Executors.newFixedThreadPool(4))
      .build();

    flowEngine.execute(sampleFlow);
  }

  public static class Stage1 implements Stage<StageContext> {

    @Override
    public void execute(StageContext context) {
      System.out.println("Stage1 executed");
    }

    @Override
    public void cancel(StageContext context) {
      System.out.println("Stage1 cancelled");
    }

    @Override
    public void recover(StageContext context) {
      System.out.println("Stage1 recovered");
    }
  }

  public static class Stage2 implements Stage<StageContext> {

    @Override
    public void execute(StageContext context) {
      throw new RuntimeException("Stage2 failed");
    }

    @Override
    public void cancel(StageContext context) {
      System.out.println("Stage2 cancelled");
    }

    @Override
    public void recover(StageContext context) {
      System.out.println("Stage2 recovered");
    }
  }
}

Sequence Task Execution

Description

...

Info

Data can be transferred between stages using StageContext model.

Stage context store data with key as String and value as Object. There is a way to make context more Java-friendly using AbstractStageContextWrapper

Drawio sketch
mVer2
zoom1
simple0
zoominComment10
inCommentcustContentId0554205209
pageId553320467
custContentIdlbox5542052091
diagramDisplayNameSimple flow execution
lbox1
contentVer4
revision4
baseUrlhttps://folio-org.atlassian.net/wiki
diagramNameSimple flow execution
pCenter0
width607
links
tbstyle
height81

...

When one of the stages in flow fails - the flow manager stops the execution and calls onFlowErrorStage (if defined)

Drawio sketch
mVer2
zoom1
simple0
zoominComment10
inCommentcustContentId0554598433
pageId553320467
custContentIdlbox5545984331
diagramDisplayNamesimple flow execution (failed stage)lbox1
contentVer4
revision4
baseUrlhttps://folio-org.atlassian.net/wiki
diagramNamesimple flow execution (failed stage)
pCenter0
width829.9999999999999
links
tbstyle
height213.99999999999997

...

Drawio sketch
mVer2
simple0
zoom1
inComment0
pageId553320467
custContentId554434581
diagramDisplayNameSimple flow execution (with cancellation)
lbox1
contentVer56
revision56
baseUrlhttps://folio-org.atlassian.net/wiki
diagramNameSimple flow execution (with cancellation)
pCenter0
width826.9999999999999827
links
tbstyle
height232.00000000000009
Code Block
languagejava
Flow.builder()
  .id("Simple Flow")
  .stage(task1)
  .stage(task2)
  .stage(task3)
  .stage(task4)
  .onFlowCancellation(onFlowCancellationStage)/ta
  .onFlowError(onFlowErrorStage)
  .executionStrategy(CANCEL_ON_ERROR | IGNORE_ON_ERROR)
  .build();

Parallel Task Execution

...

Parallel Stage execution

Drawio sketch
mVer2
zoom1
simple0
zoominComment10
inCommentcustContentId0554238011
pageId553320467
custContentIdlbox5542380111
diagramDisplayNameParallel Execution
lbox1
contentVer2
revision2
baseUrlhttps://folio-org.atlassian.net/wiki
diagramNameParallel Execution
pCenter0
width618.9999999999998
links
tbstyle
height412.99999999999994

...

Info

Note that parallelism is customizable by defining an executor when building FlowEngine component

Code Block
languagejava
public FlowEngine flowEngine() {
  return FlowEngine.builder()
    .name("flow-engine")
    .executor(/* custom executor can be defined here */))
    .executionTimeout(/* stage timeout execution can be defined here */)
    .build();
}

Parallel Stage execution(stage failed)

Drawio sketch
mVer2
zoom1
simple0
inComment0
custContentId553615411
pageId553320467
lbox1
diagramDisplayNameParallel stage execution (stage failed)
contentVer2
revision2
baseUrlhttps://folio-org.atlassian.net/wiki
diagramNameParallel stage execution (stage failed)
pCenter0
width619
links
tbstyle
height557
Code Block
languagejava
Flow.builder()
  .id("Simple Flow")
  .stage(task1)
  .stage(ParallelStage.of(task2_1, task2_2, task2_3))
  .stage(task3)
  .onFlowError(onFlowErrorStage)
  .build();

Parallel Stage execution (stage failed with cancellation)

Drawio sketch
mVer2
simple0
zoom1
inComment0
pageId553320467
custContentId553615411556433431
diagramDisplayNameParallel stage Stage execution (failed stage failedwith cancellation)
lbox1
contentVer2
revision2
baseUrlhttps://folio-org.atlassian.net/wiki
diagramNameParallel stage Stage execution (failed stage failedwith cancellation)
pCenter0
width619
links
tbstyle
height557477
Code Block
languagejava
Flow.builder()
  .id("Simple Flow")
  .stage(task1)
  .stage(ParallelStage.of(task2_1, task2_2, task2_3))
  .stage(task3)
  .onFlowCancellation(onFlowCancellationStage)
  .executionStrategy(CANCEL_ON_ERROR)
  .build();

Nested flow execution

Drawio sketch
mVer2
simple0
zoom1
inComment0
pageId553320467
custContentId556826642
diagramDisplayNameNested flow execution
lbox1
contentVer2
revision2
baseUrlhttps://folio-org.atlassian.net/wiki
diagramNameNested flow execution
pCenter0
width834
links
tbstyle
height419
Code Block
languagejava
var flow1 = Flow.builder()
  .id("Flow 1")
  .stage(task2_1_1)
  .stage(task2_1_2)
  .build();

var flow2 = Flow.builder()
  .id("Flow 1")
  .stage(task2_2_1)
  .stage(task2_2_2)
  .build();
  
Flow.builder()
  .id("Simple Flow")
  .stage(task1)
  .stage(ParallelStage.of(flow1, flow2))
  .stage(task3)
  .build();

Interfaces and models

Flow / Flow Builder

Provides a Flow object, containing a sequence of stages and flows to be executed

Sucessfull flow executions are always finished without exceptions

  • If error is occurred during flow execution and executionStrategy is IGNORE_ON_ERROR - then FlowExecutionException will be raised with a reason in cause

  • If error is occurred during flow execution and executionStrategy is CANCEL_ON_ERROR - then

    • FlowCancelledException will be raised with a reason of cancellation in cause

    • FlowCancellationException will be raised with a reason of cancellation failure in cause

Flows can be executed in async mode using flowEngine.executeAsync method. In this case result in CompletableFuture will contain an exception in the body.

Method

Description

id

Used to define flow identifier that will be used for report and in the trace logs

stage

Allows to define a Stage or StageExecutor to be executed in the flow

flowParameter

Allows to define a general flow parameter that will be available for all stages in the flow

flowParameters

Allow to define a map with flow parameters that will be available for all stages in the flow

executionStrategy

Allows to define flow execution strategy:

  • CANCEL_ON_ERROR : on stage error – all executed stage will be cancelled by calling cancel method (if defined) in reverse order

  • IGNORE_ON_ERROR : on stage error – the flow will be stopped by skipping execution of upstream stage and calling onFlowError

...

  • stage (if defined)

onFlowError

Allows to define a Stage or StageExecutor that will be always executed if flow is failed

onFlowCancellation

Allows to define a Stage or StageExecutor that will be always executed if flow is cancelled

onFlowCancellationError

Allows to define a Stage or StageExecutor that will be always executed if flow is cancelled and one if cancel method is failed

onFlowSkip

Allows to define a Stage or StageExecutor that will be executed if flow is skipped by previous stage failure

Info

This stage is only usable for nested flow, main flow cannot be skipped

Flow Engine

A flow engine is an object that is used to execute a flow defined by a developer

Method

Description

execute

Executes flow using executor and timeout from FlowEngineBuilder synchronously

executeAsync

Executes flow using executor and timeout from FlowEngineBuilder asynchronously, returning Future<Void as a result

getFlowStatus

Allows to get flow status from FlowEngine using flow identifier

Posible flow statuses:

  • IN_PROGRESS

  • SUCCESS

  • FAILED

  • CANCELLED

  • CANCELLATION_FAILED

  • UNKNOWN (if flow by id is not found)

builder

Provides a FlowEngineBuilder to create a FlowEngine object

Builder

Flow engine builder allows to customize a FlowEngine object

Method

Description

execute

Executes flow using executor and timeout from FlowEngineBuilder synchronously

executeAsync

Executes flow using executor and timeout from FlowEngineBuilder asynchronously, returning Future<Void as a result

getFlowStatus

Allows to get flow status from FlowEngine using flow identifier

Posible flow statuses:

  • IN_PROGRESS

  • SUCCESS

  • FAILED

  • CANCELLED

  • CANCELLATION_FAILED

  • UNKNOWN (if flow by id is not found)

builder

Provides a FlowEngineBuilder to create a FlowEngine object

Stage

This interface is functional, which means that it can be used as lambda for stage definition

Code Block
languagejava
var stage1 = new Stage<>() {

  @Override
  public void execute(StageContext context) {
    System.out.println("Stage executed");
  }
};

var stage2 = (Stage<StageContext>) context -> System.out.println("Stage2 executed");

Flow.builder()
  .id("Simple Flow")
  .stage(stage1)
  .stage(stage2)
  .stage(context -> System.out.println("Stage3 executed"))
  .build();

Models

Stage

Flow

...

Note

The limitation of using lambdas - it’s their nature in Java and this approach won’t allow to determine in information about generic type, so context customization is not working with this case, and raw StageContext will be passed

Flow Methods

The flow engine uses flow methods to determine, what capabilities the stage provides:

  • Stage cancellation

  • Stage recovery

Method

Description

execute

Main method, which is must be defined for stage with business logic

getId

Returns stage identifier, which will be printed in the final report or in listenable methods

recover

Provides the ability to recover a stage after fail in execute method

cancel

Provides an ability to revert changed, done in execute and recover methods, if flow is failed by upstream stage

shouldCancelIfFailed

Defines execution of cancel method using StageContext, by default - it’s false

Listenable Methods

Listenable methods provided at crucial points of stage execution to provide better observability for the stage execution status

Method

Description

onStart

This method is called before stage execution

onSuccess

This method is called after successful stage execution including successful execution of recover method, if execute failed

onError

This method is called after unsuccessful stage execution (execute and recover methods)

onCancel

This method is called after successful execution of cancel method

onCancelError

This method is called after unsuccessful execution of cancel method

Parallel Stage

Dynamic Stage

Stage Context