mts1b-platform — public API surface
Compact reference for every exported symbol. See repos/platform for the architectural context.
mts1b_platform.logging
from mts1b_platform.logging import get_logger, configure_logging
log = get_logger(__name__) # structlog logger, JSON in prod, text in dev
log.info("event", extra={"key": "value"})
log.warning("warning", extra={"err": str(e)})
log.error("fatal", extra={"traceback": tb})
# Optional explicit config
configure_logging(level="INFO", format="json")
mts1b_platform.config
from pydantic_settings import BaseSettings
from mts1b_platform.config import load_config, vault_client
class MySettings(BaseSettings):
db_dsn: str
log_level: str = "INFO"
settings = load_config(MySettings, vault_path="secret/mts1b/myservice")
# Or direct Vault access
vc = vault_client()
secret = vc.read("secret/mts1b/brokers/ibkr")["data"]["data"]
mts1b_platform.http
from mts1b_platform.http import http_client
async with http_client("polygon", base_url="https://api.polygon.io",
timeout=10, retries=3, rate_name="polygon") as c:
r = await c.get("/v2/last/trade/AAPL")
return r.json()
mts1b_platform.retry
from mts1b_platform.retry import with_retry, retry_context
import httpx
@with_retry(retries=3, backoff="exp", base=1.0, max_delay=30,
retry_on=(httpx.HTTPStatusError, asyncio.TimeoutError))
async def fetch():
...
# Inline context manager
async with retry_context(retries=3) as attempt:
async for retry in attempt:
async with retry:
r = await client.get(url)
mts1b_platform.ratelimit
from mts1b_platform.ratelimit import RateLimiter
# Shared across processes via Redis when backend="redis"
limiter = RateLimiter(name="polygon", rate=5, per_seconds=60,
strategy="token_bucket", backend="redis")
async with limiter:
quote = await fetch_quote(...)
# Manual consume
await limiter.consume(n=3)
available = await limiter.available()
mts1b_platform.eventbus
from mts1b_platform.eventbus import connect, publish_typed, subscribe_typed
from mts1b_foundation.orders import Order
# Connect
nc = await connect()
js = nc.jetstream()
# Publish typed
order = Order(...)
ack = await publish_typed("mts.v1.oms.orders.created", order, js=js)
# Subscribe typed (auto-validates payload as Order)
async for order in subscribe_typed("mts.v1.oms.orders.created", Order,
durable="my-consumer", js=js):
...
mts1b_platform.messaging
from mts1b_platform.messaging import send, send_telegram, send_slack, send_discord, send_pagerduty
# Multi-channel
await send(
channels=["telegram", "slack"],
level="warning",
subject="Risk gate fired",
body=f"Order {order_id} rejected",
data={"order_id": order_id},
)
# Per-channel
await send_telegram(text="alert", parse_mode="markdown")
await send_slack(channel="#alerts", text="alert")
await send_discord(channel_id=123, text="alert")
await send_pagerduty(routing_key="...", severity="critical", summary="alert")
mts1b_platform.calendars
from mts1b_platform.calendars import market_calendar
from datetime import date, datetime
nyse = market_calendar("NYSE")
nyse.is_trading_day(date(2026, 5, 25)) # False — Memorial Day
nyse.next_open(datetime.utcnow()) # datetime in UTC
nyse.sessions(date.today(), lookback_days=21)
nyse.early_close_dates(year=2026)
# Other venues
cme = market_calendar("CME")
coinbase = market_calendar("COINBASE") # 24/7
fx = market_calendar("FX") # 24×5 Sun 17:00 ET - Fri 17:00 ET
mts1b_platform.symbology
from mts1b_platform.symbology import normalize, to_native
from mts1b_foundation.symbology import Symbol
# Normalize external → canonical
normalize("BTCUSD", venue="coinbase") # Symbol("BTC-USD")
normalize("BTC/USD") # Symbol("BTC-USD")
# Canonical → venue wire format
to_native(Symbol("BTC-USD"), venue="binance") # "BTCUSD"
to_native(Symbol("BTC-USD"), venue="ibkr_paxos") # "BTC"
mts1b_platform.db
from mts1b_platform.db import get_pool, duckdb_session, kv_store
# Postgres pool
pool = get_pool("primary")
async with pool.connection() as conn:
rows = await conn.fetch("SELECT * FROM orders WHERE fund_id = $1", "paper-momentum")
# DuckDB
with duckdb_session() as conn:
df = conn.execute("SELECT * FROM bars WHERE symbol = ?", ["AAPL"]).pl()
# NATS K/V
kv = await kv_store("positions")
await kv.put("paper-momentum:AAPL", position.model_dump_json())
val = await kv.get("paper-momentum:AAPL")
mts1b_platform.audit
from mts1b_platform.audit import audit_chain
entry = await audit_chain.append(
actor="mts1b-oms",
action="order_rejected",
subject_id=order.order_id,
data={"reason": "drawdown_halt", "envelope_id": "env-1"},
)
print(entry.sequence, entry.hash)
# Verify chain
ok = await audit_chain.verify(from_sequence=0)
mts1b_platform.observability
from mts1b_platform.observability import get_tracer, get_meter
tracer = get_tracer(__name__)
meter = get_meter(__name__)
# Trace
with tracer.start_as_current_span("submit_order") as span:
span.set_attribute("order.id", order.order_id)
...
# Metrics
orders_total = meter.create_counter("orders_total")
orders_total.add(1, attributes={"broker": "ibkr"})
p99 = meter.create_histogram("submit_latency_seconds")
p99.record(0.045, attributes={"broker": "ibkr"})
mts1b_platform.auth
from mts1b_platform.auth import JwtSigner, mtls_client_context
# JWT service-to-service
signer = JwtSigner(key_path="/etc/mts1b/jwt-priv.pem")
token = signer.issue(subject="mts1b-research", aud="mts1b-oms", ttl=300)
claims = signer.verify(token, expected_aud="mts1b-oms")
# mTLS
ctx = mtls_client_context(cert_path="...", key_path="...", ca_path="...")
async with httpx.AsyncClient(verify=ctx) as c:
...
mts1b_platform.security.redact
from mts1b_platform.security.redact import redact
masked = redact("Authorization: Bearer abc.def.ghi")
# "Authorization: Bearer [REDACTED-JWT]"
# Used as a logging filter — auto-applied to all logs from get_logger
See also
repos/platform— full architectural context- Tutorial — Custom strategy — uses many of these