---
title: Pipeline
description: Data transformation pipelines
weight: 52
---

**Pipelines** are small transformations applied to incoming data before writing the data to the Database.

Pipelines contain **tasks** to perform atomic operations on each field of each incoming data item.

Pipelines support **conditions** to perform tasks only if certain conditions are met.

Internally, pipelines are managed as a special built-in **_pipeline** Schema.


## Use cases

Pipelines can be used at **import time** to normalize, enrich or filter incoming data before it is stored. They can also be applied to **bulk transform existing objects and assets** already stored in a Schema.

Typical use cases:

- **Disable out-of-sync objects** — compare `last_sync` against a reference date and set `_action: disable` on stale entries
- **Delete entries matching a criteria** — discard entries where a field matches a pattern or a condition is met
- **Update a field conditionally** — set a status field, overwrite a value, or compute a new field when a condition is true
- **Normalize imported data** — lowercase emails, strip whitespace, rename columns, convert types
- **Generate keys** — compute a deterministic `keyname` from multiple fields using `field_md5` or `field_uuid`
- **Enrich data** — compute timestamps, append prefixes, join fields into composite values

Example — disable assets not seen since a cutoff date:

```yaml
tasks:
- set_condition: [STALE, lt, last_sync, cutoff_date]
- field_set: [STALE, _action, disable]
```

Example — delete entries where status matches a pattern:

```yaml
tasks:
- set_condition: [OBSOLETE, field_match, status, '^(deleted|obsolete)$']
- discard: [OBSOLETE]
```

Example — update a field when a condition is met:

```yaml
tasks:
- set_condition: [NO_OWNER, empty, owner]
- field_set: [NO_OWNER, owner, 'unassigned']
```


## Pipeline Example

```yaml
# A small pipeline to adapt user.csv to bulk load users from external systems

- classname: _pipeline
  keyname: user_import_pipeline
  displayname: user_import_csv
  description: Use this pipeline to import users as a csv file from system X/Y/Z
  content: |
      csv_delimiter: ';'
      classname: _user
      keyfield: login
      encoding: 'utf-8'
      tasks: 
            # TASKNAME: ["[!]CONDITION", "opt1", "opt2", "opt3", ...]
            # use "!" before CONDITION to negate
            # use '' CONDITION as always-True
          - field_lower: ['', email, login]
          - field_upper: ['', external_id]
          - field_uuid: ['', uuid_auto]
          - field_datetime_now: ['', last_sync]
```

## Pipeline usage

```bash
$  cavaliba load files/user.csv --pipeline user_import_pipeline
```

In the Web UI Import Tool, you can specify a pipeline to apply on provided data.


## run_permission

A pipeline can declare a `run_permission` field. When set, the caller must hold the named permission to execute `apply_to_schema` against a schema. If the caller does not hold the permission, the execution is denied and an error is returned.

This is independent of the data-level permissions checked when each instance is written back to the database.

Example — restrict a pipeline to users holding `p_pipeline_run`:

```yaml
- classname: _pipeline
  keyname: my_cleanup_pipeline
  run_permission: p_pipeline_run
  content: |
      tasks:
          - field_set: ['', _action, disable]
```

If `run_permission` is empty or absent, no permission check is performed at pipeline execution time.


## apply_to_schema

`apply_to_schema` iterates over all instances of a given schema, applies the pipeline tasks to each one, and writes the result back to the database. It is the standard way to run a bulk pipeline against existing data.

Parameters:

| Parameter | Description |
|---|---|
| `schemaname` | The target schema keyname (e.g. `server`, `application`) |
| `dryrun` | If `True`, pipeline is applied but no changes are written to the database (default: `False`) |
| `aaa` | Caller identity and permissions dict — required for permission checks and write operations |

Returns a tuple `(count_ok, count_discarded, errors)`:

| Value | Description |
|---|---|
| `count_ok` | Number of instances successfully processed (and written if not dryrun) |
| `count_discarded` | Number of instances discarded by the pipeline (not written) |
| `errors` | List of error strings for instances that failed to write |


## classname

For CSV files, this mandatory field provides the Schema name to load.

For YAML/JSON files, classname is provided by each data entry. A single file can combine objects for different Schemas.


## keyfield

The `keyfield` option defines the name of the CSV column which provides the keyname (primary key) value for each Instance.

Default if none provided: `keyname`


## encoding

For CSV files, you can configure the character encoding.

Default (if none) is `utf-8`.

Example:

    content: |
        encoding: 'ISO-8859-1'


## Pipeline conditions

Conditions are True or False. They are valid for an entry. They are reset when processing the next entry.

An empty condition is True.

A non-empty condition is False by default.

You set a condition with a `set_condition` task, performing various checks on any fields of an entry.

You check a condition by providing its name as the first parameter of a task operation.

You use quotes around a condition name if it contains special characters.

If you want to negate a condition (perform operation if condition is False), you put a `!` before the name of the condition, and you surround with quotes.


Example:

```yaml

# check a condition : does myfield contains 'test' ?
# perform a field operation (set_field, create my_status field) if condition is True
tasks: 
- set_condition: [CONDITION_TEST, field_match, myfield, 'test']
- field_set: [CONDITION_TEST, my_status, 'testok']

# set a condition, and perform a field operation if condition is NOT met
# notice the ! in the field_set
tasks: 
- set_condition: [CONDITION_TEST, field_match, myfield, 'test']
- field_set: ['!CONDITION_TEST', my_status, 'test_not_ok']

# no condition, always perform
tasks:
- field_set: ['', new_field, 'Hello']

```
