Skip to main content

mts1b-datalake

Parquet-backed data lake with Prefect ingest flows. Asset-class extras for equities, crypto, options, fx, futures, bonds, altdata, macro.

Repo: github.com/MTS1B/mts1b-datalake Layer: 4 Wave: 2 (months 4-7) Depends on: foundation, platform, marketdata, altdata, cryptodata, macrodata, Prefect, polars, duckdb Audience: mts1b-research, mts1b-GPUbacktester

What it is

A parquet-backed columnar data lake with Prefect flows that ingest from the adapter repos and write canonical tables. Reads via DuckDB (zero-copy) or polars.

adapters ──ingest─▶ data lake (parquet) ──▶ research / GPUbacktester

└──▶ Postgres metadata (catalog, lineage)

Layout

data/lake/
├── equities/
│ ├── bars/{daily,1h,5m,1m}/year=YYYY/month=MM/symbol=*/data.parquet
│ ├── corporate_actions/year=YYYY/data.parquet
│ ├── fundamentals/year=YYYY/quarter=Q/data.parquet
│ └── universe/asof=YYYY-MM-DD/data.parquet
├── crypto/
│ ├── bars/...
│ ├── perp_funding/...
│ └── on_chain/...
├── options/
│ ├── chains/year=YYYY/month=MM/data.parquet
│ └── greeks/...
├── altdata/
│ ├── sec_filings/year=YYYY/data.parquet
│ ├── insider_form4/...
│ ├── congress_trades/...
│ ├── news/year=YYYY/month=MM/data.parquet
│ └── sentiment/...
├── macro/
│ ├── fred/series=SERIES_ID/data.parquet
│ └── release_calendar/data.parquet
└── _catalog/
└── lineage.duckdb # what flows wrote what, when

Partitioning by year=YYYY/month=MM/ + Parquet pushdown filters means typical queries scan ~1% of the lake.

Install with extras

pip install "mts1b-datalake[equities,crypto,options,altdata]"

Each extra pulls the right adapters + ingest flow dependencies. Don't want options? Don't install that extra.

API

Read

from mts1b_datalake import lake

# Polars (preferred for transformations)
df = lake.equities.bars.read(
symbols=["AAPL", "MSFT"],
interval="daily",
start="2014-01-01", end="2024-01-01",
)
# polars.DataFrame columns: [ts, symbol, open, high, low, close, volume]

# DuckDB (preferred for ad-hoc SQL)
with lake.duckdb_session() as conn:
df = conn.execute("""
SELECT symbol, AVG(close) AS avg_close
FROM equities.bars
WHERE ts BETWEEN '2023-01-01' AND '2024-01-01'
GROUP BY symbol
ORDER BY avg_close DESC LIMIT 20
""").pl()

Universe queries

# Get the universe of symbols satisfying filters
universe = lake.equities.universe(
asof=date(2026, 5, 1),
sectors=["Technology"],
market_cap_min=10e9,
adv_min_usd=10e6,
listing_age_min_days=730,
)
# polars.DataFrame: symbol, name, sector, market_cap_usd, adv_usd_21d

UniversePanel for factor backtests

from mts1b_foundation.market_data import UniversePanel

panel: UniversePanel = lake.build_panel(
universe="us-large-cap",
interval="daily",
start="2014-01-01", end="2024-01-01",
include=["close", "high", "low", "volume", "market_cap", "sector"],
)
# Pass directly to mts1b-GPUbacktester or any factor function

Ingest flows

Prefect-based; deployed under the mts1b-datalake project:

FlowScheduleSourceTarget
equities_daily_barsNYSE close + 1hmarketdata.fmp / polygonequities/bars/daily/
equities_intraday_1mevery 5 minmarketdata.polygonequities/bars/1m/
equities_corp_actionsdaily 02:00 ETmarketdata + altdataequities/corporate_actions/
crypto_barsevery 1hcryptodata.binance + coinbasecrypto/bars/
crypto_fundingevery 8h (after settlements)cryptodata.binance + kraken_futures + okxcrypto/perp_funding/
options_chainsdaily 17:00 ETmarketdata.thetadataoptions/chains/
sec_filingsevery 15 minaltdata.sec_edgaraltdata/sec_filings/
news_rssevery 5 minaltdata.newsaltdata/news/
macro_freddaily 09:00 ETmacrodata.fredmacro/fred/

Failures trigger retry + Telegram alert via mts1b-platform/messaging. Lineage written to _catalog/lineage.duckdb.

Caching layer

A Redis hot-cache fronts the lake for low-latency reads in mts1b-research:

CacheTTL
Latest bar per symbol30s
Latest universe definition5 min
Macro series (last 5y)1 hour

mts1b-platform/db provides the Redis pool.

Quality + lineage

Each ingest run writes to _catalog/lineage.duckdb:

SELECT * FROM lineage.flow_runs WHERE flow = 'equities_daily_bars' ORDER BY ts DESC LIMIT 5;
-- ts, flow, source, rows_written, target_path, status, error, run_duration_s, sha256

Replay-safe: if an ingest is rerun for a date, it writes to a sibling parquet and the catalog atomically swaps. No partial reads.

Compression + dedup

FormatCompressionTypical reduction
Daily barssnappy5x vs CSV
Intraday bars (1m)zstd:312x vs CSV
News articleszstd:38x (text-heavy)
Options chainssnappy4x

Build + test

pip install -e ".[dev,equities,crypto,altdata]"
pytest -m unit
docker compose up -d postgres redis
pytest -m integration

Roadmap

VersionItems
0.1 (Wave 2)Equities daily, crypto bars + funding, SEC filings, news, FRED
0.2 (Wave 2)Equities intraday (1m), options chains, full corp actions
0.3 (Wave 3)Tick-level equities, on-chain crypto, futures + bonds
0.4 (Wave 3)Iceberg / Delta Lake on top of parquet (transactional updates)
1.0 (LTS)Stable schema

See also