# Pipelines

> The @pipeline decorator. The 80% case for authoring a dataset.

A pipeline binds a source (where the raw data comes from) to a schema (the record shape) via a `transform` function. The decorator form does this in one file with one decorator. Everything you need for the common case.

If your source is paginated, multi-endpoint, or stateful, see [Three-class form](https://esker.so/docs/sdk/three-class-form.md).

## Minimal example

```python
from typing import Annotated
from pydantic import Field
from esker import pipeline


@pipeline(
    "us.sec.companies@1.0.0",
    url="https://www.sec.gov/files/company_tickers.json",
    entity_type="corp",
    key="cik",
)
class SecCompany:
    cik: Annotated[str, Field(pattern=r"^\d{10}$")]
    ticker: Annotated[str, Field(min_length=1, max_length=10)]
    title: str

    @classmethod
    def transform(cls, raw: dict) -> "SecCompany":
        return cls(
            cik=str(raw["cik_str"]).zfill(10),
            ticker=raw["ticker"],
            title=raw["title"],
        )
```

That's a complete pipeline. The decorator parses `<domain>@<semver>`, wraps the class as an `EskerModel`, synthesizes a `BulkJsonSource` from `url=`, builds an `EskerPipeline`, and registers it.

## Decorator signature

```python
def pipeline(
    ref: str,                      # "<domain>@<semver>"
    *,
    entity_type: str,              # required
    key: str,                      # required
    url: str | None = None,        # exactly one of (url, source)
    source: type[EskerSource] | None = None,
    source_url: str | None = None, # per-record URL template
    source_id: str | None = None,  # default = domain_id
    pipeline_version: str | None = None,  # default = schema_version
    cadence: str | None = None,
    user_agent: str | None = None, # only used with url=
)
```

### Required

| arg                            | what                                             | example                          |
| ------------------------------ | ------------------------------------------------ | -------------------------------- |
| `ref`                          | `<domain>@<semver>`                              | `"us.sec.companies@2.0.0"`       |
| `entity_type`                  | for `esker_id` synthesis. Lowercase letters only | `"corp"`                         |
| `key`                          | the field on your model holding the native id    | `"cik"`                          |
| **one of** `url=` or `source=` | the data source                                  | `"https://..."` or `MyCsvSource` |

### Optional

| arg                | default          | behavior                                     |
| ------------------ | ---------------- | -------------------------------------------- |
| `pipeline_version` | `schema_version` | manifest's `pipeline_version` field          |
| `source_id`        | `domain_id`      | overrides `BulkJsonSource.SOURCE_ID` default |
| `cadence`          | unset            | shown in `esker list`; metadata only         |
| `user_agent`       | `"esker/0.1"`    | header sent to origin (with `url=` only)     |
| `source_url`       | unset            | per-record URL template, see below           |

## The decorated class

```python
@pipeline(...)
class MyRecord:
    field_a: str
    field_b: int

    @classmethod
    def transform(cls, raw: dict) -> "MyRecord":
        return cls(...)
```

Rules:

- The class **must not** subclass `BaseModel`/`EskerModel`. The decorator builds the model for you. Subclassing raises `TypeError("@pipeline decorates plain classes; use the explicit three-class form when subclassing EskerModel directly.")`.
- It must declare at least one field annotation.
- It must define `transform(cls, raw) -> cls` as a classmethod.
- Fields named `schema_version` or `DOMAIN_ID` are silently ignored — you can't accidentally make them per-record by typing them.

## What gets synthesized

For `@pipeline("us.sec.companies@2.0.0", url="https://...", entity_type="corp", key="cik")`, the decorator builds three classes behind the scenes:

1. `_BaseSecCompany(EskerModel)` — intermediate base with `DOMAIN_ID = "us.sec.companies"`, `schema_version = "2.0.0"` as ClassVars.
2. `SecCompany(_BaseSecCompany)` — final `EskerModel` with your fields, plus `transform` carried over.
3. `UsSecCompaniesSource(BulkJsonSource)` — synthetic source with `URL`, `SOURCE_ID`, optional `USER_AGENT`.
4. `UsSecCompaniesPipeline(EskerPipeline)` — synthetic pipeline with all required ClassVars and a `transform` that calls your classmethod.

The decorator returns the **model class** (not the pipeline class). So `SecCompany` in your module is the `EskerModel` subclass, useful for type annotations elsewhere.

## Per-record URLs

`source_url=` takes a Python format-string template. The template is `.format(**fields)`-substituted against your record's domain fields to produce the `esker_source_url` column.

```python
@pipeline(
    "us.sec.companies@1.0.0",
    url="https://www.sec.gov/files/company_tickers.json",
    entity_type="corp",
    key="cik",
    source_url="https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}",
)
```

If `source_url=` isn't set, `esker_source_url` falls back to the URL the source actually fetched (`Fetched.source_url`).

A template referencing a missing field raises a wrapped `KeyError` that names the template, the bad key, and the available fields:

```
KeyError: source_url template 'https://example.com/{nonexistent}'
references field 'nonexistent' which is not on Widget
(available: ['name', 'wid'])
```

## Validation at decoration time

Four errors fire at module-import time:

```
TypeError: @pipeline ref must be '<domain>@<semver>', got 'badref'
TypeError: @pipeline entity_type must match /^[a-z]+$/, got 'Corp1'
TypeError: @pipeline requires exactly one of `url=` or `source=`
TypeError: @pipeline key='nonexistent' is not a field on E
```

Plus, if `transform` is missing:

```
TypeError: @pipeline class X must define `transform(cls, raw) -> cls` as a classmethod
```

These break the entry-point load for that module — the calling CLI command exits 1.

## Registration

The pipeline is `register()`ed at decoration time. The CLI discovers pipelines via the `esker.pipelines` entry-point group:

```toml
[project.entry-points."esker.pipelines"]
sec_companies = "my_pipelines.sec_companies"
```

The entry point's value is a Python module path. When `discover_pipelines()` walks the metadata, it imports each module, which fires the decorator, which registers the pipeline.

After editing entry points, force a reinstall so the metadata refreshes:

```sh
uv pip install -e . --reinstall-package my-pipelines
```

The CLI commands that need pipelines (`run`, `test`, `list`, `check`, `push`, `schema`) call `discover_pipelines()`. Hub-only commands (`view`, `manifest`, `pull`, `head`, `add`, `remove`, `sync`, `upgrade`, `whoami`, `login`, `search`, `transfer`, `visibility`, `config`) don't.

## When the decorator isn't enough

The `url=` shortcut hardcodes `BulkJsonSource`. If you need:

- pagination
- multiple endpoints
- a CSV source (use `source=BulkCsvSource`-subclass instead)
- header auth or stateful traversal
- to subclass `EskerModel` directly (e.g. share fields between models)

…use the [Three-class form](https://esker.so/docs/sdk/three-class-form.md). The decorator is sugar over exactly the same engine.

## See also

- [Records](https://esker.so/docs/sdk/records.md) — `EskerModel` deep dive
- [Sources](https://esker.so/docs/sdk/sources.md) — `EskerSource`, `Fetched`, the built-in source shapes
- [Three-class form](https://esker.so/docs/sdk/three-class-form.md) — the explicit Source/Schema/Pipeline split
- [Fixtures](https://esker.so/docs/sdk/fixtures.md) — testing your transform
- [Publishing](https://esker.so/docs/sdk/publishing.md) — pushing the result to the hub
