APPLICATION_LOGIC — notification-service
Sibling: DOMAIN_MODEL · API_CONTRACTS · EVENT_SCHEMAS · SYNC_CONTRACT
Strategic anchors: 02 Enterprise Architecture §7 · 04 Event-Driven Architecture · 08 AI Architecture
The application layer of notification-service lives under src/application/ as use cases, ports (interfaces), DTOs, and orchestration scripts. It depends on the domain layer (src/domain/) and the port interfaces — never on adapter implementations or framework code. The composition root in src/infrastructure/config/ wires concrete adapters (Postgres repositories, Pub/Sub publisher, vendor SDKs) at startup.
1. Ports (interfaces only)
// src/application/ports/repositories.ts
export interface NotificationRepository {
save(n: Notification): Promise<Result<void, RepoError>>; // OCC-aware (version)
findById(tenantId: TenantId, id: NotificationId): Promise<Notification | null>;
findScheduledDue(now: ISODate, batchSize: number): Promise<Notification[]>;
findRetryableQueued(now: ISODate, batchSize: number): Promise<Notification[]>;
listFeed(tenantId: TenantId, recipientId: RecipientId, cursor?: string, limit?: number, unreadOnly?: boolean): Promise<Page<Notification>>;
countUnreadInapp(tenantId: TenantId, recipientId: RecipientId): Promise<number>;
markRead(tenantId: TenantId, ids: NotificationId[], at: ISODate): Promise<void>;
}
export interface TemplateRepository {
saveTemplate(t: Template): Promise<void>;
saveVersion(v: TemplateVersion): Promise<void>;
findActiveByKey(tenantId: TenantId | null, key: TemplateKey): Promise<{ template: Template; activeVersion: TemplateVersion } | null>;
findById(id: TemplateId, tenantId?: TenantId): Promise<Template | null>;
findVersion(id: TemplateVersionId): Promise<TemplateVersion | null>;
resolveActiveForTenant(tenantId: TenantId, key: TemplateKey): Promise<{ template: Template; activeVersion: TemplateVersion } | null>;
listTriggerMap(tenantId?: TenantId): Promise<TriggerMapEntry[]>;
}
export interface RecipientRepository {
upsertProjection(r: Recipient): Promise<void>;
findById(tenantId: TenantId, id: RecipientId): Promise<Recipient | null>;
findByIdentity(tenantId: TenantId, identityRef: GuestId | UserId): Promise<Recipient | null>;
findByAddress(tenantId: TenantId, kind: ChannelKind, addressHash: string): Promise<Recipient | null>;
saveOptOutToken(rcpId: RecipientId, token: OptOutToken): Promise<void>;
findOptOutToken(tokenHash: string): Promise<OptOutToken | null>;
consumeOptOutToken(tokenId: OptOutTokenId): Promise<void>;
}
export interface SuppressionRepository {
isSuppressed(tenantId: TenantId, channel: ChannelKind, addressHash: string): Promise<boolean>;
add(s: SuppressionRecord): Promise<void>;
override(id: SuppressionRecordId, by: UserId, at: ISODate): Promise<void>;
list(tenantId: TenantId, page: PageRequest): Promise<Page<SuppressionRecord>>;
}
export interface ChannelConfigRepository {
getChannel(tenantId: TenantId, kind: ChannelKind): Promise<Channel | null>;
saveChannel(c: Channel): Promise<void>;
saveCredential(c: ChannelCredential): Promise<void>;
flipHealth(channelId: ChannelId, status: 'active'|'degraded'|'down', reason: string): Promise<void>;
}
export interface WebhookInboundRepository {
insert(w: WebhookInbound): Promise<void>;
findByVendorMessageId(vendor: string, vendorMessageId: string): Promise<WebhookInbound | null>;
markStatus(id: WebhookInboundId, status: WebhookInbound['status'], reason?: string): Promise<void>;
}
export interface DispatchBatchRepository {
save(b: DispatchBatch): Promise<void>;
findById(tenantId: TenantId, id: DispatchBatchId): Promise<DispatchBatch | null>;
appendChild(batchId: DispatchBatchId, notificationId: NotificationId): Promise<void>;
}
// src/application/ports/event-publisher.ts
export interface EventPublisher {
publishOutbox(events: DomainEvent[], txnContext: TxnContext): Promise<void>; // outbox-backed
}
// src/application/ports/channel-adapters.ts (one port per channel kind)
export interface EmailPort {
send(req: EmailSendRequest): Promise<ChannelSendResult>;
}
export interface SmsPort {
send(req: SmsSendRequest): Promise<ChannelSendResult>;
}
export interface WhatsAppPort {
send(req: WhatsAppSendRequest): Promise<ChannelSendResult>;
}
export interface PushPort {
send(req: PushSendRequest): Promise<ChannelSendResult>;
}
export interface InAppPort {
publishToFeed(req: InAppPublishRequest): Promise<ChannelSendResult>;
pushOverWebSocket(tenantId: TenantId, recipientId: RecipientId, payload: unknown): Promise<void>;
}
export interface VoicePort {
placeCall(req: VoiceCallRequest): Promise<ChannelSendResult>;
}
export interface ChannelSendResult {
outcome: 'accepted' | 'rejected' | 'timeout';
vendorMessageId?: string;
vendor: string;
httpStatus?: number;
errorCode?: string;
errorMessage?: string;
retryAfterSeconds?: number;
latencyMs: number;
}
// src/application/ports/clients.ts
export interface AIClient {
// notification-service NEVER calls models directly; this client surfaces only
// the ai-orchestrator HITL approval and the AI-drafted-content lookup.
fetchAIDraftedContent(draftId: string): Promise<{ rendered: RenderedMessage; provenance: AIProvenance } | null>;
requestApprovalRoute(templateVersionId: TemplateVersionId, draftId: string): Promise<{ approvalRequestId: string }>;
}
export interface IdentityResolver {
resolveGuest(tenantId: TenantId, guestId: GuestId): Promise<GuestProjection | null>;
resolveUser(tenantId: TenantId, userId: UserId): Promise<UserProjection | null>;
resolveTenantAdmins(tenantId: TenantId): Promise<UserProjection[]>;
resolveStaffByAssignment(tenantId: TenantId, assignmentRef: string): Promise<UserProjection[]>;
resolveVendor(tenantId: TenantId, vendorRef: string): Promise<VendorProjection | null>;
}
export interface TenantConfigClient {
getThemeTokens(tenantId: TenantId): Promise<ThemeTokens>;
getNotificationPolicy(tenantId: TenantId): Promise<TenantNotificationPolicy>; // budgets, allowed categories, default locale
getDomainSenderEmail(tenantId: TenantId): Promise<EmailSender | null>; // verified DKIM domain
}
export interface ReservationProjectionClient {
getReservationSnapshot(tenantId: TenantId, reservationId: ReservationId): Promise<ReservationSnapshot | null>;
}
export interface BillingProjectionClient {
getInvoiceAttachment(tenantId: TenantId, invoiceId: string): Promise<Attachment | null>;
}
export interface LockProjectionClient {
getMobileKeyToken(tenantId: TenantId, keyCredentialId: KeyCredentialId): Promise<MobileKeyTokenSnapshot | null>;
}
export interface Clock { nowUtc(): ISODate; }
export interface IdGenerator { ulid(prefix: string): string; }
export interface Hasher { sha256(input: string): string; }
export interface HmacVerifier { verify(vendor: string, headers: Record<string,string>, body: Buffer): { valid: boolean; reason?: string } }
2. Use-case catalog
| Use case | Trigger | Idempotency key | Emits |
|---|---|---|---|
EnqueueNotificationUseCase | POST /api/v1/notifications (internal) or trigger-map projection of consumed event | Idempotency-Key header or (tenantId, sourceEvent.id, templateKey, recipientId) | requested.v1, possibly scheduled.v1 or suppressed.v1 |
EnqueueBatchUseCase | POST /api/v1/notifications/batch | Idempotency-Key (per batch) | per child: requested.v1 |
DispatchNotificationUseCase | dispatcher worker pulls queued rows | (notificationId, attemptNumber) | dispatched.v1, on failure failed.v1 (terminal) |
ProcessVendorWebhookUseCase | POST /api/v1/webhooks/vendors/:vendor | (vendor, vendorMessageId, eventType, occurredAt) | delivered.v1/bounced.v1/opened.v1/clicked.v1 |
ApplyConsumedEventUseCase | Pub/Sub subscriber on each consumed event | event id | one or more EnqueueNotificationUseCase invocations |
ScheduleSendsTickUseCase | scheduler worker (every 30 s) | (notificationId) per tick | per row: dispatch invocation; emits requested.v1 if not yet enqueued |
UpdatePreferencesUseCase | PATCH /api/v1/notification-preferences/:id | Idempotency-Key | preferences.updated.v1 |
OptOutViaTokenUseCase | POST /api/v1/notification-preferences/opt-out/:token | the token id | opted_out.v1, preferences.updated.v1 |
CreateTemplateVersionUseCase | POST /api/v1/notification-templates (or new version) | Idempotency-Key | none until publish |
PublishTemplateVersionUseCase | POST /api/v1/notification-templates/:id/publish | (templateId, semver) | template.published.v1 |
ArchiveTemplateUseCase | POST /api/v1/notification-templates/:id/archive | (templateId) | template.archived.v1 |
PreviewTemplateUseCase | POST /api/v1/notification-templates/:id/preview | none (read-only) | none |
TestSendTemplateUseCase | POST /api/v1/notification-templates/:id/test-send | Idempotency-Key | requested.v1 (synthetic) |
RegisterAIDraftedTemplateUseCase | event ai.draft_content.ready.v1 from ai-orchestrator-service | event id | none until publish (HITL gated) |
UpsertChannelConfigUseCase | PATCH /api/v1/notification-channels/:id | Idempotency-Key | none, with audit |
RotateChannelCredentialUseCase | POST /api/v1/notification-channels/:id/credentials/:credId/rotate | Idempotency-Key | none, with audit |
ProbeChannelHealthUseCase | health worker (every 60 s per channel) | (channelId, probeAt) | channel.health_changed.v1 on flip |
IngestSuppressionUseCase | called by ProcessVendorWebhookUseCase on bounce/complaint | (tenantId, channel, addressHash, reason) | suppressed.v1 (per future blocked send) |
ReleaseSuppressionUseCase | POST /api/v1/suppressions/:id/release | Idempotency-Key | none, with audit |
ResendNotificationUseCase | POST /api/v1/notifications/:id/resend | Idempotency-Key | requested.v1 (sibling) |
MarkReadUseCase | PATCH /api/v1/notifications/:id/read (in-app) | (notificationId) | none |
3. The two hot paths in detail
3.1 Event-driven enqueue → dispatch (booking confirmation)
Pub/Sub event: melmastoon.reservation.confirmed.v1
│
▼
ApplyConsumedEventUseCase
1. dedupe via inbox(consumer='notif.router', eventId)
2. resolve trigger-map entries for eventType (platform + tenant overrides)
3. for each entry × resolved recipient:
EnqueueNotificationUseCase(input)
│
▼
EnqueueNotificationUseCase(input)
1. resolve Recipient (cache → IdentityResolver fallback) → upsert projection
2. resolve Template + active TemplateVersion via TemplateRepository
(tenant override wins; else platform-global)
3. resolve ThemeTokens from TenantConfigClient (cached 5 min)
4. compute locale: input.locale || recipient.preferences.locale || tenant default
5. PreferenceGate(notif draft, prefs, suppression, now)
- send → continue
- suppress → persist Notification(status='suppressed'); emit notification.suppressed.v1; STOP
- defer(at) → continue with scheduledFor=at; status will land in 'scheduled'
6. Render via TemplateRenderer(template, variables, locale, themeTokens, channel)
- on RenderError → emit notification.failed.v1; persist with status='failed'; STOP
7. SenderResolver(channel, recipient.address)
- on missing → emit notification.failed.v1 with reason='sender_id_missing'; STOP
8. RateLimiter.check on (tenant_channel_day, recipient_channel_day)
- allow → continue
- defer(at) → schedule for that time
- suppress → status='suppressed' reason='rate_limit'; emit suppressed.v1
9. WhatsApp guard: if channel='whatsapp', verify whatsappTemplateRefs[locale].status='approved'
- else → fall back to SMS for transactional categories; for marketing → defer
10. Build Notification.create(...) with renderSnapshot, computed expiresAt (per retentionClass)
11. In a single Postgres transaction:
- notifications INSERT
- delivery_attempts (none yet)
- outbox INSERT (notification.requested.v1, optionally notification.scheduled.v1)
- inbox UPSERT (consumer dedupe)
12. If status='queued', enqueue dispatch hint (non-transactional Pub/Sub topic 'notif.dispatch.requested')
Outbox relay:
Periodically reads outbox where published_at IS NULL → publishes to Pub/Sub →
marks published_at. At-least-once; consumers de-dupe by event id.
3.2 Dispatch worker (per channel)
Pub/Sub topic 'notif.dispatch.requested' OR Postgres polling for status='queued' AND scheduledFor IS NULL/elapsed
│
▼
DispatchNotificationUseCase(notificationId)
1. Load Notification, lock row (SELECT FOR UPDATE SKIP LOCKED)
2. Re-check Channel.status (degraded → use fallback vendor; down → defer 60s)
3. Re-check rate-limit (defensive)
4. Build adapter request from renderSnapshot + Sender + recipientAddress
5. Call vendor adapter (EmailPort/SmsPort/...) with timeout (per channel, e.g., email 10s, sms 8s, whatsapp 8s, push 5s)
6. ChannelSendResult →
accepted → notif.vendorAccepted(attempt); status='dispatched'; outbox dispatched.v1
rejected (retryable, e.g., 429/5xx) → notif.vendorRejectedRetryable; status='queued'
schedule next attempt: now + jitter(2^attempt s, max 1024s, honor Retry-After)
rejected (terminal, e.g., 4xx invalid recipient) → notif.vendorRejectedTerminal;
status='failed'; outbox failed.v1
if invalid_recipient → IngestSuppressionUseCase(reason='invalid_address')
timeout → same as retryable (counts as one attempt)
7. Persist + outbox in a single transaction
8. Update Channel.health.consecutiveFailures and flip status if >=3 (emit channel.health_changed.v1)
4. Vendor webhook ingestion
POST /api/v1/webhooks/vendors/{vendor} (no JWT; HMAC-validated)
1. HmacVerifier.verify(vendor, headers, raw body) → if invalid → 401 + WebhookInbound(status='rejected')
2. Persist raw body to GCS; insert WebhookInbound(status='received', signatureValid=true)
3. Parse vendor-specific payload → list of VendorEvent (a webhook can carry many — SendGrid sends arrays)
4. For each VendorEvent:
a. dedupe by (vendor, vendorMessageId, type, occurredAt)
b. correlate to Notification via DeliveryAttempt.vendorMessageId
- if not found, queue for late-correlation (cap 24h) into NOTIF_LATE_CORRELATION DLQ
c. apply state transition:
'sent' / 'queued' (vendor-side) → no-op
'delivered' → notif.markDelivered(...); outbox delivered.v1
'bounce' (hard|soft) → notif.markBounced(...); IngestSuppressionUseCase if hard or soft-threshold
'complaint' → notif.markBounced('complaint'); IngestSuppressionUseCase('complaint')
'open' → notif.markOpened(...)
'click' → notif.markClicked(...)
'failed' / 'undelivered' → notif.vendorRejectedTerminal(...); failed.v1
5. Mark WebhookInbound.status='applied'; return 204.
6. Any unhandled exception → status='dlq' + alert (NOTIF_WEBHOOK_DLQ runbook).
Deduplication and idempotency are domain-level; replaying the same webhook produces the same final state.
5. Scheduler worker (pre-arrival, post-stay, dunning)
The scheduler has two sources of work:
notificationsrows withstatus='scheduled' AND scheduledFor <= now()— drives quiet-hour deferrals and pre-rendered scheduled sends.notification_scheduledtable — rows projected from upstream events that need future enqueue. Schema in DATA_MODEL §3.10. Examples:- on
reservation.confirmed.v1withstay_start = D, insertnotification_scheduled(run_after = D - 24h, kind='pre_arrival_reminder', payload={...}). - on
reservation.checked_out.v1, insertnotification_scheduled(run_after = checkedOutAt + 24h, kind='post_stay_thank_you', payload={...}). - on
billing.subscription.payment_failed.v1, insert threenotification_scheduledrows (T+0, T+72h, T+168h) for the dunning sequence.
- on
ScheduleSendsTickUseCase (every 30 s)
for each row in notification_scheduled where run_after <= now() and processed_at IS NULL, batch=200, FOR UPDATE SKIP LOCKED:
1. Re-validate context (e.g., is the reservation still active? if cancelled, mark obsolete and skip).
2. EnqueueNotificationUseCase(payload + force=true)
3. mark processed_at = now()
for each row in notifications where status='scheduled' AND scheduledFor <= now(), batch=500, FOR UPDATE SKIP LOCKED:
1. transition to 'queued'; emit dispatch hint
Cancellation propagation: when reservation.cancelled.v1 arrives, ApplyConsumedEventUseCase marks all matching notification_scheduled rows obsolete=true so they are skipped on next tick (no embarrassing post-stay thank-you to a cancelled booking).
6. AI-drafted content path (HITL)
ai-orchestrator-service emits melmastoon.ai.draft_content.ready.v1
{ tenantId, draftId, purpose: 'notification.template.copy' | 'notification.message.personalisation',
templateKey, locale, channel, audienceSegment?, renderedBody, subject?, aiProvenance }
│
▼
RegisterAIDraftedTemplateUseCase
Case A: purpose='notification.template.copy'
1. Load Template (or create new draft Template) with key from event
2. Append a TemplateVersion(source='ai_drafted', semver=next, status='draft', aiProvenance=event.aiProvenance,
locales={ [locale]: { body: rendered, bodyFormat: matches channel } })
3. Notify tenant admins via in-app + email: 'AI-drafted template version awaiting review' (uses notification flow itself!)
4. Tenant admin opens backoffice → previews → POST /publish with approverId
5. PublishTemplateVersionUseCase enforces (source=='ai_drafted') => requires approver param; otherwise raises MELMASTOON.AI.HITL_REQUIRED
Case B: purpose='notification.message.personalisation'
1. The ai-orchestrator returns a per-recipient personalised body for an in-flight EnqueueNotificationUseCase
(synchronous AIClient.fetchAIDraftedContent during enqueue; never live model call from us)
2. The Notification carries aiProvenance attached to the renderSnapshot
3. On AIClient timeout / failure → fall back to deterministic template render (tenant-configurable degrade behaviour)
Per-tenant policy: AI personalisation can be disabled | suggest_only | auto_send. Default is suggest_only for new tenants — the AI draft is shown to staff in backoffice for review before send. auto_send is opt-in and limited to the marketing and reminder categories.
7. Trigger map (data-driven projection table)
Stored in notification_trigger_map (see DATA_MODEL §3.11) with a 30-second hot-reload TTL in the application layer. Platform default below; tenants can append (additive only for regulated categories).
| Consumed event | Recipient resolver | Channels | Template key (per channel) | Schedule |
|---|---|---|---|---|
melmastoon.reservation.confirmed.v1 | primary_guest | email (always), sms+whatsapp (per recipient pref) | reservation.confirmed.email, reservation.confirmed.sms, reservation.confirmed.whatsapp | immediate |
melmastoon.reservation.confirmed.v1 (T-24h) | primary_guest | sms | whatsapp (whichever opt-in), email | |
melmastoon.reservation.cancelled.v1 | primary_guest | email, sms | reservation.cancelled.email, reservation.cancelled.sms | immediate |
melmastoon.reservation.modified.v1 | primary_guest | email, `whatsapp | sms` | |
melmastoon.reservation.dates_changed.v1 | primary_guest | email, sms | reservation.dates_changed.email, reservation.dates_changed.sms | immediate |
melmastoon.reservation.checked_in.v1 | primary_guest | inapp, `whatsapp | sms` | |
melmastoon.reservation.checked_out.v1 (T+24h) | primary_guest | email | reservation.post_stay.email (with invoice attachment) | scheduled checked_out_at + 24h |
melmastoon.lock_integration.key_credential.issued.v1 | primary_guest | recipient-pref preferred channel for mobile_key, fallback sms | mobile_key.issued.whatsapp, mobile_key.issued.sms, mobile_key.issued.email | immediate |
melmastoon.lock_integration.key_credential.revoked.v1 (when reason in {early_checkout,cancelled}) | primary_guest | sms | whatsapp | |
melmastoon.billing.invoice.generated.v1 | primary_guest | email | billing.invoice.email | immediate |
melmastoon.billing.subscription.payment_failed.v1 | tenant_admins | email, inapp | billing.dunning.email (3 templates: T+0/T+3/T+7) | scheduled |
melmastoon.iam.password.reset_requested.v1 | iam_subject | email | iam.password_reset.email (security category — bypasses opt-out) | immediate |
melmastoon.iam.session.suspicious_login.v1 | iam_subject | email, sms | iam.suspicious_login.* | immediate |
melmastoon.tenant.invitation.sent.v1 | iam_subject (invitee email projection) | email | tenant.invitation.email | immediate |
melmastoon.maintenance.work_order.assigned.v1 | vendor_assignee | sms (always), whatsapp (if vendor opted in) | maintenance.work_order.assigned.sms | immediate |
Notes:
primary_guestresolves toReservation.primaryGuest; the app reads the projection from Postgres (eventually consistent) or falls back to a synchronousReservationProjectionClientcall.tenant_adminsresolves to all users withrole='OWNER'|'GM'|'BILLING_ADMIN'for the tenant.iam_subjectresolves the user identified in the event payload (with strict tenant scoping).- WhatsApp templates require pre-approval per locale; if the WhatsApp template is not approved, the trigger entry's WhatsApp channel falls back to SMS.
8. Saga participation (we never orchestrate; we observe and emit)
notification-service participates in two platform sagas:
| Saga | Our role |
|---|---|
| Booking saga (04 §7) | Consume reservation.confirmed.v1, reservation.cancelled.v1, reservation.checked_in.v1, reservation.checked_out.v1; emit notification lifecycle events. We are not a compensable step: a missed confirmation email does not reverse the booking. |
| Mobile-key delivery saga (09 §6) | Consume key_credential.issued.v1; deliver the mobile-key one-time-link or QR code; emit notification.delivered.v1; on failure, emit notification.failed.v1 so lock-integration-service can re-issue or fall back to mechanical key. |
We never publish events with causationId chains beyond the immediate trigger; this prevents accidental saga loops.
9. Concurrency, idempotency, and outbox
- Idempotency for HTTP:
Idempotency-Keyheader on everyPOST/PATCHmutation. Stored inidempotency_keys(see DATA_MODEL §3.12) keyed(tenantId, scope, key)withrequest_hashand stored response. Reuse with same body returns the same response; reuse with a different body returns409 MELMASTOON.SYNC.IDEMPOTENCY_KEY_REUSED. - Idempotency for consumed events:
consumed_events(consumer_name, event_id)with 7-day partition. The router setsconsumer_name='notif.router.<topic>'. - Idempotency for vendor webhooks:
(vendor, vendorMessageId, eventType, occurredAt)unique constraint onwebhook_inbound_events. - Idempotency for trigger-map fan-out:
(tenantId, sourceEventId, templateKey, recipientId)unique onnotifications(partial — only whensourceEventId IS NOT NULL). - OCC on
notifications,notification_preferences,templates,template_versions,channelsviaversioncolumn; mismatch →412 MELMASTOON.GENERAL.PRECONDITION_FAILED. - Outbox is the only path domain events reach Pub/Sub; the outbox relay (
OutboxRelayUseCase) batches every 250 ms or 100 events.
10. Cross-aggregate orchestration patterns
- Sender resolution requires
Channel+tenant.settings:EnqueueNotificationUseCaseloads both before constructing the dispatch.Channelis the source of truth for vendor selection;tenant.settingsfor branding and budgets. - Mobile-key flow: when consuming
key_credential.issued.v1, the use case callsLockProjectionClient.getMobileKeyToken(...)to fetch the one-time-link payload and embeds it as amobileKeyTokenRefon the Notification. The link is single-use; the WhatsApp/SMS body contains the URL but the JSON-LD-style payload includes itstokenHashso we can audit which token was sent in which message. - Invoice attach: when consuming
billing.invoice.generated.v1, the use case callsBillingProjectionClient.getInvoiceAttachment(...)to fetch the GCS-stored PDF reference; the attachment is by reference (URI), never inline.
11. Application-level error mapping
| Domain error | Application response | HTTP / event |
|---|---|---|
InvalidIdError | reject at the boundary | 400 MELMASTOON.GENERAL.VALIDATION_FAILED |
CrossTenantReferenceError | reject at the boundary | 422 MELMASTOON.GENERAL.CROSS_TENANT_REFERENCE |
IllegalStateTransitionError | log + reject | 409 + map per concrete code |
TemplateNotFoundError | enqueue path: emit notification.failed.v1; API path: 404 MELMASTOON.NOTIFICATION.TEMPLATE_NOT_FOUND | per call |
SenderIdMissingError | enqueue path: emit notification.failed.v1 with reason; alert tenant admin; API path: 422 MELMASTOON.NOTIFICATION.SENDER_ID_MISSING | per call |
WhatsAppTemplateNotApprovedError | fall back per policy; for transactional fall back to SMS; emit notification.failed.v1 if no fallback exists | per call |
RateLimitExceededError | defer or suppress; emit notification.scheduled.v1 or notification.suppressed.v1 | none externally |
BudgetExhaustedError | reject batch creation; 402 MELMASTOON.TENANT.PLAN_LIMIT_EXCEEDED (re-used) or 429 | per call |
HITLApprovalRequiredError | publish path: 403 MELMASTOON.AI.HITL_REQUIRED | per call |
WebhookSignatureInvalidError | 401 MELMASTOON.PAYMENT.WEBHOOK_SIGNATURE_INVALID-equivalent (re-used taxonomy semantically; we add MELMASTOON.NOTIFICATION.WEBHOOK_SIGNATURE_INVALID to ERROR_CODES) | per call |
The full list of error codes used by this service lives in API_CONTRACTS §4 and is registered in docs/standards/ERROR_CODES.md.
12. Background workers
| Worker | Schedule | Purpose |
|---|---|---|
outbox-relay | continuous (250 ms tick) | Publishes outbox rows to Pub/Sub |
dispatch-worker-email | continuous (long-poll Pub/Sub notif.dispatch.requested filter channel=email) | Calls EmailPort for queued email notifications |
dispatch-worker-sms | continuous | Calls SmsPort |
dispatch-worker-whatsapp | continuous | Calls WhatsAppPort |
dispatch-worker-push | continuous | Calls PushPort |
dispatch-worker-inapp | continuous | Calls InAppPort.publishToFeed + WS push |
dispatch-worker-voice | continuous (Phase 3+) | Calls VoicePort |
scheduler | every 30 s | Promotes scheduled → queued; reads notification_scheduled to enqueue future sends |
webhook-late-correlator | every 60 s | Correlates webhooks whose Notification was not yet visible at receive-time (cap 24 h) |
channel-health-prober | every 60 s per channel | Calls a vendor-specific health probe; flips Channel.status |
template-cache-warmer | every 30 s | Hot-reloads trigger map and per-tenant template cache from Postgres |
suppression-replicator | every 60 s | Pushes recent suppression rows into Memorystore set for fast dispatch-time lookup |
partition-maintainer (cron job) | hourly | pg_partman keeps notifications and delivery_attempts partitions current |
archive-rotator (cron job) | daily | Moves >24-month rows to GCS Parquet for BigQuery cold tier |
dlq-monitor (cron job) | every 5 min | Counts DLQ rows; pages on threshold |
Each worker runs as a separate Cloud Run instance with its own scaling envelope (see DEPLOYMENT_TOPOLOGY).
13. Tracing and correlation
- Every use case enters with a W3C
traceparentfrom the calling context (HTTP gateway, Pub/Sub message attribute, scheduler tick). - Spans:
notification.enqueue,notification.preference_gate,notification.render,notification.sender_resolve,notification.rate_limit,notification.dispatch.<channel>,notification.webhook_ingest.<vendor>,notification.scheduler.tick. - Span attributes (always):
tenant.id,notification.id,notification.channel,notification.category,template.key,template.semver,recipient.id(hashed),vendor,attempt.number,result.outcome. Defined in OBSERVABILITY.