Skip to main content

mts1b_foundation.protocols — full reference

Six runtime-checkable Python Protocols (PEP 544). Implementations live in mts1b-brokers, mts1b-marketdata, mts1b-riskengine, mts1b-portfolio, mts1b-quantkit, and community plugins via mts1b-pluginsdk.

Quick example: confirm an adapter implements BrokerProtocol

from mts1b_foundation.protocols import BrokerProtocol


class MyBroker:
name = "my-broker"
async def submit(self, order): ...
async def cancel(self, order_id): ...
async def get_open_orders(self): ...
async def get_positions(self): ...
async def stream_fills(self): ...


client = MyBroker()
assert isinstance(client, BrokerProtocol) # passes — structural match

The @runtime_checkable decorator on each Protocol means isinstance() does structural typing — no inheritance required.

BrokerProtocol

@runtime_checkable
class BrokerProtocol(Protocol):
@property
def name(self) -> str: ...

async def submit(self, order: Order) -> Order:
"""Submit an order. Return the order with broker-assigned id + state."""

async def cancel(self, order_id: str) -> bool:
"""Cancel an open order. Return True on success."""

async def get_open_orders(self) -> list[Order]:
"""List open (unfilled) orders for this broker account."""

async def get_positions(self) -> list[Position]:
"""Current positions for this broker account."""

async def stream_fills(self) -> AsyncIterator[Fill]:
"""Stream fills as they occur."""

Implementation walkthrough — paper broker

import asyncio
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from decimal import Decimal
from typing import AsyncIterator

from mts1b_foundation.orders import Order, Fill
from mts1b_foundation.positions import Position
from mts1b_foundation.protocols import BrokerProtocol


@dataclass
class PaperClient:
name: str = "paper"
fee_bps_per_side: float = 5.0
_open_orders: dict[str, Order] = field(default_factory=dict)
_positions: dict[str, Position] = field(default_factory=dict)
_fill_queue: asyncio.Queue = field(default_factory=asyncio.Queue)

async def submit(self, order: Order) -> Order:
broker_oid = f"PAPER-{uuid.uuid4().hex[:8]}"
accepted = order.model_copy(update={
"submitted_at": datetime.now(timezone.utc),
"accepted_at": datetime.now(timezone.utc),
"broker_order_id": broker_oid,
})
self._open_orders[broker_oid] = accepted

# Simulate immediate fill at limit_price (or last_quote for market)
asyncio.create_task(self._fill(accepted))
return accepted

async def _fill(self, order: Order):
await asyncio.sleep(0.1)
fill = Fill(
fill_id=str(uuid.uuid4()),
order_id=order.order_id,
symbol=order.symbol,
side=order.side,
quantity=order.quantity,
price=order.limit_price or Decimal("100"),
fees=order.quantity * (order.limit_price or Decimal("100"))
* Decimal(str(self.fee_bps_per_side / 10_000)),
venue=self.name,
timestamp=datetime.now(timezone.utc),
)
await self._fill_queue.put(fill)
self._open_orders.pop(order.broker_order_id, None)

async def cancel(self, order_id: str) -> bool:
# In paper, every order fills immediately, so cancel rarely succeeds
return self._open_orders.pop(order_id, None) is not None

async def get_open_orders(self) -> list[Order]:
return list(self._open_orders.values())

async def get_positions(self) -> list[Position]:
return list(self._positions.values())

async def stream_fills(self) -> AsyncIterator[Fill]:
while True:
fill = await self._fill_queue.get()
yield fill


# Verify it implements BrokerProtocol
client = PaperClient()
assert isinstance(client, BrokerProtocol)


# Use it as the contract dictates
async def example():
order = Order(...)
submitted = await client.submit(order)
async for fill in client.stream_fills():
print(f"got fill: {fill.fill_id}")
break

What every implementation MUST do

RequirementWhy
name returns a stable stringUsed for routing, logging, metrics, audit trail
submit is idempotent on order.idempotency_keyOMS retries are safe
submit returns an Order with submitted_at populatedOMS uses it as the transition point
cancel returns True ONLY if cancellation was confirmedFail loud if cancel raced a fill
Fills include fees in account base currencyTreasury computes net P/L correctly
Streaming is replay-safe (durable consumer)Restart shouldn't lose fills

Common implementations

AdapterLocationAsset classes
papermts1b-brokers/paperall
IbkrClientmts1b-brokers/ibkrequities, options, futures, fx, crypto (PAXOS)
CoinbaseClientmts1b-brokers/coinbasecrypto
SchwabClientmts1b-brokers/schwabequities, options
KrakenClientmts1b-brokers/krakencrypto
MoomooClientmts1b-brokers/moomooequities (US/HK)
TradierClientmts1b-brokers/tradierequities, options

Community plugins (e.g., Robinhood Crypto, Tastytrade) can subclass-by-shape via mts1b-pluginsdk.

MarketDataProtocol

@runtime_checkable
class MarketDataProtocol(Protocol):
@property
def name(self) -> str: ...

async def get_quote(self, symbol: Symbol) -> Quote: ...

async def get_bars(
self, symbol: Symbol, interval: str,
start: datetime, end: datetime | None = None,
) -> list[Bar]: ...

async def get_trades(self, symbol: Symbol, asof: datetime) -> list[Trade]: ...

async def stream_quotes(self, symbols: list[Symbol]) -> AsyncIterator[Quote]: ...

Implementation walkthrough — Polygon adapter (sketch)

from dataclasses import dataclass
from datetime import datetime, timezone
from decimal import Decimal
from typing import AsyncIterator

from mts1b_foundation.market_data import Quote, Bar, Trade
from mts1b_foundation.symbology import Symbol, to_native
from mts1b_foundation.protocols import MarketDataProtocol


@dataclass
class PolygonClient:
name: str = "polygon"
api_key: str = ""

async def get_quote(self, symbol: Symbol) -> Quote:
native = to_native(symbol, venue="polygon")
r = await self._client.get(f"/v2/last/nbbo/{native}")
d = r.json()["results"]
return Quote(
symbol=symbol,
bid=Decimal(str(d["P"])),
ask=Decimal(str(d["p"])),
bid_size=d["S"],
ask_size=d["s"],
venue=self.name,
timestamp=datetime.fromtimestamp(d["t"] / 1000, tz=timezone.utc),
)

async def get_bars(self, symbol, interval, start, end=None):
# ... map interval, fetch, return list[Bar]
...

async def get_trades(self, symbol, asof):
...

async def stream_quotes(self, symbols) -> AsyncIterator[Quote]:
# WebSocket subscription
...


client = PolygonClient(api_key="...")
assert isinstance(client, MarketDataProtocol)

Symbol → wire format

Every adapter must normalize at the boundary:

# Inside MarketDataProtocol implementations
native = to_native(symbol, venue=self.name)

See symbology for the full list of venue translations.

RiskGate

@runtime_checkable
class RiskGate(Protocol):
@property
def name(self) -> str: ...

async def check(
self, order: Order, envelope: RiskEnvelope, context: dict,
) -> bool: ...

Example — implementing a custom gate

from dataclasses import dataclass

from mts1b_foundation.orders import Order
from mts1b_foundation.risk import RiskEnvelope
from mts1b_foundation.protocols import RiskGate


@dataclass
class NoTradesNearOpenGate:
"""Reject orders in the first 5 minutes after market open
(high volatility, low fill quality)."""

name: str = "no_trades_near_open"

async def check(self, order: Order, envelope: RiskEnvelope, context: dict) -> bool:
market_open = context["market_open_today"]
if (order.created_at - market_open).total_seconds() < 300:
return False # reject — within 5 minutes of open
return True


gate = NoTradesNearOpenGate()
assert isinstance(gate, RiskGate)

Register it with mts1b-riskengine to plug into the gate pipeline. Returning False triggers an OrderRejection event.

Built-in gates (in mts1b-riskengine)

NameReturns False if
idempotencyOrder key seen in dedupe window
staticBroker/order-type/asset-class not allowed; notional too big
position_riskResulting position exceeds caps
drawdown_haltFund is in halt state
short_sideShorting disabled or borrow fee too high
cro_vetoLLM CRO persona vetoed

Sizer

@runtime_checkable
class Sizer(Protocol):
def __call__(self, signal: Any, /, **params: Any) -> Any: ...

A position sizer turns a ranking into target weights.

Example — equal-weight L/S sizer

import numpy as np
from mts1b_foundation.protocols import Sizer


class EqualWeightLS:
def __call__(self, signal: np.ndarray, /, *,
n_long: int = 5, n_short: int = 5, gross: float = 1.0) -> np.ndarray:
"""Top n_long get +gross/2/n_long, bottom n_short get -gross/2/n_short."""
weights = np.zeros_like(signal)
top = np.argsort(signal)[-n_long:]
bot = np.argsort(signal)[:n_short]
weights[top] = gross / 2 / n_long
weights[bot] = -gross / 2 / n_short
return weights


sizer = EqualWeightLS()
assert isinstance(sizer, Sizer)


signal = np.array([0.5, 1.2, -0.3, 2.1, -1.5, 0.8])
weights = sizer(signal, n_long=2, n_short=2, gross=1.0)
# [0, 0.25, 0, 0.25, -0.25, -0.25]

Allocator

@runtime_checkable
class Allocator(Protocol):
def __call__(self, returns: Any, /, **params: Any) -> Any: ...

A portfolio allocator takes a returns matrix (historical) and produces weights.

Example — equal-risk-contribution

import numpy as np
import pandas as pd
from mts1b_foundation.protocols import Allocator


class EqualRiskContribution:
def __call__(self, returns: pd.DataFrame, /, *, max_iter: int = 1000) -> dict[str, float]:
cov = returns.cov().values
n = cov.shape[0]
w = np.ones(n) / n
for _ in range(max_iter):
risk_contrib = w * (cov @ w)
target = risk_contrib.sum() / n
grad = risk_contrib - target
w -= 0.001 * grad / np.maximum(np.abs(grad).max(), 1e-9)
w = np.clip(w, 0.001, 1.0)
w /= w.sum()
return dict(zip(returns.columns, w))


alloc = EqualRiskContribution()
assert isinstance(alloc, Allocator)

Implementations of HRP, Black-Litterman, Markowitz, Ledoit-Wolf all conform to this Protocol and live in mts1b-quantkit.

FactorFn

@runtime_checkable
class FactorFn(Protocol):
def __call__(self, panel: UniversePanel, /, **params: Any) -> Any: ...

A factor function. Convention: name starts with f_, registered via mts1b_quantkit.factors.register.

Example — momentum factor

import numpy as np
from mts1b_foundation.market_data import UniversePanel
from mts1b_foundation.protocols import FactorFn


def f_momentum_12_1(panel: UniversePanel, /, h_long: int = 252, h_skip: int = 21) -> np.ndarray:
"""Classic 12-1 momentum: 12-month return excluding the most recent month."""
close = panel.close # (T, A)
ret = close[-h_skip-1] / close[-h_long-h_skip-1] - 1
return zscore_cross_sectional(ret)


# Note: a function is implicitly callable; isinstance works
assert isinstance(f_momentum_12_1, FactorFn)

Why runtime-checkable Protocols (not ABCs)

FeatureABC@runtime_checkable Protocol
Inheritance required❌ (structural)
Plugin authors must import the ABC
Old-Python compatible✅ (PEP 544 = 3.8+)
Plays nicely with @dataclass⚠️ (need explicit init)
isinstance() works

Choosing Protocol means:

# Plugin author writes:
@dataclass
class MyBroker:
name: str = "my-broker"
async def submit(self, order): ...
# ... etc

# No need to:
# from mts1b_foundation.protocols.broker import BrokerProtocol
# class MyBroker(BrokerProtocol): ...

This is a major design choice. See Design — Why Python Protocols, not ABCs in the foundation design doc.

Verifying protocol conformance in tests

# tests/test_protocol_conformance.py
import pytest
from mts1b_foundation.protocols import BrokerProtocol
from my_broker import MyBroker


def test_broker_protocol():
"""Plugin author's smoke test — pre-generated by mts1b-pluginsdk."""
client = MyBroker(api_key="dummy")
assert isinstance(client, BrokerProtocol)


def test_all_methods_async():
"""Belt + braces — ensure methods are coroutines."""
import inspect
client = MyBroker(api_key="dummy")
for method_name in ("submit", "cancel", "get_open_orders",
"get_positions", "stream_fills"):
method = getattr(client, method_name)
assert inspect.iscoroutinefunction(method) or \
inspect.isasyncgenfunction(method), f"{method_name} must be async"

See also

  • ordersOrder is the input to BrokerProtocol.submit
  • market_dataQuote, Bar, Trade come from MarketDataProtocol
  • riskRiskEnvelope is one of RiskGate.check's parameters
  • mts1b-pluginsdk — community plugin authoring kit
  • Tutorial: Add a broker — end-to-end implementation walkthrough