Skip to main content

Application Logic

:::info Source Sourced from services/notification-service/APPLICATION_LOGIC.md in the documentation repo. :::

Command / Query Separation

Commands (write-side)

  • EnqueueNotificationCommand
  • MarkNotificationReadCommand
  • UpdatePreferencesCommand
  • PublishTemplateCommand
  • DeprecateTemplateCommand
  • CreateWebhookSubscriptionCommand
  • RotateWebhookSecretCommand
  • RecordProviderReceiptCommand
  • RecordBounceCommand
  • ScrubUserDataCommand (GDPR)

Queries (read-side)

  • ListUserInAppNotificationsQuery
  • GetPreferencesQuery
  • ListTemplatesQuery
  • GetTemplateByKeyQuery
  • ListWebhookSubscriptionsQuery
  • GetNotificationAuditQuery (admin)
  • GetDeliveryStatsQuery (analytics handoff)

High-Level Flow: Event-Triggered Notification

Consumer Groups & Concurrency

ConsumerStreamDurable NameMax In-FlightAck Wait
NotificationRouterdomain.*notif-router25630s
OutboundEmailWorkernotif.outbound.emailnotif-email6460s
OutboundSmsWorkernotif.outbound.smsnotif-sms3260s
OutboundPushWorkernotif.outbound.pushnotif-push12830s
InAppFanoutnotif.outbound.inappnotif-inapp25610s
WebhookDispatchernotif.outbound.webhooknotif-webhook3260s
BounceIngestproviders.bounce.*notif-bounce3215s
DigestScheduler(cron-driven)notif-digest16120s

Horizontal scaling: each worker pool scales independently. Router is CPU-bound (template compilation) and can scale to 8 replicas easily; email worker is I/O-bound and caps at provider rate limits (SES: 50 req/s default, negotiable).

Command Handlers (detailed)

EnqueueNotificationCommand

input: {
tenantId, userId, templateKey, variables, channel?, locale?,
priority?, scheduledFor?, correlationId, sourceEvent?
}

steps:
1. validateTenantActive(tenantId) // tenant-service projection
2. loadUserProfile(userId) // identity projection
3. loadPreferences(userId)
4. resolveChannel(channel, preferences, templateKey) // if not explicit
5. resolveRecipientAddress(channel, userProfile)
6. resolveLocale(locale, preferences, userProfile)
7. loadTemplate(tenantId, templateKey, 'active') // tenant-scoped then global fallback
8. preferenceGate.decide(notification, preferences)
├─ send → createNotification(status: queued) → emit internal "render.requested"
├─ suppress → createNotification(status: suppressed, reason) → DONE
├─ defer → createNotification(status: queued, scheduledFor: quietHoursEnd)
└─ digest → rollIntoDigestGroup(category, userId)
9. persist (outbox-pattern: notification row + outbound_queue row in same tx)
10. return notificationId

Idempotency: (correlationId, userId, templateKey, channel) forms the natural idempotency key. Repeat submissions return the existing notificationId.

UpdatePreferencesCommand

  • Optimistic concurrency via version field.
  • Emits notification.preference.updated.v1.
  • Invalidates the user's in-memory preference cache (via LRU TTL + NATS cache-invalidation topic).
  • Denies the update if it tries to turn off security email (regulatory override).

PublishTemplateCommand

Workflow:

  1. Validate all locale bodies compile (Handlebars + MJML).
  2. Validate all declared variables are referenced (and vice versa) in at least one locale.
  3. Compile + cache AST.
  4. Transition draft → active.
  5. If a prior active version exists, transition it to deprecated with pointer to new version.
  6. Emit notification.template.published.v1.

Breaking-change detection: if new major version, existing scheduled notifications using prior version continue to use prior snapshot (snapshot stored inline on notification at creation).

RotateWebhookSecretCommand

  • Generates new 32-byte secret.
  • Keeps prior secret as signingSecretPrevious for 72h grace period.
  • Notifies customer via email (template notification.webhook.secret_rotated.email).
  • Schedules ScrubPreviousSecretCommand after 72h.

RecordBounceCommand

  • Triggered by provider webhook (SES SNS, Twilio, FCM error callback).
  • Updates notification status.
  • For hard bounces: inserts SuppressionEntry so future sends to same address are pre-blocked.
  • Emits notification.bounced.v1.
  • If bounce rate for tenant exceeds 5% in rolling 24h: emit platform alert and throttle.

Query Handlers

ListUserInAppNotificationsQuery

  • Reads denormalized inapp_feed projection (Postgres + Redis cache for hot reads).
  • Pagination: cursor-based on (createdAt desc, id desc).
  • Filters: unread, category, since.
  • Read-models updated by projector on every notification.delivered.v1 where channel=inapp.

GetNotificationAuditQuery

  • Admin-only with tenant scope.
  • Returns metadata + body-hash (body fetched separately only by authorized ops with explicit audit logging).
  • Backed by read-replica to avoid starving write workload.

Digest Engine Algorithm

State: digest_windows table keyed by (tenantId, userId, category, channel, windowId).

on notification enqueued with digestable=true:
windowKey = (user, category, channel, floor(now / 15min))
existing = load(windowKey)
if !existing:
existing = create(windowKey, TTL=30min)
schedule flushDigest(windowKey) at now + 15min
existing.append(notification)
if existing.count >= CATEGORY_THRESHOLD[category]:
flushDigest(windowKey) now

flushDigest(windowKey):
window = load(windowKey)
if window.count == 1:
send the single notification as-is (no digest wrapping)
else:
render digest template "digest.{category}.{channel}" with list
mark all rolled-in notifications as superseded
enqueue digest notification as child
delete window

Daily/weekly digests: cron-driven scheduler per tenant per user.

Outbound Worker Contract

Each channel worker implements:

interface OutboundWorker {
readonly channel: Channel;
readonly providerName: string;
send(notification: Notification, rendered: RenderedMessage): Promise<ProviderResult>;
// Must be idempotent on re-delivery (use providerMessageId if seen).
// Must classify errors into: transient (retry) vs terminal (fail).
}

Error classification:

  • Transient: 429, 5xx, timeouts, DNS failures → retry with backoff.
  • Terminal: 4xx (except 429), invalid address, hard bounce → mark failed/bounced.

Transactional Outbox Pattern

  • Writes to notifications and outbox tables happen in one Postgres transaction.
  • A relay process (or NATS source connector) ships outbox rows to JetStream.
  • Ensures exactly-once-to-broker semantics; provider send is at-least-once.

Concurrency & Idempotency

  • Inbound NATS messages: idempotent via (eventId, consumerName) dedupe table, 48h window.
  • Outbound provider sends: idempotent via providerMessageId once returned.
  • In-app WebSocket delivery: idempotent via notificationId on client-side ack.
  • Digest flush: guarded by SELECT FOR UPDATE on window row.

Clock & Timezone Handling

  • All domain timestamps in UTC.
  • Quiet hours and digest schedules stored in user's IANA timezone.
  • DST transitions: scheduled notifications re-computed on user tz change.
  • Use @js-temporal/polyfill (or native Temporal in Node 22+) for all tz math.

Rate Limiting

  • Per tenant per channel: configurable, default 10,000/day email, 500/day SMS.
  • Per user per channel: hard cap 50/day email, 5/day SMS to prevent runaway.
  • Per template: optional cap for marketing templates.
  • Enforcement: Redis token-bucket with Lua script; fallback to Postgres counter if Redis down.

Graceful Degradation

FailureDegraded Mode
Redis unavailableFall back to Postgres for preference cache; rate limit resets to permissive.
Provider down (SES)Flip feature flag to secondary provider; alert on-call; notifications accumulate in queue up to 6h before alerting critical.
Template-cache missCold-compile template (30-100ms penalty); log metric.
NATS backpressurePause consumer; publishers get 503 on POST /send; dashboard shows lag.
Postgres primary downRead-only mode from replica; commands return 503; outbox accumulates in app until replica promotes.