Skip to main content

Event-driven data via NATS

Problem: You have a custom analytics service that needs to react to every fill. Polling the position store every second is wasteful.

Solution: Subscribe to the NATS event bus.

Subject taxonomy refresher

mts.v1.<source-repo>.<noun>.<verb>[.fund.<fund_id>]

Some commonly-subscribed subjects:

SubjectPayloadWhen
mts.v1.oms.fills.createdFillevery fill
mts.v1.oms.orders.rejectedOrder (with reason)risk gate failure
mts.v1.research.signals.publishedSignalstrategy emits
mts.v1.treasury.nav.<fund_id>NavSnapshotNAV update
mts.v1.risk.gate.failedOrderRejectiongate failure (different from orders.rejectedrisk.gate.failed is per-gate; orders.rejected is final)

See Eventbus concept for the full taxonomy.

Subscriber example

import asyncio
from mts1b_platform.eventbus import connect
from mts1b_platform.logging import get_logger
from mts1b_foundation.orders import Fill

log = get_logger(__name__)


async def main():
nc = await connect()
js = nc.jetstream()

# Durable subscription — JetStream tracks our ack state
sub = await js.subscribe(
"mts.v1.oms.fills.created",
durable="my-analytics",
manual_ack=True,
max_deliver=5, # retry up to 5x then DLQ
)

async for msg in sub.messages:
try:
fill = Fill.model_validate_json(msg.data)
await process_fill(fill)
await msg.ack()
except Exception as e:
log.error("fill processing failed", extra={"err": str(e)})
await msg.nak(delay=30.0) # retry in 30s


async def process_fill(fill: Fill):
# Your custom analytics here
...


asyncio.run(main())

Filtering with wildcards

NATS subjects support * (one token) and > (rest):

# Everything OMS publishes
await js.subscribe("mts.v1.oms.>")

# All `verb=created` events across all repos
await js.subscribe("mts.v1.*.*.created")

# Fund-scoped
await js.subscribe("mts.v1.oms.fills.created.fund.my-fund-id")

Producers tag fund-scoped events with .fund.<fund_id> in the subject path, enabling cheap fund-scoped subscriptions.

Replay from a past sequence

JetStream stores messages durably. Replay:

sub = await js.subscribe(
"mts.v1.oms.fills.created",
durable="my-analytics",
deliver_policy="by_start_sequence",
opt_start_seq=12345, # start from this sequence
)

Useful for backfilling analytics after a deploy.

Publishing

from mts1b_platform.eventbus import connect
from mts1b_foundation.signals import Signal


async def publish_signal(signal: Signal):
nc = await connect()
js = nc.jetstream()

ack = await js.publish(
subject=f"mts.v1.research.signals.published",
payload=signal.model_dump_json().encode(),
headers={
"NATS-Msg-Id": signal.signal_id, # dedupe key
"mts-schema": "mts1b_foundation.signals.Signal:v1",
"mts-source-repo": "my-custom-service",
"mts-actor": "custom-strategy-v1",
},
)
log.info("signal published", extra={"stream": ack.stream, "seq": ack.seq})

The NATS-Msg-Id header enables JetStream's built-in dedupe within a window (default 2 min). Same signal_id within 2 min → ignored. Safe to retry on errors.

Slow consumer protection

If you can't keep up with the message rate, JetStream drops your subscription:

sub = await js.subscribe(
"mts.v1.marketdata.quotes.SPY",
durable="my-quote-consumer",
ack_wait=10.0, # ack within 10s
max_ack_pending=1000, # at most 1k in flight
)

mts1b-operations monitors nats_consumer_lag_seconds per durable subscription. Lag > 30s triggers an alert.

Observability

Standard metrics emitted by every consumer using mts1b-platform/eventbus:

  • nats_consume_total{subject, status}
  • nats_consume_latency_seconds{subject} (publish → consume time)
  • nats_consume_errors_total{subject, error_type}
  • nats_consumer_lag_seconds{subject, durable_name}

Add to your Grafana dashboards under "Event bus health".

Common patterns

Use caseSubjectDurable?
Real-time UI updatesmts.v1.oms.>No (don't want backlog on reconnect)
Trade journalingmts.v1.oms.fills.>Yes (must not lose fills)
External webhookmts.v1.oms.orders.filledYes (downstream might be slow)
Audit / compliancemts.v1.>Yes (regulatory)
Drift monitormts.v1.research.signals.>Yes

See also