Detailed Release Notes for Data Import Splitting Feature

Configuration summary:

Configuration for this feature is supplied via environment variables.  To configure the new splitting feature, you'll need to do the following:

  1. Configure a S3-compatible storage (such as AWS S3 or MinIO) and supply its connection details in the environment variables
    1. Ensure CORS is configured as applicable
  2. Enable the feature using the SPLIT_FILES_ENABLED and RECORDS_PER_SPLIT_FILE environment variables
  3. (Optional) Setup garbage collection for abandoned uploads in AWS S3/MinIO
  4. (Optional) Customize the chunk prioritization algorithm via environment variables; recommended ones are listed here

Don't want this feature?

By default, this feature is disabled.  If this feature is disabled, no other configuration is necessary, and no S3 storage needs to be created/setup.

Additionally, please note that without this feature, downloading files from the UI will be unavailable.

If you want to enable this feature for a while, but later change your mind, don't worry — you can toggle this feature on and off as desired.  Note that, when disabled, no files can be downloaded from the UI and the "job parts" column will not show, however, all entries will still be present and viewable within the job log.

Environment variables at a glance

Please note, all of these are set at the cluster level for mod-data-import.  Additionally, once configured, they should be rather "plug and play", with little need for additional tweaking.  It is possible to run multiple instances of mod-data-import with different values, however, this can lead to strange behavior and is not recommended.

Purpose

Parameter

Required

Type (unit)

Default value

Notes

Main featureSPLIT_FILES_ENABLEDyes, if enabling the futuretrue, falsefalseThis feature is currently opt-in
Main featureRECORDS_PER_SPLIT_FILEnoint (records)1000Lower values result in easier to debug tiny pieces, whereas larger values result in less job log clutter
Main featureASYNC_PROCESSOR_POLL_INTERVAL_MSnoint (msec)5000

The number of milliseconds between times when the module checks the queue for waiting jobs.
This is also the "worst case" amount of time between when a job is queued and actually started. The average time is half this value.
A lower number means slightly increased response times, whereas a higher number decreases database load.
Note: once a job is begun, there will be no delays between further checks against the database until all jobs are completed.

Main featureASYNC_PROCESSOR_MAX_WORKERS_COUNTnoint ≥ 11The maximum number of jobs/slices to simultaneously process in a single instance.  

The worker count is useful for production/multi-tenant environments, where you might want to provide more capacity without additional instances.

Please note that multiple workers running simultaneously may cause some odd behavior when only one user is running a job, as multiple parts may appear to complete together.

S3 StorageAWS_URLyes, if splitting enabledURL as string

http://127.0.0.1:9000/

Location of the S3-compatible storage
S3 StorageAWS_REGIONyes, if splitting enabledstringnoneS3 region
S3 StorageAWS_BUCKETyes, if splitting enabledstringnoneBucket name
S3 StorageAWS_ACCESS_KEY_IDyes, if splitting enabledstringnoneAccess key 
S3 StorageAWS_SECRET_ACCESS_KEYyes, if splitting enabledstringnoneAccess secret
S3 StorageAWS_SDKyes, if using AWS and
splitting is enabled
true, falsefalseIf the S3 storage is AWS (true) or MinIO (false)
S3 StorageS3_FORCEPATHSTYLEnotrue, falsefalseIf buckets should be referenced by path instead of virtual host
Chunk prioritization

SCORE_JOB_SMALLEST

noint

40

See the section below on customizing the scoring algorithm.

Chunk prioritization

SCORE_JOB_LARGEST

noint

-40


Chunk prioritization

SCORE_JOB_REFERENCE

noint (records)

100000


Chunk prioritization

SCORE_AGE_NEWEST

noint

0


Chunk prioritization

SCORE_AGE_OLDEST

noint

50


Chunk prioritization

SCORE_AGE_EXTREME_THRESHOLD_MINUTES

noint (minutes)

480


Chunk prioritization

SCORE_AGE_EXTREME_VALUE

noint

10000


Chunk prioritization

SCORE_TENANT_USAGE_MIN

noint

100


Chunk prioritization

SCORE_TENANT_USAGE_MAX

noint

-200


Chunk prioritization

SCORE_PART_NUMBER_FIRST

noint

1


Chunk prioritization

SCORE_PART_NUMBER_LAST

noint

0


Chunk prioritizationSCORE_PART_NUMBER_LAST_REFERENCEnoint100

AWS S3 CORS Configuration

By default, AWS does not permit cross-origin requests to S3 buckets.  Therefore, you will need to configure CORS with something like the following:

[
  {
    "AllowedHeaders": [],
    "AllowedMethods": ["GET", "PUT"],
    "AllowedOrigins": ["https://your-tenant-url"],
    "ExposeHeaders": ["etag"]
  }
]

If it better fits your use case, you can use wildcards in the origin, such as https://*.int.aws.folio.org/.  Note that Okapi/backend modules' origins do not need to be included here; only domains that will be running the UI.

Garbage collection of cloud stored files: 

Due to the persistence of objects in cloud storage when using data import file splitting it is recommended that hosting providers define life cycle policies for files.  Files should be deleted after a given time frame to eliminate avoidable costs. 

AWS/S3 procedure:

For normal files this time frame can be tailored to what tenants and host desire and done easily in the AWS S3 UI.. 

S3 storage lens can help find them as a Manual option

https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpu-abort-incomplete-mpu-lifecycle-config.html

https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lifecycle-mgmt.html

This lifecycle may already be defined, however a new type will need to be defined for failed uploads. 

An example lifecycle rule to delete incomplete multi part uploads older then 7 days

<LifecycleConfiguration> <Rule> <ID>sample-rule</ID> <Prefix></Prefix> <Status>Enabled</Status> <AbortIncompleteMultipartUpload> <DaysAfterInitiation>7</DaysAfterInitiation> </AbortIncompleteMultipartUpload> </Rule> </LifecycleConfiguration>

From <https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpu-abort-incomplete-mpu-lifecycle-config.html>

This can be done in a command like: 

s3api put-bucket-lifecycle-configuration --bucket <bucketName> --lifecycle-configuration '<json content>'

MinIO procedure:

Info can be found here:

https://min.io/docs/minio/linux/administration/object-management/object-lifecycle-management.html#minio-lifecycle-management-tiering

mc ilm tier add

From <https://min.io/docs/minio/linux/reference/minio-mc/mc-ilm-tier-add.html#command-mc.ilm.tier.add>

mc ilm rule add

From <https://min.io/docs/minio/linux/reference/minio-mc/mc-ilm-rule-add.html#mc.ilm.rule.add.-transition-days>

Queue Prioritization Algorithm

When enabled, mod-data-import will split large jobs into smaller “chunks,” adding each chunk into a queue and dynamically ordering them to ensure a fair distribution of jobs are run at the same time, considering metrics such as job size, tenant usage (for multi-tenant environments), and how long a job has been waiting.  The algorithm for selecting which chunk will be run next is highly configurable, allowing experimentation and "dialing in" of parameters for specific tenants and deployments.  Details on how this algorithm works, as well as how to customize it, may be found below:

Approach

When a worker becomes available, it will calculate and assign every waiting chunk a single numerical “score.” This score will combine many factors according to the parameters, and is designed to represent a holistic view of the chunk, including the job size, waiting time, and more.  Higher scores are better.

Factors considered

Metric

Calculation type (see "Implementation notes")

Parameters

Job size

Unbounded logarithmic

SCORE_JOB_SIZE_SMALLESTSCORE_JOB_SIZE_LARGEST, reference is SCORE_JOB_SIZE_REFERENCE

Age

Bounded logarithmic

SCORE_AGE_NEWESTSCORE_AGE_OLDEST
SCORE_AGE_EXTREME_VALUE above some threshold (SCORE_AGE_EXTREME_THRESHOLD minutes)

Tenant usage

Linear

SCORE_TENANT_USAGE_MINSCORE_TENANT_USAGE_MAX

Part number

Unbounded logarithmic

SCORE_PART_NUMBER_FIRSTSCORE_PART_NUMBER_LAST

Job size

This metric considers the total size of the job, in records.  This allows control over prioritizing smaller jobs over larger ones; for instance, if a large job has been running for many hours (or would otherwise have priority), it may be desired for a job with only a handful of records to be able to "skip the line" and get processed next.

This is computed on a logarithmic scale, meaning that every doubling of the value only increases the score by one.  For example, if the score ranges from 5 (smallest) to 0 (largest), and the reference value is 32, a job with size 1 would get score 5, size 2 gets score 4, size 4 gets score 3, size 8 gets score 2, size 16 gets score 1, and size 32 gets score 0.  For more details on the calculation, see “Implementation notes” below.

Age

This metric considers how long this chunk has been waiting, based on when the user selected "Run" in the interface.  This control is useful since it allows jobs that have been waiting longer to be prioritized.

Additionally, this metric considers an "extreme value," allowing a sort of failsafe to prevent other factors from de-prioritizing very old jobs.  For example, if we set the normal newest — oldest scores to range from 0 - 100, and job size 0 - 1000, job size could very well outweigh the age, and keep bumping an old but large job to the back.  An example usage of this failsafe could be to ensure no job waits more than 24 hours; with a threshold of 24 hours, the value could be set to something like 10000 — more than enough to jump any jobs waiting longer than a day to the top of the queue.

Tenant usage

This considers how many workers are currently being used by a tenant; jobs from a tenant which is currently saturating the queue would be deprioritized, pushing for an even distribution amongst all tenants.  Note that this will have no effect if in a single-tenant environment (or only one tenant is currently importing data), since all jobs' scores would be affected equally.

This is done on a "linear" scale, making it percentage-based; a tenant using 25% of the workers will have a score 25% of the way between the min and max values.

Part number

Lastly, this metric is useful to ensure chunks run in order (otherwise chunks from the same job could run in a non-deterministic order).  It is recommended to keep this range very low (e.g. just 0 to -1), since otherwise every chunk in the same job should have the same score.

This is logarithmic for implementation-specific reasons, but since this is intended to be used on a very small range, this does not really matter.

Suggested values

Parameter

Sample value

Justification/Notes

SCORE_JOB_SMALLEST

40


SCORE_JOB_LARGEST

-40

Larger jobs should be deprioritized

SCORE_JOB_REFERENCE

100000


SCORE_AGE_NEWEST

0

New jobs get no boost

SCORE_AGE_OLDEST

50

Jobs aging have their scores increased rapidly, so this does not have to be too high. We want smaller jobs to be able to "cut" effectively

SCORE_AGE_EXTREME_THRESHOLD_MINUTES

480 minutes

8 hours

SCORE_AGE_EXTREME_VALUE

10000

Jump to the top of the queue

SCORE_TENANT_USAGE_MIN

100

If the tenant has no jobs running, then it should be prioritized

SCORE_TENANT_USAGE_MAX

-200

If the tenant is using all available workers, it should be significantly deprioritized. If no other tenants are competing, this will not matter (since all jobs would be offset by this)

SCORE_PART_NUMBER_FIRST

1

Very small; we only want to order parts amongst others within a job (which would likely have the same score otherwise)

SCORE_PART_NUMBER_LAST

0


SCORE_PART_NUMBER_LAST_REFERENCE100Does not really matter due to small range

First-In-First-Out (FIFO) configuration

For a FIFO configuration (considering only age of each job), use the following configuration:

Parameter

Sample value

Justification/Notes

SCORE_AGE_NEWEST

0


SCORE_AGE_OLDEST

50

Can be anything, really, so long as older jobs get higher scores

SCORE_AGE_EXTREME_THRESHOLD_MINUTES

480 minutes

8 hours

SCORE_AGE_EXTREME_VALUE

10000

Jump to the top of the queue

SCORE_PART_NUMBER_FIRST

1

Without considering part number, each slice of a job may be considered "at the same time", causing some slices to be processed before others.

SCORE_PART_NUMBER_LAST

0


SCORE_PART_NUMBER_LAST_REFERENCE100

SCORE_JOB_SMALLEST

0

unused

SCORE_JOB_LARGEST

0

unused

SCORE_JOB_REFERENCE

0

unused

SCORE_TENANT_USAGE_MIN

0

unused

SCORE_TENANT_USAGE_MAX

0

unused

Implementation notes

Want to disregard a metric?

To disregard/ignore a metric, simply set its parameters to zero.

Customization tips

To make it easier to visualize this algorithm, see this playground https://codesandbox.io/s/di-scoring-playground-x4kqrw?file=/src/ChunkAdd.tsx.  This makes it easy to simulate tenant usage and jobs of different ages, sizes, and tenants.

Logarithmic factors can often be difficult to calibrate, since they can potentially be infinite, making it difficult to choose a good reference value.  When attempting to determine this, don't look for the "largest possible" value, just use something that is a "typical" or "expected" large value; any values that exceed the reference value will still have their score calculated; it will just exceed the range.  To aid in this calibration, we developed https://codesandbox.io/s/di-unbounded-logarithmic-playground-yf4yyz, which shows score ranges for a given lower and upper value:

Logging

Whenever a worker looks for a job, we log all calculated scores, to make it easier to calibrate in production.  Look for Current worker tenant usage (lists how many workers are in use by each) and Calculated scores (lists the score for each job, listed as tenant/queue chunk ID/score).

Calculations

Warning to the reader: this section gets extremely technical.

Unbounded logarithmic

These represent potentially unbounded values. As such, they do not have a limit; instead, we will define a reference/expected value for the upper bound (that would represent a typical upper bound). However, since we are using a logarithm, the effect of values past the expected bounds is minimal.

To interactively see how this calculation works, see https://codesandbox.io/s/di-unbounded-logarithmic-playground-yf4yyz

For example: the size of a chunk. If we expect chunks to typically have a size from 1 to 32, we could define scores 0 to 5. With this, we would get the following scores:

Value range

Score range

[1,2]

(0,1]

[3,4]

(1,2]

[5,8]

(2,3]

[9,16]

(3,4]

[17,32]

(4,5]. This is the upper bound of the expected range, but since the real value could be infinite, it can keep going…

[33,64]

(5,6]

… towards ∞

Why is this a good approach?

This may seem way overcomplicated, however, the above math can be written as one or two lines of code. Some other questions I asked myself while deciding on this were:

  • Why not just use a linear approach? We want to group things into classes of different sizes; for example, jobs could be “small” (e.g. 0-100), “medium” (100-1000), and “large” (1000-10000+). Two “medium” jobs could be 900 records apart, 9x the entire range of “small”, diluting the differences within that class. By using a logarithmic scale, we can retain granularity within each group.

  • Why not just have classes/ranges like you mentioned above? We could do this, however, the next immediate question is “how many ranges should there be” and “what range should each one cover.” We could attempt to answer these, but we cannot anticipate the needs of the future. For the latter, we could make configuration variables, but just sorting into five groups would introduce fifteen variables (lower, upper, score); which seems overcomplicated.

Bounded logarithmic

This acts like logarithmic, however, upon reaching the reference value/expected upper bound (EXTREME_THRESHOLD), the EXTREME_VALUE will be used instead. This is useful for something like Age, where we want to gradually increase the score over time, however, after a certain amount of time, we want that job to finish ASAP (effectively be bumped to the top). This represents the maximum we want this parameter to ever get to.

Linear

This works for percentages; if the value corresponds to 50%, then the score will be halfway between the upper and lower scores.

Adding additional metrics

The code backing this is highly extensible; to add your own factors, create a new class extending QueueItemRanker with a single method score(DataImportQueueItem queueItem, Map<String, Long> tenantUsage).  Then, to include it in the calculations, simply add your custom ranker to the score  method of QueueItemHolisticRanker.