DLR Processor — Application Logic
Status: populated Owner: Platform Engineering Last updated: 2026-04-18 Companion: DOMAIN_MODEL · DATA_MODEL · FAILURE_MODES
1. Processing Pipeline
[NATS sms.dlr.inbound]
│
▼
1. Deserialise + validate (Zod schema)
│
▼
2. Idempotency check — lookup operator_message_id in dlr.delivery_receipts
│
┌────┴────┐
exists not found
│ │
ACK 3. Normalise stat → DlrStatus
│
▼
4. Correlate: lookup orch.sms_messages by operatorMessageId
│
┌──────┴──────┐
found not found
│ │
5. Persist 5b. Write dlr.orphaned_receipts
delivery_receipt │
│ Publish sms.dlr.unmatched
│ │
▼ ACK
6. Update orch.sms_messages
│
▼
7. Publish billing.events (terminal status only)
│
▼
8. Publish webhook.dispatch
│
▼
ACK
2. Step Details
Step 1 — Deserialise + Validate
- Parse raw NATS message payload as UTF-8 JSON.
- Validate against
InboundDlrEventZod schema. - Validation failure: log
WARN, incrementdlr_validation_errors_totalcounter, Nak with no redelivery (malformed events are permanently discarded).
Step 2 — Idempotency Check
SELECT receipt_id FROM dlr.delivery_receipts
WHERE operator_message_id = $1
LIMIT 1;
If found: log DEBUG dlr.duplicate, increment dlr_duplicates_total, Ack immediately.
Step 3 — Status Normalisation
Stateless pure function:
function normalise(stat: string): DlrStatus {
const map: Record<string, DlrStatus> = {
DELIVRD: 'DELIVERED',
UNDELIV: 'UNDELIVERED',
EXPIRED: 'EXPIRED',
DELETED: 'FAILED',
ACCEPTD: 'UNKNOWN',
REJECTD: 'REJECTED',
UNKNOWN: 'UNKNOWN',
FAILED: 'FAILED',
};
return map[stat.toUpperCase()] ?? 'UNKNOWN';
}
Step 4 — Correlation
SELECT message_id, account_id, segment_count, to_number
FROM orch.sms_messages
WHERE operator_message_id = $1
LIMIT 1;
Uses index on orch.sms_messages(operator_message_id). Query timeout: 3 s.
Step 5 — Persist dlr.delivery_receipts
INSERT INTO dlr.delivery_receipts
(message_id, account_id, operator_id, operator_message_id,
raw_stat, dlr_status, delivered_at, error_code, correlated_at)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,now())
ON CONFLICT (operator_message_id) DO NOTHING;
Step 5b — Orphan Path
When correlation finds no matching message:
- Insert into
dlr.orphaned_receiptswith fullrawPayload. - Publish
sms.dlr.unmatchedto NATS. - Ack — orphan is accepted and quarantined; retrying would not help.
Step 6 — Update orch.sms_messages
Only for terminal DlrStatus (DELIVERED, FAILED, UNDELIVERED, EXPIRED, REJECTED):
UPDATE orch.sms_messages
SET status = $1, dlr_status = $2, dlr_received_at = $3, processed_at = now()
WHERE message_id = $4
AND status NOT IN ('DELIVERED','FAILED','UNDELIVERED','EXPIRED','REJECTED');
Guard clause prevents re-updating already-terminal rows.
Step 7 — Publish billing.events
Only for terminal statuses. Uses transactional outbox: event inserted to dlr.outbox within same DB transaction as steps 5+6; outbox relay publishes to NATS.
Step 8 — Publish webhook.dispatch
Published for all statuses including UNKNOWN. Same outbox mechanism.
3. Transactional Outbox Pattern
Steps 5, 6, 7, 8 execute within a single PostgreSQL transaction:
BEGIN;
INSERT INTO dlr.delivery_receipts ...
UPDATE orch.sms_messages ...
INSERT INTO dlr.outbox (subject, payload) VALUES ('billing.events', $b), ('webhook.dispatch', $w);
COMMIT;
The outbox relay (separate in-process worker) polls dlr.outbox and publishes to NATS with at-least-once semantics. Downstream consumers are idempotent on eventId.
4. Concurrency
- NATS consumer
MaxConcurrency: 10— up to 10 messages processed concurrently. - Each message processed in an isolated async context; no shared mutable state.
- PG connection pool:
min: 5, max: 20.
5. Error Handling
| Error Type | Action |
|---|---|
| Schema validation failure | Discard + Ack (no retry value) |
| DB transient error | Nak (NATS retries with backoff) |
| DB duplicate key (idempotency) | Ack (already processed) |
| NATS publish failure | Transaction rolls back; Nak |
| Correlation not found | Write orphan + Ack |
| Unhandled exception | Nak; alert fired at 3 consecutive failures |