Skip to main content

mts1b-platform — public API surface

Compact reference for every exported symbol. See repos/platform for the architectural context.

mts1b_platform.logging

from mts1b_platform.logging import get_logger, configure_logging

log = get_logger(__name__) # structlog logger, JSON in prod, text in dev

log.info("event", extra={"key": "value"})
log.warning("warning", extra={"err": str(e)})
log.error("fatal", extra={"traceback": tb})

# Optional explicit config
configure_logging(level="INFO", format="json")

mts1b_platform.config

from pydantic_settings import BaseSettings
from mts1b_platform.config import load_config, vault_client

class MySettings(BaseSettings):
db_dsn: str
log_level: str = "INFO"

settings = load_config(MySettings, vault_path="secret/mts1b/myservice")

# Or direct Vault access
vc = vault_client()
secret = vc.read("secret/mts1b/brokers/ibkr")["data"]["data"]

mts1b_platform.http

from mts1b_platform.http import http_client

async with http_client("polygon", base_url="https://api.polygon.io",
timeout=10, retries=3, rate_name="polygon") as c:
r = await c.get("/v2/last/trade/AAPL")
return r.json()

mts1b_platform.retry

from mts1b_platform.retry import with_retry, retry_context
import httpx

@with_retry(retries=3, backoff="exp", base=1.0, max_delay=30,
retry_on=(httpx.HTTPStatusError, asyncio.TimeoutError))
async def fetch():
...

# Inline context manager
async with retry_context(retries=3) as attempt:
async for retry in attempt:
async with retry:
r = await client.get(url)

mts1b_platform.ratelimit

from mts1b_platform.ratelimit import RateLimiter

# Shared across processes via Redis when backend="redis"
limiter = RateLimiter(name="polygon", rate=5, per_seconds=60,
strategy="token_bucket", backend="redis")

async with limiter:
quote = await fetch_quote(...)

# Manual consume
await limiter.consume(n=3)
available = await limiter.available()

mts1b_platform.eventbus

from mts1b_platform.eventbus import connect, publish_typed, subscribe_typed
from mts1b_foundation.orders import Order

# Connect
nc = await connect()
js = nc.jetstream()

# Publish typed
order = Order(...)
ack = await publish_typed("mts.v1.oms.orders.created", order, js=js)

# Subscribe typed (auto-validates payload as Order)
async for order in subscribe_typed("mts.v1.oms.orders.created", Order,
durable="my-consumer", js=js):
...

mts1b_platform.messaging

from mts1b_platform.messaging import send, send_telegram, send_slack, send_discord, send_pagerduty

# Multi-channel
await send(
channels=["telegram", "slack"],
level="warning",
subject="Risk gate fired",
body=f"Order {order_id} rejected",
data={"order_id": order_id},
)

# Per-channel
await send_telegram(text="alert", parse_mode="markdown")
await send_slack(channel="#alerts", text="alert")
await send_discord(channel_id=123, text="alert")
await send_pagerduty(routing_key="...", severity="critical", summary="alert")

mts1b_platform.calendars

from mts1b_platform.calendars import market_calendar
from datetime import date, datetime

nyse = market_calendar("NYSE")
nyse.is_trading_day(date(2026, 5, 25)) # False — Memorial Day
nyse.next_open(datetime.utcnow()) # datetime in UTC
nyse.sessions(date.today(), lookback_days=21)
nyse.early_close_dates(year=2026)

# Other venues
cme = market_calendar("CME")
coinbase = market_calendar("COINBASE") # 24/7
fx = market_calendar("FX") # 24×5 Sun 17:00 ET - Fri 17:00 ET

mts1b_platform.symbology

from mts1b_platform.symbology import normalize, to_native
from mts1b_foundation.symbology import Symbol

# Normalize external → canonical
normalize("BTCUSD", venue="coinbase") # Symbol("BTC-USD")
normalize("BTC/USD") # Symbol("BTC-USD")

# Canonical → venue wire format
to_native(Symbol("BTC-USD"), venue="binance") # "BTCUSD"
to_native(Symbol("BTC-USD"), venue="ibkr_paxos") # "BTC"

mts1b_platform.db

from mts1b_platform.db import get_pool, duckdb_session, kv_store

# Postgres pool
pool = get_pool("primary")
async with pool.connection() as conn:
rows = await conn.fetch("SELECT * FROM orders WHERE fund_id = $1", "paper-momentum")

# DuckDB
with duckdb_session() as conn:
df = conn.execute("SELECT * FROM bars WHERE symbol = ?", ["AAPL"]).pl()

# NATS K/V
kv = await kv_store("positions")
await kv.put("paper-momentum:AAPL", position.model_dump_json())
val = await kv.get("paper-momentum:AAPL")

mts1b_platform.audit

from mts1b_platform.audit import audit_chain

entry = await audit_chain.append(
actor="mts1b-oms",
action="order_rejected",
subject_id=order.order_id,
data={"reason": "drawdown_halt", "envelope_id": "env-1"},
)
print(entry.sequence, entry.hash)

# Verify chain
ok = await audit_chain.verify(from_sequence=0)

mts1b_platform.observability

from mts1b_platform.observability import get_tracer, get_meter

tracer = get_tracer(__name__)
meter = get_meter(__name__)

# Trace
with tracer.start_as_current_span("submit_order") as span:
span.set_attribute("order.id", order.order_id)
...

# Metrics
orders_total = meter.create_counter("orders_total")
orders_total.add(1, attributes={"broker": "ibkr"})

p99 = meter.create_histogram("submit_latency_seconds")
p99.record(0.045, attributes={"broker": "ibkr"})

mts1b_platform.auth

from mts1b_platform.auth import JwtSigner, mtls_client_context

# JWT service-to-service
signer = JwtSigner(key_path="/etc/mts1b/jwt-priv.pem")
token = signer.issue(subject="mts1b-research", aud="mts1b-oms", ttl=300)
claims = signer.verify(token, expected_aud="mts1b-oms")

# mTLS
ctx = mtls_client_context(cert_path="...", key_path="...", ca_path="...")
async with httpx.AsyncClient(verify=ctx) as c:
...

mts1b_platform.security.redact

from mts1b_platform.security.redact import redact

masked = redact("Authorization: Bearer abc.def.ghi")
# "Authorization: Bearer [REDACTED-JWT]"

# Used as a logging filter — auto-applied to all logs from get_logger

See also