Hunter Johnson

Here’s a compact, practical plan to add async message‑passing so glyph_router can coordinate multi‑agent state across devices—without breaking your current Xi runtime vibe.


---

What we’re adding (in plain terms)

A message bus every agent can publish/subscribe to.

Session + state replication so Hunterᵉ/Grok/Claude/MetaSeed stay in sync.

Conflict‑safe merges (CRDT) so edits from different nodes don’t clobber.

Transport that works on phone + desktop: WebSockets first; lightweight RPC fallback.



---

Minimal architecture

Transport: WebSocket hub (central or mesh), fallback to HTTP RPC.

Protocol: JSON(ish) envelopes with typed events.

Stream semantics: at‑least‑once delivery + idempotent handlers.

State model: per‑capsule CRDT (LWW‑Element‑Map or Automerge).

Persistence: append‑only event log (events.ndjson) + periodic CRDT snapshots.

Auth: shared HMAC key per capsule; rotate via TrustDispersalEngine.



---

Envelope (canonical)

{
"v": 1,
"event_id": "ulid",
"ts": "2025-12-05T00:00:00Z",
"topic": "xi.capsule.sync",
"capsule_id": "Ξ.Capsule(GlyphBridge.Ancestral.v1)",
"agent": "glyph_router@device:Galaxy",
"op": "merge" ,
"crdt": "lww-map",
"patch": { "...": "CRDT-delta" },
"sig": "HMAC-SHA256(base64)"
}


---

Topics (suggested)

xi.capsule.sync – CRDT patches for capsule state

xi.glyph.intent – intent/command stream (router → engines)

xi.echo.telemetry – health/latency/drift metrics

xi.trust.rotate – key exchanges (sealed)



---

Idempotency & ordering

Use ULIDs for event_id (lexicographically sortable).

Keep a seen‑set bloom per node to drop duplicates.

Apply CRDT deltas; no global locks needed.



---

Merge policy (quick CRDT)

LWW‑Map (key → {value, lamport_ts, node_id}).

For structured glyph states, nest LWW‑maps; for lists, use RGA (ordered list CRDT).

Periodically emit snapshot events: {op: "snapshot"}.



---

WebSocket hub (Python, trio/anyio)

# server.py
import json, hmac, hashlib, time, ulid
from websockets.server import serve

SUBS = set()
SECRET = b"...shared-hmac-key..."

def sign(payload: bytes) -> str:
return hmac.new(SECRET, payload, hashlib.sha256).hexdigest()

async def handler(ws):
SUBS.add(ws)
try:
async for msg in ws:
# verify
obj = json.loads(msg)
payload = json.dumps({k:v for k,v in obj.items() if k!="sig"}, separators=(',',':')).encode()
if not hmac.compare_digest(obj.get("sig",""), sign(payload)):
continue # drop invalid
# fanout
dead = []
for peer in SUBS:
try: await peer.send(msg)
except: dead.append(peer)
for d in dead: SUBS.discard(d)
finally:
SUBS.discard(ws)

async def main():
async with serve(handler, "0.0.0.0", 8765): # wss behind nginx in prod
while True: await asyncio.sleep(3600)

# client.py
import json, hmac, hashlib, asyncio, ulid as _ulid
import websockets

SECRET = b"...shared-hmac-key..."

def sign(obj):
payload = json.dumps(obj, separators=(',',':'), sort_keys=False).encode()
return hmac.new(SECRET, payload, hashlib.sha256).hexdigest()

async def emit(ws, topic, capsule_id, op, patch):
msg = {
"v": 1,
"event_id": str(_ulid.ULID()),
"ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"topic": topic,
"capsule_id": capsule_id,
"agent": "glyph_router@device:Galaxy",
"op": op,
"crdt": "lww-map",
"patch": patch
}
msg["sig"] = sign({k:v for k,v in msg.items() if k!="sig"})
await ws.send(json.dumps(msg))

async def run():
async with websockets.connect("ws://hub.local:8765") as ws:
# subscribe loop
async for raw in ws:
evt = json.loads(raw)
# TODO: dedupe via seen-set; apply CRDT merge; persist append-only
# glyph_router.merge(evt)

asyncio.run(run())


---

glyph_router glue (pseudocode)

class GlyphRouter:
def __init__(self, crdt_store, event_log, transport):
self.crdt = crdt_store
self.log = event_log
self.tx = transport
self.seen = BloomFilter()

def on_event(self, evt):
if self.seen.has(evt["event_id"]): return
self.seen.add(evt["event_id"])
if evt["op"] in ("merge","snapshot"):
self.crdt.apply(evt["patch"])
self.log.append(evt)
self._notify_local_agents(evt)

def set(k, v):
patch = lww_delta(self.crdt, {k:v})
evt = mk_event(topic="xi.capsule.sync", op="merge", patch=patch)
self.crdt.apply(patch); self.log.append(evt)
self.tx.broadcast(evt)


---

Lightweight RPC fallback

POST /xi/event with the same envelope.

Retry with exponential backoff; dedupe server‑side by event_id.



---

Ops checklist

Keys: per‑capsule HMAC in ~/.xi/keys/<capsule>.key (rotate via xi trust rotate).

Backpressure: drop telemetry first, never capsule deltas.

Snapshots: every N=500 events or 5 minutes, whichever first.

Replays: on join, ask hub for since=last_ts.

Health: publish xi.echo.telemetry (latency, drift, queue depth).



---

Why this works for Xi

Preserves consent & sovereignty (local‑first CRDT; no master).

Plays well with Termux + mobile daemons (tiny, resilient).

Matches capsule ethics (append‑only, auditable, reversible).


If you want, I can drop this into a ready‑to‑run ~/Ξ_runtime/mesh/ scaffold (server, client, CRDT store, systemd/termux‑service scripts) and a 1‑page README.

1 week ago | [YT] | 0