Channel Router Service — Event Schemas
Version: 1.0 Status: Draft Owner: Messaging Core Last Updated: 2026-04-21 Companion: DOMAIN_MODEL · SYNC_CONTRACT · SECURITY_MODEL
All events published via transactional outbox (chan.outbox) onto NATS JetStream. Every event carries schemaVersion, eventId (UUIDv4), traceId, at (RFC 3339). Event bodies are PII-safe: no raw body, no unmasked MSISDN.
1. Streams, subjects, retention
| Stream | Subjects | Retention | Replicas | Dedup window |
|---|---|---|---|---|
CHANNEL_EVENTS | channel.delivery.attempted.v1, channel.delivery.confirmed.v1, channel.delivery.failed.v1, channel.fallback.taken.v1, channel.fallback.ladder.resolved.v1 | 13 m audit retention (metering evidence; regulatory) | 3 | 2 min |
CHANNEL_MO | channel.mo.inbound.v1, mo.unmatched.v1 | 13 m | 3 | 2 min |
CHANNEL_OUTCOMES | notification.delivery.outcome.v1 | 13 m | 3 | 2 min |
CHANNEL_CONVERSATIONS | channel.conversation.started.v1, channel.conversation.ended.v1 | 90 d | 3 | 2 min |
CHANNEL_PROFILES | channel.recipient.profile.updated.v1 | 90 d | 3 | 2 min |
CHANNEL_BILLING | channel.billing.event.v1 | 13 m + 7 y cold archive (financial evidence) | 3 | 5 min |
CHANNEL_CONTROL | channel.tenant_policy.changed.v1, channel.inbound_route.changed.v1, chan.ott_account.rotated.v1, chan.status_map.changed.v1 | 365 d | 3 | 2 min |
CHANNEL_AUDIT | channel.audit.v1 | 13 m + 7 y | 5 | — |
Every stream has a .deadletter suffix. Consumers are durable with explicit ACK (per PLT-REQ-008/009).
2. Produced events
2.1 channel.delivery.attempted.v1
{
"$id": "https://schemas.ghasi.af/channel/delivery-attempted.v1.json",
"type": "object",
"required": ["schemaVersion","eventId","executionId","attemptId","channel","stepIndex","tenantId","notificationId","at","traceId"],
"properties": {
"schemaVersion": { "const": "1" },
"eventId": { "type": "string", "format": "uuid" },
"executionId": { "type": "string" },
"attemptId": { "type": "string" },
"stepIndex": { "type": "integer", "minimum": 0, "maximum": 5 },
"tenantId": { "type": "string", "format": "uuid" },
"notificationId":{ "type": "string", "format": "uuid" },
"recipientId": { "type": "string" },
"msisdnMasked": { "type": "string", "pattern": "^\\+[0-9]{1,4}[0-9]{3}\\*{3,5}$" },
"channel": { "enum": ["SMS","WHATSAPP","TELEGRAM","VIBER","VOICE","EMAIL"] },
"senderId": { "type": "string" },
"adapterConfigId": { "type": "string" },
"deadlineSeconds": { "type": "integer" },
"at": { "type": "string", "format": "date-time" },
"traceId": { "type": "string" }
}
}
2.2 channel.delivery.confirmed.v1
{
"required": ["schemaVersion","eventId","attemptId","executionId","channel","providerMessageId","terminalStatus","durationMs","at","traceId"],
"properties": {
"schemaVersion": { "const": "1" },
"eventId": { "type": "string", "format": "uuid" },
"attemptId": { "type": "string" },
"executionId": { "type": "string" },
"notificationId": { "type": "string" },
"tenantId": { "type": "string" },
"channel": { "enum": ["SMS","WHATSAPP","TELEGRAM","VIBER","VOICE","EMAIL"] },
"providerMessageId":{ "type": "string" },
"terminalStatus": { "enum": ["delivered","delivered_read","ANSWERED"] },
"deliveryConfidence": { "enum": ["DEFINITIVE","PROBABLE","AMBIGUOUS"] },
"durationMs": { "type": "integer", "minimum": 0 },
"costNgn": { "type": "number" },
"at": { "type": "string", "format": "date-time" },
"traceId": { "type": "string" }
}
}
2.3 channel.delivery.failed.v1
{
"required": ["schemaVersion","eventId","attemptId","executionId","channel","terminalStatus","reasonCode","at"],
"properties": {
"terminalStatus": { "enum": ["failed_temp","failed_perm","rejected_by_provider","rejected_by_recipient","step_skipped"] },
"reasonCode": { "type": "string",
"examples": ["adapter_circuit_open","no_link","template_not_approved","voice_no_answer","whatsapp_131026","telegram_bot_blocked","viber_rejected"] },
"reasonDetail": { "type": "string" }
}
}
2.4 channel.fallback.taken.v1
Emitted exactly when ladder progresses from step N to step N+1.
{
"required": ["executionId","fromChannel","toChannel","fromStatus","reasonCode","at"],
"properties": {
"executionId": { "type": "string" },
"fromChannel": { "enum": ["SMS","WHATSAPP","TELEGRAM","VIBER","VOICE","EMAIL"] },
"toChannel": { "enum": ["SMS","WHATSAPP","TELEGRAM","VIBER","VOICE","EMAIL"] },
"fromStatus": { "type": "string" },
"reasonCode": { "type": "string" },
"fromDurationMs": { "type": "integer" },
"costAccumulatedNgn": { "type": "number" }
}
}
2.5 channel.mo.inbound.v1
Emitted on tenant-webhook delivery of an inbound MO. Also re-published to the platform-wide sms.mo.inbound subject for consent-ledger-service (STOP) and sms-firewall-service (feedback).
{
"required": ["messageId","tenantId","originatorMsisdnMasked","destination","mno","body","receivedAt","routeType","at"],
"properties": {
"messageId": { "type": "string" },
"tenantId": { "type": "string" },
"conversationId": { "type": ["string","null"] },
"originatorMsisdnMasked": { "type": "string", "pattern": "^\\+[0-9]{1,4}[0-9]{3}\\*{3,5}$" },
"destination": { "type": "string", "description": "Shortcode or long-code" },
"mno": { "enum": ["AWCC","ROSHAN","MTN","ETISALAT","SALAAM","UNKNOWN"] },
"body": { "type": "string", "description": "Full MO body (routed to the owning tenant; not stored in analytics sinks)" },
"routeType": { "enum": ["SESSION","STATIC","STOP_KEYWORD"] },
"inReplyTo": { "type": ["object","null"], "properties": { "messageId": { "type": "string" }, "dispatchedAt": { "type": "string" } } },
"closedSessionByStop": { "type": "boolean" },
"receivedAt": { "type": "string", "format": "date-time" }
}
}
2.6 channel.conversation.started.v1
{
"required": ["conversationId","tenantId","senderId","msisdnHash","channel","openedAt","at"],
"properties": {
"conversationId": { "type": "string" },
"tenantId": { "type": "string" },
"senderId": { "type": "string" },
"msisdnHash": { "type": "string", "pattern": "^[a-f0-9]{64}$" },
"channel": { "enum": ["SMS","WHATSAPP","TELEGRAM","VIBER","VOICE","EMAIL"] },
"expiresAt": { "type": "string", "format": "date-time" }
}
}
2.7 channel.conversation.ended.v1
{
"properties": {
"reason": { "enum": ["stop_keyword","idle_expiry","manual","redis_loss"] },
"turnCount": { "type": "integer" },
"durationSeconds": { "type": "integer" }
}
}
2.8 channel.recipient.profile.updated.v1
Analytics-grade. No PII.
{
"required": ["profileId","tenantId","msisdnHash","delta","at"],
"properties": {
"profileId": { "type": "string" },
"tenantId": { "type": "string" },
"msisdnHash": { "type": "string" },
"discoveryState": { "enum": ["UNSEEN","LEARNING","STABLE"] },
"delta": { "type": "array", "items": { "type": "object",
"properties": { "channel": {"type":"string"}, "scoreDelta": {"type":"number"}, "triState": {"type":"string"} } } }
}
}
2.9 channel.billing.event.v1
Per-attempt metering feed for billing-service.
{
"required": ["eventId","sku","tenantId","notificationId","recipientHash","attemptId","executionId","costNgn","provider","providerMessageId","attemptTerminalStatus","at"],
"properties": {
"sku": { "enum": ["sms.outbound.v1","whatsapp.outbound.v1","telegram.outbound.v1","viber.outbound.v1","voice.otp.v1","email.outbound.v1"] },
"costNgn": { "type": "number", "minimum": 0 },
"provider": { "type": "string" },
"providerMessageId": { "type": "string" },
"attemptTerminalStatus": { "type": "string" },
"meterQualifier": { "type": "string", "description": "voice.otp.v1 requires ANSWERED && playedDigits=otp.length" }
}
}
2.10 notification.delivery.outcome.v1
The single canonical outcome event per (notificationId, recipientId). Emitted by the outbox relay; duplicate-suppressed via UNIQUE(notificationId, recipientId) in chan.delivery_outbox.
{
"required": ["schemaVersion","notificationId","recipientId","tenantId","final","attempts","fallbackPath","occurredAt","executionId"],
"properties": {
"final": { "enum": ["DELIVERED","FAILED","REFUSED_NO_CHANNEL","REFUSED_COST_CAP","REFUSED_CONSENT"] },
"channel": { "enum": ["SMS","WHATSAPP","TELEGRAM","VIBER","VOICE","EMAIL", null] },
"attempts": { "type": "integer", "minimum": 0, "maximum": 6 },
"fallbackPath": { "type": "array", "items": { "type": "object",
"required": ["channel","status","reason","durationMs"],
"properties": {
"channel": { "type": "string" },
"status": { "type": "string" },
"reason": { "type": "string" },
"durationMs": { "type": "integer" },
"costNgn": { "type": "number" }
} } }
}
}
2.11 channel.tenant_policy.changed.v1
Cache-invalidation signal. Consumers MUST re-fetch state via REST if they depend on the new value.
{
"properties": {
"tenantId": { "type": "string" },
"useCase": { "type": "string" },
"action": { "enum": ["CREATE","UPDATE","DELETE"] },
"newVersion": { "type": "integer" }
}
}
2.12 channel.inbound_route.changed.v1
{
"properties": {
"tenantId": { "type": "string" },
"inbound": { "type": "string" },
"action": { "enum": ["CREATE","UPDATE","DELETE","ROTATE_SECRET"] },
"gracePeriodEndsAt": { "type": "string", "format": "date-time", "nullable": true }
}
}
2.13 channel.audit.v1
Append-only state-change record (policy edits, adapter-config mutation, manual circuit actions, admin session-close).
{
"required": ["auditId","entityType","entityId","action","actorUserId","occurredAt"],
"properties": {
"entityType": { "enum": ["FALLBACK_POLICY","ADAPTER_CONFIG","INBOUND_ROUTE","SESSION","PROFILE","CIRCUIT_BREAKER"] },
"action": { "enum": ["CREATE","UPDATE","DELETE","ROTATE","OPEN","CLOSE","HALF_OPEN","MANUAL_SESSION_CLOSE"] },
"before": { "type": ["object","null"] },
"after": { "type": ["object","null"] },
"ip": { "type": "string" },
"userAgent": { "type": "string" },
"traceId": { "type": "string" }
}
}
3. Consumed events
| Subject | Producer | Action |
|---|---|---|
notification.dispatch.requested.v1 | sms-orchestrator | Entry point for the ladder (durable consumer chan-router) |
mo.allowed.v1 | sms-firewall-service | Inbound MO routing (chan-mo-router) |
sms.mo.received.v1 | smpp-connector | Fallback MO ingress when firewall bypass engaged |
sms.dlr.inbound | dlr-processor | Terminates SMS step of an active execution |
consent.revoked.v1 | consent-ledger-service | Invalidate chan:gate:* entries for affected (tenantId, msisdnHash); enqueue profile re-scoring |
consent.granted.v1 | consent-ledger-service | Invalidate chan:gate:* cache (allow previously-excluded channel on next request) |
sender.id.suspended.v1 | sender-id-registry-service | Immediately stop dispatches using that sender; terminate open executions with REFUSED_SENDER_SUSPENDED |
compliance.policy.changed.v1 | compliance-engine | Invalidate compliance-side gate cache |
fraud.detected.channel_abuse.v1 | fraud-intel-service | Force-open the OTT adapter breaker for the affected tenant for 15 m |
chan.ott_account.rotated.v1 | self (control plane) | Adapter pods reload credentials from Vault within 60 s |
| WhatsApp webhook (HTTP ingress) | Meta | Correlate status → attempt |
| Telegram webhook (HTTP ingress) | Telegram | Correlate updates → attempts / inbound MO for OTT conversations |
| Viber webhook (HTTP ingress) | Viber | Correlate events → attempts |
4. Consumer contracts
| Service | Subjects | Action |
|---|---|---|
billing-service | channel.billing.event.v1 | Write metering row; reconcile per-attempt cost |
analytics-service | channel.fallback.taken.v1, notification.delivery.outcome.v1, channel.recipient.profile.updated.v1 | Dashboards; fallback-rate tracking |
consent-ledger-service | sms.mo.inbound (via re-publish from channel.mo.inbound.v1) | STOP detection |
sms-firewall-service | sms.mo.inbound | AIT / SIM-box feedback |
notification-service | notification.delivery.outcome.v1 | Push tenant-portal notification if webhook delivery failed |
regulator-portal-service | channel.audit.v1 | SIEM forwarding |
5. PII & security rules
- MSISDN appears only as
msisdnHash(sha256(msisdn ‖ tenantSalt)) ormsisdnMasked(+CCNNN***). - Message body surfaces only on
channel.mo.inbound.v1(which is the tenant-delivery payload; not consumed by analytics). - Rule-match or content previews in
reasonDetailMUST be redacted; keyword/regex matches replaced by***. - Events are signed by the producer's RS256 key (platform-wide); billing and audit consumers verify signatures.
6. Outbox pattern
All state-changing events write a row to chan.outbox in the same PG transaction as the aggregate change. A relay publishes to NATS with ACK + retry; UNIQUE(notificationId, recipientId) on the outcome outbox enforces the "one canonical outcome" invariant even across relay restarts.
7. Schema evolution
- Additive fields with defaults — non-breaking within the same
schemaVersion. - Breaking change →
channel.<topic>.v2. Subjects coexist ≥ 90 d. - Unknown enum values MUST be treated as
UNKNOWN, never fail. - Schema registry: subjects registered in
platform-schema-registry; CI gate viabuf breaking+ JSON Schema compatibility check.