Three-class form
The explicit Source / Schema / Pipeline split. The escape hatch when the decorator isn't enough.
The @pipeline decorator is sugar over exactly the form on this page. It synthesizes three classes; here you write them yourself.
Reach for this form when:
- The source is paginated, multi-endpoint, or stateful.
- You want the source to use
fetch_cachedfor per-id caching. - You want to import
transformindependently from the synthetic pipeline (e.g. for unit tests that bypass the framework). - You want to subclass
EskerModeldirectly to share fields between models.
Layout
A pipeline package, four files:
my_pipelines/us_treasury_yields/
├── __init__.py re-exports TreasuryYieldCurvePipeline
├── source.py TreasuryYieldCurveSource(EskerSource)
├── schema.py TreasuryYieldCurve(EskerModel)
└── pipeline.py TreasuryYieldCurvePipeline(EskerPipeline) @register
Re-exporting in __init__.py is what makes the entry-point load fire @register. Without it, pipeline.py is never imported and the registration never happens.
source.py
from datetime import datetime, timezone
from io import StringIO
from typing import ClassVar, Iterator
from urllib.request import Request, urlopen
import csv
from esker import EskerSource, Fetched, config
class TreasuryYieldCurveSource(EskerSource):
SOURCE_ID: ClassVar[str] = "us.treasury.yields"
USER_AGENT: ClassVar[str] = "esker/0.1 (you@example.com)"
YEARS: ClassVar[int] = 5
URL_TEMPLATE: ClassVar[str] = (
"https://home.treasury.gov/.../{year}/all?type=daily_treasury_yield_curve"
)
def fetch_all(self) -> Iterator[Fetched]:
fetched_at = datetime.now(timezone.utc)
end = datetime.now(timezone.utc).year
for year in range(end - self.YEARS + 1, end + 1):
url = self.URL_TEMPLATE.format(year=year)
req = Request(url, headers={"User-Agent": self.USER_AGENT})
with urlopen(req, timeout=config.http_timeout()) as resp:
text = resp.read().decode("utf-8")
for row in csv.DictReader(StringIO(text)):
yield Fetched(raw=dict(row), source_url=url, fetched_at=fetched_at)
Five HTTP calls, all sharing one fetched_at, each producing many CSV rows. The lineage bundle ends up with five batches.
See Sources for the full EskerSource surface (per-id fetch, fetch_cached, fetch_shards for parallelism).
schema.py
from datetime import date
from typing import ClassVar
from esker import EskerModel
class TreasuryYieldCurve(EskerModel):
DOMAIN_ID: ClassVar[str] = "us.treasury.yields"
schema_version: ClassVar[str] = "2.0.0"
quote_date: date
rate_1m: float | None = None
rate_3m: float | None = None
rate_6m: float | None = None
rate_1y: float | None = None
rate_2y: float | None = None
rate_5y: float | None = None
rate_10y: float | None = None
rate_30y: float | None = None
Vanilla EskerModel subclass. ClassVars + Pydantic fields. See Records for what's allowed.
pipeline.py
from datetime import datetime
from typing import ClassVar
from esker import EskerPipeline, register
from .schema import TreasuryYieldCurve
from .source import TreasuryYieldCurveSource
@register
class TreasuryYieldCurvePipeline(EskerPipeline):
DOMAIN_ID: ClassVar[str] = "us.treasury.yields"
PIPELINE_VERSION: ClassVar[str] = "2.0.0"
SOURCE = TreasuryYieldCurveSource
SCHEMA = TreasuryYieldCurve
_ENTITY_TYPE: ClassVar[str] = "curve"
_KEY: ClassVar[str] = "quote_date"
_SOURCE_URL_TEMPLATE: ClassVar[str | None] = None
cadence: ClassVar[str] = "daily"
def transform(self, raw: dict) -> TreasuryYieldCurve:
return TreasuryYieldCurve(
quote_date=datetime.strptime(raw["Date"], "%m/%d/%Y").date(),
rate_1m=_parse(raw.get("1 Mo")),
rate_3m=_parse(raw.get("3 Mo")),
rate_6m=_parse(raw.get("6 Mo")),
rate_1y=_parse(raw.get("1 Yr")),
rate_2y=_parse(raw.get("2 Yr")),
rate_5y=_parse(raw.get("5 Yr")),
rate_10y=_parse(raw.get("10 Yr")),
rate_30y=_parse(raw.get("30 Yr")),
)
def _parse(value: str | None) -> float | None:
if value is None or not value.strip():
return None
return float(value)
Note: transform is an instance method (self) here, not a classmethod. The decorator path wraps your classmethod into an instance method; in this form you write the method directly.
Required EskerPipeline ClassVars
Six required:
| ClassVar | what |
|---|---|
DOMAIN_ID |
dataset domain (must match ^[a-z0-9]+(\.[a-z0-9]+)+$) |
PIPELINE_VERSION |
semver of the transform code |
SOURCE |
EskerSource subclass |
SCHEMA |
EskerModel subclass |
_ENTITY_TYPE |
for esker_id synthesis (^[a-z]+$) |
_KEY |
the field on SCHEMA holding the native id |
Optional:
| ClassVar | default | what |
|---|---|---|
_SOURCE_URL_TEMPLATE |
None |
per-record URL template (.format(**fields) substituted) |
cadence |
unset | shown in esker list; metadata only |
ROW_GROUP_SIZE |
10_000 |
parquet row group size — lower for tests, higher for big datasets |
Missing required ClassVar → TypeError("X must declare a Y class variable") at class creation time.
__init__.py
from my_pipelines.us_treasury_yields.pipeline import TreasuryYieldCurvePipeline
__all__ = ["TreasuryYieldCurvePipeline"]
The re-export is what makes the entry-point load actually fire @register. Importing the package fires pipeline.py, which fires the decorator.
Then in pyproject.toml:
[project.entry-points."esker.pipelines"]
treasury_yields = "my_pipelines.us_treasury_yields"
Mixing forms
You can use the decorator for some pipelines and the three-class form for others within the same project. They register through the same entry-point group.
The fixture directory convention differs:
- Decorator path (one
.pyfile):<pipeline_file>_fixtures/. - Three-class form (a package):
<pipeline_package>/fixtures/.
See Fixtures for layouts.