Skip to main content

DLR Processor — Application Logic

Status: populated Owner: Platform Engineering Last updated: 2026-04-18 Companion: DOMAIN_MODEL · DATA_MODEL · FAILURE_MODES

1. Processing Pipeline

[NATS sms.dlr.inbound]


1. Deserialise + validate (Zod schema)


2. Idempotency check — lookup operator_message_id in dlr.delivery_receipts

┌────┴────┐
exists not found
│ │
ACK 3. Normalise stat → DlrStatus


4. Correlate: lookup orch.sms_messages by operatorMessageId

┌──────┴──────┐
found not found
│ │
5. Persist 5b. Write dlr.orphaned_receipts
delivery_receipt │
│ Publish sms.dlr.unmatched
│ │
▼ ACK
6. Update orch.sms_messages


7. Publish billing.events (terminal status only)


8. Publish webhook.dispatch


ACK

2. Step Details

Step 1 — Deserialise + Validate

  • Parse raw NATS message payload as UTF-8 JSON.
  • Validate against InboundDlrEvent Zod schema.
  • Validation failure: log WARN, increment dlr_validation_errors_total counter, Nak with no redelivery (malformed events are permanently discarded).

Step 2 — Idempotency Check

SELECT receipt_id FROM dlr.delivery_receipts
WHERE operator_message_id = $1
LIMIT 1;

If found: log DEBUG dlr.duplicate, increment dlr_duplicates_total, Ack immediately.

Step 3 — Status Normalisation

Stateless pure function:

function normalise(stat: string): DlrStatus {
const map: Record<string, DlrStatus> = {
DELIVRD: 'DELIVERED',
UNDELIV: 'UNDELIVERED',
EXPIRED: 'EXPIRED',
DELETED: 'FAILED',
ACCEPTD: 'UNKNOWN',
REJECTD: 'REJECTED',
UNKNOWN: 'UNKNOWN',
FAILED: 'FAILED',
};
return map[stat.toUpperCase()] ?? 'UNKNOWN';
}

Step 4 — Correlation

SELECT message_id, account_id, segment_count, to_number
FROM orch.sms_messages
WHERE operator_message_id = $1
LIMIT 1;

Uses index on orch.sms_messages(operator_message_id). Query timeout: 3 s.

Step 5 — Persist dlr.delivery_receipts

INSERT INTO dlr.delivery_receipts
(message_id, account_id, operator_id, operator_message_id,
raw_stat, dlr_status, delivered_at, error_code, correlated_at)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,now())
ON CONFLICT (operator_message_id) DO NOTHING;

Step 5b — Orphan Path

When correlation finds no matching message:

  1. Insert into dlr.orphaned_receipts with full rawPayload.
  2. Publish sms.dlr.unmatched to NATS.
  3. Ack — orphan is accepted and quarantined; retrying would not help.

Step 6 — Update orch.sms_messages

Only for terminal DlrStatus (DELIVERED, FAILED, UNDELIVERED, EXPIRED, REJECTED):

UPDATE orch.sms_messages
SET status = $1, dlr_status = $2, dlr_received_at = $3, processed_at = now()
WHERE message_id = $4
AND status NOT IN ('DELIVERED','FAILED','UNDELIVERED','EXPIRED','REJECTED');

Guard clause prevents re-updating already-terminal rows.

Step 7 — Publish billing.events

Only for terminal statuses. Uses transactional outbox: event inserted to dlr.outbox within same DB transaction as steps 5+6; outbox relay publishes to NATS.

Step 8 — Publish webhook.dispatch

Published for all statuses including UNKNOWN. Same outbox mechanism.

3. Transactional Outbox Pattern

Steps 5, 6, 7, 8 execute within a single PostgreSQL transaction:

BEGIN;
INSERT INTO dlr.delivery_receipts ...
UPDATE orch.sms_messages ...
INSERT INTO dlr.outbox (subject, payload) VALUES ('billing.events', $b), ('webhook.dispatch', $w);
COMMIT;

The outbox relay (separate in-process worker) polls dlr.outbox and publishes to NATS with at-least-once semantics. Downstream consumers are idempotent on eventId.

4. Concurrency

  • NATS consumer MaxConcurrency: 10 — up to 10 messages processed concurrently.
  • Each message processed in an isolated async context; no shared mutable state.
  • PG connection pool: min: 5, max: 20.

5. Error Handling

Error TypeAction
Schema validation failureDiscard + Ack (no retry value)
DB transient errorNak (NATS retries with backoff)
DB duplicate key (idempotency)Ack (already processed)
NATS publish failureTransaction rolls back; Nak
Correlation not foundWrite orphan + Ack
Unhandled exceptionNak; alert fired at 3 consecutive failures