Data Model
:::info Source
Sourced from services/notification-service/DATA_MODEL.md in the documentation repo.
:::
Storage Layout
| Store | Role |
|---|---|
| Postgres 16 (primary) | Transactional state: notifications, preferences, templates, webhooks, suppressions, outbox |
| Postgres read-replica | Feed/list queries, audit queries |
| Redis (Cluster) | Template AST cache, preference cache, rate limits, WebSocket presence, digest windows |
| S3 (or MinIO in dev) | Rendered email HTML (hot storage 30d), raw provider webhook payloads (audit, 90d) |
| NATS JetStream | Inbound + outbound event streams (see EVENT_SCHEMAS.md) |
Postgres Schema
Schema isolation: every table has tenant_id column; Row-Level Security enabled with USING (tenant_id = current_setting('app.tenant_id')::uuid) plus a security_definer bypass role for cross-tenant maintenance jobs.
notifications
CREATE TABLE notifications (
id ULID PRIMARY KEY,
tenant_id UUID NOT NULL,
user_id UUID NOT NULL,
template_key TEXT NOT NULL,
template_version TEXT NOT NULL,
template_snapshot JSONB NOT NULL, -- frozen at enqueue
channel TEXT NOT NULL, -- enum-constrained
recipient_address JSONB NOT NULL, -- VO
variables JSONB NOT NULL,
locale TEXT NOT NULL,
status TEXT NOT NULL,
category TEXT NOT NULL,
priority TEXT NOT NULL DEFAULT 'normal',
scheduled_for TIMESTAMPTZ,
correlation_id TEXT NOT NULL,
source_event_type TEXT,
source_event_id TEXT,
digest_group_id ULID,
suppression_reason TEXT,
delivered_at TIMESTAMPTZ,
read_at TIMESTAMPTZ,
expires_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
body_uri TEXT, -- S3 URI for rendered body
CONSTRAINT notifications_channel_chk CHECK (channel IN ('email','sms','push','inapp','webhook')),
CONSTRAINT notifications_status_chk CHECK (status IN ('queued','rendering','sending','sent','delivered','opened','clicked','failed','bounced','suppressed','superseded')),
CONSTRAINT notifications_category_chk CHECK (category IN ('academic','billing','marketing','security','social','system','compliance')),
CONSTRAINT notifications_priority_chk CHECK (priority IN ('low','normal','high','critical'))
) PARTITION BY RANGE (created_at);
-- Monthly partitions, rolling 24 months then archived to S3
CREATE TABLE notifications_y2026m04 PARTITION OF notifications
FOR VALUES FROM ('2026-04-01') TO ('2026-05-01');
-- (pg_partman manages partition creation)
CREATE INDEX ix_notif_user_feed
ON notifications (tenant_id, user_id, channel, created_at DESC)
WHERE channel = 'inapp';
CREATE INDEX ix_notif_user_unread
ON notifications (tenant_id, user_id, created_at DESC)
WHERE channel = 'inapp' AND read_at IS NULL;
CREATE INDEX ix_notif_correlation
ON notifications (tenant_id, correlation_id);
CREATE INDEX ix_notif_digest_group
ON notifications (tenant_id, digest_group_id)
WHERE digest_group_id IS NOT NULL;
CREATE INDEX ix_notif_scheduled
ON notifications (scheduled_for)
WHERE status = 'queued' AND scheduled_for IS NOT NULL;
ALTER TABLE notifications ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON notifications
USING (tenant_id::text = current_setting('app.tenant_id', true));
delivery_attempts
CREATE TABLE delivery_attempts (
notification_id ULID NOT NULL,
attempt_number INT NOT NULL,
provider_name TEXT NOT NULL,
provider_message_id TEXT,
requested_at TIMESTAMPTZ NOT NULL,
responded_at TIMESTAMPTZ,
outcome TEXT NOT NULL,
error_code TEXT,
error_message TEXT,
latency_ms INT,
PRIMARY KEY (notification_id, attempt_number),
FOREIGN KEY (notification_id) REFERENCES notifications(id) ON DELETE CASCADE
);
CREATE INDEX ix_attempts_provider_msg ON delivery_attempts (provider_name, provider_message_id);
notification_templates
CREATE TABLE notification_templates (
id UUID PRIMARY KEY,
tenant_id UUID, -- NULL for platform-global
key TEXT NOT NULL,
version TEXT NOT NULL,
channel TEXT NOT NULL,
category TEXT NOT NULL,
subject_tmpl TEXT,
locales JSONB NOT NULL, -- {locale: {body, bodyFormat, ...}}
variables JSONB NOT NULL, -- [{name, type, required, ...}]
default_priority TEXT NOT NULL DEFAULT 'normal',
status TEXT NOT NULL,
created_by UUID NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
published_at TIMESTAMPTZ,
deprecated_at TIMESTAMPTZ,
supersedes_id UUID,
UNIQUE (tenant_id, key, version)
);
CREATE INDEX ix_templates_active
ON notification_templates (tenant_id, key, status)
WHERE status = 'active';
notification_preferences
CREATE TABLE notification_preferences (
tenant_id UUID NOT NULL,
user_id UUID NOT NULL,
language TEXT NOT NULL,
channels JSONB NOT NULL,
quiet_hours JSONB NOT NULL,
digest_schedule JSONB NOT NULL,
guardian_routing JSONB,
global_unsubscribed_at TIMESTAMPTZ,
version INT NOT NULL DEFAULT 1,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (tenant_id, user_id)
);
ALTER TABLE notification_preferences ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON notification_preferences
USING (tenant_id::text = current_setting('app.tenant_id', true));
webhook_subscriptions
CREATE TABLE webhook_subscriptions (
id UUID PRIMARY KEY,
tenant_id UUID NOT NULL,
name TEXT NOT NULL,
target_url TEXT NOT NULL,
event_filters JSONB NOT NULL,
signing_secret_ciphertext BYTEA NOT NULL,
signing_secret_prev_ciphertext BYTEA,
signing_secret_rotated_at TIMESTAMPTZ,
status TEXT NOT NULL DEFAULT 'active',
created_by UUID NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
last_success_at TIMESTAMPTZ,
last_failure_at TIMESTAMPTZ,
consecutive_failures INT NOT NULL DEFAULT 0,
auto_disable_threshold INT NOT NULL DEFAULT 20
);
-- Secrets encrypted with KMS envelope encryption. Plain secret never stored.
webhook_deliveries
CREATE TABLE webhook_deliveries (
id ULID PRIMARY KEY,
subscription_id UUID NOT NULL REFERENCES webhook_subscriptions(id),
tenant_id UUID NOT NULL,
event_type TEXT NOT NULL,
event_id TEXT NOT NULL,
payload_uri TEXT NOT NULL, -- S3 URI (audit, 90d)
status TEXT NOT NULL, -- queued|delivered|failed|disabled
attempts INT NOT NULL DEFAULT 0,
next_attempt_at TIMESTAMPTZ,
last_status_code INT,
last_error TEXT,
first_attempted_at TIMESTAMPTZ,
delivered_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
) PARTITION BY RANGE (created_at);
suppression_entries
CREATE TABLE suppression_entries (
id ULID PRIMARY KEY,
tenant_id UUID NOT NULL, -- or 'platform' sentinel for cross-tenant
channel TEXT NOT NULL,
address_hash TEXT NOT NULL, -- sha256(lowercased email/phone)
address_ciphertext BYTEA, -- recoverable for reinstatement
reason TEXT NOT NULL,
originating_message_id ULID,
suppressed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
expires_at TIMESTAMPTZ,
overridden_by UUID,
overridden_at TIMESTAMPTZ,
UNIQUE (tenant_id, channel, address_hash)
);
CREATE INDEX ix_suppression_active
ON suppression_entries (tenant_id, channel, address_hash)
WHERE overridden_at IS NULL;
outbox
CREATE TABLE outbox (
id ULID PRIMARY KEY,
aggregate_id TEXT NOT NULL,
aggregate_type TEXT NOT NULL,
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
headers JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
published_at TIMESTAMPTZ
);
CREATE INDEX ix_outbox_unpublished ON outbox (created_at) WHERE published_at IS NULL;
consumed_events (inbound dedupe)
CREATE TABLE consumed_events (
consumer_name TEXT NOT NULL,
event_id TEXT NOT NULL,
consumed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (consumer_name, event_id)
);
-- 7-day retention via pg_partman daily partitions
digest_windows
CREATE TABLE digest_windows (
id ULID PRIMARY KEY,
tenant_id UUID NOT NULL,
user_id UUID NOT NULL,
category TEXT NOT NULL,
channel TEXT NOT NULL,
window_start TIMESTAMPTZ NOT NULL,
window_end TIMESTAMPTZ NOT NULL,
item_ids ULID[] NOT NULL DEFAULT '{}',
item_count INT NOT NULL DEFAULT 0,
state TEXT NOT NULL DEFAULT 'open', -- open|flushing|flushed
flush_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE UNIQUE INDEX ux_digest_window
ON digest_windows (tenant_id, user_id, category, channel, window_start);
CREATE INDEX ix_digest_flush_ready
ON digest_windows (flush_at)
WHERE state = 'open';
rate_limit_counters
Ephemeral in Redis; Postgres fallback table identical in shape when Redis down.
EP-10 operational tables (migration 0002_ep10_dod.sql)
Implemented in the Ghasi-edTech monorepo notification-service. These support HTTP idempotency, quiet-hours deferral, and webhook retry indexing.
idempotency_keys
CREATE TABLE idempotency_keys (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
scope TEXT NOT NULL,
idempotency_key TEXT NOT NULL,
request_hash TEXT NOT NULL,
response_status INTEGER NOT NULL,
response_body JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (tenant_id, scope, idempotency_key)
);
RLS: tenant_id = current_setting('app.tenant_id')::uuid.
notification_scheduled
Rows created when email/SMS/push would fall in the user’s quiet hours (unless critical override applies). Stores the original send body in request_payload and run_after (UTC). A background worker replays with forceDelivery and sets processed_at.
webhook_deliveries retries
Delivery rows use next_retry_at for exponential backoff when status = 'failed'. Partial index webhook_deliveries_retry on (tenant_id, status, next_retry_at) supports the worker. After max attempts, status moves to dlq and notification.webhook.delivery_failed.v1 is emitted (see EVENT_SCHEMAS).
Redis Key Layout
notif:tmpl:{templateId}:{version}:ast → compiled Handlebars AST (LRU, TTL 1h)
notif:pref:{tenantId}:{userId} → JSON (TTL 10m, invalidate on update)
notif:rate:{tenantId}:email:day:{YYYY-MM-DD} → counter
notif:rate:user:{userId}:email:day:{YYYY-MM-DD} → counter
notif:ws:{userId}:{deviceId} → server instance pointer (TTL 60s, heartbeat)
notif:digest:{windowId}:lock → distributed lock for flush
notif:suppress:{channel}:{addrHash} → boolean (TTL 1h, auth copy in Postgres)
S3 Bucket Layout
s3://ghasi-notif-prod/
rendered/{tenantId}/{YYYY}/{MM}/{DD}/{notificationId}.html # 30d
webhook-payloads/{tenantId}/{YYYY}/{MM}/{DD}/{deliveryId}.json # 90d
provider-raw/{provider}/{YYYY}/{MM}/{DD}/{hour}/ # 30d audit
archive/notifications/{tenantId}/{YYYY}/{MM}/parquet/ # 24m+ cold
Multi-Tenancy
- Every table has
tenant_idNOT NULL (except platform-global tables:notification_templateswheretenant_id IS NULLis allowed for platform templates). - RLS enforces tenant isolation; app sets
SET LOCAL app.tenant_id = '...'on every transaction. - Cross-tenant reads (admin ops) use the dedicated
ghasi_adminrole that bypasses RLS with explicit audit log entries.
Data Volume Estimates (steady state, 10M users)
| Entity | Rows/day | Rows/year | Avg Size |
|---|---|---|---|
| notifications | 50M | 18B | 2KB row + 30KB body → S3 |
| delivery_attempts | 55M | 20B | 0.3KB |
| webhook_deliveries | 2M | 730M | 0.5KB meta + body in S3 |
| preferences | mutations ~100K/d | - | 4KB |
| templates | < 1K total | - | 20KB |
Partitioning + archive strategy caps hot Postgres at ~3TB. Cold archive in Parquet on S3 is queryable via Athena for compliance.
Schema Migrations
- Tooling:
sqlx migrate(Rust) ornode-pg-migrate(Node), check-in SQL files under/migrations. - Forward-compatible: never break running producers.
- Add-column: always nullable or with default.
- Drop-column: two-release dance (stop writing → stop reading → drop).
- Index creation:
CREATE INDEX CONCURRENTLYonly; never blocking. - Partition maintenance: pg_partman with 12-month pre-creation window.
Backup & Recovery
- Postgres: continuous WAL archiving to S3, full backup daily, PITR window 35 days.
- Redis: RDB snapshots hourly, AOF for last-minute durability; rebuildable from Postgres.
- S3: versioning + Object Lock for webhook payloads (compliance).
- RTO: 30 min | RPO: 5 min (Postgres), 1 min (Redis semantics acceptable as a cache).