mts1b_foundation.nats — full reference
NATS subject helpers + schema version negotiation. Pure helpers — no NATS client connection logic (that's in mts1b-platform/eventbus).
Quick example
from mts1b_foundation.nats import subject, negotiate, get_registry
# Build a canonical subject
subj = subject("mts1b-oms", "orders", "created")
# "mts.v1.oms.orders.created"
# With fund-scoped partition
subj = subject("mts1b-treasury", "nav", "updated", fund_id="paper-momentum")
# "mts.v1.treasury.nav.updated.fund.paper-momentum"
# Custom version
subj = subject("mts1b-oms", "orders", "created", version=2)
# "mts.v2.oms.orders.created"
# Discover all known subjects
for s, payload_type in get_registry().items():
print(f" {s:<55} → {payload_type}")
subject(repo, noun, verb, *, version=1, fund_id=None) -> str
Build a canonical NATS subject.
mts.v<version>.<repo>.<noun>.<verb>[.fund.<fund_id>]
The <repo> part automatically strips the mts1b- prefix.
Parameters
| Param | Type | Default | Notes |
|---|---|---|---|
repo | str | required | e.g. "mts1b-oms" or just "oms" |
noun | str | required | domain entity: "orders", "fills", "signals", ... |
verb | str | required | past-tense action: "created", "filled", "updated", ... |
version | int | 1 | schema version |
fund_id | str / None | None | optional partition key |
Examples
subject("mts1b-oms", "orders", "created") # "mts.v1.oms.orders.created"
subject("mts1b-oms", "fills", "created") # "mts.v1.oms.fills.created"
subject("mts1b-research", "signals", "published") # "mts.v1.research.signals.published"
subject("mts1b-risk", "envelope","updated") # "mts.v1.risk.envelope.updated"
subject("mts1b-risk", "gate", "failed") # "mts.v1.risk.gate.failed"
subject("mts1b-treasury", "nav", "updated", fund_id="paper")
# "mts.v1.treasury.nav.updated.fund.paper"
# Short repo name (drop the mts1b- prefix yourself; same result)
subject("oms", "orders", "created") # "mts.v1.oms.orders.created"
Wildcard subscriptions (NATS-level, not in this API)
While this function builds concrete subjects, NATS subscribers can use * and >:
mts.v1.oms.> # everything OMS publishes
mts.v1.*.*.created # all `created` events across all repos
mts.v1.oms.fills.* # all OMS fill events
mts.v1.treasury.nav.updated.fund.paper-momentum # one fund
mts.v1.treasury.nav.updated.fund.* # all funds
negotiate(consumer_manifests) -> int
Pick the highest schema version every consumer can handle. Used at producer startup to choose which subject version to publish on.
Manifest shape
Each consumer declares the range of versions it understands:
manifest = {
"consumer": "my-service", # human name (for error messages)
"min_v": 1, # lowest supported version
"max_v": 2, # highest supported version
}
Algorithm
common_versions = {1, 2, 3, ..., MAX} ∩ [min_v_1, max_v_1] ∩ [min_v_2, max_v_2] ∩ ...
return max(common_versions)
If common_versions is empty, raises IncompatibleConsumersError.
Examples
from mts1b_foundation.nats import negotiate, IncompatibleConsumersError
# All consumers support v1+v2 → pick v2
v = negotiate([
{"consumer": "research", "min_v": 1, "max_v": 2},
{"consumer": "risk", "min_v": 1, "max_v": 2},
])
# v == 2
# One consumer is v1-only → fall back to v1
v = negotiate([
{"consumer": "research", "min_v": 1, "max_v": 2},
{"consumer": "legacy", "min_v": 1, "max_v": 1},
])
# v == 1
# Two consumers, disjoint ranges → error
try:
negotiate([
{"consumer": "a", "min_v": 1, "max_v": 1},
{"consumer": "b", "min_v": 2, "max_v": 2},
])
except IncompatibleConsumersError as e:
print(e)
# no common version across 2 consumers; manifests: [{'consumer': 'a', ...}, ...]
# Empty manifest list → default to v1
negotiate([]) # 1
Subject registry
get_registry() returns the canonical subject → payload-type map.
from mts1b_foundation.nats import get_registry
REG = get_registry()
print(REG["mts.v1.oms.orders.created"])
# "mts1b_foundation.orders.Order"
Known subjects (Wave 1)
| Subject | Payload type |
|---|---|
mts.v1.oms.orders.created | mts1b_foundation.orders.Order |
mts.v1.oms.orders.accepted | mts1b_foundation.orders.Order |
mts.v1.oms.orders.submitted | mts1b_foundation.orders.Order |
mts.v1.oms.orders.filled | mts1b_foundation.orders.Order |
mts.v1.oms.orders.partial | mts1b_foundation.orders.Order |
mts.v1.oms.orders.canceled | mts1b_foundation.orders.Order |
mts.v1.oms.orders.rejected | mts1b_foundation.orders.Order |
mts.v1.oms.orders.closed | mts1b_foundation.orders.Order |
mts.v1.oms.fills.created | mts1b_foundation.orders.Fill |
mts.v1.research.signals.published | mts1b_foundation.signals.Signal |
mts.v1.research.ladder.stage_complete | dict |
mts.v1.research.drift.measured | dict |
mts.v1.risk.envelope.updated | mts1b_foundation.risk.RiskEnvelope |
mts.v1.risk.gate.failed | mts1b_foundation.risk.OrderRejection |
mts.v1.risk.gate.passed | dict |
mts.v1.risk.halt.requested | mts1b_foundation.risk.HaltRequest |
mts.v1.treasury.nav.updated | mts1b_foundation.funds.NavSnapshot |
mts.v1.treasury.transfers.requested | mts1b_foundation.funds.TransferRequest |
mts.v1.operations.halt.requested | mts1b_foundation.risk.HaltRequest |
mts.v1.operations.audit.appended | dict |
Dynamic subjects (not in static registry)
Per-symbol or per-fund subjects don't appear in the registry but follow the convention:
| Pattern | Payload |
|---|---|
mts.v1.marketdata.quotes.<symbol> | Quote |
mts.v1.marketdata.bars.<symbol>.<interval> | Bar |
mts.v1.brokers.<broker>.fills.raw | broker-native dict |
mts.v1.treasury.nav.updated.fund.<fund_id> | NavSnapshot |
class IncompatibleConsumersError(ValueError)
Raised by negotiate() when no common schema version exists.
from mts1b_foundation.nats import IncompatibleConsumersError, negotiate
try:
v = negotiate([
{"consumer": "a", "min_v": 1, "max_v": 1},
{"consumer": "b", "min_v": 3, "max_v": 5},
])
except IncompatibleConsumersError as e:
print("startup-fail:", e)
# Action: upgrade consumer 'a' to support v3+, OR pin producer to v1 and skip 'b'
End-to-end producer/consumer pattern
This module produces the subject strings; the actual NATS connection + publish/subscribe lives in mts1b-platform/eventbus.
import asyncio
from datetime import datetime, timezone
from mts1b_foundation.nats import subject, negotiate
from mts1b_foundation.orders import Order
from mts1b_platform.eventbus import connect
async def main():
# 1. Negotiate version with downstream consumers
consumer_manifests = await fetch_consumer_manifests("mts.v?.oms.orders.>")
version = negotiate(consumer_manifests)
# → e.g. version=1 if any consumer is v1-only
# 2. Build subject
subj = subject("mts1b-oms", "orders", "created", version=version)
# "mts.v1.oms.orders.created"
# 3. Connect + publish via platform
nc = await connect()
js = nc.jetstream()
order = Order(...)
await js.publish(
subject=subj,
payload=order.model_dump_json().encode(),
headers={
"NATS-Msg-Id": order.idempotency_key,
"mts-schema": f"mts1b_foundation.orders.Order:v{version}",
"mts-source-repo": "mts1b-oms",
"mts-actor": order.actor,
},
)
asyncio.run(main())
Subject discovery via NATS
NATS itself doesn't index "active subjects". For discovery, we use a side-channel: consumers register their interest in mts.meta.consumers at startup.
# Producer queries the meta channel for consumers of orders.>
async def fetch_consumer_manifests(subject_pattern: str) -> list[dict]:
"""Ask each consumer to declare its version range."""
nc = await connect()
response = await nc.request(
"mts.meta.consumers.query",
payload=json.dumps({"pattern": subject_pattern}).encode(),
timeout=5.0,
)
return json.loads(response.data)
The implementation lives in mts1b-platform/eventbus; the protocol is documented here.
Versioning strategy in practice
| Scenario | What to do |
|---|---|
Adding a field to Order | Increment minor version of mts1b-foundation; subjects stay at v1 |
| Adding a required field | Major version bump → producers emit on v2; v1 subjects deprecated for 6 months |
| Removing a field | Major bump → field becomes optional in v1 for 6 months, gone in v2 |
| Renaming a subject | New repo / noun / verb → new subject path; v1 retained for transition |
The 6-month transition window: every consumer learns "both" versions; both subjects publish; consumer eventually migrates; v1 producer is retired.
See also
- Concept: Event bus — full NATS architecture
platform.eventbus— connect, JetStream, schema validationorders—Orderis the payload for manyoms.orders.*subjects