# Sources

> Fetching raw data. The two built-in source shapes, when to write your own, and how parallel fetches work.

A source produces raw input. The pipeline transforms each raw row into one record. Two built-in source classes cover the most common shapes; for anything else, you subclass `EskerSource` directly.

If you're using the [`@pipeline` decorator](https://esker.so/docs/sdk/pipelines.md) with `url=`, you're already using `BulkJsonSource` under the hood — read this page when that stops being enough.

## One JSON endpoint

Most public datasets are a single JSON file. The `@pipeline` decorator handles this case directly:

```python
@pipeline(
    "us.sec.companies@1.0.0",
    url="https://www.sec.gov/files/company_tickers.json",
    entity_type="corp",
    key="cik",
)
class SecCompany:
    ...
```

This synthesizes a `BulkJsonSource` subclass with `URL` set. It accepts two payload shapes:

- **Top-level dict** (`{"0": {...}, "1": {...}}`) — iterates `.values()`. The SEC ticker file shape.
- **Top-level array** (`[{...}, {...}]`) — iterates directly. The SpaceX rockets shape.

All rows from a single endpoint share one `fetched_at` timestamp and one `source_url`, so they're recorded as a single [lineage batch](https://esker.so/docs/protocol/lineage.md).

If you need to use `BulkJsonSource` outside the decorator (e.g. in the [three-class form](https://esker.so/docs/sdk/three-class-form.md)):

```python
from typing import ClassVar
from esker import BulkJsonSource


class SecTickersSource(BulkJsonSource):
    URL: ClassVar[str] = "https://www.sec.gov/files/company_tickers.json"
    SOURCE_ID: ClassVar[str] = "us.sec.tickers"
    USER_AGENT: ClassVar[str] = "esker/0.1 (you@example.com)"
```

Always set `SOURCE_ID` explicitly — the default `"bulk-json"` would land on your manifest meaninglessly.

## One CSV endpoint

```python
class TreasuryYieldsSource(BulkCsvSource):
    URL: ClassVar[str] = "https://example.gov/yields.csv"
    SOURCE_ID: ClassVar[str] = "us.treasury.yields"
    ENCODING: ClassVar[str] = "utf-8"
    DELIMITER: ClassVar[str] = ","
```

`BulkCsvSource` mirrors `BulkJsonSource` but uses `csv.DictReader`. The header row becomes keys; each subsequent row is a `dict[str, str]`. **All cell values are raw strings** — empty cells (`""`) round-trip distinctly from missing keys, and your `transform` does the coercion.

The decorator's `url=` shortcut is JSON-only. To use `BulkCsvSource` through the decorator, pass `source=YourCsvSourceSubclass` explicitly:

```python
@pipeline("...", source=TreasuryYieldsSource, entity_type="...", key="...")
class TreasuryYieldCurve:
    ...
```

## Multi-endpoint or paginated sources

When the source spans multiple URLs, write your own `EskerSource` subclass. The contract is small — one required ClassVar, one required method:

```python
from datetime import datetime, timezone
from io import StringIO
from typing import ClassVar, Iterator
from urllib.request import Request, urlopen
import csv

from esker import EskerSource, Fetched, config


class TreasuryYieldCurveSource(EskerSource):
    SOURCE_ID: ClassVar[str] = "us.treasury.yields"
    USER_AGENT: ClassVar[str] = "esker/0.1 (you@example.com)"

    def fetch_all(self) -> Iterator[Fetched]:
        fetched_at = datetime.now(timezone.utc)
        for year in range(2020, 2026):
            url = f"https://home.treasury.gov/.../{year}/all"
            req = Request(url, headers={"User-Agent": self.USER_AGENT})
            with urlopen(req, timeout=config.http_timeout()) as resp:
                text = resp.read().decode("utf-8")
            for row in csv.DictReader(StringIO(text)):
                yield Fetched(raw=dict(row), source_url=url, fetched_at=fetched_at)
```

Yield one `Fetched` envelope per raw record, even when many records share a `source_url`. The pipeline groups them into [lineage batches](https://esker.so/docs/protocol/lineage.md) automatically.

## Per-record fetches with caching

When the source is a per-id API (one HTTP call per record), use `fetch_cached` to avoid re-hitting the upstream:

```python
class CikDetailSource(EskerSource):
    SOURCE_ID: ClassVar[str] = "us.sec.cik_detail"

    def fetch(self, id: str) -> Fetched:
        url = f"https://data.sec.gov/submissions/CIK{id}.json"
        req = Request(url, headers={"User-Agent": "esker/0.1"})
        with urlopen(req, timeout=config.http_timeout()) as resp:
            raw = json.loads(resp.read())
        return Fetched(raw=raw, source_url=url, fetched_at=datetime.now(timezone.utc))

    def fetch_all(self) -> Iterator[Fetched]:
        for id in self._known_ids():
            yield self.fetch_cached(id)
```

`fetch_cached(id)` round-trips through `~/.esker/cache/<SOURCE_ID>/<safe_id>.json` (or wherever `ESKER_CACHE_DIR` points). The cache stores the full envelope.

The cache key escapes `/` and `:` to `_`. Other unsafe filesystem chars aren't handled — fine for typical IDs (numeric, hex, slugs).

There's no bulk-cache primitive. `BulkJsonSource` and `BulkCsvSource` re-fetch on every run. If your bulk source is large, consider whether the cost is acceptable; if not, write a per-id source and use `fetch_cached`.

## Parallel fetches

For sources with N independent endpoints, opt into parallelism by overriding `fetch_shards`:

```python
from typing import Callable

class TreasuryYieldsSource(EskerSource):
    SOURCE_ID: ClassVar[str] = "us.treasury.yields"
    MAX_PARALLEL_SHARDS: ClassVar[int] = 5

    def fetch_shards(self) -> list[Callable[[], Iterator[Fetched]]]:
        return [lambda y=y: self._fetch_year(y) for y in range(2020, 2026)]

    def fetch_all(self) -> Iterator[Fetched]:
        for shard in self.fetch_shards():
            yield from shard()
```

Each shard runs on its own thread; the pipeline runner caps concurrency at `MAX_PARALLEL_SHARDS`. Only the network fetches parallelize — the transform and parquet write stay single-threaded.

`MAX_PARALLEL_SHARDS = 1` is the default. Sources opt in by overriding it. The cap lives on the source (not the runner) because rate-limit budgets belong to the source contract.

Default `fetch_shards()` returns `None`, in which case `fetch_all()` is the source of truth and the run stays sequential.

## The `Fetched` envelope

Every source yields these:

```python
@dataclass(frozen=True)
class Fetched:
    raw: dict        # whatever the source produces; opaque to the framework
    source_url: str  # the URL this row came from
    fetched_at: datetime  # when the source was hit
```

The pipeline reads `(raw, source_url, fetched_at)` from each envelope and:

- Hands `raw` to your `transform`.
- Builds the [`esker_source_url`](https://esker.so/docs/sdk/records.md#injected-fields) column from `source_url` (or your `_SOURCE_URL_TEMPLATE`).
- Groups envelopes with the same `(source_url, fetched_at)` into one lineage batch.

The contract is intentionally narrow. Anything richer — schema, freshness, deltas — is the pipeline's responsibility to derive.

## HTTP layer

Esker uses **stdlib `urllib`** — no `requests`, no `httpx`. The bar for new dependencies is high, and `urllib` is free.

What this means for you:

- No retries, no backoff, no rate limiting. Author your own if you need them.
- Set timeouts via `config.http_timeout()` (defaults to 60s, override with `ESKER_HTTP_TIMEOUT`).
- A wedged origin trips the timeout instead of hanging forever.
- Default `User-Agent` is `esker/0.1`. Override per-source via the `USER_AGENT` ClassVar.

For sources that need richer HTTP behavior (auth headers, session cookies, retry policies), build it yourself in your `fetch_all`. Esker doesn't get in the way.

## Choosing a shape

| your source                                             | use                                        |
| ------------------------------------------------------- | ------------------------------------------ |
| One JSON endpoint, dict-of-records or list-of-records   | `BulkJsonSource` (or `@pipeline(url=...)`) |
| One CSV endpoint with a header row                      | `BulkCsvSource`                            |
| Per-id fetches                                          | `EskerSource` directly + `fetch_cached`    |
| Multiple endpoints, paginated, header-auth, or stateful | `EskerSource` directly                     |

When in doubt, start with one of the bulk shapes and graduate when you hit a wall.

## See also

- [Pipelines](https://esker.so/docs/sdk/pipelines.md) — the `@pipeline` decorator
- [Three-class form](https://esker.so/docs/sdk/three-class-form.md) — full source/schema/pipeline split
- [Lineage](https://esker.so/docs/protocol/lineage.md) — how `Fetched` becomes batches
- [Caching](https://esker.so/docs/sdk/caching.md) — `ESKER_CACHE_DIR` and friends
