mts1b-datalake — public API surface
Parquet store + Prefect ingest flows + DuckDB queries.
Read (polars)
from mts1b_datalake import lake
df = lake.equities.bars.read(
symbols=["AAPL", "MSFT"],
interval="daily",
start="2014-01-01", end="2024-01-01",
)
# polars.DataFrame: ts, symbol, open, high, low, close, volume
Read (DuckDB)
with lake.duckdb_session() as conn:
df = conn.execute("""
SELECT symbol, AVG(close) AS avg_close
FROM equities.bars
WHERE ts BETWEEN '2023-01-01' AND '2024-01-01'
GROUP BY symbol ORDER BY avg_close DESC LIMIT 20
""").pl()
Universe queries
universe = lake.equities.universe(
asof=date(2026, 5, 1),
sectors=["Technology"],
market_cap_min=10e9,
adv_min_usd=10e6,
listing_age_min_days=730,
)
Build a UniversePanel
panel = lake.build_panel(
universe="us-large-cap",
interval="daily",
start="2014-01-01", end="2024-01-01",
include=["close", "high", "low", "volume", "market_cap", "sector"],
)
# UniversePanel(close=(T,A), dates=(T,), symbols=(A,), ...)
Per-asset-class accessors
lake.equities.bars # equities bars
lake.equities.corporate_actions
lake.equities.fundamentals
lake.equities.universe
lake.crypto.bars
lake.crypto.perp_funding
lake.crypto.on_chain
lake.options.chains
lake.options.greeks
lake.altdata.sec_filings
lake.altdata.news
lake.altdata.sentiment
lake.altdata.congress_trades
lake.altdata.insiders
lake.macro.fred
lake.macro.release_calendar
Ingest flows (Prefect)
# Triggered by deploy / cron
mts1b-datalake flow run --name equities_daily_bars
mts1b-datalake flow run --name crypto_funding
mts1b-datalake flow run --name sec_filings
Flows + schedules:
| Flow | Schedule |
|---|---|
equities_daily_bars | NYSE close + 1h |
equities_intraday_1m | every 5 min |
equities_corp_actions | daily 02:00 ET |
crypto_bars | every 1h |
crypto_funding | every 8h |
options_chains | daily 17:00 ET |
sec_filings | every 15 min |
news_rss | every 5 min |
macro_fred | daily 09:00 ET |
Lineage tracking
from mts1b_datalake.catalog import lineage
# Most recent runs
runs = await lineage.recent_runs(flow="equities_daily_bars", limit=5)
# Specific date's data lineage
trail = await lineage.trail(table="equities.bars", asof=date(2026, 5, 23))
# Shows: source → flow → output path → sha256
Cache layer
| Cache | TTL | Backend |
|---|---|---|
| Latest bar per symbol | 30s | Redis |
| Latest universe definition | 5 min | Redis |
| Macro series (last 5y) | 1 hour | Redis |
from mts1b_datalake.cache import quote_cache, bar_cache
q = await quote_cache.get(symbol="AAPL")
CLI
mts mts1b-datalake flow list
mts mts1b-datalake flow run --name X
mts mts1b-datalake flow logs --name X --limit 50
mts mts1b-datalake query "SELECT count(*) FROM equities.bars WHERE symbol='AAPL'"
mts mts1b-datalake size # total lake size on disk
mts mts1b-datalake validate # check parquet integrity