Skip to end of banner
Go to start of banner

folio-flow-engine

Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Overview

folio-flow-engine is a Java library, that utilizes native classes for Asynchronous execution like CompletableFuture, ExecutorService etc. to provide a friendly interface for composing tasks to be executed with different rules.

GitHub repository: https://github.com/folio-org/folio-flow-engine

Key Functionality

Flow engine provides the following functionality:

  • Asynchronous execution

  • Stage execution in sequence

  • Stage execution in parallel

  • Complex flow execution, including execution of nested flows in parallel

  • Dynamic stage generation based on context data

  • Stage cancellation of one of the upstream stages failed

  • Stage execution observability

    • methods to observe critical moments of stage execution

    • explicit logging, including flow engine reports in logs

  • Java-friendly context management using the Decorator pattern

Java code example

import java.util.concurrent.Executors;
import org.folio.flow.api.Flow;
import org.folio.flow.api.FlowEngine;
import org.folio.flow.api.Stage;
import org.folio.flow.api.StageContext;
import org.folio.flow.model.FlowExecutionStrategy;

public class FlowEngineExample {

  public static void main(String[] args) {
    var sampleFlow = Flow.builder()
      .id("sample flow")
      .stage(new Stage1())
      .stage(new Stage2())
      .executionStrategy(FlowExecutionStrategy.IGNORE_ON_ERROR)
      .build();

    var flowEngine = FlowEngine.builder()
      .executor(Executors.newFixedThreadPool(4))
      .build();

    flowEngine.execute(sampleFlow);
  }

  public static class Stage1 implements Stage<StageContext> {

    @Override
    public void execute(StageContext context) {
      System.out.println("Stage1 executed");
    }

    @Override
    public void cancel(StageContext context) {
      System.out.println("Stage1 cancelled");
    }

    @Override
    public void recover(StageContext context) {
      System.out.println("Stage1 recovered");
    }
  }

  public static class Stage2 implements Stage<StageContext> {

    @Override
    public void execute(StageContext context) {
      throw new RuntimeException("Stage2 failed");
    }

    @Override
    public void cancel(StageContext context) {
      System.out.println("Stage2 cancelled");
    }

    @Override
    public void recover(StageContext context) {
      System.out.println("Stage2 recovered");
    }
  }
}

Sequence Task Execution

Description

The Flow Engine allows tasks to be executed in a specific order, ensuring each task is completed before the next one begins.

Usage

  • Define a sequence of tasks in the desired order.

  • The Flow Engine will execute each task one after the other.

  • If a task fails, subsequent tasks in the sequence will not be executed.

Flow engine follows the following scenarios:

Simple flow execution

A flow engine can execute stages in sequence, waiting for the previous stage to be finished before executing the next one.

Data can be transferred between stages using StageContext model.

Stage context store data with key as String and value as Object. There is a way to make context more Java-friendly using AbstractStageContextWrapper

Flow.builder()
  .id("Simple Flow")
  .stage(task1)
  .stage(task2)
  .stage(task3)
  .build();

Simple flow execution (failed stage)

When one of the stages in flow fails - the flow manager stops the execution and calls onFlowErrorStage (if defined)

Flow.builder()
  .id("Simple Flow")
  .stage(task1)
  .stage(task2)
  .stage(task3)
  .stage(task4)
  .onFlowError(onFlowErrorStage)
  .build();

This behavior is default, and it can be re-configured:

Flow.builder()
  // stages
  .executionStrategy(CANCEL_ON_ERROR | IGNORE_ON_ERROR)
  .build()

Simple flow execution (failed stage with cancellation)

Flow.builder()
  .id("Simple Flow")
  .stage(task1)
  .stage(task2)
  .stage(task3)
  .stage(task4)
  .onFlowCancellation(onFlowCancellationStage)
  .onFlowError(onFlowErrorStage)
  .executionStrategy(CANCEL_ON_ERROR | IGNORE_ON_ERROR)
  .build();

Parallel Task Execution

Description

The Flow Engine supports the execution of multiple tasks simultaneously, leveraging concurrency to improve efficiency and reduce total execution time.

Usage

  • Define tasks that can be executed in parallel.

  • The Flow Engine will manage the concurrent execution of these tasks.

    • parallelism is configured in the FlowEngine component

  • If one task fails, all running parallel tasks will be canceled (if the strategy is CANCEL_ON_ERROR).

Parallel Stage execution

Flow.builder()
  .id("Simple Flow")
  .stage(task1)
  .stage(ParallelStage.of(task2_1, task2_2, task2_3))
  .stage(task3)
  .build();

Note that parallelism is customizable by defining an executor when building FlowEngine component

public FlowEngine flowEngine() {
  return FlowEngine.builder()
    .name("flow-engine")
    .executor(/* custom executor can be defined here */))
    .executionTimeout(/* stage timeout execution can be defined here */)
    .build();
}

Parallel Stage execution(stage failed)

Flow.builder()
  .id("Simple Flow")
  .stage(task1)
  .stage(ParallelStage.of(task2_1, task2_2, task2_3))
  .stage(task3)
  .onFlowError(onFlowErrorStage)
  .build();

Models

Stage

Flow

Parallel Flow

  • No labels