Sources
Fetching raw data. The two built-in source shapes, when to write your own, and how parallel fetches work.
A source produces raw input. The pipeline transforms each raw row into one record. Two built-in source classes cover the most common shapes; for anything else, you subclass EskerSource directly.
If you're using the @pipeline decorator with url=, you're already using BulkJsonSource under the hood — read this page when that stops being enough.
One JSON endpoint
Most public datasets are a single JSON file. The @pipeline decorator handles this case directly:
@pipeline(
"us.sec.companies@1.0.0",
url="https://www.sec.gov/files/company_tickers.json",
entity_type="corp",
key="cik",
)
class SecCompany:
...
This synthesizes a BulkJsonSource subclass with URL set. It accepts two payload shapes:
- Top-level dict (
{"0": {...}, "1": {...}}) — iterates.values(). The SEC ticker file shape. - Top-level array (
[{...}, {...}]) — iterates directly. The SpaceX rockets shape.
All rows from a single endpoint share one fetched_at timestamp and one source_url, so they're recorded as a single lineage batch.
If you need to use BulkJsonSource outside the decorator (e.g. in the three-class form):
from typing import ClassVar
from esker import BulkJsonSource
class SecTickersSource(BulkJsonSource):
URL: ClassVar[str] = "https://www.sec.gov/files/company_tickers.json"
SOURCE_ID: ClassVar[str] = "us.sec.tickers"
USER_AGENT: ClassVar[str] = "esker/0.1 (you@example.com)"
Always set SOURCE_ID explicitly — the default "bulk-json" would land on your manifest meaninglessly.
One CSV endpoint
class TreasuryYieldsSource(BulkCsvSource):
URL: ClassVar[str] = "https://example.gov/yields.csv"
SOURCE_ID: ClassVar[str] = "us.treasury.yields"
ENCODING: ClassVar[str] = "utf-8"
DELIMITER: ClassVar[str] = ","
BulkCsvSource mirrors BulkJsonSource but uses csv.DictReader. The header row becomes keys; each subsequent row is a dict[str, str]. All cell values are raw strings — empty cells ("") round-trip distinctly from missing keys, and your transform does the coercion.
The decorator's url= shortcut is JSON-only. To use BulkCsvSource through the decorator, pass source=YourCsvSourceSubclass explicitly:
@pipeline("...", source=TreasuryYieldsSource, entity_type="...", key="...")
class TreasuryYieldCurve:
...
Multi-endpoint or paginated sources
When the source spans multiple URLs, write your own EskerSource subclass. The contract is small — one required ClassVar, one required method:
from datetime import datetime, timezone
from io import StringIO
from typing import ClassVar, Iterator
from urllib.request import Request, urlopen
import csv
from esker import EskerSource, Fetched, config
class TreasuryYieldCurveSource(EskerSource):
SOURCE_ID: ClassVar[str] = "us.treasury.yields"
USER_AGENT: ClassVar[str] = "esker/0.1 (you@example.com)"
def fetch_all(self) -> Iterator[Fetched]:
fetched_at = datetime.now(timezone.utc)
for year in range(2020, 2026):
url = f"https://home.treasury.gov/.../{year}/all"
req = Request(url, headers={"User-Agent": self.USER_AGENT})
with urlopen(req, timeout=config.http_timeout()) as resp:
text = resp.read().decode("utf-8")
for row in csv.DictReader(StringIO(text)):
yield Fetched(raw=dict(row), source_url=url, fetched_at=fetched_at)
Yield one Fetched envelope per raw record, even when many records share a source_url. The pipeline groups them into lineage batches automatically.
Per-record fetches with caching
When the source is a per-id API (one HTTP call per record), use fetch_cached to avoid re-hitting the upstream:
class CikDetailSource(EskerSource):
SOURCE_ID: ClassVar[str] = "us.sec.cik_detail"
def fetch(self, id: str) -> Fetched:
url = f"https://data.sec.gov/submissions/CIK{id}.json"
req = Request(url, headers={"User-Agent": "esker/0.1"})
with urlopen(req, timeout=config.http_timeout()) as resp:
raw = json.loads(resp.read())
return Fetched(raw=raw, source_url=url, fetched_at=datetime.now(timezone.utc))
def fetch_all(self) -> Iterator[Fetched]:
for id in self._known_ids():
yield self.fetch_cached(id)
fetch_cached(id) round-trips through ~/.esker/cache/<SOURCE_ID>/<safe_id>.json (or wherever ESKER_CACHE_DIR points). The cache stores the full envelope.
The cache key escapes / and : to _. Other unsafe filesystem chars aren't handled — fine for typical IDs (numeric, hex, slugs).
There's no bulk-cache primitive. BulkJsonSource and BulkCsvSource re-fetch on every run. If your bulk source is large, consider whether the cost is acceptable; if not, write a per-id source and use fetch_cached.
Parallel fetches
For sources with N independent endpoints, opt into parallelism by overriding fetch_shards:
from typing import Callable
class TreasuryYieldsSource(EskerSource):
SOURCE_ID: ClassVar[str] = "us.treasury.yields"
MAX_PARALLEL_SHARDS: ClassVar[int] = 5
def fetch_shards(self) -> list[Callable[[], Iterator[Fetched]]]:
return [lambda y=y: self._fetch_year(y) for y in range(2020, 2026)]
def fetch_all(self) -> Iterator[Fetched]:
for shard in self.fetch_shards():
yield from shard()
Each shard runs on its own thread; the pipeline runner caps concurrency at MAX_PARALLEL_SHARDS. Only the network fetches parallelize — the transform and parquet write stay single-threaded.
MAX_PARALLEL_SHARDS = 1 is the default. Sources opt in by overriding it. The cap lives on the source (not the runner) because rate-limit budgets belong to the source contract.
Default fetch_shards() returns None, in which case fetch_all() is the source of truth and the run stays sequential.
The Fetched envelope
Every source yields these:
@dataclass(frozen=True)
class Fetched:
raw: dict # whatever the source produces; opaque to the framework
source_url: str # the URL this row came from
fetched_at: datetime # when the source was hit
The pipeline reads (raw, source_url, fetched_at) from each envelope and:
- Hands
rawto yourtransform. - Builds the
esker_source_urlcolumn fromsource_url(or your_SOURCE_URL_TEMPLATE). - Groups envelopes with the same
(source_url, fetched_at)into one lineage batch.
The contract is intentionally narrow. Anything richer — schema, freshness, deltas — is the pipeline's responsibility to derive.
HTTP layer
Esker uses stdlib urllib — no requests, no httpx. The bar for new dependencies is high, and urllib is free.
What this means for you:
- No retries, no backoff, no rate limiting. Author your own if you need them.
- Set timeouts via
config.http_timeout()(defaults to 60s, override withESKER_HTTP_TIMEOUT). - A wedged origin trips the timeout instead of hanging forever.
- Default
User-Agentisesker/0.1. Override per-source via theUSER_AGENTClassVar.
For sources that need richer HTTP behavior (auth headers, session cookies, retry policies), build it yourself in your fetch_all. Esker doesn't get in the way.
Choosing a shape
| your source | use |
|---|---|
| One JSON endpoint, dict-of-records or list-of-records | BulkJsonSource (or @pipeline(url=...)) |
| One CSV endpoint with a header row | BulkCsvSource |
| Per-id fetches | EskerSource directly + fetch_cached |
| Multiple endpoints, paginated, header-auth, or stateful | EskerSource directly |
When in doubt, start with one of the bulk shapes and graduate when you hit a wall.
See also
- Pipelines — the
@pipelinedecorator - Three-class form — full source/schema/pipeline split
- Lineage — how
Fetchedbecomes batches - Caching —
ESKER_CACHE_DIRand friends