Channel Router Service — Data Model
Version: 1.0 Status: Draft Owner: Messaging Core Last Updated: 2026-04-21 Companion: DOMAIN_MODEL · SECURITY_MODEL · SYNC_CONTRACT
PostgreSQL schema: chan. Owned exclusively by channel-router-service. No other service reads/writes these tables directly — all integration is via gRPC, REST, or NATS events.
1. DDL
-- =====================================================================
-- RECIPIENT PROFILES (LWW-merged)
-- =====================================================================
CREATE TYPE chan.tri_state AS ENUM ('KNOWN_YES','KNOWN_NO','UNKNOWN');
CREATE TYPE chan.discovery_state AS ENUM ('UNSEEN','LEARNING','STABLE');
CREATE TYPE chan.channel AS ENUM ('SMS','WHATSAPP','TELEGRAM','VIBER','VOICE','EMAIL');
CREATE TABLE chan.recipient_profiles (
profile_id UUID PRIMARY KEY,
tenant_id UUID NOT NULL,
msisdn_hash TEXT NOT NULL, -- sha256(msisdn || tenantSalt)
channel_preferences JSONB NOT NULL DEFAULT '[]', -- [{channel, score, confidence, lastObservedAt}]
has_whatsapp_business chan.tri_state NOT NULL DEFAULT 'UNKNOWN',
telegram_chat_id TEXT,
viber_id TEXT,
voice_otp_supported chan.tri_state NOT NULL DEFAULT 'UNKNOWN',
email_verified BOOLEAN NOT NULL DEFAULT FALSE,
last_successful_channel chan.channel,
last_observed_at TIMESTAMPTZ,
discovery_state chan.discovery_state NOT NULL DEFAULT 'UNSEEN',
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE UNIQUE INDEX ux_profile_tenant_msisdn
ON chan.recipient_profiles (tenant_id, msisdn_hash);
CREATE INDEX ix_profile_updated ON chan.recipient_profiles (updated_at DESC);
ALTER TABLE chan.recipient_profiles ENABLE ROW LEVEL SECURITY;
-- =====================================================================
-- FALLBACK POLICIES
-- =====================================================================
CREATE TYPE chan.fallback_strategy AS ENUM ('SEQUENTIAL','PARALLEL','FAILOVER');
CREATE TYPE chan.use_case AS ENUM ('otp','txn','marketing','alert','conversational');
CREATE TABLE chan.fallback_policies (
policy_id UUID PRIMARY KEY,
tenant_id UUID NOT NULL,
use_case chan.use_case NOT NULL,
strategy chan.fallback_strategy NOT NULL DEFAULT 'SEQUENTIAL',
cost_cap_per_message_ngn NUMERIC(12,4) NOT NULL,
session_ttl_seconds INT NOT NULL DEFAULT 86400,
ladder JSONB NOT NULL, -- [{ channel, deadlineSeconds, retryBudget, ... }]
stop_keywords_override TEXT[] NOT NULL DEFAULT '{}',
version INT NOT NULL DEFAULT 1,
created_by UUID NOT NULL,
updated_by UUID NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
deleted_at TIMESTAMPTZ,
grace_period_ends_at TIMESTAMPTZ
);
CREATE UNIQUE INDEX ux_policies_tenant_usecase
ON chan.fallback_policies (tenant_id, use_case)
WHERE deleted_at IS NULL;
CREATE TABLE chan.fallback_policy_versions (
policy_id UUID NOT NULL,
version INT NOT NULL,
snapshot JSONB NOT NULL,
changed_by UUID NOT NULL,
changed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (policy_id, version)
);
-- =====================================================================
-- FALLBACK EXECUTIONS (append-only, partitioned)
-- =====================================================================
CREATE TYPE chan.outcome AS ENUM (
'IN_PROGRESS','DELIVERED','FAILED',
'REFUSED_NO_CHANNEL','REFUSED_COST_CAP','REFUSED_CONSENT',
'REFUSED_SENDER_UNAUTHORIZED'
);
CREATE TABLE chan.fallback_executions (
execution_id UUID NOT NULL,
notification_id UUID NOT NULL,
recipient_id TEXT NOT NULL,
tenant_id UUID NOT NULL,
use_case chan.use_case NOT NULL,
policy_id UUID,
policy_version INT,
ladder_snapshot JSONB NOT NULL,
final_channel chan.channel,
outcome chan.outcome NOT NULL DEFAULT 'IN_PROGRESS',
total_cost_ngn NUMERIC(12,4) NOT NULL DEFAULT 0,
fallback_path JSONB NOT NULL DEFAULT '[]',
started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
terminated_at TIMESTAMPTZ,
trace_id TEXT,
PRIMARY KEY (started_at, execution_id)
) PARTITION BY RANGE (started_at);
CREATE INDEX ix_exec_tenant_time
ON chan.fallback_executions (tenant_id, started_at DESC);
CREATE INDEX ix_exec_notification
ON chan.fallback_executions (notification_id, recipient_id);
-- Append-only at DB level
CREATE RULE exec_no_update AS ON UPDATE TO chan.fallback_executions DO INSTEAD NOTHING;
CREATE RULE exec_no_delete AS ON DELETE TO chan.fallback_executions DO INSTEAD NOTHING;
-- =====================================================================
-- DELIVERY ATTEMPTS (append-only, partitioned)
-- =====================================================================
CREATE TYPE chan.attempt_status AS ENUM (
'accepted','sent','delivered','delivered_read',
'failed_temp','failed_perm','rejected_by_provider','rejected_by_recipient',
'step_skipped','timed_out'
);
CREATE TABLE chan.delivery_attempts (
attempt_id UUID NOT NULL,
execution_id UUID NOT NULL,
step_index INT NOT NULL,
channel chan.channel NOT NULL,
adapter_config_id UUID,
provider_message_id TEXT,
status chan.attempt_status NOT NULL,
reason TEXT,
cost_ngn NUMERIC(12,4) NOT NULL DEFAULT 0,
duration_ms INT,
raw_provider_payload JSONB,
started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
terminated_at TIMESTAMPTZ,
PRIMARY KEY (started_at, attempt_id)
) PARTITION BY RANGE (started_at);
CREATE INDEX ix_attempt_execution
ON chan.delivery_attempts (execution_id);
CREATE INDEX ix_attempt_provider_msg
ON chan.delivery_attempts (provider_message_id)
WHERE provider_message_id IS NOT NULL;
-- =====================================================================
-- CONVERSATIONS
-- =====================================================================
CREATE TYPE chan.conversation_status AS ENUM (
'OPEN','CLOSED_STOP','CLOSED_IDLE','CLOSED_MANUAL'
);
CREATE TABLE chan.conversations (
conversation_id UUID PRIMARY KEY,
tenant_id UUID NOT NULL,
sender_id TEXT NOT NULL,
msisdn_hash TEXT NOT NULL,
channel chan.channel NOT NULL,
status chan.conversation_status NOT NULL DEFAULT 'OPEN',
turn_count INT NOT NULL DEFAULT 0,
last_mt_message_id UUID,
last_mo_message_id UUID,
opened_at TIMESTAMPTZ NOT NULL DEFAULT now(),
last_seen_at TIMESTAMPTZ NOT NULL DEFAULT now(),
expires_at TIMESTAMPTZ NOT NULL,
closed_at TIMESTAMPTZ,
close_reason TEXT
);
CREATE INDEX ix_conv_key
ON chan.conversations (tenant_id, sender_id, msisdn_hash);
CREATE INDEX ix_conv_status_exp
ON chan.conversations (status, expires_at);
ALTER TABLE chan.conversations ENABLE ROW LEVEL SECURITY;
-- =====================================================================
-- CHANNEL ADAPTER CONFIGS
-- =====================================================================
CREATE TYPE chan.provider AS ENUM (
'WHATSAPP_CLOUD','TELEGRAM_BOT','VIBER','VOICE_OTP_GATEWAY','SMTP','SMPP_CONNECTOR'
);
CREATE TYPE chan.circuit_state AS ENUM ('CLOSED','OPEN','HALF_OPEN');
CREATE TABLE chan.channel_adapter_configs (
adapter_config_id UUID PRIMARY KEY,
tenant_id UUID, -- NULL for platform-wide adapters
provider chan.provider NOT NULL,
phone_number_id_or_handle TEXT,
secret_ref TEXT NOT NULL, -- Vault path
circuit_state chan.circuit_state NOT NULL DEFAULT 'CLOSED',
circuit_opened_at TIMESTAMPTZ,
rate_limit_per_second INT,
rate_limit_per_day INT,
regional_egress_ip_pool TEXT,
pricing_model JSONB NOT NULL, -- {base, perSegment, perMinute, ...}
active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX ix_adcfg_tenant_provider
ON chan.channel_adapter_configs (tenant_id, provider);
-- =====================================================================
-- TENANT INBOUND ROUTES
-- =====================================================================
CREATE TABLE chan.tenant_inbound_routes (
route_id UUID PRIMARY KEY,
tenant_id UUID NOT NULL,
inbound TEXT NOT NULL, -- shortcode or E.164
webhook_url TEXT NOT NULL,
secret_ref TEXT NOT NULL, -- Vault path
active BOOLEAN NOT NULL DEFAULT TRUE,
grace_period_ends_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE UNIQUE INDEX ux_inbound_unique
ON chan.tenant_inbound_routes (inbound)
WHERE active = TRUE;
-- =====================================================================
-- ADAPTER STATUS MAP
-- =====================================================================
CREATE TABLE chan.adapter_status_map (
adapter chan.provider NOT NULL,
provider_code TEXT NOT NULL,
canonical chan.attempt_status NOT NULL,
reason TEXT NOT NULL,
version INT NOT NULL DEFAULT 1,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (adapter, provider_code)
);
-- Seeded: WhatsApp 131026 -> rejected_by_recipient (recipient unsupported)
-- WhatsApp 131051 -> rejected_by_recipient (unsupported message type)
-- Telegram 403 -> rejected_by_recipient (bot blocked)
-- Viber "failed" -> failed_perm
-- =====================================================================
-- DELIVERY OUTBOX (single outcome event per recipient)
-- =====================================================================
CREATE TABLE chan.delivery_outbox (
event_id UUID PRIMARY KEY,
notification_id UUID NOT NULL,
recipient_id TEXT NOT NULL,
subject TEXT NOT NULL DEFAULT 'notification.delivery.outcome.v1',
payload JSONB NOT NULL,
published_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (notification_id, recipient_id)
);
CREATE INDEX ix_outbox_unpublished
ON chan.delivery_outbox (created_at) WHERE published_at IS NULL;
-- General-purpose outbox for non-outcome events
CREATE TABLE chan.outbox (
event_id UUID PRIMARY KEY,
subject TEXT NOT NULL,
payload JSONB NOT NULL,
published_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX ix_outbox_unpub2
ON chan.outbox (created_at) WHERE published_at IS NULL;
-- =====================================================================
-- AUDIT (append-only, hash-chained, partitioned)
-- =====================================================================
CREATE TABLE chan.audit (
audit_id UUID NOT NULL,
entity_type TEXT NOT NULL, -- FALLBACK_POLICY|ADAPTER_CONFIG|INBOUND_ROUTE|SESSION|PROFILE|CIRCUIT_BREAKER
entity_id UUID NOT NULL,
action TEXT NOT NULL,
actor_user_id UUID NOT NULL,
before JSONB,
after JSONB,
ip INET,
user_agent TEXT,
trace_id TEXT,
prev_hash TEXT, -- hex sha256 of previous row
record_hash TEXT NOT NULL, -- sha256(payload || prev_hash)
occurred_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (occurred_at, audit_id)
) PARTITION BY RANGE (occurred_at);
CREATE RULE audit_no_update AS ON UPDATE TO chan.audit DO INSTEAD NOTHING;
CREATE RULE audit_no_delete AS ON DELETE TO chan.audit DO INSTEAD NOTHING;
CREATE INDEX ix_audit_entity
ON chan.audit (entity_type, entity_id, occurred_at DESC);
-- =====================================================================
-- INBOX (dedup for orchestrator messages)
-- =====================================================================
CREATE TABLE chan.delivery_inbox (
nats_msg_id TEXT PRIMARY KEY,
notification_id UUID NOT NULL,
recipient_id TEXT NOT NULL,
received_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX ix_inbox_time ON chan.delivery_inbox (received_at);
2. Redis keys
All keys namespaced chan:*.
| Key | TTL | Purpose |
|---|---|---|
chan:policy:{tenantId}:{useCase} | 300 s | Resolved fallback policy |
chan:profile:{msisdnHash} | 60 s | Recipient profile |
chan:gate:{tenantId}:{msisdnHash}:{useCase} | 60 s | Consent + compliance gating result |
chan:session:{senderId}:{msisdnHash} (Redis HASH) | sessionTtlSeconds (sliding) | Live conversation |
chan:inflight:{notificationId}:{recipientId} | 300 s | Dispatch idempotency |
chan:fallback:inflight:{executionId} | until terminal | Execution lock |
chan:deadlines (ZSET, score = unix epoch) | — | Deadline scanner input |
chan:tps:{provider}:{accountId} (sliding window) | 1 s | Per-provider token bucket |
chan:circuit:{adapter}:{provider} | 60 s | Breaker state + rolling counters |
chan:ott_token:{tenantId}:{provider} | 60 s | Cached Vault credential (refreshed on rotate) |
chan:mo:unmatched:{destination} | 60 s | Anti-enumeration counter for unmatched inbounds |
chan:deadline:lock | 2 s | Deadline-scanner distributed lock |
3. Row-level security
CREATE POLICY profile_tenant_read ON chan.recipient_profiles
FOR SELECT
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);
CREATE POLICY profile_admin_all ON chan.recipient_profiles
USING (current_setting('app.caller_role') IN
('platform.channel.admin','platform.support','tenant.admin','tenant.support'));
CREATE POLICY conv_tenant_read ON chan.conversations
FOR SELECT
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);
Handlers set SET LOCAL app.current_tenant_id = $1 from the Kong-injected X-Tenant-Id and SET LOCAL app.caller_role = $2 from the JWT-extracted role claim.
4. ID prefixes
| Prefix | Entity |
|---|---|
profile_ | RecipientProfile |
policy_ | FallbackPolicy |
exec_ | FallbackExecution |
attempt_ | DeliveryAttempt |
conv_ | Conversation (ULID-based) |
route_ | TenantInboundRoute |
adcfg_ | ChannelAdapterConfig |
chan_audit_ | AuditLog row (external view only) |
Internally UUIDv4; prefixes only apply on external REST / event payloads.
5. PII, encryption, retention
| Field | Class | Protection |
|---|---|---|
recipient_profiles.msisdn_hash | Pseudo-ID | Salted SHA-256; salt per-tenant in Vault |
delivery_attempts.raw_provider_payload | PII + provider data | Body redacted at ingress; 30 d retention via partition drop |
tenant_inbound_routes.secret_ref | Secret | Vault path only; plaintext never in DB |
channel_adapter_configs.secret_ref | Secret | Same |
conversations.msisdn_hash | Pseudo-ID | Salted SHA-256 |
audit.before/after | PII-adjacent | Redacted before write (msisdn masked) |
Retention:
fallback_executions/delivery_attempts: 13 m hot (partitioned monthly) + 7 y cold archive to S3 (encrypted).conversations: 90 d after CLOSED_*.recipient_profiles: live; purged on GDPR erasure.audit: 13 m hot + 7 y cold.fallback_policy_versions: retained for the lifetime of the policy + 1 y.delivery_outbox/outbox: 30 d after published.delivery_inbox: 7 d.
6. Partition strategy
- Monthly partitions for
fallback_executions,delivery_attempts,audit, provisioned 3 months ahead by nightly job; missing future partitions trigger HIGH alertChannelPartitionMissing. - Retention enforced by dropping partitions, never via
DELETE. - Cold-tier export job writes Parquet into
s3://ghasi-channel-cold/{year}/{month}/...with per-tenant KMS envelope.
7. Indexes & query patterns
| Query | Index used |
|---|---|
| Hot-path profile lookup | ux_profile_tenant_msisdn |
| Hot-path policy lookup | ux_policies_tenant_usecase |
| Provider webhook correlation | ix_attempt_provider_msg |
| Session resolve on MO | ix_conv_key |
| MO static routing | ux_inbound_unique |
| Admin session list | ix_conv_status_exp |
| Execution by notification (idempotency recovery) | ix_exec_notification |
8. Migration strategy
Expand/contract with forward-only migrations. All PII-touching columns require Security review. Partition provisioning is blue/green: new partitions created before application deploys attempt writes. Schema tied to chan.schema_version table read at startup; mismatch triggers refusal to serve (fail-closed boot).