Event-driven data via NATS
Problem: You have a custom analytics service that needs to react to every fill. Polling the position store every second is wasteful.
Solution: Subscribe to the NATS event bus.
Subject taxonomy refresher
mts.v1.<source-repo>.<noun>.<verb>[.fund.<fund_id>]
Some commonly-subscribed subjects:
| Subject | Payload | When |
|---|---|---|
mts.v1.oms.fills.created | Fill | every fill |
mts.v1.oms.orders.rejected | Order (with reason) | risk gate failure |
mts.v1.research.signals.published | Signal | strategy emits |
mts.v1.treasury.nav.<fund_id> | NavSnapshot | NAV update |
mts.v1.risk.gate.failed | OrderRejection | gate failure (different from orders.rejected — risk.gate.failed is per-gate; orders.rejected is final) |
See Eventbus concept for the full taxonomy.
Subscriber example
import asyncio
from mts1b_platform.eventbus import connect
from mts1b_platform.logging import get_logger
from mts1b_foundation.orders import Fill
log = get_logger(__name__)
async def main():
nc = await connect()
js = nc.jetstream()
# Durable subscription — JetStream tracks our ack state
sub = await js.subscribe(
"mts.v1.oms.fills.created",
durable="my-analytics",
manual_ack=True,
max_deliver=5, # retry up to 5x then DLQ
)
async for msg in sub.messages:
try:
fill = Fill.model_validate_json(msg.data)
await process_fill(fill)
await msg.ack()
except Exception as e:
log.error("fill processing failed", extra={"err": str(e)})
await msg.nak(delay=30.0) # retry in 30s
async def process_fill(fill: Fill):
# Your custom analytics here
...
asyncio.run(main())
Filtering with wildcards
NATS subjects support * (one token) and > (rest):
# Everything OMS publishes
await js.subscribe("mts.v1.oms.>")
# All `verb=created` events across all repos
await js.subscribe("mts.v1.*.*.created")
# Fund-scoped
await js.subscribe("mts.v1.oms.fills.created.fund.my-fund-id")
Producers tag fund-scoped events with .fund.<fund_id> in the subject path, enabling cheap fund-scoped subscriptions.
Replay from a past sequence
JetStream stores messages durably. Replay:
sub = await js.subscribe(
"mts.v1.oms.fills.created",
durable="my-analytics",
deliver_policy="by_start_sequence",
opt_start_seq=12345, # start from this sequence
)
Useful for backfilling analytics after a deploy.
Publishing
from mts1b_platform.eventbus import connect
from mts1b_foundation.signals import Signal
async def publish_signal(signal: Signal):
nc = await connect()
js = nc.jetstream()
ack = await js.publish(
subject=f"mts.v1.research.signals.published",
payload=signal.model_dump_json().encode(),
headers={
"NATS-Msg-Id": signal.signal_id, # dedupe key
"mts-schema": "mts1b_foundation.signals.Signal:v1",
"mts-source-repo": "my-custom-service",
"mts-actor": "custom-strategy-v1",
},
)
log.info("signal published", extra={"stream": ack.stream, "seq": ack.seq})
The NATS-Msg-Id header enables JetStream's built-in dedupe within a window (default 2 min). Same signal_id within 2 min → ignored. Safe to retry on errors.
Slow consumer protection
If you can't keep up with the message rate, JetStream drops your subscription:
sub = await js.subscribe(
"mts.v1.marketdata.quotes.SPY",
durable="my-quote-consumer",
ack_wait=10.0, # ack within 10s
max_ack_pending=1000, # at most 1k in flight
)
mts1b-operations monitors nats_consumer_lag_seconds per durable subscription. Lag > 30s triggers an alert.
Observability
Standard metrics emitted by every consumer using mts1b-platform/eventbus:
nats_consume_total{subject, status}nats_consume_latency_seconds{subject}(publish → consume time)nats_consume_errors_total{subject, error_type}nats_consumer_lag_seconds{subject, durable_name}
Add to your Grafana dashboards under "Event bus health".
Common patterns
| Use case | Subject | Durable? |
|---|---|---|
| Real-time UI updates | mts.v1.oms.> | No (don't want backlog on reconnect) |
| Trade journaling | mts.v1.oms.fills.> | Yes (must not lose fills) |
| External webhook | mts.v1.oms.orders.filled | Yes (downstream might be slow) |
| Audit / compliance | mts.v1.> | Yes (regulatory) |
| Drift monitor | mts.v1.research.signals.> | Yes |
See also
- Concept: Event bus — full architecture
mts1b-platform/eventbus— connection helpersmts1b-foundationschemas — payload types