# Lineage

> Per-row provenance. The lineage bundle written next to every parquet so consumers can answer "where did this row come from?"

Esker records provenance for every record it produces. The mechanism is two artifact tiers:

- **Per-row**: each parquet record carries an `esker_lineage_id` column.
- **Per-run**: a sidecar `<DOMAIN_ID>.lineage.json` file lists each fetch batch with its `lineage_id`, `source_url`, and `fetched_at`.

A consumer who wants to know "where did this record come from?" joins the parquet's `esker_lineage_id` against the bundle's `batches[*].lineage_id` and reads off the URL and timestamp.

## Reading lineage

The lineage file is **not** automatically downloaded by `esker.get()` or `esker pull`. Fetch it explicitly:

```sh
curl https://esker.so/archie/us.treasury.yields@2.0.0/lineage.json > lineage.json
```

Or programmatically:

```python
from esker.client import hub
from esker.schemas.ref import DatasetRef

ref = DatasetRef.parse("archie/us.treasury.yields@2.0.0")
hub.download_artifact_to(ref, "lineage.json", Path("./lineage.json"))
```

Once you have it, joining is just polars:

```python
import polars as pl
import json

frame = esker.get("archie/us.treasury.yields@2.0.0")
bundle = json.loads(Path("./lineage.json").read_text())
batches = pl.DataFrame(bundle["batches"]).rename({"lineage_id": "esker_lineage_id"})

with_provenance = frame.join(batches, on="esker_lineage_id")
# now every row has source_url and fetched_at columns
```

## What the bundle looks like

```json
{
  "manifest_version": "1.0",
  "run_id": "a7076934-be56-477b-a030-250e4492ec93",
  "batches": [
    {
      "lineage_id": "e7b1fdda-cbf8-42b5-bd88-419c487482fd",
      "source_url": "https://home.treasury.gov/.../2022/all?...",
      "fetched_at": "2026-05-03T23:00:43.810315Z"
    },
    {
      "lineage_id": "7a90a5b1-c4cd-4f80-9c2d-c0e1d2c8a23a",
      "source_url": "https://home.treasury.gov/.../2023/all?...",
      "fetched_at": "2026-05-03T23:00:43.810315Z"
    }
  ]
}
```

One `LineageBatch` per unique `(source_url, fetched_at)` pair. The `run_id` matches the `run_id` on the [manifest](https://esker.so/docs/protocol/manifests.md).

## How many batches

The number of batches reflects the source shape:

| pipeline shape                                 | batches                      |
| ---------------------------------------------- | ---------------------------- |
| Single bulk endpoint (e.g. SEC tickers)        | 1 batch covering all records |
| Multi-endpoint (e.g. treasury yields, 5 years) | 5 batches, one per endpoint  |
| Per-id source                                  | N batches, one per record    |

For multi-endpoint sources, all batches typically share the same `fetched_at` if the source captures the timestamp once before the fetch loop (the recommended pattern). This makes the batches distinguishable by URL alone.

## Integrity

The manifest carries `lineage_hash` — a sha256 of the `lineage.json` bytes as written. So a consumer can verify the lineage file hasn't been tampered with using only the manifest:

```python
from hashlib import sha256

actual = "sha256:" + sha256(Path("./lineage.json").read_bytes()).hexdigest()
assert actual == manifest.lineage_hash, "lineage drift"
```

This isn't done automatically — `esker pull` and `esker.get` only verify the parquet — but the hash is there if you want it.

## When to use lineage

The common cases:

- **Auditing.** "This record claims X happened on date Y. Where did that data come from?"
- **Debugging upstream changes.** "These rows look wrong — were they fetched before or after the source was updated?"
- **Differential analysis.** Compare batches across two runs to see what re-fetched.
- **Selective re-processing.** "Re-process only the records from this batch."

For most consumer code, you don't need to look at lineage. It's a debugging and audit affordance, not part of the hot read path.

## What's not in the lineage

- **Source content hash.** The bundle records the URL and fetch time, not what was at the URL. If the upstream is mutable, two fetches at different `fetched_at` may have hit different content — the lineage tells you when, not what.
- **Transform code version.** That's `pipeline_version` on the manifest, not in the lineage.
- **Row-level provenance within a batch.** All records sharing a `(source_url, fetched_at)` get the same `lineage_id`. To trace a single row back to a specific upstream record, your transform needs to keep a domain-level reference (e.g. a `source_record_id` field).

## See also

- [Manifests](https://esker.so/docs/protocol/manifests.md) — `lineage_hash` and `run_id`
- [Sources](https://esker.so/docs/sdk/sources.md) — what `Fetched` is and how sources emit it
- [Records](https://esker.so/docs/sdk/records.md) — the `esker_lineage_id` column
