mts1b_foundation.protocols — full reference
Six runtime-checkable Python Protocols (PEP 544). Implementations live in mts1b-brokers, mts1b-marketdata, mts1b-riskengine, mts1b-portfolio, mts1b-quantkit, and community plugins via mts1b-pluginsdk.
Quick example: confirm an adapter implements BrokerProtocol
from mts1b_foundation.protocols import BrokerProtocol
class MyBroker:
name = "my-broker"
async def submit(self, order): ...
async def cancel(self, order_id): ...
async def get_open_orders(self): ...
async def get_positions(self): ...
async def stream_fills(self): ...
client = MyBroker()
assert isinstance(client, BrokerProtocol) # passes — structural match
The @runtime_checkable decorator on each Protocol means isinstance() does structural typing — no inheritance required.
BrokerProtocol
@runtime_checkable
class BrokerProtocol(Protocol):
@property
def name(self) -> str: ...
async def submit(self, order: Order) -> Order:
"""Submit an order. Return the order with broker-assigned id + state."""
async def cancel(self, order_id: str) -> bool:
"""Cancel an open order. Return True on success."""
async def get_open_orders(self) -> list[Order]:
"""List open (unfilled) orders for this broker account."""
async def get_positions(self) -> list[Position]:
"""Current positions for this broker account."""
async def stream_fills(self) -> AsyncIterator[Fill]:
"""Stream fills as they occur."""
Implementation walkthrough — paper broker
import asyncio
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from decimal import Decimal
from typing import AsyncIterator
from mts1b_foundation.orders import Order, Fill
from mts1b_foundation.positions import Position
from mts1b_foundation.protocols import BrokerProtocol
@dataclass
class PaperClient:
name: str = "paper"
fee_bps_per_side: float = 5.0
_open_orders: dict[str, Order] = field(default_factory=dict)
_positions: dict[str, Position] = field(default_factory=dict)
_fill_queue: asyncio.Queue = field(default_factory=asyncio.Queue)
async def submit(self, order: Order) -> Order:
broker_oid = f"PAPER-{uuid.uuid4().hex[:8]}"
accepted = order.model_copy(update={
"submitted_at": datetime.now(timezone.utc),
"accepted_at": datetime.now(timezone.utc),
"broker_order_id": broker_oid,
})
self._open_orders[broker_oid] = accepted
# Simulate immediate fill at limit_price (or last_quote for market)
asyncio.create_task(self._fill(accepted))
return accepted
async def _fill(self, order: Order):
await asyncio.sleep(0.1)
fill = Fill(
fill_id=str(uuid.uuid4()),
order_id=order.order_id,
symbol=order.symbol,
side=order.side,
quantity=order.quantity,
price=order.limit_price or Decimal("100"),
fees=order.quantity * (order.limit_price or Decimal("100"))
* Decimal(str(self.fee_bps_per_side / 10_000)),
venue=self.name,
timestamp=datetime.now(timezone.utc),
)
await self._fill_queue.put(fill)
self._open_orders.pop(order.broker_order_id, None)
async def cancel(self, order_id: str) -> bool:
# In paper, every order fills immediately, so cancel rarely succeeds
return self._open_orders.pop(order_id, None) is not None
async def get_open_orders(self) -> list[Order]:
return list(self._open_orders.values())
async def get_positions(self) -> list[Position]:
return list(self._positions.values())
async def stream_fills(self) -> AsyncIterator[Fill]:
while True:
fill = await self._fill_queue.get()
yield fill
# Verify it implements BrokerProtocol
client = PaperClient()
assert isinstance(client, BrokerProtocol)
# Use it as the contract dictates
async def example():
order = Order(...)
submitted = await client.submit(order)
async for fill in client.stream_fills():
print(f"got fill: {fill.fill_id}")
break
What every implementation MUST do
| Requirement | Why |
|---|---|
name returns a stable string | Used for routing, logging, metrics, audit trail |
submit is idempotent on order.idempotency_key | OMS retries are safe |
submit returns an Order with submitted_at populated | OMS uses it as the transition point |
cancel returns True ONLY if cancellation was confirmed | Fail loud if cancel raced a fill |
Fills include fees in account base currency | Treasury computes net P/L correctly |
| Streaming is replay-safe (durable consumer) | Restart shouldn't lose fills |
Common implementations
| Adapter | Location | Asset classes |
|---|---|---|
paper | mts1b-brokers/paper | all |
IbkrClient | mts1b-brokers/ibkr | equities, options, futures, fx, crypto (PAXOS) |
CoinbaseClient | mts1b-brokers/coinbase | crypto |
SchwabClient | mts1b-brokers/schwab | equities, options |
KrakenClient | mts1b-brokers/kraken | crypto |
MoomooClient | mts1b-brokers/moomoo | equities (US/HK) |
TradierClient | mts1b-brokers/tradier | equities, options |
Community plugins (e.g., Robinhood Crypto, Tastytrade) can subclass-by-shape via mts1b-pluginsdk.
MarketDataProtocol
@runtime_checkable
class MarketDataProtocol(Protocol):
@property
def name(self) -> str: ...
async def get_quote(self, symbol: Symbol) -> Quote: ...
async def get_bars(
self, symbol: Symbol, interval: str,
start: datetime, end: datetime | None = None,
) -> list[Bar]: ...
async def get_trades(self, symbol: Symbol, asof: datetime) -> list[Trade]: ...
async def stream_quotes(self, symbols: list[Symbol]) -> AsyncIterator[Quote]: ...
Implementation walkthrough — Polygon adapter (sketch)
from dataclasses import dataclass
from datetime import datetime, timezone
from decimal import Decimal
from typing import AsyncIterator
from mts1b_foundation.market_data import Quote, Bar, Trade
from mts1b_foundation.symbology import Symbol, to_native
from mts1b_foundation.protocols import MarketDataProtocol
@dataclass
class PolygonClient:
name: str = "polygon"
api_key: str = ""
async def get_quote(self, symbol: Symbol) -> Quote:
native = to_native(symbol, venue="polygon")
r = await self._client.get(f"/v2/last/nbbo/{native}")
d = r.json()["results"]
return Quote(
symbol=symbol,
bid=Decimal(str(d["P"])),
ask=Decimal(str(d["p"])),
bid_size=d["S"],
ask_size=d["s"],
venue=self.name,
timestamp=datetime.fromtimestamp(d["t"] / 1000, tz=timezone.utc),
)
async def get_bars(self, symbol, interval, start, end=None):
# ... map interval, fetch, return list[Bar]
...
async def get_trades(self, symbol, asof):
...
async def stream_quotes(self, symbols) -> AsyncIterator[Quote]:
# WebSocket subscription
...
client = PolygonClient(api_key="...")
assert isinstance(client, MarketDataProtocol)
Symbol → wire format
Every adapter must normalize at the boundary:
# Inside MarketDataProtocol implementations
native = to_native(symbol, venue=self.name)
See symbology for the full list of venue translations.
RiskGate
@runtime_checkable
class RiskGate(Protocol):
@property
def name(self) -> str: ...
async def check(
self, order: Order, envelope: RiskEnvelope, context: dict,
) -> bool: ...
Example — implementing a custom gate
from dataclasses import dataclass
from mts1b_foundation.orders import Order
from mts1b_foundation.risk import RiskEnvelope
from mts1b_foundation.protocols import RiskGate
@dataclass
class NoTradesNearOpenGate:
"""Reject orders in the first 5 minutes after market open
(high volatility, low fill quality)."""
name: str = "no_trades_near_open"
async def check(self, order: Order, envelope: RiskEnvelope, context: dict) -> bool:
market_open = context["market_open_today"]
if (order.created_at - market_open).total_seconds() < 300:
return False # reject — within 5 minutes of open
return True
gate = NoTradesNearOpenGate()
assert isinstance(gate, RiskGate)
Register it with mts1b-riskengine to plug into the gate pipeline. Returning False triggers an OrderRejection event.
Built-in gates (in mts1b-riskengine)
| Name | Returns False if |
|---|---|
idempotency | Order key seen in dedupe window |
static | Broker/order-type/asset-class not allowed; notional too big |
position_risk | Resulting position exceeds caps |
drawdown_halt | Fund is in halt state |
short_side | Shorting disabled or borrow fee too high |
cro_veto | LLM CRO persona vetoed |
Sizer
@runtime_checkable
class Sizer(Protocol):
def __call__(self, signal: Any, /, **params: Any) -> Any: ...
A position sizer turns a ranking into target weights.
Example — equal-weight L/S sizer
import numpy as np
from mts1b_foundation.protocols import Sizer
class EqualWeightLS:
def __call__(self, signal: np.ndarray, /, *,
n_long: int = 5, n_short: int = 5, gross: float = 1.0) -> np.ndarray:
"""Top n_long get +gross/2/n_long, bottom n_short get -gross/2/n_short."""
weights = np.zeros_like(signal)
top = np.argsort(signal)[-n_long:]
bot = np.argsort(signal)[:n_short]
weights[top] = gross / 2 / n_long
weights[bot] = -gross / 2 / n_short
return weights
sizer = EqualWeightLS()
assert isinstance(sizer, Sizer)
signal = np.array([0.5, 1.2, -0.3, 2.1, -1.5, 0.8])
weights = sizer(signal, n_long=2, n_short=2, gross=1.0)
# [0, 0.25, 0, 0.25, -0.25, -0.25]
Allocator
@runtime_checkable
class Allocator(Protocol):
def __call__(self, returns: Any, /, **params: Any) -> Any: ...
A portfolio allocator takes a returns matrix (historical) and produces weights.
Example — equal-risk-contribution
import numpy as np
import pandas as pd
from mts1b_foundation.protocols import Allocator
class EqualRiskContribution:
def __call__(self, returns: pd.DataFrame, /, *, max_iter: int = 1000) -> dict[str, float]:
cov = returns.cov().values
n = cov.shape[0]
w = np.ones(n) / n
for _ in range(max_iter):
risk_contrib = w * (cov @ w)
target = risk_contrib.sum() / n
grad = risk_contrib - target
w -= 0.001 * grad / np.maximum(np.abs(grad).max(), 1e-9)
w = np.clip(w, 0.001, 1.0)
w /= w.sum()
return dict(zip(returns.columns, w))
alloc = EqualRiskContribution()
assert isinstance(alloc, Allocator)
Implementations of HRP, Black-Litterman, Markowitz, Ledoit-Wolf all conform to this Protocol and live in mts1b-quantkit.
FactorFn
@runtime_checkable
class FactorFn(Protocol):
def __call__(self, panel: UniversePanel, /, **params: Any) -> Any: ...
A factor function. Convention: name starts with f_, registered via mts1b_quantkit.factors.register.
Example — momentum factor
import numpy as np
from mts1b_foundation.market_data import UniversePanel
from mts1b_foundation.protocols import FactorFn
def f_momentum_12_1(panel: UniversePanel, /, h_long: int = 252, h_skip: int = 21) -> np.ndarray:
"""Classic 12-1 momentum: 12-month return excluding the most recent month."""
close = panel.close # (T, A)
ret = close[-h_skip-1] / close[-h_long-h_skip-1] - 1
return zscore_cross_sectional(ret)
# Note: a function is implicitly callable; isinstance works
assert isinstance(f_momentum_12_1, FactorFn)
Why runtime-checkable Protocols (not ABCs)
| Feature | ABC | @runtime_checkable Protocol |
|---|---|---|
| Inheritance required | ✅ | ❌ (structural) |
| Plugin authors must import the ABC | ✅ | ❌ |
| Old-Python compatible | ✅ | ✅ (PEP 544 = 3.8+) |
Plays nicely with @dataclass | ⚠️ (need explicit init) | ✅ |
isinstance() works | ✅ | ✅ |
Choosing Protocol means:
# Plugin author writes:
@dataclass
class MyBroker:
name: str = "my-broker"
async def submit(self, order): ...
# ... etc
# No need to:
# from mts1b_foundation.protocols.broker import BrokerProtocol
# class MyBroker(BrokerProtocol): ...
This is a major design choice. See Design — Why Python Protocols, not ABCs in the foundation design doc.
Verifying protocol conformance in tests
# tests/test_protocol_conformance.py
import pytest
from mts1b_foundation.protocols import BrokerProtocol
from my_broker import MyBroker
def test_broker_protocol():
"""Plugin author's smoke test — pre-generated by mts1b-pluginsdk."""
client = MyBroker(api_key="dummy")
assert isinstance(client, BrokerProtocol)
def test_all_methods_async():
"""Belt + braces — ensure methods are coroutines."""
import inspect
client = MyBroker(api_key="dummy")
for method_name in ("submit", "cancel", "get_open_orders",
"get_positions", "stream_fills"):
method = getattr(client, method_name)
assert inspect.iscoroutinefunction(method) or \
inspect.isasyncgenfunction(method), f"{method_name} must be async"
See also
orders—Orderis the input toBrokerProtocol.submitmarket_data—Quote,Bar,Tradecome fromMarketDataProtocolrisk—RiskEnvelopeis one ofRiskGate.check's parametersmts1b-pluginsdk— community plugin authoring kit- Tutorial: Add a broker — end-to-end implementation walkthrough