Skip to main content

APPLICATION_LOGIC — notification-service

Sibling: DOMAIN_MODEL · API_CONTRACTS · EVENT_SCHEMAS · SYNC_CONTRACT

Strategic anchors: 02 Enterprise Architecture §7 · 04 Event-Driven Architecture · 08 AI Architecture

The application layer of notification-service lives under src/application/ as use cases, ports (interfaces), DTOs, and orchestration scripts. It depends on the domain layer (src/domain/) and the port interfaces — never on adapter implementations or framework code. The composition root in src/infrastructure/config/ wires concrete adapters (Postgres repositories, Pub/Sub publisher, vendor SDKs) at startup.


1. Ports (interfaces only)

// src/application/ports/repositories.ts
export interface NotificationRepository {
save(n: Notification): Promise<Result<void, RepoError>>; // OCC-aware (version)
findById(tenantId: TenantId, id: NotificationId): Promise<Notification | null>;
findScheduledDue(now: ISODate, batchSize: number): Promise<Notification[]>;
findRetryableQueued(now: ISODate, batchSize: number): Promise<Notification[]>;
listFeed(tenantId: TenantId, recipientId: RecipientId, cursor?: string, limit?: number, unreadOnly?: boolean): Promise<Page<Notification>>;
countUnreadInapp(tenantId: TenantId, recipientId: RecipientId): Promise<number>;
markRead(tenantId: TenantId, ids: NotificationId[], at: ISODate): Promise<void>;
}

export interface TemplateRepository {
saveTemplate(t: Template): Promise<void>;
saveVersion(v: TemplateVersion): Promise<void>;
findActiveByKey(tenantId: TenantId | null, key: TemplateKey): Promise<{ template: Template; activeVersion: TemplateVersion } | null>;
findById(id: TemplateId, tenantId?: TenantId): Promise<Template | null>;
findVersion(id: TemplateVersionId): Promise<TemplateVersion | null>;
resolveActiveForTenant(tenantId: TenantId, key: TemplateKey): Promise<{ template: Template; activeVersion: TemplateVersion } | null>;
listTriggerMap(tenantId?: TenantId): Promise<TriggerMapEntry[]>;
}

export interface RecipientRepository {
upsertProjection(r: Recipient): Promise<void>;
findById(tenantId: TenantId, id: RecipientId): Promise<Recipient | null>;
findByIdentity(tenantId: TenantId, identityRef: GuestId | UserId): Promise<Recipient | null>;
findByAddress(tenantId: TenantId, kind: ChannelKind, addressHash: string): Promise<Recipient | null>;
saveOptOutToken(rcpId: RecipientId, token: OptOutToken): Promise<void>;
findOptOutToken(tokenHash: string): Promise<OptOutToken | null>;
consumeOptOutToken(tokenId: OptOutTokenId): Promise<void>;
}

export interface SuppressionRepository {
isSuppressed(tenantId: TenantId, channel: ChannelKind, addressHash: string): Promise<boolean>;
add(s: SuppressionRecord): Promise<void>;
override(id: SuppressionRecordId, by: UserId, at: ISODate): Promise<void>;
list(tenantId: TenantId, page: PageRequest): Promise<Page<SuppressionRecord>>;
}

export interface ChannelConfigRepository {
getChannel(tenantId: TenantId, kind: ChannelKind): Promise<Channel | null>;
saveChannel(c: Channel): Promise<void>;
saveCredential(c: ChannelCredential): Promise<void>;
flipHealth(channelId: ChannelId, status: 'active'|'degraded'|'down', reason: string): Promise<void>;
}

export interface WebhookInboundRepository {
insert(w: WebhookInbound): Promise<void>;
findByVendorMessageId(vendor: string, vendorMessageId: string): Promise<WebhookInbound | null>;
markStatus(id: WebhookInboundId, status: WebhookInbound['status'], reason?: string): Promise<void>;
}

export interface DispatchBatchRepository {
save(b: DispatchBatch): Promise<void>;
findById(tenantId: TenantId, id: DispatchBatchId): Promise<DispatchBatch | null>;
appendChild(batchId: DispatchBatchId, notificationId: NotificationId): Promise<void>;
}

// src/application/ports/event-publisher.ts
export interface EventPublisher {
publishOutbox(events: DomainEvent[], txnContext: TxnContext): Promise<void>; // outbox-backed
}

// src/application/ports/channel-adapters.ts (one port per channel kind)
export interface EmailPort {
send(req: EmailSendRequest): Promise<ChannelSendResult>;
}
export interface SmsPort {
send(req: SmsSendRequest): Promise<ChannelSendResult>;
}
export interface WhatsAppPort {
send(req: WhatsAppSendRequest): Promise<ChannelSendResult>;
}
export interface PushPort {
send(req: PushSendRequest): Promise<ChannelSendResult>;
}
export interface InAppPort {
publishToFeed(req: InAppPublishRequest): Promise<ChannelSendResult>;
pushOverWebSocket(tenantId: TenantId, recipientId: RecipientId, payload: unknown): Promise<void>;
}
export interface VoicePort {
placeCall(req: VoiceCallRequest): Promise<ChannelSendResult>;
}

export interface ChannelSendResult {
outcome: 'accepted' | 'rejected' | 'timeout';
vendorMessageId?: string;
vendor: string;
httpStatus?: number;
errorCode?: string;
errorMessage?: string;
retryAfterSeconds?: number;
latencyMs: number;
}

// src/application/ports/clients.ts
export interface AIClient {
// notification-service NEVER calls models directly; this client surfaces only
// the ai-orchestrator HITL approval and the AI-drafted-content lookup.
fetchAIDraftedContent(draftId: string): Promise<{ rendered: RenderedMessage; provenance: AIProvenance } | null>;
requestApprovalRoute(templateVersionId: TemplateVersionId, draftId: string): Promise<{ approvalRequestId: string }>;
}

export interface IdentityResolver {
resolveGuest(tenantId: TenantId, guestId: GuestId): Promise<GuestProjection | null>;
resolveUser(tenantId: TenantId, userId: UserId): Promise<UserProjection | null>;
resolveTenantAdmins(tenantId: TenantId): Promise<UserProjection[]>;
resolveStaffByAssignment(tenantId: TenantId, assignmentRef: string): Promise<UserProjection[]>;
resolveVendor(tenantId: TenantId, vendorRef: string): Promise<VendorProjection | null>;
}

export interface TenantConfigClient {
getThemeTokens(tenantId: TenantId): Promise<ThemeTokens>;
getNotificationPolicy(tenantId: TenantId): Promise<TenantNotificationPolicy>; // budgets, allowed categories, default locale
getDomainSenderEmail(tenantId: TenantId): Promise<EmailSender | null>; // verified DKIM domain
}

export interface ReservationProjectionClient {
getReservationSnapshot(tenantId: TenantId, reservationId: ReservationId): Promise<ReservationSnapshot | null>;
}

export interface BillingProjectionClient {
getInvoiceAttachment(tenantId: TenantId, invoiceId: string): Promise<Attachment | null>;
}

export interface LockProjectionClient {
getMobileKeyToken(tenantId: TenantId, keyCredentialId: KeyCredentialId): Promise<MobileKeyTokenSnapshot | null>;
}

export interface Clock { nowUtc(): ISODate; }
export interface IdGenerator { ulid(prefix: string): string; }
export interface Hasher { sha256(input: string): string; }
export interface HmacVerifier { verify(vendor: string, headers: Record<string,string>, body: Buffer): { valid: boolean; reason?: string } }

2. Use-case catalog

Use caseTriggerIdempotency keyEmits
EnqueueNotificationUseCasePOST /api/v1/notifications (internal) or trigger-map projection of consumed eventIdempotency-Key header or (tenantId, sourceEvent.id, templateKey, recipientId)requested.v1, possibly scheduled.v1 or suppressed.v1
EnqueueBatchUseCasePOST /api/v1/notifications/batchIdempotency-Key (per batch)per child: requested.v1
DispatchNotificationUseCasedispatcher worker pulls queued rows(notificationId, attemptNumber)dispatched.v1, on failure failed.v1 (terminal)
ProcessVendorWebhookUseCasePOST /api/v1/webhooks/vendors/:vendor(vendor, vendorMessageId, eventType, occurredAt)delivered.v1/bounced.v1/opened.v1/clicked.v1
ApplyConsumedEventUseCasePub/Sub subscriber on each consumed eventevent idone or more EnqueueNotificationUseCase invocations
ScheduleSendsTickUseCasescheduler worker (every 30 s)(notificationId) per tickper row: dispatch invocation; emits requested.v1 if not yet enqueued
UpdatePreferencesUseCasePATCH /api/v1/notification-preferences/:idIdempotency-Keypreferences.updated.v1
OptOutViaTokenUseCasePOST /api/v1/notification-preferences/opt-out/:tokenthe token idopted_out.v1, preferences.updated.v1
CreateTemplateVersionUseCasePOST /api/v1/notification-templates (or new version)Idempotency-Keynone until publish
PublishTemplateVersionUseCasePOST /api/v1/notification-templates/:id/publish(templateId, semver)template.published.v1
ArchiveTemplateUseCasePOST /api/v1/notification-templates/:id/archive(templateId)template.archived.v1
PreviewTemplateUseCasePOST /api/v1/notification-templates/:id/previewnone (read-only)none
TestSendTemplateUseCasePOST /api/v1/notification-templates/:id/test-sendIdempotency-Keyrequested.v1 (synthetic)
RegisterAIDraftedTemplateUseCaseevent ai.draft_content.ready.v1 from ai-orchestrator-serviceevent idnone until publish (HITL gated)
UpsertChannelConfigUseCasePATCH /api/v1/notification-channels/:idIdempotency-Keynone, with audit
RotateChannelCredentialUseCasePOST /api/v1/notification-channels/:id/credentials/:credId/rotateIdempotency-Keynone, with audit
ProbeChannelHealthUseCasehealth worker (every 60 s per channel)(channelId, probeAt)channel.health_changed.v1 on flip
IngestSuppressionUseCasecalled by ProcessVendorWebhookUseCase on bounce/complaint(tenantId, channel, addressHash, reason)suppressed.v1 (per future blocked send)
ReleaseSuppressionUseCasePOST /api/v1/suppressions/:id/releaseIdempotency-Keynone, with audit
ResendNotificationUseCasePOST /api/v1/notifications/:id/resendIdempotency-Keyrequested.v1 (sibling)
MarkReadUseCasePATCH /api/v1/notifications/:id/read (in-app)(notificationId)none

3. The two hot paths in detail

3.1 Event-driven enqueue → dispatch (booking confirmation)

Pub/Sub event: melmastoon.reservation.confirmed.v1


ApplyConsumedEventUseCase
1. dedupe via inbox(consumer='notif.router', eventId)
2. resolve trigger-map entries for eventType (platform + tenant overrides)
3. for each entry × resolved recipient:
EnqueueNotificationUseCase(input)


EnqueueNotificationUseCase(input)
1. resolve Recipient (cache → IdentityResolver fallback) → upsert projection
2. resolve Template + active TemplateVersion via TemplateRepository
(tenant override wins; else platform-global)
3. resolve ThemeTokens from TenantConfigClient (cached 5 min)
4. compute locale: input.locale || recipient.preferences.locale || tenant default
5. PreferenceGate(notif draft, prefs, suppression, now)
- send → continue
- suppress → persist Notification(status='suppressed'); emit notification.suppressed.v1; STOP
- defer(at) → continue with scheduledFor=at; status will land in 'scheduled'
6. Render via TemplateRenderer(template, variables, locale, themeTokens, channel)
- on RenderError → emit notification.failed.v1; persist with status='failed'; STOP
7. SenderResolver(channel, recipient.address)
- on missing → emit notification.failed.v1 with reason='sender_id_missing'; STOP
8. RateLimiter.check on (tenant_channel_day, recipient_channel_day)
- allow → continue
- defer(at) → schedule for that time
- suppress → status='suppressed' reason='rate_limit'; emit suppressed.v1
9. WhatsApp guard: if channel='whatsapp', verify whatsappTemplateRefs[locale].status='approved'
- else → fall back to SMS for transactional categories; for marketing → defer
10. Build Notification.create(...) with renderSnapshot, computed expiresAt (per retentionClass)
11. In a single Postgres transaction:
- notifications INSERT
- delivery_attempts (none yet)
- outbox INSERT (notification.requested.v1, optionally notification.scheduled.v1)
- inbox UPSERT (consumer dedupe)
12. If status='queued', enqueue dispatch hint (non-transactional Pub/Sub topic 'notif.dispatch.requested')

Outbox relay:
Periodically reads outbox where published_at IS NULL → publishes to Pub/Sub →
marks published_at. At-least-once; consumers de-dupe by event id.

3.2 Dispatch worker (per channel)

Pub/Sub topic 'notif.dispatch.requested' OR Postgres polling for status='queued' AND scheduledFor IS NULL/elapsed


DispatchNotificationUseCase(notificationId)
1. Load Notification, lock row (SELECT FOR UPDATE SKIP LOCKED)
2. Re-check Channel.status (degraded → use fallback vendor; down → defer 60s)
3. Re-check rate-limit (defensive)
4. Build adapter request from renderSnapshot + Sender + recipientAddress
5. Call vendor adapter (EmailPort/SmsPort/...) with timeout (per channel, e.g., email 10s, sms 8s, whatsapp 8s, push 5s)
6. ChannelSendResult →
accepted → notif.vendorAccepted(attempt); status='dispatched'; outbox dispatched.v1
rejected (retryable, e.g., 429/5xx) → notif.vendorRejectedRetryable; status='queued'
schedule next attempt: now + jitter(2^attempt s, max 1024s, honor Retry-After)
rejected (terminal, e.g., 4xx invalid recipient) → notif.vendorRejectedTerminal;
status='failed'; outbox failed.v1
if invalid_recipient → IngestSuppressionUseCase(reason='invalid_address')
timeout → same as retryable (counts as one attempt)
7. Persist + outbox in a single transaction
8. Update Channel.health.consecutiveFailures and flip status if >=3 (emit channel.health_changed.v1)

4. Vendor webhook ingestion

POST /api/v1/webhooks/vendors/{vendor} (no JWT; HMAC-validated)
1. HmacVerifier.verify(vendor, headers, raw body) → if invalid → 401 + WebhookInbound(status='rejected')
2. Persist raw body to GCS; insert WebhookInbound(status='received', signatureValid=true)
3. Parse vendor-specific payload → list of VendorEvent (a webhook can carry many — SendGrid sends arrays)
4. For each VendorEvent:
a. dedupe by (vendor, vendorMessageId, type, occurredAt)
b. correlate to Notification via DeliveryAttempt.vendorMessageId
- if not found, queue for late-correlation (cap 24h) into NOTIF_LATE_CORRELATION DLQ
c. apply state transition:
'sent' / 'queued' (vendor-side) → no-op
'delivered' → notif.markDelivered(...); outbox delivered.v1
'bounce' (hard|soft) → notif.markBounced(...); IngestSuppressionUseCase if hard or soft-threshold
'complaint' → notif.markBounced('complaint'); IngestSuppressionUseCase('complaint')
'open' → notif.markOpened(...)
'click' → notif.markClicked(...)
'failed' / 'undelivered' → notif.vendorRejectedTerminal(...); failed.v1
5. Mark WebhookInbound.status='applied'; return 204.
6. Any unhandled exception → status='dlq' + alert (NOTIF_WEBHOOK_DLQ runbook).

Deduplication and idempotency are domain-level; replaying the same webhook produces the same final state.


5. Scheduler worker (pre-arrival, post-stay, dunning)

The scheduler has two sources of work:

  1. notifications rows with status='scheduled' AND scheduledFor <= now() — drives quiet-hour deferrals and pre-rendered scheduled sends.
  2. notification_scheduled table — rows projected from upstream events that need future enqueue. Schema in DATA_MODEL §3.10. Examples:
    • on reservation.confirmed.v1 with stay_start = D, insert notification_scheduled(run_after = D - 24h, kind='pre_arrival_reminder', payload={...}).
    • on reservation.checked_out.v1, insert notification_scheduled(run_after = checkedOutAt + 24h, kind='post_stay_thank_you', payload={...}).
    • on billing.subscription.payment_failed.v1, insert three notification_scheduled rows (T+0, T+72h, T+168h) for the dunning sequence.
ScheduleSendsTickUseCase (every 30 s)
for each row in notification_scheduled where run_after <= now() and processed_at IS NULL, batch=200, FOR UPDATE SKIP LOCKED:
1. Re-validate context (e.g., is the reservation still active? if cancelled, mark obsolete and skip).
2. EnqueueNotificationUseCase(payload + force=true)
3. mark processed_at = now()
for each row in notifications where status='scheduled' AND scheduledFor <= now(), batch=500, FOR UPDATE SKIP LOCKED:
1. transition to 'queued'; emit dispatch hint

Cancellation propagation: when reservation.cancelled.v1 arrives, ApplyConsumedEventUseCase marks all matching notification_scheduled rows obsolete=true so they are skipped on next tick (no embarrassing post-stay thank-you to a cancelled booking).


6. AI-drafted content path (HITL)

ai-orchestrator-service emits melmastoon.ai.draft_content.ready.v1
{ tenantId, draftId, purpose: 'notification.template.copy' | 'notification.message.personalisation',
templateKey, locale, channel, audienceSegment?, renderedBody, subject?, aiProvenance }


RegisterAIDraftedTemplateUseCase
Case A: purpose='notification.template.copy'
1. Load Template (or create new draft Template) with key from event
2. Append a TemplateVersion(source='ai_drafted', semver=next, status='draft', aiProvenance=event.aiProvenance,
locales={ [locale]: { body: rendered, bodyFormat: matches channel } })
3. Notify tenant admins via in-app + email: 'AI-drafted template version awaiting review' (uses notification flow itself!)
4. Tenant admin opens backoffice → previews → POST /publish with approverId
5. PublishTemplateVersionUseCase enforces (source=='ai_drafted') => requires approver param; otherwise raises MELMASTOON.AI.HITL_REQUIRED

Case B: purpose='notification.message.personalisation'
1. The ai-orchestrator returns a per-recipient personalised body for an in-flight EnqueueNotificationUseCase
(synchronous AIClient.fetchAIDraftedContent during enqueue; never live model call from us)
2. The Notification carries aiProvenance attached to the renderSnapshot
3. On AIClient timeout / failure → fall back to deterministic template render (tenant-configurable degrade behaviour)

Per-tenant policy: AI personalisation can be disabled | suggest_only | auto_send. Default is suggest_only for new tenants — the AI draft is shown to staff in backoffice for review before send. auto_send is opt-in and limited to the marketing and reminder categories.


7. Trigger map (data-driven projection table)

Stored in notification_trigger_map (see DATA_MODEL §3.11) with a 30-second hot-reload TTL in the application layer. Platform default below; tenants can append (additive only for regulated categories).

Consumed eventRecipient resolverChannelsTemplate key (per channel)Schedule
melmastoon.reservation.confirmed.v1primary_guestemail (always), sms+whatsapp (per recipient pref)reservation.confirmed.email, reservation.confirmed.sms, reservation.confirmed.whatsappimmediate
melmastoon.reservation.confirmed.v1 (T-24h)primary_guestsmswhatsapp (whichever opt-in), email
melmastoon.reservation.cancelled.v1primary_guestemail, smsreservation.cancelled.email, reservation.cancelled.smsimmediate
melmastoon.reservation.modified.v1primary_guestemail, `whatsappsms`
melmastoon.reservation.dates_changed.v1primary_guestemail, smsreservation.dates_changed.email, reservation.dates_changed.smsimmediate
melmastoon.reservation.checked_in.v1primary_guestinapp, `whatsappsms`
melmastoon.reservation.checked_out.v1 (T+24h)primary_guestemailreservation.post_stay.email (with invoice attachment)scheduled checked_out_at + 24h
melmastoon.lock_integration.key_credential.issued.v1primary_guestrecipient-pref preferred channel for mobile_key, fallback smsmobile_key.issued.whatsapp, mobile_key.issued.sms, mobile_key.issued.emailimmediate
melmastoon.lock_integration.key_credential.revoked.v1 (when reason in {early_checkout,cancelled})primary_guestsmswhatsapp
melmastoon.billing.invoice.generated.v1primary_guestemailbilling.invoice.emailimmediate
melmastoon.billing.subscription.payment_failed.v1tenant_adminsemail, inappbilling.dunning.email (3 templates: T+0/T+3/T+7)scheduled
melmastoon.iam.password.reset_requested.v1iam_subjectemailiam.password_reset.email (security category — bypasses opt-out)immediate
melmastoon.iam.session.suspicious_login.v1iam_subjectemail, smsiam.suspicious_login.*immediate
melmastoon.tenant.invitation.sent.v1iam_subject (invitee email projection)emailtenant.invitation.emailimmediate
melmastoon.maintenance.work_order.assigned.v1vendor_assigneesms (always), whatsapp (if vendor opted in)maintenance.work_order.assigned.smsimmediate

Notes:

  • primary_guest resolves to Reservation.primaryGuest; the app reads the projection from Postgres (eventually consistent) or falls back to a synchronous ReservationProjectionClient call.
  • tenant_admins resolves to all users with role='OWNER'|'GM'|'BILLING_ADMIN' for the tenant.
  • iam_subject resolves the user identified in the event payload (with strict tenant scoping).
  • WhatsApp templates require pre-approval per locale; if the WhatsApp template is not approved, the trigger entry's WhatsApp channel falls back to SMS.

8. Saga participation (we never orchestrate; we observe and emit)

notification-service participates in two platform sagas:

SagaOur role
Booking saga (04 §7)Consume reservation.confirmed.v1, reservation.cancelled.v1, reservation.checked_in.v1, reservation.checked_out.v1; emit notification lifecycle events. We are not a compensable step: a missed confirmation email does not reverse the booking.
Mobile-key delivery saga (09 §6)Consume key_credential.issued.v1; deliver the mobile-key one-time-link or QR code; emit notification.delivered.v1; on failure, emit notification.failed.v1 so lock-integration-service can re-issue or fall back to mechanical key.

We never publish events with causationId chains beyond the immediate trigger; this prevents accidental saga loops.


9. Concurrency, idempotency, and outbox

  • Idempotency for HTTP: Idempotency-Key header on every POST/PATCH mutation. Stored in idempotency_keys (see DATA_MODEL §3.12) keyed (tenantId, scope, key) with request_hash and stored response. Reuse with same body returns the same response; reuse with a different body returns 409 MELMASTOON.SYNC.IDEMPOTENCY_KEY_REUSED.
  • Idempotency for consumed events: consumed_events(consumer_name, event_id) with 7-day partition. The router sets consumer_name='notif.router.<topic>'.
  • Idempotency for vendor webhooks: (vendor, vendorMessageId, eventType, occurredAt) unique constraint on webhook_inbound_events.
  • Idempotency for trigger-map fan-out: (tenantId, sourceEventId, templateKey, recipientId) unique on notifications (partial — only when sourceEventId IS NOT NULL).
  • OCC on notifications, notification_preferences, templates, template_versions, channels via version column; mismatch → 412 MELMASTOON.GENERAL.PRECONDITION_FAILED.
  • Outbox is the only path domain events reach Pub/Sub; the outbox relay (OutboxRelayUseCase) batches every 250 ms or 100 events.

10. Cross-aggregate orchestration patterns

  • Sender resolution requires Channel + tenant.settings: EnqueueNotificationUseCase loads both before constructing the dispatch. Channel is the source of truth for vendor selection; tenant.settings for branding and budgets.
  • Mobile-key flow: when consuming key_credential.issued.v1, the use case calls LockProjectionClient.getMobileKeyToken(...) to fetch the one-time-link payload and embeds it as a mobileKeyTokenRef on the Notification. The link is single-use; the WhatsApp/SMS body contains the URL but the JSON-LD-style payload includes its tokenHash so we can audit which token was sent in which message.
  • Invoice attach: when consuming billing.invoice.generated.v1, the use case calls BillingProjectionClient.getInvoiceAttachment(...) to fetch the GCS-stored PDF reference; the attachment is by reference (URI), never inline.

11. Application-level error mapping

Domain errorApplication responseHTTP / event
InvalidIdErrorreject at the boundary400 MELMASTOON.GENERAL.VALIDATION_FAILED
CrossTenantReferenceErrorreject at the boundary422 MELMASTOON.GENERAL.CROSS_TENANT_REFERENCE
IllegalStateTransitionErrorlog + reject409 + map per concrete code
TemplateNotFoundErrorenqueue path: emit notification.failed.v1; API path: 404 MELMASTOON.NOTIFICATION.TEMPLATE_NOT_FOUNDper call
SenderIdMissingErrorenqueue path: emit notification.failed.v1 with reason; alert tenant admin; API path: 422 MELMASTOON.NOTIFICATION.SENDER_ID_MISSINGper call
WhatsAppTemplateNotApprovedErrorfall back per policy; for transactional fall back to SMS; emit notification.failed.v1 if no fallback existsper call
RateLimitExceededErrordefer or suppress; emit notification.scheduled.v1 or notification.suppressed.v1none externally
BudgetExhaustedErrorreject batch creation; 402 MELMASTOON.TENANT.PLAN_LIMIT_EXCEEDED (re-used) or 429per call
HITLApprovalRequiredErrorpublish path: 403 MELMASTOON.AI.HITL_REQUIREDper call
WebhookSignatureInvalidError401 MELMASTOON.PAYMENT.WEBHOOK_SIGNATURE_INVALID-equivalent (re-used taxonomy semantically; we add MELMASTOON.NOTIFICATION.WEBHOOK_SIGNATURE_INVALID to ERROR_CODES)per call

The full list of error codes used by this service lives in API_CONTRACTS §4 and is registered in docs/standards/ERROR_CODES.md.


12. Background workers

WorkerSchedulePurpose
outbox-relaycontinuous (250 ms tick)Publishes outbox rows to Pub/Sub
dispatch-worker-emailcontinuous (long-poll Pub/Sub notif.dispatch.requested filter channel=email)Calls EmailPort for queued email notifications
dispatch-worker-smscontinuousCalls SmsPort
dispatch-worker-whatsappcontinuousCalls WhatsAppPort
dispatch-worker-pushcontinuousCalls PushPort
dispatch-worker-inappcontinuousCalls InAppPort.publishToFeed + WS push
dispatch-worker-voicecontinuous (Phase 3+)Calls VoicePort
schedulerevery 30 sPromotes scheduledqueued; reads notification_scheduled to enqueue future sends
webhook-late-correlatorevery 60 sCorrelates webhooks whose Notification was not yet visible at receive-time (cap 24 h)
channel-health-proberevery 60 s per channelCalls a vendor-specific health probe; flips Channel.status
template-cache-warmerevery 30 sHot-reloads trigger map and per-tenant template cache from Postgres
suppression-replicatorevery 60 sPushes recent suppression rows into Memorystore set for fast dispatch-time lookup
partition-maintainer (cron job)hourlypg_partman keeps notifications and delivery_attempts partitions current
archive-rotator (cron job)dailyMoves >24-month rows to GCS Parquet for BigQuery cold tier
dlq-monitor (cron job)every 5 minCounts DLQ rows; pages on threshold

Each worker runs as a separate Cloud Run instance with its own scaling envelope (see DEPLOYMENT_TOPOLOGY).


13. Tracing and correlation

  • Every use case enters with a W3C traceparent from the calling context (HTTP gateway, Pub/Sub message attribute, scheduler tick).
  • Spans: notification.enqueue, notification.preference_gate, notification.render, notification.sender_resolve, notification.rate_limit, notification.dispatch.<channel>, notification.webhook_ingest.<vendor>, notification.scheduler.tick.
  • Span attributes (always): tenant.id, notification.id, notification.channel, notification.category, template.key, template.semver, recipient.id (hashed), vendor, attempt.number, result.outcome. Defined in OBSERVABILITY.