Skip to main content

Event bus (NATS)

Every MTS1B service communicates over NATS for live trading and event observability. This page explains the subject taxonomy, the wire format, and the durability/exactly-once guarantees.

Why NATS

WantWhy NATS
Low latency (≤ 1ms p99)NATS Core is in-memory pub/sub, no Kafka-style commit log
At-least-once for orders/fillsNATS JetStream gives durable streams + ack semantics
Wildcards for ops dashboardsmts.oms.*.> subscribes to everything OMS publishes
Self-hosted, no cloud depSingle Go binary; runs on Proxmox LXC
TLS + JWT auth out of the boxmTLS for service-to-service, JWT for human consumers

We considered Kafka (too heavy for our scale), Redis Streams (not durable enough), and RabbitMQ (operationally finicky). NATS won on simplicity.

Subject taxonomy

mts.<version>.<source-repo>.<noun>.<verb>[.fund.<fund_id>]

Conventions:

  • <version> — schema version (v1, v2, ...); allows in-place upgrade
  • <source-repo> — the repo that PUBLISHES this subject (single producer per subject)
  • <noun> — domain entity (orders, fills, quotes, signals, ...)
  • <verb> — past-tense action (created, updated, filled, canceled, ...)
  • fund.<fund_id> — optional partition key for fund-scoped subscriptions

Stable subjects (Wave 1)

SubjectProducerPayload
mts.v1.oms.orders.createdmts1b-omsOrder
mts.v1.oms.orders.acceptedmts1b-omsOrder
mts.v1.oms.orders.rejectedmts1b-omsOrder (with rejected_reason)
mts.v1.oms.orders.canceledmts1b-omsOrder
mts.v1.oms.fills.createdmts1b-omsFill
mts.v1.brokers.<broker>.fills.rawmts1b-brokersbroker-native fill record
mts.v1.marketdata.quotes.<symbol>mts1b-marketdataQuote
mts.v1.marketdata.bars.<symbol>.<interval>mts1b-marketdataBar
mts.v1.research.signals.publishedmts1b-researchSignal
mts.v1.risk.envelope.updatedmts1b-riskengineRiskEnvelope
mts.v1.risk.gate.failedmts1b-riskengineOrderRejection
mts.v1.treasury.nav.<fund_id>mts1b-treasuryNavSnapshot
mts.v1.operations.halt.requestedmts1b-operationsHaltRequest

Durability tiers

Not every event needs the same guarantees:

TierSubjectsNATS featureUse
Best-effortquotes, bars, raw market dataNATS CoreHigh-frequency, ephemeral. Drop on reconnect is fine.
At-least-onceorders, fills, signals, risk envelopesJetStream stream + consumer ackTrading events. Must survive restart.
At-least-once + dedupeorder intentsJetStream + idempotency key on consumerReplay-safe. Consumer dedupes on idempotency_key.

Streams are configured in mts1b-platform/eventbus/streams.py:

# OMS_ORDERS stream
{
"name": "OMS_ORDERS",
"subjects": ["mts.v1.oms.orders.*"],
"retention": "limits",
"max_age": "7d",
"max_msgs_per_subject": 1_000_000,
"storage": "file",
"num_replicas": 1, # 3 in production
"discard": "old",
}

Wire format

All payloads are JSON-encoded pydantic models from mts1b-foundation. Headers carry routing/auth:

NATS-Msg-Id: <foundation Order.idempotency_key>
content-type: application/json
mts-trace-id: <opentelemetry trace id>
mts-source-repo: mts1b-oms
mts-schema: mts1b_foundation.orders.Order:v1
mts-actor: crypto_dual_paper

The mts-schema header pins the producer's understanding. Consumers reject (with metric nats.consume.schema_mismatch) anything they can't validate.

Subscribing — minimal example

any-consumer.py
import asyncio
from mts1b_platform.eventbus import connect
from mts1b_foundation.orders import Order


async def main():
nc = await connect() # uses platform/config — TLS + auth handled
js = nc.jetstream()

sub = await js.subscribe(
"mts.v1.oms.orders.filled",
durable="my-consumer", # JetStream tracks ack state
manual_ack=True,
)
async for msg in sub.messages:
order = Order.model_validate_json(msg.data)
try:
handle(order)
await msg.ack()
except Exception as e:
# NATS will redeliver up to max_deliver times, then move to DLQ
await msg.nak(delay=30.0)


asyncio.run(main())

Producing — minimal example

some-publisher.py
from mts1b_platform.eventbus import connect
from mts1b_foundation.orders import Order

nc = await connect()
js = nc.jetstream()

order = Order(...) # validated pydantic model
ack = await js.publish(
subject="mts.v1.oms.orders.created",
payload=order.model_dump_json().encode(),
headers={
"NATS-Msg-Id": order.idempotency_key, # dedupe key for JetStream
"mts-schema": "mts1b_foundation.orders.Order:v1",
"mts-source-repo": "mts1b-oms",
"mts-actor": order.actor,
},
)
# ack contains stream + sequence — log for audit

JetStream's built-in NATS-Msg-Id dedupe (configurable window, default 2 min) means duplicate publishes within the window are dropped — so retrying the producer is safe.

Observability

Every service uses mts1b-platform/observability:

  • Prometheus counters nats_publish_total, nats_consume_total, nats_consume_errors_total, labeled by subject + repo
  • OpenTelemetry traces propagate via mts-trace-id header
  • Slow-consumer alerts in mts1b-operations fire if a JetStream consumer lag > 30 s

Local dev: running NATS

# Single-node, no auth (dev only — DO NOT use in production)
docker run -p 4222:4222 -p 8222:8222 nats:2-alpine -js -m 8222

# With mts1b-deploy
mts1b-deploy install --profile minimal --include nats

For production, see mts1b-deploy/nats/ for the full TLS + clustered + JetStream config.

Subject reservation

If you're adding a new subject:

  1. Choose mts.v1.<your-repo>.<noun>.<verb> per convention.
  2. Add the schema to mts1b-foundation.
  3. Open a PR against mts1b-foundation updating subjects.md.
  4. CI enforces uniqueness: no two repos can claim the same subject prefix.

This prevents the "two services publishing slightly different orders.created shapes" problem we saw in the monorepo.