Skip to main content

mts1b-datalake — public API surface

Parquet store + Prefect ingest flows + DuckDB queries.

Read (polars)

from mts1b_datalake import lake

df = lake.equities.bars.read(
symbols=["AAPL", "MSFT"],
interval="daily",
start="2014-01-01", end="2024-01-01",
)
# polars.DataFrame: ts, symbol, open, high, low, close, volume

Read (DuckDB)

with lake.duckdb_session() as conn:
df = conn.execute("""
SELECT symbol, AVG(close) AS avg_close
FROM equities.bars
WHERE ts BETWEEN '2023-01-01' AND '2024-01-01'
GROUP BY symbol ORDER BY avg_close DESC LIMIT 20
""").pl()

Universe queries

universe = lake.equities.universe(
asof=date(2026, 5, 1),
sectors=["Technology"],
market_cap_min=10e9,
adv_min_usd=10e6,
listing_age_min_days=730,
)

Build a UniversePanel

panel = lake.build_panel(
universe="us-large-cap",
interval="daily",
start="2014-01-01", end="2024-01-01",
include=["close", "high", "low", "volume", "market_cap", "sector"],
)
# UniversePanel(close=(T,A), dates=(T,), symbols=(A,), ...)

Per-asset-class accessors

lake.equities.bars # equities bars
lake.equities.corporate_actions
lake.equities.fundamentals
lake.equities.universe

lake.crypto.bars
lake.crypto.perp_funding
lake.crypto.on_chain

lake.options.chains
lake.options.greeks

lake.altdata.sec_filings
lake.altdata.news
lake.altdata.sentiment
lake.altdata.congress_trades
lake.altdata.insiders

lake.macro.fred
lake.macro.release_calendar

Ingest flows (Prefect)

# Triggered by deploy / cron
mts1b-datalake flow run --name equities_daily_bars
mts1b-datalake flow run --name crypto_funding
mts1b-datalake flow run --name sec_filings

Flows + schedules:

FlowSchedule
equities_daily_barsNYSE close + 1h
equities_intraday_1mevery 5 min
equities_corp_actionsdaily 02:00 ET
crypto_barsevery 1h
crypto_fundingevery 8h
options_chainsdaily 17:00 ET
sec_filingsevery 15 min
news_rssevery 5 min
macro_freddaily 09:00 ET

Lineage tracking

from mts1b_datalake.catalog import lineage

# Most recent runs
runs = await lineage.recent_runs(flow="equities_daily_bars", limit=5)

# Specific date's data lineage
trail = await lineage.trail(table="equities.bars", asof=date(2026, 5, 23))
# Shows: source → flow → output path → sha256

Cache layer

CacheTTLBackend
Latest bar per symbol30sRedis
Latest universe definition5 minRedis
Macro series (last 5y)1 hourRedis
from mts1b_datalake.cache import quote_cache, bar_cache

q = await quote_cache.get(symbol="AAPL")

CLI

mts mts1b-datalake flow list
mts mts1b-datalake flow run --name X
mts mts1b-datalake flow logs --name X --limit 50

mts mts1b-datalake query "SELECT count(*) FROM equities.bars WHERE symbol='AAPL'"
mts mts1b-datalake size # total lake size on disk
mts mts1b-datalake validate # check parquet integrity

See also