Three-class form

The explicit Source / Schema / Pipeline split. The escape hatch when the decorator isn't enough.

The @pipeline decorator is sugar over exactly the form on this page. It synthesizes three classes; here you write them yourself.

Reach for this form when:

  • The source is paginated, multi-endpoint, or stateful.
  • You want the source to use fetch_cached for per-id caching.
  • You want to import transform independently from the synthetic pipeline (e.g. for unit tests that bypass the framework).
  • You want to subclass EskerModel directly to share fields between models.

Layout

A pipeline package, four files:

my_pipelines/us_treasury_yields/
├── __init__.py    re-exports TreasuryYieldCurvePipeline
├── source.py      TreasuryYieldCurveSource(EskerSource)
├── schema.py      TreasuryYieldCurve(EskerModel)
└── pipeline.py    TreasuryYieldCurvePipeline(EskerPipeline)  @register

Re-exporting in __init__.py is what makes the entry-point load fire @register. Without it, pipeline.py is never imported and the registration never happens.

source.py

from datetime import datetime, timezone
from io import StringIO
from typing import ClassVar, Iterator
from urllib.request import Request, urlopen
import csv

from esker import EskerSource, Fetched, config


class TreasuryYieldCurveSource(EskerSource):
    SOURCE_ID: ClassVar[str] = "us.treasury.yields"
    USER_AGENT: ClassVar[str] = "esker/0.1 (you@example.com)"
    YEARS: ClassVar[int] = 5
    URL_TEMPLATE: ClassVar[str] = (
        "https://home.treasury.gov/.../{year}/all?type=daily_treasury_yield_curve"
    )

    def fetch_all(self) -> Iterator[Fetched]:
        fetched_at = datetime.now(timezone.utc)
        end = datetime.now(timezone.utc).year
        for year in range(end - self.YEARS + 1, end + 1):
            url = self.URL_TEMPLATE.format(year=year)
            req = Request(url, headers={"User-Agent": self.USER_AGENT})
            with urlopen(req, timeout=config.http_timeout()) as resp:
                text = resp.read().decode("utf-8")
            for row in csv.DictReader(StringIO(text)):
                yield Fetched(raw=dict(row), source_url=url, fetched_at=fetched_at)

Five HTTP calls, all sharing one fetched_at, each producing many CSV rows. The lineage bundle ends up with five batches.

See Sources for the full EskerSource surface (per-id fetch, fetch_cached, fetch_shards for parallelism).

schema.py

from datetime import date
from typing import ClassVar
from esker import EskerModel


class TreasuryYieldCurve(EskerModel):
    DOMAIN_ID: ClassVar[str] = "us.treasury.yields"
    schema_version: ClassVar[str] = "2.0.0"

    quote_date: date
    rate_1m: float | None = None
    rate_3m: float | None = None
    rate_6m: float | None = None
    rate_1y: float | None = None
    rate_2y: float | None = None
    rate_5y: float | None = None
    rate_10y: float | None = None
    rate_30y: float | None = None

Vanilla EskerModel subclass. ClassVars + Pydantic fields. See Records for what's allowed.

pipeline.py

from datetime import datetime
from typing import ClassVar

from esker import EskerPipeline, register
from .schema import TreasuryYieldCurve
from .source import TreasuryYieldCurveSource


@register
class TreasuryYieldCurvePipeline(EskerPipeline):
    DOMAIN_ID: ClassVar[str] = "us.treasury.yields"
    PIPELINE_VERSION: ClassVar[str] = "2.0.0"
    SOURCE = TreasuryYieldCurveSource
    SCHEMA = TreasuryYieldCurve
    _ENTITY_TYPE: ClassVar[str] = "curve"
    _KEY: ClassVar[str] = "quote_date"
    _SOURCE_URL_TEMPLATE: ClassVar[str | None] = None
    cadence: ClassVar[str] = "daily"

    def transform(self, raw: dict) -> TreasuryYieldCurve:
        return TreasuryYieldCurve(
            quote_date=datetime.strptime(raw["Date"], "%m/%d/%Y").date(),
            rate_1m=_parse(raw.get("1 Mo")),
            rate_3m=_parse(raw.get("3 Mo")),
            rate_6m=_parse(raw.get("6 Mo")),
            rate_1y=_parse(raw.get("1 Yr")),
            rate_2y=_parse(raw.get("2 Yr")),
            rate_5y=_parse(raw.get("5 Yr")),
            rate_10y=_parse(raw.get("10 Yr")),
            rate_30y=_parse(raw.get("30 Yr")),
        )


def _parse(value: str | None) -> float | None:
    if value is None or not value.strip():
        return None
    return float(value)

Note: transform is an instance method (self) here, not a classmethod. The decorator path wraps your classmethod into an instance method; in this form you write the method directly.

Required EskerPipeline ClassVars

Six required:

ClassVar what
DOMAIN_ID dataset domain (must match ^[a-z0-9]+(\.[a-z0-9]+)+$)
PIPELINE_VERSION semver of the transform code
SOURCE EskerSource subclass
SCHEMA EskerModel subclass
_ENTITY_TYPE for esker_id synthesis (^[a-z]+$)
_KEY the field on SCHEMA holding the native id

Optional:

ClassVar default what
_SOURCE_URL_TEMPLATE None per-record URL template (.format(**fields) substituted)
cadence unset shown in esker list; metadata only
ROW_GROUP_SIZE 10_000 parquet row group size — lower for tests, higher for big datasets

Missing required ClassVar → TypeError("X must declare a Y class variable") at class creation time.

__init__.py

from my_pipelines.us_treasury_yields.pipeline import TreasuryYieldCurvePipeline

__all__ = ["TreasuryYieldCurvePipeline"]

The re-export is what makes the entry-point load actually fire @register. Importing the package fires pipeline.py, which fires the decorator.

Then in pyproject.toml:

[project.entry-points."esker.pipelines"]
treasury_yields = "my_pipelines.us_treasury_yields"

Mixing forms

You can use the decorator for some pipelines and the three-class form for others within the same project. They register through the same entry-point group.

The fixture directory convention differs:

  • Decorator path (one .py file): <pipeline_file>_fixtures/.
  • Three-class form (a package): <pipeline_package>/fixtures/.

See Fixtures for layouts.

See also

  • Pipelines — the decorator the three-class form expands to
  • SourcesEskerSource, Fetched, fetch_cached, sharded fetches
  • RecordsEskerModel deep dive
  • Fixtures — package vs single-file fixture layouts