Skip to main content

mts1b-platform

Shared platform layer: auth, eventbus, observability, messaging, logging, config, DB pools, retry, ratelimit, HTTP, secrets redaction. Transport-only.

Repo: github.com/MTS1B/mts1b-platform Layer: 1 Depends on: foundation + (httpx, pydantic-settings, structlog, opentelemetry, nats-py, redis-py, psycopg, ...) Audience: every service repo

What it is

The single canonical implementation of 11 cross-cutting concerns that institutional codebases centralize but the source monorepo had scattered. Each was deduped from many copies:

ModuleWasNow
messaging6 places sending Telegramone dispatcher
calendars5+ places computing market sessionsone singleton
symbology3+ places normalizing BTC-USD vs BTCUSDone normalizer
loggingscattered handler wiringone factory
configscattered BaseSettingsone config tree
dbscattered pool wrappersone pool registry
retryscattered tenacity wrappersone decorator family
ratelimitscattered token bucketsone limiter
httpscattered httpx factoriesone client factory
security/redactscattered log filtersone redactor
observabilityscattered metrics + tracing wiringone OTel setup

Module layout

mts1b_platform/
├── auth/ # JWT, mTLS service tokens
├── audit/ # Merkle-hashed audit chain
├── eventbus/ # NATS connect + JetStream wrappers + schema validation
├── observability/ # OpenTelemetry + Prometheus + structured logs
├── messaging/ # Telegram, Slack, Discord, PagerDuty dispatch
├── calendars/ # NYSE, NASDAQ, CME, Coinbase 24/7, ...
├── symbology/ # symbol normalization across venues
├── logging/ # get_logger() factory + JSON formatter
├── config/ # Vault-aware Pydantic Settings
├── db/ # Postgres pool, DuckDB wrapper, NATS K/V
├── retry/ # @with_retry, async_retry context manager
├── ratelimit/ # RateLimiter (token bucket, fixed window, sliding window)
├── http/ # http_client(name, retries, timeout) factory
└── security/
└── redact.py # log filter + manual redact()

Top APIs

get_logger

from mts1b_platform.logging import get_logger
log = get_logger(__name__)

log.info("order received", extra={"order_id": "abc", "symbol": "SPY"})
# {"ts": "2026-05-23T19:00:00Z", "level": "INFO", "msg": "order received",
# "logger": "mts1b_oms.service", "order_id": "abc", "symbol": "SPY", ...}

JSON-structured by default, human-readable in dev (MTS1B_LOG_FORMAT=text). Secrets are auto-redacted by the redact filter.

load_config

from mts1b_platform.config import load_config
from mts1b_foundation.funds import FundConfig

class OmsSettings(BaseSettings):
nats_url: str = "nats://localhost:4222"
db_dsn: str
log_level: str = "INFO"
funds: list[FundConfig]

settings = load_config(OmsSettings, vault_path="secret/mts1b/oms")

Resolution order: env var → Vault → .env → defaults. Vault paths support templating: {{ env "FUND_ID" }} in YAML.

http_client

from mts1b_platform.http import http_client

async with http_client("ibkr", base_url="https://api.ibkr.com",
timeout=20, retries=3) as c:
r = await c.get("/v1/positions")

Pre-configured with: OTel tracing headers, retry with backoff, rate-limit aware, secret-aware log filtering (the request body is redacted before logging).

with_retry

from mts1b_platform.retry import with_retry

@with_retry(retries=3, backoff="exp", base=1.0, max_delay=30.0,
retry_on=(httpx.HTTPStatusError, asyncio.TimeoutError))
async def fetch_quote(symbol: str) -> Quote:
...

RateLimiter

from mts1b_platform.ratelimit import RateLimiter

limiter = RateLimiter(name="polygon", rate=5, per_seconds=60)

async with limiter:
quote = await fetch_quote_from_polygon(...)

Per-name registry; same name across services shares the budget.

messaging.send

from mts1b_platform.messaging import send

await send(channels=["telegram", "slack"],
level="warning",
subject="Risk gate fired",
body=f"Order {order.order_id} rejected: max_position_pct exceeded",
data={"order_id": order.order_id, "envelope_id": envelope.envelope_id})

Routes to configured channels per level. Per-service Vault paths control which channels are wired. Free-tier rate limits applied per channel.

calendars

from mts1b_platform.calendars import market_calendar

cal = market_calendar("NYSE")
cal.is_trading_day(date(2026, 5, 25)) # False (Memorial Day)
cal.next_open(datetime.utcnow()) # next NYSE open
cal.sessions(asof, lookback_days=21) # list of (open, close) datetimes

# 24/7 markets
crypto = market_calendar("COINBASE")
crypto.is_trading_day(date(2026, 12, 25)) # True (crypto never closes)

symbology.normalize

from mts1b_platform.symbology import normalize

normalize("BTCUSD", venue="coinbase") # "BTC-USD"
normalize("btc/usd", venue="binance") # "BTC-USD"
normalize("BTC.USD", venue="kraken") # "BTC-USD"

One canonical format (BASE-QUOTE) inside MTS1B; adapter-specific renderers handle the wire format.

eventbus.connect

from mts1b_platform.eventbus import connect

nc = await connect() # uses load_config under the hood
js = nc.jetstream()
ack = await js.publish("mts.v1.oms.orders.created", order.model_dump_json().encode())

Auto-configures TLS, mTLS auth, OTel tracing, schema validation.

audit.append

from mts1b_platform.audit import audit_chain

await audit_chain.append(
actor="mts1b-oms",
action="order_rejected",
subject_id=order.order_id,
data={"reason": "drawdown_halt", "envelope_id": envelope.envelope_id},
)

Append-only Merkle-hashed log. Each entry is signed and includes the prior hash, so tampering is detectable. Used by mts1b-operations for compliance reporting.

Observability

Every primitive emits OTel spans + Prometheus metrics, labeled by service, operation, and status.

Standard panels in the mts1b-platform Grafana dashboard:

  • HTTP client throughput + p99 latency per name
  • Retry counts per decorated function
  • Rate-limit hits per limiter
  • NATS publish/consume rates per subject
  • Audit chain growth + integrity check status

Build + test

git clone https://github.com/MTS1B/mts1b-platform
cd mts1b-platform
pip install -e ".[dev]"

pytest -v # incl. integration tests (NATS + Postgres in docker)
docker compose up -d postgres nats redis
pytest -m integration

Roadmap

VersionItems
0.1 (Wave 1)11 dedupe modules + auth + eventbus + observability + audit
0.2 (Wave 1)gRPC server scaffolding, FastAPI middleware bundles
0.3 (Wave 2)Distributed tracing across NATS boundaries
1.0 (LTS)Frozen API