# Three-class form

> The explicit Source / Schema / Pipeline split. The escape hatch when the decorator isn't enough.

The [`@pipeline` decorator](https://esker.so/docs/sdk/pipelines.md) is sugar over exactly the form on this page. It synthesizes three classes; here you write them yourself.

Reach for this form when:

- The source is paginated, multi-endpoint, or stateful.
- You want the source to use `fetch_cached` for per-id caching.
- You want to import `transform` independently from the synthetic pipeline (e.g. for unit tests that bypass the framework).
- You want to subclass `EskerModel` directly to share fields between models.

## Layout

A pipeline package, four files:

```
my_pipelines/us_treasury_yields/
├── __init__.py    re-exports TreasuryYieldCurvePipeline
├── source.py      TreasuryYieldCurveSource(EskerSource)
├── schema.py      TreasuryYieldCurve(EskerModel)
└── pipeline.py    TreasuryYieldCurvePipeline(EskerPipeline)  @register
```

Re-exporting in `__init__.py` is what makes the entry-point load fire `@register`. Without it, `pipeline.py` is never imported and the registration never happens.

## `source.py`

```python
from datetime import datetime, timezone
from io import StringIO
from typing import ClassVar, Iterator
from urllib.request import Request, urlopen
import csv

from esker import EskerSource, Fetched, config


class TreasuryYieldCurveSource(EskerSource):
    SOURCE_ID: ClassVar[str] = "us.treasury.yields"
    USER_AGENT: ClassVar[str] = "esker/0.1 (you@example.com)"
    YEARS: ClassVar[int] = 5
    URL_TEMPLATE: ClassVar[str] = (
        "https://home.treasury.gov/.../{year}/all?type=daily_treasury_yield_curve"
    )

    def fetch_all(self) -> Iterator[Fetched]:
        fetched_at = datetime.now(timezone.utc)
        end = datetime.now(timezone.utc).year
        for year in range(end - self.YEARS + 1, end + 1):
            url = self.URL_TEMPLATE.format(year=year)
            req = Request(url, headers={"User-Agent": self.USER_AGENT})
            with urlopen(req, timeout=config.http_timeout()) as resp:
                text = resp.read().decode("utf-8")
            for row in csv.DictReader(StringIO(text)):
                yield Fetched(raw=dict(row), source_url=url, fetched_at=fetched_at)
```

Five HTTP calls, all sharing one `fetched_at`, each producing many CSV rows. The lineage bundle ends up with five batches.

See [Sources](https://esker.so/docs/sdk/sources.md) for the full `EskerSource` surface (per-id `fetch`, `fetch_cached`, `fetch_shards` for parallelism).

## `schema.py`

```python
from datetime import date
from typing import ClassVar
from esker import EskerModel


class TreasuryYieldCurve(EskerModel):
    DOMAIN_ID: ClassVar[str] = "us.treasury.yields"
    schema_version: ClassVar[str] = "2.0.0"

    quote_date: date
    rate_1m: float | None = None
    rate_3m: float | None = None
    rate_6m: float | None = None
    rate_1y: float | None = None
    rate_2y: float | None = None
    rate_5y: float | None = None
    rate_10y: float | None = None
    rate_30y: float | None = None
```

Vanilla `EskerModel` subclass. ClassVars + Pydantic fields. See [Records](https://esker.so/docs/sdk/records.md) for what's allowed.

## `pipeline.py`

```python
from datetime import datetime
from typing import ClassVar

from esker import EskerPipeline, register
from .schema import TreasuryYieldCurve
from .source import TreasuryYieldCurveSource


@register
class TreasuryYieldCurvePipeline(EskerPipeline):
    DOMAIN_ID: ClassVar[str] = "us.treasury.yields"
    PIPELINE_VERSION: ClassVar[str] = "2.0.0"
    SOURCE = TreasuryYieldCurveSource
    SCHEMA = TreasuryYieldCurve
    _ENTITY_TYPE: ClassVar[str] = "curve"
    _KEY: ClassVar[str] = "quote_date"
    _SOURCE_URL_TEMPLATE: ClassVar[str | None] = None
    cadence: ClassVar[str] = "daily"

    def transform(self, raw: dict) -> TreasuryYieldCurve:
        return TreasuryYieldCurve(
            quote_date=datetime.strptime(raw["Date"], "%m/%d/%Y").date(),
            rate_1m=_parse(raw.get("1 Mo")),
            rate_3m=_parse(raw.get("3 Mo")),
            rate_6m=_parse(raw.get("6 Mo")),
            rate_1y=_parse(raw.get("1 Yr")),
            rate_2y=_parse(raw.get("2 Yr")),
            rate_5y=_parse(raw.get("5 Yr")),
            rate_10y=_parse(raw.get("10 Yr")),
            rate_30y=_parse(raw.get("30 Yr")),
        )


def _parse(value: str | None) -> float | None:
    if value is None or not value.strip():
        return None
    return float(value)
```

Note: `transform` is an **instance method** (`self`) here, not a classmethod. The decorator path wraps your classmethod into an instance method; in this form you write the method directly.

## Required `EskerPipeline` ClassVars

Six required:

| ClassVar           | what                                                    |
| ------------------ | ------------------------------------------------------- |
| `DOMAIN_ID`        | dataset domain (must match `^[a-z0-9]+(\.[a-z0-9]+)+$`) |
| `PIPELINE_VERSION` | semver of the transform code                            |
| `SOURCE`           | `EskerSource` subclass                                  |
| `SCHEMA`           | `EskerModel` subclass                                   |
| `_ENTITY_TYPE`     | for `esker_id` synthesis (`^[a-z]+$`)                   |
| `_KEY`             | the field on `SCHEMA` holding the native id             |

Optional:

| ClassVar               | default  | what                                                              |
| ---------------------- | -------- | ----------------------------------------------------------------- |
| `_SOURCE_URL_TEMPLATE` | `None`   | per-record URL template (`.format(**fields)` substituted)         |
| `cadence`              | unset    | shown in `esker list`; metadata only                              |
| `ROW_GROUP_SIZE`       | `10_000` | parquet row group size — lower for tests, higher for big datasets |

Missing required ClassVar → `TypeError("X must declare a Y class variable")` at class creation time.

## `__init__.py`

```python
from my_pipelines.us_treasury_yields.pipeline import TreasuryYieldCurvePipeline

__all__ = ["TreasuryYieldCurvePipeline"]
```

The re-export is what makes the entry-point load actually fire `@register`. Importing the package fires `pipeline.py`, which fires the decorator.

Then in `pyproject.toml`:

```toml
[project.entry-points."esker.pipelines"]
treasury_yields = "my_pipelines.us_treasury_yields"
```

## Mixing forms

You can use the decorator for some pipelines and the three-class form for others within the same project. They register through the same entry-point group.

The fixture directory convention differs:

- Decorator path (one `.py` file): `<pipeline_file>_fixtures/`.
- Three-class form (a package): `<pipeline_package>/fixtures/`.

See [Fixtures](https://esker.so/docs/sdk/fixtures.md) for layouts.

## See also

- [Pipelines](https://esker.so/docs/sdk/pipelines.md) — the decorator the three-class form expands to
- [Sources](https://esker.so/docs/sdk/sources.md) — `EskerSource`, `Fetched`, `fetch_cached`, sharded fetches
- [Records](https://esker.so/docs/sdk/records.md) — `EskerModel` deep dive
- [Fixtures](https://esker.so/docs/sdk/fixtures.md) — package vs single-file fixture layouts
