Sources

Fetching raw data. The two built-in source shapes, when to write your own, and how parallel fetches work.

A source produces raw input. The pipeline transforms each raw row into one record. Two built-in source classes cover the most common shapes; for anything else, you subclass EskerSource directly.

If you're using the @pipeline decorator with url=, you're already using BulkJsonSource under the hood — read this page when that stops being enough.

One JSON endpoint

Most public datasets are a single JSON file. The @pipeline decorator handles this case directly:

@pipeline(
    "us.sec.companies@1.0.0",
    url="https://www.sec.gov/files/company_tickers.json",
    entity_type="corp",
    key="cik",
)
class SecCompany:
    ...

This synthesizes a BulkJsonSource subclass with URL set. It accepts two payload shapes:

  • Top-level dict ({"0": {...}, "1": {...}}) — iterates .values(). The SEC ticker file shape.
  • Top-level array ([{...}, {...}]) — iterates directly. The SpaceX rockets shape.

All rows from a single endpoint share one fetched_at timestamp and one source_url, so they're recorded as a single lineage batch.

If you need to use BulkJsonSource outside the decorator (e.g. in the three-class form):

from typing import ClassVar
from esker import BulkJsonSource


class SecTickersSource(BulkJsonSource):
    URL: ClassVar[str] = "https://www.sec.gov/files/company_tickers.json"
    SOURCE_ID: ClassVar[str] = "us.sec.tickers"
    USER_AGENT: ClassVar[str] = "esker/0.1 (you@example.com)"

Always set SOURCE_ID explicitly — the default "bulk-json" would land on your manifest meaninglessly.

One CSV endpoint

class TreasuryYieldsSource(BulkCsvSource):
    URL: ClassVar[str] = "https://example.gov/yields.csv"
    SOURCE_ID: ClassVar[str] = "us.treasury.yields"
    ENCODING: ClassVar[str] = "utf-8"
    DELIMITER: ClassVar[str] = ","

BulkCsvSource mirrors BulkJsonSource but uses csv.DictReader. The header row becomes keys; each subsequent row is a dict[str, str]. All cell values are raw strings — empty cells ("") round-trip distinctly from missing keys, and your transform does the coercion.

The decorator's url= shortcut is JSON-only. To use BulkCsvSource through the decorator, pass source=YourCsvSourceSubclass explicitly:

@pipeline("...", source=TreasuryYieldsSource, entity_type="...", key="...")
class TreasuryYieldCurve:
    ...

Multi-endpoint or paginated sources

When the source spans multiple URLs, write your own EskerSource subclass. The contract is small — one required ClassVar, one required method:

from datetime import datetime, timezone
from io import StringIO
from typing import ClassVar, Iterator
from urllib.request import Request, urlopen
import csv

from esker import EskerSource, Fetched, config


class TreasuryYieldCurveSource(EskerSource):
    SOURCE_ID: ClassVar[str] = "us.treasury.yields"
    USER_AGENT: ClassVar[str] = "esker/0.1 (you@example.com)"

    def fetch_all(self) -> Iterator[Fetched]:
        fetched_at = datetime.now(timezone.utc)
        for year in range(2020, 2026):
            url = f"https://home.treasury.gov/.../{year}/all"
            req = Request(url, headers={"User-Agent": self.USER_AGENT})
            with urlopen(req, timeout=config.http_timeout()) as resp:
                text = resp.read().decode("utf-8")
            for row in csv.DictReader(StringIO(text)):
                yield Fetched(raw=dict(row), source_url=url, fetched_at=fetched_at)

Yield one Fetched envelope per raw record, even when many records share a source_url. The pipeline groups them into lineage batches automatically.

Per-record fetches with caching

When the source is a per-id API (one HTTP call per record), use fetch_cached to avoid re-hitting the upstream:

class CikDetailSource(EskerSource):
    SOURCE_ID: ClassVar[str] = "us.sec.cik_detail"

    def fetch(self, id: str) -> Fetched:
        url = f"https://data.sec.gov/submissions/CIK{id}.json"
        req = Request(url, headers={"User-Agent": "esker/0.1"})
        with urlopen(req, timeout=config.http_timeout()) as resp:
            raw = json.loads(resp.read())
        return Fetched(raw=raw, source_url=url, fetched_at=datetime.now(timezone.utc))

    def fetch_all(self) -> Iterator[Fetched]:
        for id in self._known_ids():
            yield self.fetch_cached(id)

fetch_cached(id) round-trips through ~/.esker/cache/<SOURCE_ID>/<safe_id>.json (or wherever ESKER_CACHE_DIR points). The cache stores the full envelope.

The cache key escapes / and : to _. Other unsafe filesystem chars aren't handled — fine for typical IDs (numeric, hex, slugs).

There's no bulk-cache primitive. BulkJsonSource and BulkCsvSource re-fetch on every run. If your bulk source is large, consider whether the cost is acceptable; if not, write a per-id source and use fetch_cached.

Parallel fetches

For sources with N independent endpoints, opt into parallelism by overriding fetch_shards:

from typing import Callable

class TreasuryYieldsSource(EskerSource):
    SOURCE_ID: ClassVar[str] = "us.treasury.yields"
    MAX_PARALLEL_SHARDS: ClassVar[int] = 5

    def fetch_shards(self) -> list[Callable[[], Iterator[Fetched]]]:
        return [lambda y=y: self._fetch_year(y) for y in range(2020, 2026)]

    def fetch_all(self) -> Iterator[Fetched]:
        for shard in self.fetch_shards():
            yield from shard()

Each shard runs on its own thread; the pipeline runner caps concurrency at MAX_PARALLEL_SHARDS. Only the network fetches parallelize — the transform and parquet write stay single-threaded.

MAX_PARALLEL_SHARDS = 1 is the default. Sources opt in by overriding it. The cap lives on the source (not the runner) because rate-limit budgets belong to the source contract.

Default fetch_shards() returns None, in which case fetch_all() is the source of truth and the run stays sequential.

The Fetched envelope

Every source yields these:

@dataclass(frozen=True)
class Fetched:
    raw: dict        # whatever the source produces; opaque to the framework
    source_url: str  # the URL this row came from
    fetched_at: datetime  # when the source was hit

The pipeline reads (raw, source_url, fetched_at) from each envelope and:

  • Hands raw to your transform.
  • Builds the esker_source_url column from source_url (or your _SOURCE_URL_TEMPLATE).
  • Groups envelopes with the same (source_url, fetched_at) into one lineage batch.

The contract is intentionally narrow. Anything richer — schema, freshness, deltas — is the pipeline's responsibility to derive.

HTTP layer

Esker uses stdlib urllib — no requests, no httpx. The bar for new dependencies is high, and urllib is free.

What this means for you:

  • No retries, no backoff, no rate limiting. Author your own if you need them.
  • Set timeouts via config.http_timeout() (defaults to 60s, override with ESKER_HTTP_TIMEOUT).
  • A wedged origin trips the timeout instead of hanging forever.
  • Default User-Agent is esker/0.1. Override per-source via the USER_AGENT ClassVar.

For sources that need richer HTTP behavior (auth headers, session cookies, retry policies), build it yourself in your fetch_all. Esker doesn't get in the way.

Choosing a shape

your source use
One JSON endpoint, dict-of-records or list-of-records BulkJsonSource (or @pipeline(url=...))
One CSV endpoint with a header row BulkCsvSource
Per-id fetches EskerSource directly + fetch_cached
Multiple endpoints, paginated, header-auth, or stateful EskerSource directly

When in doubt, start with one of the bulk shapes and graduate when you hit a wall.

See also