Skip to main content

Data Model

:::info Source Sourced from services/notification-service/DATA_MODEL.md in the documentation repo. :::

Storage Layout

StoreRole
Postgres 16 (primary)Transactional state: notifications, preferences, templates, webhooks, suppressions, outbox
Postgres read-replicaFeed/list queries, audit queries
Redis (Cluster)Template AST cache, preference cache, rate limits, WebSocket presence, digest windows
S3 (or MinIO in dev)Rendered email HTML (hot storage 30d), raw provider webhook payloads (audit, 90d)
NATS JetStreamInbound + outbound event streams (see EVENT_SCHEMAS.md)

Postgres Schema

Schema isolation: every table has tenant_id column; Row-Level Security enabled with USING (tenant_id = current_setting('app.tenant_id')::uuid) plus a security_definer bypass role for cross-tenant maintenance jobs.

notifications

CREATE TABLE notifications (
id ULID PRIMARY KEY,
tenant_id UUID NOT NULL,
user_id UUID NOT NULL,
template_key TEXT NOT NULL,
template_version TEXT NOT NULL,
template_snapshot JSONB NOT NULL, -- frozen at enqueue
channel TEXT NOT NULL, -- enum-constrained
recipient_address JSONB NOT NULL, -- VO
variables JSONB NOT NULL,
locale TEXT NOT NULL,
status TEXT NOT NULL,
category TEXT NOT NULL,
priority TEXT NOT NULL DEFAULT 'normal',
scheduled_for TIMESTAMPTZ,
correlation_id TEXT NOT NULL,
source_event_type TEXT,
source_event_id TEXT,
digest_group_id ULID,
suppression_reason TEXT,
delivered_at TIMESTAMPTZ,
read_at TIMESTAMPTZ,
expires_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
body_uri TEXT, -- S3 URI for rendered body
CONSTRAINT notifications_channel_chk CHECK (channel IN ('email','sms','push','inapp','webhook')),
CONSTRAINT notifications_status_chk CHECK (status IN ('queued','rendering','sending','sent','delivered','opened','clicked','failed','bounced','suppressed','superseded')),
CONSTRAINT notifications_category_chk CHECK (category IN ('academic','billing','marketing','security','social','system','compliance')),
CONSTRAINT notifications_priority_chk CHECK (priority IN ('low','normal','high','critical'))
) PARTITION BY RANGE (created_at);

-- Monthly partitions, rolling 24 months then archived to S3
CREATE TABLE notifications_y2026m04 PARTITION OF notifications
FOR VALUES FROM ('2026-04-01') TO ('2026-05-01');
-- (pg_partman manages partition creation)

CREATE INDEX ix_notif_user_feed
ON notifications (tenant_id, user_id, channel, created_at DESC)
WHERE channel = 'inapp';

CREATE INDEX ix_notif_user_unread
ON notifications (tenant_id, user_id, created_at DESC)
WHERE channel = 'inapp' AND read_at IS NULL;

CREATE INDEX ix_notif_correlation
ON notifications (tenant_id, correlation_id);

CREATE INDEX ix_notif_digest_group
ON notifications (tenant_id, digest_group_id)
WHERE digest_group_id IS NOT NULL;

CREATE INDEX ix_notif_scheduled
ON notifications (scheduled_for)
WHERE status = 'queued' AND scheduled_for IS NOT NULL;

ALTER TABLE notifications ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON notifications
USING (tenant_id::text = current_setting('app.tenant_id', true));

delivery_attempts

CREATE TABLE delivery_attempts (
notification_id ULID NOT NULL,
attempt_number INT NOT NULL,
provider_name TEXT NOT NULL,
provider_message_id TEXT,
requested_at TIMESTAMPTZ NOT NULL,
responded_at TIMESTAMPTZ,
outcome TEXT NOT NULL,
error_code TEXT,
error_message TEXT,
latency_ms INT,
PRIMARY KEY (notification_id, attempt_number),
FOREIGN KEY (notification_id) REFERENCES notifications(id) ON DELETE CASCADE
);
CREATE INDEX ix_attempts_provider_msg ON delivery_attempts (provider_name, provider_message_id);

notification_templates

CREATE TABLE notification_templates (
id UUID PRIMARY KEY,
tenant_id UUID, -- NULL for platform-global
key TEXT NOT NULL,
version TEXT NOT NULL,
channel TEXT NOT NULL,
category TEXT NOT NULL,
subject_tmpl TEXT,
locales JSONB NOT NULL, -- {locale: {body, bodyFormat, ...}}
variables JSONB NOT NULL, -- [{name, type, required, ...}]
default_priority TEXT NOT NULL DEFAULT 'normal',
status TEXT NOT NULL,
created_by UUID NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
published_at TIMESTAMPTZ,
deprecated_at TIMESTAMPTZ,
supersedes_id UUID,
UNIQUE (tenant_id, key, version)
);

CREATE INDEX ix_templates_active
ON notification_templates (tenant_id, key, status)
WHERE status = 'active';

notification_preferences

CREATE TABLE notification_preferences (
tenant_id UUID NOT NULL,
user_id UUID NOT NULL,
language TEXT NOT NULL,
channels JSONB NOT NULL,
quiet_hours JSONB NOT NULL,
digest_schedule JSONB NOT NULL,
guardian_routing JSONB,
global_unsubscribed_at TIMESTAMPTZ,
version INT NOT NULL DEFAULT 1,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (tenant_id, user_id)
);

ALTER TABLE notification_preferences ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON notification_preferences
USING (tenant_id::text = current_setting('app.tenant_id', true));

webhook_subscriptions

CREATE TABLE webhook_subscriptions (
id UUID PRIMARY KEY,
tenant_id UUID NOT NULL,
name TEXT NOT NULL,
target_url TEXT NOT NULL,
event_filters JSONB NOT NULL,
signing_secret_ciphertext BYTEA NOT NULL,
signing_secret_prev_ciphertext BYTEA,
signing_secret_rotated_at TIMESTAMPTZ,
status TEXT NOT NULL DEFAULT 'active',
created_by UUID NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
last_success_at TIMESTAMPTZ,
last_failure_at TIMESTAMPTZ,
consecutive_failures INT NOT NULL DEFAULT 0,
auto_disable_threshold INT NOT NULL DEFAULT 20
);

-- Secrets encrypted with KMS envelope encryption. Plain secret never stored.

webhook_deliveries

CREATE TABLE webhook_deliveries (
id ULID PRIMARY KEY,
subscription_id UUID NOT NULL REFERENCES webhook_subscriptions(id),
tenant_id UUID NOT NULL,
event_type TEXT NOT NULL,
event_id TEXT NOT NULL,
payload_uri TEXT NOT NULL, -- S3 URI (audit, 90d)
status TEXT NOT NULL, -- queued|delivered|failed|disabled
attempts INT NOT NULL DEFAULT 0,
next_attempt_at TIMESTAMPTZ,
last_status_code INT,
last_error TEXT,
first_attempted_at TIMESTAMPTZ,
delivered_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
) PARTITION BY RANGE (created_at);

suppression_entries

CREATE TABLE suppression_entries (
id ULID PRIMARY KEY,
tenant_id UUID NOT NULL, -- or 'platform' sentinel for cross-tenant
channel TEXT NOT NULL,
address_hash TEXT NOT NULL, -- sha256(lowercased email/phone)
address_ciphertext BYTEA, -- recoverable for reinstatement
reason TEXT NOT NULL,
originating_message_id ULID,
suppressed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
expires_at TIMESTAMPTZ,
overridden_by UUID,
overridden_at TIMESTAMPTZ,
UNIQUE (tenant_id, channel, address_hash)
);

CREATE INDEX ix_suppression_active
ON suppression_entries (tenant_id, channel, address_hash)
WHERE overridden_at IS NULL;

outbox

CREATE TABLE outbox (
id ULID PRIMARY KEY,
aggregate_id TEXT NOT NULL,
aggregate_type TEXT NOT NULL,
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
headers JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
published_at TIMESTAMPTZ
);
CREATE INDEX ix_outbox_unpublished ON outbox (created_at) WHERE published_at IS NULL;

consumed_events (inbound dedupe)

CREATE TABLE consumed_events (
consumer_name TEXT NOT NULL,
event_id TEXT NOT NULL,
consumed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (consumer_name, event_id)
);
-- 7-day retention via pg_partman daily partitions

digest_windows

CREATE TABLE digest_windows (
id ULID PRIMARY KEY,
tenant_id UUID NOT NULL,
user_id UUID NOT NULL,
category TEXT NOT NULL,
channel TEXT NOT NULL,
window_start TIMESTAMPTZ NOT NULL,
window_end TIMESTAMPTZ NOT NULL,
item_ids ULID[] NOT NULL DEFAULT '{}',
item_count INT NOT NULL DEFAULT 0,
state TEXT NOT NULL DEFAULT 'open', -- open|flushing|flushed
flush_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE UNIQUE INDEX ux_digest_window
ON digest_windows (tenant_id, user_id, category, channel, window_start);
CREATE INDEX ix_digest_flush_ready
ON digest_windows (flush_at)
WHERE state = 'open';

rate_limit_counters

Ephemeral in Redis; Postgres fallback table identical in shape when Redis down.

EP-10 operational tables (migration 0002_ep10_dod.sql)

Implemented in the Ghasi-edTech monorepo notification-service. These support HTTP idempotency, quiet-hours deferral, and webhook retry indexing.

idempotency_keys

CREATE TABLE idempotency_keys (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
scope TEXT NOT NULL,
idempotency_key TEXT NOT NULL,
request_hash TEXT NOT NULL,
response_status INTEGER NOT NULL,
response_body JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (tenant_id, scope, idempotency_key)
);

RLS: tenant_id = current_setting('app.tenant_id')::uuid.

notification_scheduled

Rows created when email/SMS/push would fall in the user’s quiet hours (unless critical override applies). Stores the original send body in request_payload and run_after (UTC). A background worker replays with forceDelivery and sets processed_at.

webhook_deliveries retries

Delivery rows use next_retry_at for exponential backoff when status = 'failed'. Partial index webhook_deliveries_retry on (tenant_id, status, next_retry_at) supports the worker. After max attempts, status moves to dlq and notification.webhook.delivery_failed.v1 is emitted (see EVENT_SCHEMAS).

Redis Key Layout

notif:tmpl:{templateId}:{version}:ast → compiled Handlebars AST (LRU, TTL 1h)
notif:pref:{tenantId}:{userId} → JSON (TTL 10m, invalidate on update)
notif:rate:{tenantId}:email:day:{YYYY-MM-DD} → counter
notif:rate:user:{userId}:email:day:{YYYY-MM-DD} → counter
notif:ws:{userId}:{deviceId} → server instance pointer (TTL 60s, heartbeat)
notif:digest:{windowId}:lock → distributed lock for flush
notif:suppress:{channel}:{addrHash} → boolean (TTL 1h, auth copy in Postgres)

S3 Bucket Layout

s3://ghasi-notif-prod/
rendered/{tenantId}/{YYYY}/{MM}/{DD}/{notificationId}.html # 30d
webhook-payloads/{tenantId}/{YYYY}/{MM}/{DD}/{deliveryId}.json # 90d
provider-raw/{provider}/{YYYY}/{MM}/{DD}/{hour}/ # 30d audit
archive/notifications/{tenantId}/{YYYY}/{MM}/parquet/ # 24m+ cold

Multi-Tenancy

  • Every table has tenant_id NOT NULL (except platform-global tables: notification_templates where tenant_id IS NULL is allowed for platform templates).
  • RLS enforces tenant isolation; app sets SET LOCAL app.tenant_id = '...' on every transaction.
  • Cross-tenant reads (admin ops) use the dedicated ghasi_admin role that bypasses RLS with explicit audit log entries.

Data Volume Estimates (steady state, 10M users)

EntityRows/dayRows/yearAvg Size
notifications50M18B2KB row + 30KB body → S3
delivery_attempts55M20B0.3KB
webhook_deliveries2M730M0.5KB meta + body in S3
preferencesmutations ~100K/d-4KB
templates< 1K total-20KB

Partitioning + archive strategy caps hot Postgres at ~3TB. Cold archive in Parquet on S3 is queryable via Athena for compliance.

Schema Migrations

  • Tooling: sqlx migrate (Rust) or node-pg-migrate (Node), check-in SQL files under /migrations.
  • Forward-compatible: never break running producers.
  • Add-column: always nullable or with default.
  • Drop-column: two-release dance (stop writing → stop reading → drop).
  • Index creation: CREATE INDEX CONCURRENTLY only; never blocking.
  • Partition maintenance: pg_partman with 12-month pre-creation window.

Backup & Recovery

  • Postgres: continuous WAL archiving to S3, full backup daily, PITR window 35 days.
  • Redis: RDB snapshots hourly, AOF for last-minute durability; rebuildable from Postgres.
  • S3: versioning + Object Lock for webhook payloads (compliance).
  • RTO: 30 min | RPO: 5 min (Postgres), 1 min (Redis semantics acceptable as a cache).