Skip to main content

mts1b_foundation.nats — full reference

NATS subject helpers + schema version negotiation. Pure helpers — no NATS client connection logic (that's in mts1b-platform/eventbus).

Quick example

from mts1b_foundation.nats import subject, negotiate, get_registry

# Build a canonical subject
subj = subject("mts1b-oms", "orders", "created")
# "mts.v1.oms.orders.created"

# With fund-scoped partition
subj = subject("mts1b-treasury", "nav", "updated", fund_id="paper-momentum")
# "mts.v1.treasury.nav.updated.fund.paper-momentum"

# Custom version
subj = subject("mts1b-oms", "orders", "created", version=2)
# "mts.v2.oms.orders.created"

# Discover all known subjects
for s, payload_type in get_registry().items():
print(f" {s:<55}{payload_type}")

subject(repo, noun, verb, *, version=1, fund_id=None) -> str

Build a canonical NATS subject.

mts.v<version>.<repo>.<noun>.<verb>[.fund.<fund_id>]

The <repo> part automatically strips the mts1b- prefix.

Parameters

ParamTypeDefaultNotes
repostrrequirede.g. "mts1b-oms" or just "oms"
nounstrrequireddomain entity: "orders", "fills", "signals", ...
verbstrrequiredpast-tense action: "created", "filled", "updated", ...
versionint1schema version
fund_idstr / NoneNoneoptional partition key

Examples

subject("mts1b-oms", "orders", "created") # "mts.v1.oms.orders.created"
subject("mts1b-oms", "fills", "created") # "mts.v1.oms.fills.created"
subject("mts1b-research", "signals", "published") # "mts.v1.research.signals.published"
subject("mts1b-risk", "envelope","updated") # "mts.v1.risk.envelope.updated"
subject("mts1b-risk", "gate", "failed") # "mts.v1.risk.gate.failed"
subject("mts1b-treasury", "nav", "updated", fund_id="paper")
# "mts.v1.treasury.nav.updated.fund.paper"

# Short repo name (drop the mts1b- prefix yourself; same result)
subject("oms", "orders", "created") # "mts.v1.oms.orders.created"

Wildcard subscriptions (NATS-level, not in this API)

While this function builds concrete subjects, NATS subscribers can use * and >:

mts.v1.oms.> # everything OMS publishes
mts.v1.*.*.created # all `created` events across all repos
mts.v1.oms.fills.* # all OMS fill events
mts.v1.treasury.nav.updated.fund.paper-momentum # one fund
mts.v1.treasury.nav.updated.fund.* # all funds

negotiate(consumer_manifests) -> int

Pick the highest schema version every consumer can handle. Used at producer startup to choose which subject version to publish on.

Manifest shape

Each consumer declares the range of versions it understands:

manifest = {
"consumer": "my-service", # human name (for error messages)
"min_v": 1, # lowest supported version
"max_v": 2, # highest supported version
}

Algorithm

common_versions = {1, 2, 3, ..., MAX}[min_v_1, max_v_1][min_v_2, max_v_2]...
return max(common_versions)

If common_versions is empty, raises IncompatibleConsumersError.

Examples

from mts1b_foundation.nats import negotiate, IncompatibleConsumersError

# All consumers support v1+v2 → pick v2
v = negotiate([
{"consumer": "research", "min_v": 1, "max_v": 2},
{"consumer": "risk", "min_v": 1, "max_v": 2},
])
# v == 2


# One consumer is v1-only → fall back to v1
v = negotiate([
{"consumer": "research", "min_v": 1, "max_v": 2},
{"consumer": "legacy", "min_v": 1, "max_v": 1},
])
# v == 1


# Two consumers, disjoint ranges → error
try:
negotiate([
{"consumer": "a", "min_v": 1, "max_v": 1},
{"consumer": "b", "min_v": 2, "max_v": 2},
])
except IncompatibleConsumersError as e:
print(e)
# no common version across 2 consumers; manifests: [{'consumer': 'a', ...}, ...]


# Empty manifest list → default to v1
negotiate([]) # 1

Subject registry

get_registry() returns the canonical subject → payload-type map.

from mts1b_foundation.nats import get_registry

REG = get_registry()
print(REG["mts.v1.oms.orders.created"])
# "mts1b_foundation.orders.Order"

Known subjects (Wave 1)

SubjectPayload type
mts.v1.oms.orders.createdmts1b_foundation.orders.Order
mts.v1.oms.orders.acceptedmts1b_foundation.orders.Order
mts.v1.oms.orders.submittedmts1b_foundation.orders.Order
mts.v1.oms.orders.filledmts1b_foundation.orders.Order
mts.v1.oms.orders.partialmts1b_foundation.orders.Order
mts.v1.oms.orders.canceledmts1b_foundation.orders.Order
mts.v1.oms.orders.rejectedmts1b_foundation.orders.Order
mts.v1.oms.orders.closedmts1b_foundation.orders.Order
mts.v1.oms.fills.createdmts1b_foundation.orders.Fill
mts.v1.research.signals.publishedmts1b_foundation.signals.Signal
mts.v1.research.ladder.stage_completedict
mts.v1.research.drift.measureddict
mts.v1.risk.envelope.updatedmts1b_foundation.risk.RiskEnvelope
mts.v1.risk.gate.failedmts1b_foundation.risk.OrderRejection
mts.v1.risk.gate.passeddict
mts.v1.risk.halt.requestedmts1b_foundation.risk.HaltRequest
mts.v1.treasury.nav.updatedmts1b_foundation.funds.NavSnapshot
mts.v1.treasury.transfers.requestedmts1b_foundation.funds.TransferRequest
mts.v1.operations.halt.requestedmts1b_foundation.risk.HaltRequest
mts.v1.operations.audit.appendeddict

Dynamic subjects (not in static registry)

Per-symbol or per-fund subjects don't appear in the registry but follow the convention:

PatternPayload
mts.v1.marketdata.quotes.<symbol>Quote
mts.v1.marketdata.bars.<symbol>.<interval>Bar
mts.v1.brokers.<broker>.fills.rawbroker-native dict
mts.v1.treasury.nav.updated.fund.<fund_id>NavSnapshot

class IncompatibleConsumersError(ValueError)

Raised by negotiate() when no common schema version exists.

from mts1b_foundation.nats import IncompatibleConsumersError, negotiate

try:
v = negotiate([
{"consumer": "a", "min_v": 1, "max_v": 1},
{"consumer": "b", "min_v": 3, "max_v": 5},
])
except IncompatibleConsumersError as e:
print("startup-fail:", e)
# Action: upgrade consumer 'a' to support v3+, OR pin producer to v1 and skip 'b'

End-to-end producer/consumer pattern

This module produces the subject strings; the actual NATS connection + publish/subscribe lives in mts1b-platform/eventbus.

import asyncio
from datetime import datetime, timezone
from mts1b_foundation.nats import subject, negotiate
from mts1b_foundation.orders import Order
from mts1b_platform.eventbus import connect


async def main():
# 1. Negotiate version with downstream consumers
consumer_manifests = await fetch_consumer_manifests("mts.v?.oms.orders.>")
version = negotiate(consumer_manifests)
# → e.g. version=1 if any consumer is v1-only

# 2. Build subject
subj = subject("mts1b-oms", "orders", "created", version=version)
# "mts.v1.oms.orders.created"

# 3. Connect + publish via platform
nc = await connect()
js = nc.jetstream()

order = Order(...)
await js.publish(
subject=subj,
payload=order.model_dump_json().encode(),
headers={
"NATS-Msg-Id": order.idempotency_key,
"mts-schema": f"mts1b_foundation.orders.Order:v{version}",
"mts-source-repo": "mts1b-oms",
"mts-actor": order.actor,
},
)


asyncio.run(main())

Subject discovery via NATS

NATS itself doesn't index "active subjects". For discovery, we use a side-channel: consumers register their interest in mts.meta.consumers at startup.

# Producer queries the meta channel for consumers of orders.>
async def fetch_consumer_manifests(subject_pattern: str) -> list[dict]:
"""Ask each consumer to declare its version range."""
nc = await connect()
response = await nc.request(
"mts.meta.consumers.query",
payload=json.dumps({"pattern": subject_pattern}).encode(),
timeout=5.0,
)
return json.loads(response.data)

The implementation lives in mts1b-platform/eventbus; the protocol is documented here.

Versioning strategy in practice

ScenarioWhat to do
Adding a field to OrderIncrement minor version of mts1b-foundation; subjects stay at v1
Adding a required fieldMajor version bump → producers emit on v2; v1 subjects deprecated for 6 months
Removing a fieldMajor bump → field becomes optional in v1 for 6 months, gone in v2
Renaming a subjectNew repo / noun / verb → new subject path; v1 retained for transition

The 6-month transition window: every consumer learns "both" versions; both subjects publish; consumer eventually migrates; v1 producer is retired.

See also