Skip to main content

Channel Router Service — Data Model

Version: 1.0 Status: Draft Owner: Messaging Core Last Updated: 2026-04-21 Companion: DOMAIN_MODEL · SECURITY_MODEL · SYNC_CONTRACT

PostgreSQL schema: chan. Owned exclusively by channel-router-service. No other service reads/writes these tables directly — all integration is via gRPC, REST, or NATS events.


1. DDL

-- =====================================================================
-- RECIPIENT PROFILES (LWW-merged)
-- =====================================================================

CREATE TYPE chan.tri_state AS ENUM ('KNOWN_YES','KNOWN_NO','UNKNOWN');
CREATE TYPE chan.discovery_state AS ENUM ('UNSEEN','LEARNING','STABLE');
CREATE TYPE chan.channel AS ENUM ('SMS','WHATSAPP','TELEGRAM','VIBER','VOICE','EMAIL');

CREATE TABLE chan.recipient_profiles (
profile_id UUID PRIMARY KEY,
tenant_id UUID NOT NULL,
msisdn_hash TEXT NOT NULL, -- sha256(msisdn || tenantSalt)
channel_preferences JSONB NOT NULL DEFAULT '[]', -- [{channel, score, confidence, lastObservedAt}]
has_whatsapp_business chan.tri_state NOT NULL DEFAULT 'UNKNOWN',
telegram_chat_id TEXT,
viber_id TEXT,
voice_otp_supported chan.tri_state NOT NULL DEFAULT 'UNKNOWN',
email_verified BOOLEAN NOT NULL DEFAULT FALSE,
last_successful_channel chan.channel,
last_observed_at TIMESTAMPTZ,
discovery_state chan.discovery_state NOT NULL DEFAULT 'UNSEEN',
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE UNIQUE INDEX ux_profile_tenant_msisdn
ON chan.recipient_profiles (tenant_id, msisdn_hash);
CREATE INDEX ix_profile_updated ON chan.recipient_profiles (updated_at DESC);

ALTER TABLE chan.recipient_profiles ENABLE ROW LEVEL SECURITY;

-- =====================================================================
-- FALLBACK POLICIES
-- =====================================================================

CREATE TYPE chan.fallback_strategy AS ENUM ('SEQUENTIAL','PARALLEL','FAILOVER');
CREATE TYPE chan.use_case AS ENUM ('otp','txn','marketing','alert','conversational');

CREATE TABLE chan.fallback_policies (
policy_id UUID PRIMARY KEY,
tenant_id UUID NOT NULL,
use_case chan.use_case NOT NULL,
strategy chan.fallback_strategy NOT NULL DEFAULT 'SEQUENTIAL',
cost_cap_per_message_ngn NUMERIC(12,4) NOT NULL,
session_ttl_seconds INT NOT NULL DEFAULT 86400,
ladder JSONB NOT NULL, -- [{ channel, deadlineSeconds, retryBudget, ... }]
stop_keywords_override TEXT[] NOT NULL DEFAULT '{}',
version INT NOT NULL DEFAULT 1,
created_by UUID NOT NULL,
updated_by UUID NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
deleted_at TIMESTAMPTZ,
grace_period_ends_at TIMESTAMPTZ
);
CREATE UNIQUE INDEX ux_policies_tenant_usecase
ON chan.fallback_policies (tenant_id, use_case)
WHERE deleted_at IS NULL;

CREATE TABLE chan.fallback_policy_versions (
policy_id UUID NOT NULL,
version INT NOT NULL,
snapshot JSONB NOT NULL,
changed_by UUID NOT NULL,
changed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (policy_id, version)
);

-- =====================================================================
-- FALLBACK EXECUTIONS (append-only, partitioned)
-- =====================================================================

CREATE TYPE chan.outcome AS ENUM (
'IN_PROGRESS','DELIVERED','FAILED',
'REFUSED_NO_CHANNEL','REFUSED_COST_CAP','REFUSED_CONSENT',
'REFUSED_SENDER_UNAUTHORIZED'
);

CREATE TABLE chan.fallback_executions (
execution_id UUID NOT NULL,
notification_id UUID NOT NULL,
recipient_id TEXT NOT NULL,
tenant_id UUID NOT NULL,
use_case chan.use_case NOT NULL,
policy_id UUID,
policy_version INT,
ladder_snapshot JSONB NOT NULL,
final_channel chan.channel,
outcome chan.outcome NOT NULL DEFAULT 'IN_PROGRESS',
total_cost_ngn NUMERIC(12,4) NOT NULL DEFAULT 0,
fallback_path JSONB NOT NULL DEFAULT '[]',
started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
terminated_at TIMESTAMPTZ,
trace_id TEXT,
PRIMARY KEY (started_at, execution_id)
) PARTITION BY RANGE (started_at);

CREATE INDEX ix_exec_tenant_time
ON chan.fallback_executions (tenant_id, started_at DESC);
CREATE INDEX ix_exec_notification
ON chan.fallback_executions (notification_id, recipient_id);

-- Append-only at DB level
CREATE RULE exec_no_update AS ON UPDATE TO chan.fallback_executions DO INSTEAD NOTHING;
CREATE RULE exec_no_delete AS ON DELETE TO chan.fallback_executions DO INSTEAD NOTHING;

-- =====================================================================
-- DELIVERY ATTEMPTS (append-only, partitioned)
-- =====================================================================

CREATE TYPE chan.attempt_status AS ENUM (
'accepted','sent','delivered','delivered_read',
'failed_temp','failed_perm','rejected_by_provider','rejected_by_recipient',
'step_skipped','timed_out'
);

CREATE TABLE chan.delivery_attempts (
attempt_id UUID NOT NULL,
execution_id UUID NOT NULL,
step_index INT NOT NULL,
channel chan.channel NOT NULL,
adapter_config_id UUID,
provider_message_id TEXT,
status chan.attempt_status NOT NULL,
reason TEXT,
cost_ngn NUMERIC(12,4) NOT NULL DEFAULT 0,
duration_ms INT,
raw_provider_payload JSONB,
started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
terminated_at TIMESTAMPTZ,
PRIMARY KEY (started_at, attempt_id)
) PARTITION BY RANGE (started_at);

CREATE INDEX ix_attempt_execution
ON chan.delivery_attempts (execution_id);
CREATE INDEX ix_attempt_provider_msg
ON chan.delivery_attempts (provider_message_id)
WHERE provider_message_id IS NOT NULL;

-- =====================================================================
-- CONVERSATIONS
-- =====================================================================

CREATE TYPE chan.conversation_status AS ENUM (
'OPEN','CLOSED_STOP','CLOSED_IDLE','CLOSED_MANUAL'
);

CREATE TABLE chan.conversations (
conversation_id UUID PRIMARY KEY,
tenant_id UUID NOT NULL,
sender_id TEXT NOT NULL,
msisdn_hash TEXT NOT NULL,
channel chan.channel NOT NULL,
status chan.conversation_status NOT NULL DEFAULT 'OPEN',
turn_count INT NOT NULL DEFAULT 0,
last_mt_message_id UUID,
last_mo_message_id UUID,
opened_at TIMESTAMPTZ NOT NULL DEFAULT now(),
last_seen_at TIMESTAMPTZ NOT NULL DEFAULT now(),
expires_at TIMESTAMPTZ NOT NULL,
closed_at TIMESTAMPTZ,
close_reason TEXT
);
CREATE INDEX ix_conv_key
ON chan.conversations (tenant_id, sender_id, msisdn_hash);
CREATE INDEX ix_conv_status_exp
ON chan.conversations (status, expires_at);

ALTER TABLE chan.conversations ENABLE ROW LEVEL SECURITY;

-- =====================================================================
-- CHANNEL ADAPTER CONFIGS
-- =====================================================================

CREATE TYPE chan.provider AS ENUM (
'WHATSAPP_CLOUD','TELEGRAM_BOT','VIBER','VOICE_OTP_GATEWAY','SMTP','SMPP_CONNECTOR'
);
CREATE TYPE chan.circuit_state AS ENUM ('CLOSED','OPEN','HALF_OPEN');

CREATE TABLE chan.channel_adapter_configs (
adapter_config_id UUID PRIMARY KEY,
tenant_id UUID, -- NULL for platform-wide adapters
provider chan.provider NOT NULL,
phone_number_id_or_handle TEXT,
secret_ref TEXT NOT NULL, -- Vault path
circuit_state chan.circuit_state NOT NULL DEFAULT 'CLOSED',
circuit_opened_at TIMESTAMPTZ,
rate_limit_per_second INT,
rate_limit_per_day INT,
regional_egress_ip_pool TEXT,
pricing_model JSONB NOT NULL, -- {base, perSegment, perMinute, ...}
active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX ix_adcfg_tenant_provider
ON chan.channel_adapter_configs (tenant_id, provider);

-- =====================================================================
-- TENANT INBOUND ROUTES
-- =====================================================================

CREATE TABLE chan.tenant_inbound_routes (
route_id UUID PRIMARY KEY,
tenant_id UUID NOT NULL,
inbound TEXT NOT NULL, -- shortcode or E.164
webhook_url TEXT NOT NULL,
secret_ref TEXT NOT NULL, -- Vault path
active BOOLEAN NOT NULL DEFAULT TRUE,
grace_period_ends_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE UNIQUE INDEX ux_inbound_unique
ON chan.tenant_inbound_routes (inbound)
WHERE active = TRUE;

-- =====================================================================
-- ADAPTER STATUS MAP
-- =====================================================================

CREATE TABLE chan.adapter_status_map (
adapter chan.provider NOT NULL,
provider_code TEXT NOT NULL,
canonical chan.attempt_status NOT NULL,
reason TEXT NOT NULL,
version INT NOT NULL DEFAULT 1,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (adapter, provider_code)
);
-- Seeded: WhatsApp 131026 -> rejected_by_recipient (recipient unsupported)
-- WhatsApp 131051 -> rejected_by_recipient (unsupported message type)
-- Telegram 403 -> rejected_by_recipient (bot blocked)
-- Viber "failed" -> failed_perm

-- =====================================================================
-- DELIVERY OUTBOX (single outcome event per recipient)
-- =====================================================================

CREATE TABLE chan.delivery_outbox (
event_id UUID PRIMARY KEY,
notification_id UUID NOT NULL,
recipient_id TEXT NOT NULL,
subject TEXT NOT NULL DEFAULT 'notification.delivery.outcome.v1',
payload JSONB NOT NULL,
published_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (notification_id, recipient_id)
);
CREATE INDEX ix_outbox_unpublished
ON chan.delivery_outbox (created_at) WHERE published_at IS NULL;

-- General-purpose outbox for non-outcome events
CREATE TABLE chan.outbox (
event_id UUID PRIMARY KEY,
subject TEXT NOT NULL,
payload JSONB NOT NULL,
published_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX ix_outbox_unpub2
ON chan.outbox (created_at) WHERE published_at IS NULL;

-- =====================================================================
-- AUDIT (append-only, hash-chained, partitioned)
-- =====================================================================

CREATE TABLE chan.audit (
audit_id UUID NOT NULL,
entity_type TEXT NOT NULL, -- FALLBACK_POLICY|ADAPTER_CONFIG|INBOUND_ROUTE|SESSION|PROFILE|CIRCUIT_BREAKER
entity_id UUID NOT NULL,
action TEXT NOT NULL,
actor_user_id UUID NOT NULL,
before JSONB,
after JSONB,
ip INET,
user_agent TEXT,
trace_id TEXT,
prev_hash TEXT, -- hex sha256 of previous row
record_hash TEXT NOT NULL, -- sha256(payload || prev_hash)
occurred_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (occurred_at, audit_id)
) PARTITION BY RANGE (occurred_at);

CREATE RULE audit_no_update AS ON UPDATE TO chan.audit DO INSTEAD NOTHING;
CREATE RULE audit_no_delete AS ON DELETE TO chan.audit DO INSTEAD NOTHING;
CREATE INDEX ix_audit_entity
ON chan.audit (entity_type, entity_id, occurred_at DESC);

-- =====================================================================
-- INBOX (dedup for orchestrator messages)
-- =====================================================================

CREATE TABLE chan.delivery_inbox (
nats_msg_id TEXT PRIMARY KEY,
notification_id UUID NOT NULL,
recipient_id TEXT NOT NULL,
received_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX ix_inbox_time ON chan.delivery_inbox (received_at);

2. Redis keys

All keys namespaced chan:*.

KeyTTLPurpose
chan:policy:{tenantId}:{useCase}300 sResolved fallback policy
chan:profile:{msisdnHash}60 sRecipient profile
chan:gate:{tenantId}:{msisdnHash}:{useCase}60 sConsent + compliance gating result
chan:session:{senderId}:{msisdnHash} (Redis HASH)sessionTtlSeconds (sliding)Live conversation
chan:inflight:{notificationId}:{recipientId}300 sDispatch idempotency
chan:fallback:inflight:{executionId}until terminalExecution lock
chan:deadlines (ZSET, score = unix epoch)Deadline scanner input
chan:tps:{provider}:{accountId} (sliding window)1 sPer-provider token bucket
chan:circuit:{adapter}:{provider}60 sBreaker state + rolling counters
chan:ott_token:{tenantId}:{provider}60 sCached Vault credential (refreshed on rotate)
chan:mo:unmatched:{destination}60 sAnti-enumeration counter for unmatched inbounds
chan:deadline:lock2 sDeadline-scanner distributed lock

3. Row-level security

CREATE POLICY profile_tenant_read ON chan.recipient_profiles
FOR SELECT
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);

CREATE POLICY profile_admin_all ON chan.recipient_profiles
USING (current_setting('app.caller_role') IN
('platform.channel.admin','platform.support','tenant.admin','tenant.support'));

CREATE POLICY conv_tenant_read ON chan.conversations
FOR SELECT
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);

Handlers set SET LOCAL app.current_tenant_id = $1 from the Kong-injected X-Tenant-Id and SET LOCAL app.caller_role = $2 from the JWT-extracted role claim.


4. ID prefixes

PrefixEntity
profile_RecipientProfile
policy_FallbackPolicy
exec_FallbackExecution
attempt_DeliveryAttempt
conv_Conversation (ULID-based)
route_TenantInboundRoute
adcfg_ChannelAdapterConfig
chan_audit_AuditLog row (external view only)

Internally UUIDv4; prefixes only apply on external REST / event payloads.


5. PII, encryption, retention

FieldClassProtection
recipient_profiles.msisdn_hashPseudo-IDSalted SHA-256; salt per-tenant in Vault
delivery_attempts.raw_provider_payloadPII + provider dataBody redacted at ingress; 30 d retention via partition drop
tenant_inbound_routes.secret_refSecretVault path only; plaintext never in DB
channel_adapter_configs.secret_refSecretSame
conversations.msisdn_hashPseudo-IDSalted SHA-256
audit.before/afterPII-adjacentRedacted before write (msisdn masked)

Retention:

  • fallback_executions / delivery_attempts: 13 m hot (partitioned monthly) + 7 y cold archive to S3 (encrypted).
  • conversations: 90 d after CLOSED_*.
  • recipient_profiles: live; purged on GDPR erasure.
  • audit: 13 m hot + 7 y cold.
  • fallback_policy_versions: retained for the lifetime of the policy + 1 y.
  • delivery_outbox/outbox: 30 d after published.
  • delivery_inbox: 7 d.

6. Partition strategy

  • Monthly partitions for fallback_executions, delivery_attempts, audit, provisioned 3 months ahead by nightly job; missing future partitions trigger HIGH alert ChannelPartitionMissing.
  • Retention enforced by dropping partitions, never via DELETE.
  • Cold-tier export job writes Parquet into s3://ghasi-channel-cold/{year}/{month}/... with per-tenant KMS envelope.

7. Indexes & query patterns

QueryIndex used
Hot-path profile lookupux_profile_tenant_msisdn
Hot-path policy lookupux_policies_tenant_usecase
Provider webhook correlationix_attempt_provider_msg
Session resolve on MOix_conv_key
MO static routingux_inbound_unique
Admin session listix_conv_status_exp
Execution by notification (idempotency recovery)ix_exec_notification

8. Migration strategy

Expand/contract with forward-only migrations. All PII-touching columns require Security review. Partition provisioning is blue/green: new partitions created before application deploys attempt writes. Schema tied to chan.schema_version table read at startup; mismatch triggers refusal to serve (fail-closed boot).