Pipelines

The @pipeline decorator. The 80% case for authoring a dataset.

A pipeline binds a source (where the raw data comes from) to a schema (the record shape) via a transform function. The decorator form does this in one file with one decorator. Everything you need for the common case.

If your source is paginated, multi-endpoint, or stateful, see Three-class form.

Minimal example

from typing import Annotated
from pydantic import Field
from esker import pipeline


@pipeline(
    "us.sec.companies@1.0.0",
    url="https://www.sec.gov/files/company_tickers.json",
    entity_type="corp",
    key="cik",
)
class SecCompany:
    cik: Annotated[str, Field(pattern=r"^\d{10}$")]
    ticker: Annotated[str, Field(min_length=1, max_length=10)]
    title: str

    @classmethod
    def transform(cls, raw: dict) -> "SecCompany":
        return cls(
            cik=str(raw["cik_str"]).zfill(10),
            ticker=raw["ticker"],
            title=raw["title"],
        )

That's a complete pipeline. The decorator parses <domain>@<semver>, wraps the class as an EskerModel, synthesizes a BulkJsonSource from url=, builds an EskerPipeline, and registers it.

Decorator signature

def pipeline(
    ref: str,                      # "<domain>@<semver>"
    *,
    entity_type: str,              # required
    key: str,                      # required
    url: str | None = None,        # exactly one of (url, source)
    source: type[EskerSource] | None = None,
    source_url: str | None = None, # per-record URL template
    source_id: str | None = None,  # default = domain_id
    pipeline_version: str | None = None,  # default = schema_version
    cadence: str | None = None,
    user_agent: str | None = None, # only used with url=
)

Required

arg what example
ref <domain>@<semver> "us.sec.companies@2.0.0"
entity_type for esker_id synthesis. Lowercase letters only "corp"
key the field on your model holding the native id "cik"
one of url= or source= the data source "https://..." or MyCsvSource

Optional

arg default behavior
pipeline_version schema_version manifest's pipeline_version field
source_id domain_id overrides BulkJsonSource.SOURCE_ID default
cadence unset shown in esker list; metadata only
user_agent "esker/0.1" header sent to origin (with url= only)
source_url unset per-record URL template, see below

The decorated class

@pipeline(...)
class MyRecord:
    field_a: str
    field_b: int

    @classmethod
    def transform(cls, raw: dict) -> "MyRecord":
        return cls(...)

Rules:

  • The class must not subclass BaseModel/EskerModel. The decorator builds the model for you. Subclassing raises TypeError("@pipeline decorates plain classes; use the explicit three-class form when subclassing EskerModel directly.").
  • It must declare at least one field annotation.
  • It must define transform(cls, raw) -> cls as a classmethod.
  • Fields named schema_version or DOMAIN_ID are silently ignored — you can't accidentally make them per-record by typing them.

What gets synthesized

For @pipeline("us.sec.companies@2.0.0", url="https://...", entity_type="corp", key="cik"), the decorator builds three classes behind the scenes:

  1. _BaseSecCompany(EskerModel) — intermediate base with DOMAIN_ID = "us.sec.companies", schema_version = "2.0.0" as ClassVars.
  2. SecCompany(_BaseSecCompany) — final EskerModel with your fields, plus transform carried over.
  3. UsSecCompaniesSource(BulkJsonSource) — synthetic source with URL, SOURCE_ID, optional USER_AGENT.
  4. UsSecCompaniesPipeline(EskerPipeline) — synthetic pipeline with all required ClassVars and a transform that calls your classmethod.

The decorator returns the model class (not the pipeline class). So SecCompany in your module is the EskerModel subclass, useful for type annotations elsewhere.

Per-record URLs

source_url= takes a Python format-string template. The template is .format(**fields)-substituted against your record's domain fields to produce the esker_source_url column.

@pipeline(
    "us.sec.companies@1.0.0",
    url="https://www.sec.gov/files/company_tickers.json",
    entity_type="corp",
    key="cik",
    source_url="https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}",
)

If source_url= isn't set, esker_source_url falls back to the URL the source actually fetched (Fetched.source_url).

A template referencing a missing field raises a wrapped KeyError that names the template, the bad key, and the available fields:

KeyError: source_url template 'https://example.com/{nonexistent}'
references field 'nonexistent' which is not on Widget
(available: ['name', 'wid'])

Validation at decoration time

Four errors fire at module-import time:

TypeError: @pipeline ref must be '<domain>@<semver>', got 'badref'
TypeError: @pipeline entity_type must match /^[a-z]+$/, got 'Corp1'
TypeError: @pipeline requires exactly one of `url=` or `source=`
TypeError: @pipeline key='nonexistent' is not a field on E

Plus, if transform is missing:

TypeError: @pipeline class X must define `transform(cls, raw) -> cls` as a classmethod

These break the entry-point load for that module — the calling CLI command exits 1.

Registration

The pipeline is register()ed at decoration time. The CLI discovers pipelines via the esker.pipelines entry-point group:

[project.entry-points."esker.pipelines"]
sec_companies = "my_pipelines.sec_companies"

The entry point's value is a Python module path. When discover_pipelines() walks the metadata, it imports each module, which fires the decorator, which registers the pipeline.

After editing entry points, force a reinstall so the metadata refreshes:

uv pip install -e . --reinstall-package my-pipelines

The CLI commands that need pipelines (run, test, list, check, push, schema) call discover_pipelines(). Hub-only commands (view, manifest, pull, head, add, remove, sync, upgrade, whoami, login, search, transfer, visibility, config) don't.

When the decorator isn't enough

The url= shortcut hardcodes BulkJsonSource. If you need:

  • pagination
  • multiple endpoints
  • a CSV source (use source=BulkCsvSource-subclass instead)
  • header auth or stateful traversal
  • to subclass EskerModel directly (e.g. share fields between models)

…use the Three-class form. The decorator is sugar over exactly the same engine.

See also