folio-flow-engine

Overview

folio-flow-engine is a Java library, that utilizes native classes for Asynchronous execution like CompletableFuture, ExecutorService etc. to provide a friendly interface for composing tasks to be executed with different rules.

GitHub repository: https://github.com/folio-org/folio-flow-engine

Key Functionality

Flow engine provides the following functionality:

  • Asynchronous execution

  • Stage execution in sequence

  • Stage execution in parallel

  • Complex flow execution, including execution of nested flows in parallel

  • Dynamic stage generation based on context data

  • Stage cancellation of one of the upstream stages failed

  • Stage execution observability

    • methods to observe critical moments of stage execution

    • explicit logging, including flow engine reports in logs

  • Java-friendly context management using the Decorator pattern

import java.util.concurrent.Executors; import org.folio.flow.api.Flow; import org.folio.flow.api.FlowEngine; import org.folio.flow.api.Stage; import org.folio.flow.api.StageContext; import org.folio.flow.model.FlowExecutionStrategy; public class FlowEngineExample { public static void main(String[] args) { var sampleFlow = Flow.builder() .id("sample flow") .stage(new Stage1()) .stage(new Stage2()) .executionStrategy(FlowExecutionStrategy.IGNORE_ON_ERROR) .build(); var flowEngine = FlowEngine.builder() .executor(Executors.newFixedThreadPool(4)) .build(); flowEngine.execute(sampleFlow); } public static class Stage1 implements Stage<StageContext> { @Override public void execute(StageContext context) { System.out.println("Stage1 executed"); } @Override public void cancel(StageContext context) { System.out.println("Stage1 cancelled"); } @Override public void recover(StageContext context) { System.out.println("Stage1 recovered"); } } public static class Stage2 implements Stage<StageContext> { @Override public void execute(StageContext context) { throw new RuntimeException("Stage2 failed"); } @Override public void cancel(StageContext context) { System.out.println("Stage2 cancelled"); } @Override public void recover(StageContext context) { System.out.println("Stage2 recovered"); } } }

Sequence Task Execution

Description

The Flow Engine allows tasks to be executed in a specific order, ensuring each task is completed before the next one begins.

Usage

  • Define a sequence of tasks in the desired order.

  • The Flow Engine will execute each task one after the other.

  • If a task fails, subsequent tasks in the sequence will not be executed.

Flow engine follows the following scenarios:

Simple flow execution

A flow engine can execute stages in sequence, waiting for the previous stage to be finished before executing the next one.

Data can be transferred between stages using StageContext model.

Stage context store data with key as String and value as Object. There is a way to make context more Java-friendly using AbstractStageContextWrapper

Flow.builder() .id("Simple Flow") .stage(task1) .stage(task2) .stage(task3) .build();

Simple flow execution (failed stage)

When one of the stages in flow fails - the flow manager stops the execution and calls onFlowErrorStage (if defined)

Flow.builder() .id("Simple Flow") .stage(task1) .stage(task2) .stage(task3) .stage(task4) .onFlowError(onFlowErrorStage) .build();

This behavior is default, and it can be re-configured:

Simple flow execution (failed stage with cancellation)

Parallel Task Execution

Description

The Flow Engine supports the execution of multiple tasks simultaneously, leveraging concurrency to improve efficiency and reduce total execution time.

Usage

  • Define tasks that can be executed in parallel.

  • The Flow Engine will manage the concurrent execution of these tasks.

    • parallelism is configured in the FlowEngine component

  • If one task fails, all parallel tasks will be canceled except failed ones (if the strategy is CANCEL_ON_ERROR).

Parallel Stage execution

Note that parallelism is customizable by defining an executor when building FlowEngine component

Parallel Stage execution(stage failed)

Parallel Stage execution (stage failed with cancellation)

Nested flow execution

Interfaces and models

Flow / Flow Builder

Provides a Flow object, containing a sequence of stages and flows to be executed

Method

Description

Method

Description

id

Used to define flow identifier that will be used for report and in the trace logs

stage

Allows to define a Stage or StageExecutor to be executed in the flow

flowParameter

Allows to define a general flow parameter that will be available for all stages in the flow

flowParameters

Allow to define a map with flow parameters that will be available for all stages in the flow

executionStrategy

Allows to define flow execution strategy:

  • CANCEL_ON_ERROR : on stage error – all executed stage will be cancelled by calling cancel method (if defined) in reverse order

  • IGNORE_ON_ERROR : on stage error – the flow will be stopped by skipping execution of upstream stage and calling onFlowError stage (if defined)

onFlowError

Allows to define a Stage or StageExecutor that will be always executed if flow is failed

onFlowCancellation

Allows to define a Stage or StageExecutor that will be always executed if flow is cancelled

onFlowCancellationError

Allows to define a Stage or StageExecutor that will be always executed if flow is cancelled and one if cancel method is failed

onFlowSkip

Allows to define a Stage or StageExecutor that will be executed if flow is skipped by previous stage failure

Flow Engine

A flow engine is an object that is used to execute a flow defined by a developer

Method

Description

Method

Description

execute

Executes flow using executor and timeout from FlowEngineBuilder synchronously

executeAsync

Executes flow using executor and timeout from FlowEngineBuilder asynchronously, returning Future<Void as a result

getFlowStatus

Allows to get flow status from FlowEngine using flow identifier

Posible flow statuses:

  • IN_PROGRESS

  • SUCCESS

  • FAILED

  • CANCELLED

  • CANCELLATION_FAILED

  • UNKNOWN (if flow by id is not found)

builder

Provides a FlowEngineBuilder to create a FlowEngine object

Builder

Flow engine builder allows to customize a FlowEngine object

Method

Description

Method

Description

name

Allows to define flow engine name

lastExecutionsStatusCacheSize

Allows to define how much history preserve in memory for latest flow executions

executor

Allows to define custom executor for the FlowEngine, by default is common ForkJoinPool is used

printFlowResult

Defines if flow result must be printed in logs

stageReportProvider

Allows to define custom stage report provider instead of custom one

Stage Report Provider

Stage report is a useful way to customize intends and stage execution messages

Default stage report example:

Custom stage report example:

Stage

This interface is functional, which means that it can be used as lambda for stage definition

Flow Methods

The flow engine uses flow methods to determine, what capabilities the stage provides:

  • Stage cancellation

  • Stage recovery

Method

Description

Method

Description

execute

Main method, which is must be defined for stage with business logic

getId

Returns stage identifier, which will be printed in the final report or in listenable methods

recover

Provides the ability to recover a stage after fail in execute method

cancel

Provides an ability to revert changed, done in execute and recover methods, if flow is failed by upstream stage

shouldCancelIfFailed

Defines execution of cancel method using StageContext, by default - it’s false

Listenable Methods

Listenable methods provided at crucial points of stage execution to provide better observability for the stage execution status

Method

Description

Method

Description

onStart

This method is called before stage execution

onSuccess

This method is called after successful stage execution including successful execution of recover method, if execute failed

onError

This method is called after unsuccessful stage execution (execute and recover methods)

onCancel

This method is called after successful execution of cancel method

onCancelError

This method is called after unsuccessful execution of cancel method

Parallel Stage

A parallel stage is used to specify a group of stages that need to be executed simultaneously.

Parallel Stage Builder

id

Used to define flow identifier that will be used for report and in the trace logs

stage

Allows to define a Stage or StageExecutor to be executed in the parallel

stages

Allows to define a list of Stage objects to be executed in parallel

shouldCancelIfFailed

Defines if parallel stage must call cancel method for inner stages if failed

Dynamic Stage

A dynamic stage generates a stage, flow, or parallel stage based on the data within StageContext.

Stage Context

Stage context is generated before execution of the stage and it will contain the following data

  • Current flow identifier

  • Previous stage context data

  • Flow parameters (nested flow parameters will always override main flow parameters until exiting the nested flow)

Stage executor

A StageExecutor is an interface that allows developers to implement a custom stage execution method. It has already been implemented as follows:

  • DefaultStageExecutor - used to execute all stages implementing Stage interface

  • FlowExecutor - used to execute all Flow objects

  • ParallelStageExecutor - used to execute stages in parallel

  • DynamicStageExecutor - used to execute DynamicStage objects

id

Used to define flow identifier that will be used for report and in the trace logs

stage

Allows to define a Stage or StageExecutor to be executed in the parallel

stages

Allows to define a list of Stage objects to be executed in parallel

shouldCancelIfFailed

Defines if parallel stage must call cancel method for inner stages if failed

Logging

flow engine provides a general logger with the name folio-flow-engine, and the log level can be customized

By default - all execution logs appear at the DEBUF level, and flow engine reports at the INFO level