mts1b-datalake
Parquet-backed data lake with Prefect ingest flows. Asset-class extras for equities, crypto, options, fx, futures, bonds, altdata, macro.
Repo: github.com/MTS1B/mts1b-datalake
Layer: 4
Wave: 2 (months 4-7)
Depends on: foundation, platform, marketdata, altdata, cryptodata, macrodata, Prefect, polars, duckdb
Audience: mts1b-research, mts1b-GPUbacktester
What it is
A parquet-backed columnar data lake with Prefect flows that ingest from the adapter repos and write canonical tables. Reads via DuckDB (zero-copy) or polars.
adapters ──ingest─▶ data lake (parquet) ──▶ research / GPUbacktester
│
└──▶ Postgres metadata (catalog, lineage)
Layout
data/lake/
├── equities/
│ ├── bars/{daily,1h,5m,1m}/year=YYYY/month=MM/symbol=*/data.parquet
│ ├── corporate_actions/year=YYYY/data.parquet
│ ├── fundamentals/year=YYYY/quarter=Q/data.parquet
│ └── universe/asof=YYYY-MM-DD/data.parquet
├── crypto/
│ ├── bars/...
│ ├── perp_funding/...
│ └── on_chain/...
├── options/
│ ├── chains/year=YYYY/month=MM/data.parquet
│ └── greeks/...
├── altdata/
│ ├── sec_filings/year=YYYY/data.parquet
│ ├── insider_form4/...
│ ├── congress_trades/...
│ ├── news/year=YYYY/month=MM/data.parquet
│ └── sentiment/...
├── macro/
│ ├── fred/series=SERIES_ID/data.parquet
│ └── release_calendar/data.parquet
└── _catalog/
└── lineage.duckdb # what flows wrote what, when
Partitioning by year=YYYY/month=MM/ + Parquet pushdown filters means typical queries scan ~1% of the lake.
Install with extras
pip install "mts1b-datalake[equities,crypto,options,altdata]"
Each extra pulls the right adapters + ingest flow dependencies. Don't want options? Don't install that extra.
API
Read
from mts1b_datalake import lake
# Polars (preferred for transformations)
df = lake.equities.bars.read(
symbols=["AAPL", "MSFT"],
interval="daily",
start="2014-01-01", end="2024-01-01",
)
# polars.DataFrame columns: [ts, symbol, open, high, low, close, volume]
# DuckDB (preferred for ad-hoc SQL)
with lake.duckdb_session() as conn:
df = conn.execute("""
SELECT symbol, AVG(close) AS avg_close
FROM equities.bars
WHERE ts BETWEEN '2023-01-01' AND '2024-01-01'
GROUP BY symbol
ORDER BY avg_close DESC LIMIT 20
""").pl()
Universe queries
# Get the universe of symbols satisfying filters
universe = lake.equities.universe(
asof=date(2026, 5, 1),
sectors=["Technology"],
market_cap_min=10e9,
adv_min_usd=10e6,
listing_age_min_days=730,
)
# polars.DataFrame: symbol, name, sector, market_cap_usd, adv_usd_21d
UniversePanel for factor backtests
from mts1b_foundation.market_data import UniversePanel
panel: UniversePanel = lake.build_panel(
universe="us-large-cap",
interval="daily",
start="2014-01-01", end="2024-01-01",
include=["close", "high", "low", "volume", "market_cap", "sector"],
)
# Pass directly to mts1b-GPUbacktester or any factor function
Ingest flows
Prefect-based; deployed under the mts1b-datalake project:
| Flow | Schedule | Source | Target |
|---|---|---|---|
equities_daily_bars | NYSE close + 1h | marketdata.fmp / polygon | equities/bars/daily/ |
equities_intraday_1m | every 5 min | marketdata.polygon | equities/bars/1m/ |
equities_corp_actions | daily 02:00 ET | marketdata + altdata | equities/corporate_actions/ |
crypto_bars | every 1h | cryptodata.binance + coinbase | crypto/bars/ |
crypto_funding | every 8h (after settlements) | cryptodata.binance + kraken_futures + okx | crypto/perp_funding/ |
options_chains | daily 17:00 ET | marketdata.thetadata | options/chains/ |
sec_filings | every 15 min | altdata.sec_edgar | altdata/sec_filings/ |
news_rss | every 5 min | altdata.news | altdata/news/ |
macro_fred | daily 09:00 ET | macrodata.fred | macro/fred/ |
Failures trigger retry + Telegram alert via mts1b-platform/messaging. Lineage written to _catalog/lineage.duckdb.
Caching layer
A Redis hot-cache fronts the lake for low-latency reads in mts1b-research:
| Cache | TTL |
|---|---|
| Latest bar per symbol | 30s |
| Latest universe definition | 5 min |
| Macro series (last 5y) | 1 hour |
mts1b-platform/db provides the Redis pool.
Quality + lineage
Each ingest run writes to _catalog/lineage.duckdb:
SELECT * FROM lineage.flow_runs WHERE flow = 'equities_daily_bars' ORDER BY ts DESC LIMIT 5;
-- ts, flow, source, rows_written, target_path, status, error, run_duration_s, sha256
Replay-safe: if an ingest is rerun for a date, it writes to a sibling parquet and the catalog atomically swaps. No partial reads.
Compression + dedup
| Format | Compression | Typical reduction |
|---|---|---|
| Daily bars | snappy | 5x vs CSV |
| Intraday bars (1m) | zstd:3 | 12x vs CSV |
| News articles | zstd:3 | 8x (text-heavy) |
| Options chains | snappy | 4x |
Build + test
pip install -e ".[dev,equities,crypto,altdata]"
pytest -m unit
docker compose up -d postgres redis
pytest -m integration
Roadmap
| Version | Items |
|---|---|
| 0.1 (Wave 2) | Equities daily, crypto bars + funding, SEC filings, news, FRED |
| 0.2 (Wave 2) | Equities intraday (1m), options chains, full corp actions |
| 0.3 (Wave 3) | Tick-level equities, on-chain crypto, futures + bonds |
| 0.4 (Wave 3) | Iceberg / Delta Lake on top of parquet (transactional updates) |
| 1.0 (LTS) | Stable schema |
See also
mts1b-marketdata,mts1b-altdata,mts1b-cryptodata,mts1b-macrodata— ingest sourcesmts1b-research,mts1b-GPUbacktester— consumers