Pipelines
The @pipeline decorator. The 80% case for authoring a dataset.
A pipeline binds a source (where the raw data comes from) to a schema (the record shape) via a transform function. The decorator form does this in one file with one decorator. Everything you need for the common case.
If your source is paginated, multi-endpoint, or stateful, see Three-class form.
Minimal example
from typing import Annotated
from pydantic import Field
from esker import pipeline
@pipeline(
"us.sec.companies@1.0.0",
url="https://www.sec.gov/files/company_tickers.json",
entity_type="corp",
key="cik",
)
class SecCompany:
cik: Annotated[str, Field(pattern=r"^\d{10}$")]
ticker: Annotated[str, Field(min_length=1, max_length=10)]
title: str
@classmethod
def transform(cls, raw: dict) -> "SecCompany":
return cls(
cik=str(raw["cik_str"]).zfill(10),
ticker=raw["ticker"],
title=raw["title"],
)
That's a complete pipeline. The decorator parses <domain>@<semver>, wraps the class as an EskerModel, synthesizes a BulkJsonSource from url=, builds an EskerPipeline, and registers it.
Decorator signature
def pipeline(
ref: str, # "<domain>@<semver>"
*,
entity_type: str, # required
key: str, # required
url: str | None = None, # exactly one of (url, source)
source: type[EskerSource] | None = None,
source_url: str | None = None, # per-record URL template
source_id: str | None = None, # default = domain_id
pipeline_version: str | None = None, # default = schema_version
cadence: str | None = None,
user_agent: str | None = None, # only used with url=
)
Required
| arg | what | example |
|---|---|---|
ref |
<domain>@<semver> |
"us.sec.companies@2.0.0" |
entity_type |
for esker_id synthesis. Lowercase letters only |
"corp" |
key |
the field on your model holding the native id | "cik" |
one of url= or source= |
the data source | "https://..." or MyCsvSource |
Optional
| arg | default | behavior |
|---|---|---|
pipeline_version |
schema_version |
manifest's pipeline_version field |
source_id |
domain_id |
overrides BulkJsonSource.SOURCE_ID default |
cadence |
unset | shown in esker list; metadata only |
user_agent |
"esker/0.1" |
header sent to origin (with url= only) |
source_url |
unset | per-record URL template, see below |
The decorated class
@pipeline(...)
class MyRecord:
field_a: str
field_b: int
@classmethod
def transform(cls, raw: dict) -> "MyRecord":
return cls(...)
Rules:
- The class must not subclass
BaseModel/EskerModel. The decorator builds the model for you. Subclassing raisesTypeError("@pipeline decorates plain classes; use the explicit three-class form when subclassing EskerModel directly."). - It must declare at least one field annotation.
- It must define
transform(cls, raw) -> clsas a classmethod. - Fields named
schema_versionorDOMAIN_IDare silently ignored — you can't accidentally make them per-record by typing them.
What gets synthesized
For @pipeline("us.sec.companies@2.0.0", url="https://...", entity_type="corp", key="cik"), the decorator builds three classes behind the scenes:
_BaseSecCompany(EskerModel)— intermediate base withDOMAIN_ID = "us.sec.companies",schema_version = "2.0.0"as ClassVars.SecCompany(_BaseSecCompany)— finalEskerModelwith your fields, plustransformcarried over.UsSecCompaniesSource(BulkJsonSource)— synthetic source withURL,SOURCE_ID, optionalUSER_AGENT.UsSecCompaniesPipeline(EskerPipeline)— synthetic pipeline with all required ClassVars and atransformthat calls your classmethod.
The decorator returns the model class (not the pipeline class). So SecCompany in your module is the EskerModel subclass, useful for type annotations elsewhere.
Per-record URLs
source_url= takes a Python format-string template. The template is .format(**fields)-substituted against your record's domain fields to produce the esker_source_url column.
@pipeline(
"us.sec.companies@1.0.0",
url="https://www.sec.gov/files/company_tickers.json",
entity_type="corp",
key="cik",
source_url="https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}",
)
If source_url= isn't set, esker_source_url falls back to the URL the source actually fetched (Fetched.source_url).
A template referencing a missing field raises a wrapped KeyError that names the template, the bad key, and the available fields:
KeyError: source_url template 'https://example.com/{nonexistent}'
references field 'nonexistent' which is not on Widget
(available: ['name', 'wid'])
Validation at decoration time
Four errors fire at module-import time:
TypeError: @pipeline ref must be '<domain>@<semver>', got 'badref'
TypeError: @pipeline entity_type must match /^[a-z]+$/, got 'Corp1'
TypeError: @pipeline requires exactly one of `url=` or `source=`
TypeError: @pipeline key='nonexistent' is not a field on E
Plus, if transform is missing:
TypeError: @pipeline class X must define `transform(cls, raw) -> cls` as a classmethod
These break the entry-point load for that module — the calling CLI command exits 1.
Registration
The pipeline is register()ed at decoration time. The CLI discovers pipelines via the esker.pipelines entry-point group:
[project.entry-points."esker.pipelines"]
sec_companies = "my_pipelines.sec_companies"
The entry point's value is a Python module path. When discover_pipelines() walks the metadata, it imports each module, which fires the decorator, which registers the pipeline.
After editing entry points, force a reinstall so the metadata refreshes:
uv pip install -e . --reinstall-package my-pipelines
The CLI commands that need pipelines (run, test, list, check, push, schema) call discover_pipelines(). Hub-only commands (view, manifest, pull, head, add, remove, sync, upgrade, whoami, login, search, transfer, visibility, config) don't.
When the decorator isn't enough
The url= shortcut hardcodes BulkJsonSource. If you need:
- pagination
- multiple endpoints
- a CSV source (use
source=BulkCsvSource-subclass instead) - header auth or stateful traversal
- to subclass
EskerModeldirectly (e.g. share fields between models)
…use the Three-class form. The decorator is sugar over exactly the same engine.
See also
- Records —
EskerModeldeep dive - Sources —
EskerSource,Fetched, the built-in source shapes - Three-class form — the explicit Source/Schema/Pipeline split
- Fixtures — testing your transform
- Publishing — pushing the result to the hub