Pipeline

Pipeline

Pipelines are small transformations applied to incoming data before writing the data to the Database.

Pipelines contain tasks to perform atomic operations on each field of each incoming data item.

Pipelines support conditions to perform tasks only if certain conditions are met.

Internally, pipelines are managed as a special built-in _pipeline Schema.

Use cases

Pipelines can be used at import time to normalize, enrich or filter incoming data before it is stored. They can also be applied to bulk transform existing objects and assets already stored in a Schema.

Typical use cases:

  • Disable out-of-sync objects — compare last_sync against a reference date and set _action: disable on stale entries
  • Delete entries matching a criteria — discard entries where a field matches a pattern or a condition is met
  • Update a field conditionally — set a status field, overwrite a value, or compute a new field when a condition is true
  • Normalize imported data — lowercase emails, strip whitespace, rename columns, convert types
  • Generate keys — compute a deterministic keyname from multiple fields using field_md5 or field_uuid
  • Enrich data — compute timestamps, append prefixes, join fields into composite values

Example — disable assets not seen since a cutoff date:

tasks:
- set_condition: [STALE, lt, last_sync, cutoff_date]
- field_set: [STALE, _action, disable]

Example — delete entries where status matches a pattern:

tasks:
- set_condition: [OBSOLETE, field_match, status, '^(deleted|obsolete)$']
- discard: [OBSOLETE]

Example — update a field when a condition is met:

tasks:
- set_condition: [NO_OWNER, empty, owner]
- field_set: [NO_OWNER, owner, 'unassigned']

Pipeline Example

# A small pipeline to adapt user.csv to bulk load users from external systems

- classname: _pipeline
  keyname: user_import_pipeline
  displayname: user_import_csv
  description: Use this pipeline to import users as a csv file from system X/Y/Z
  content: |
      csv_delimiter: ';'
      classname: _user
      keyfield: login
      encoding: 'utf-8'
      tasks: 
            # TASKNAME: ["[!]CONDITION", "opt1", "opt2", "opt3", ...]
            # use "!" before CONDITION to negate
            # use '' CONDITION as always-True
          - field_lower: ['', email, login]
          - field_upper: ['', external_id]
          - field_uuid: ['', uuid_auto]
          - field_datetime_now: ['', last_sync]      

Pipeline usage

$  cavaliba load files/user.csv --pipeline user_import_pipeline

In the Web UI Import Tool, you can specify a pipeline to apply on provided data.

run_permission

A pipeline can declare a run_permission field. When set, the caller must hold the named permission to execute apply_to_schema against a schema. If the caller does not hold the permission, the execution is denied and an error is returned.

This is independent of the data-level permissions checked when each instance is written back to the database.

Example — restrict a pipeline to users holding p_pipeline_run:

- classname: _pipeline
  keyname: my_cleanup_pipeline
  run_permission: p_pipeline_run
  content: |
      tasks:
          - field_set: ['', _action, disable]      

If run_permission is empty or absent, no permission check is performed at pipeline execution time.

apply_to_schema

apply_to_schema iterates over all instances of a given schema, applies the pipeline tasks to each one, and writes the result back to the database. It is the standard way to run a bulk pipeline against existing data.

Parameters:

Parameter Description
schemaname The target schema keyname (e.g. server, application)
dryrun If True, pipeline is applied but no changes are written to the database (default: False)
aaa Caller identity and permissions dict — required for permission checks and write operations

Returns a tuple (count_ok, count_discarded, errors):

Value Description
count_ok Number of instances successfully processed (and written if not dryrun)
count_discarded Number of instances discarded by the pipeline (not written)
errors List of error strings for instances that failed to write

classname

For CSV files, this mandatory field provides the Schema name to load.

For YAML/JSON files, classname is provided by each data entry. A single file can combine objects for different Schemas.

keyfield

The keyfield option defines the name of the CSV column which provides the keyname (primary key) value for each Instance.

Default if none provided: keyname

encoding

For CSV files, you can configure the character encoding.

Default (if none) is utf-8.

Example:

content: |
    encoding: 'ISO-8859-1'

Pipeline conditions

Conditions are True or False. They are valid for an entry. They are reset when processing the next entry.

An empty condition is True.

A non-empty condition is False by default.

You set a condition with a set_condition task, performing various checks on any fields of an entry.

You check a condition by providing its name as the first parameter of a task operation.

You use quotes around a condition name if it contains special characters.

If you want to negate a condition (perform operation if condition is False), you put a ! before the name of the condition, and you surround with quotes.

Example:


# check a condition : does myfield contains 'test' ?
# perform a field operation (set_field, create my_status field) if condition is True
tasks: 
- set_condition: [CONDITION_TEST, field_match, myfield, 'test']
- field_set: [CONDITION_TEST, my_status, 'testok']

# set a condition, and perform a field operation if condition is NOT met
# notice the ! in the field_set
tasks: 
- set_condition: [CONDITION_TEST, field_match, myfield, 'test']
- field_set: ['!CONDITION_TEST', my_status, 'test_not_ok']

# no condition, always perform
tasks:
- field_set: ['', new_field, 'Hello']