CDR Mediation Service — Application Logic
Version: 1.0 Status: Draft Owner: Commerce + Regulator Liaison Last Updated: 2026-04-21 Companion: DOMAIN_MODEL · API_CONTRACTS · DATA_MODEL · FAILURE_MODES
This document describes the use cases executed by the cdr-mediation-service, the ports they depend on, the concurrency model, and the latency budgets against which they are alerted. Each use case is framed as a discrete, testable flow.
1. Use Case Catalog
| ID | Use Case | Trigger | Latency Budget |
|---|---|---|---|
| UC-01 | IngestDlrSignal | NATS consumer on sms.dlr.inbound | P99 ≤ 10 s DLR → CDR row |
| UC-02 | RepriceCdr | NATS consumer on billing.pricing.snapshot.ready | P95 ≤ 5 s |
| UC-03 | GenerateHourlyRollup | Cron on HH:00 UTC, leader-only | Seal within 5 min of close |
| UC-04 | ComputeHashChain | Embedded in UC-03 | N/A (bounded by UC-03) |
| UC-05 | MirrorBucketToObjectStorage | Fired on cdr.bucket.sealed.v1 | ≤ 15 min after seal |
| UC-06 | IssueAdjustment | REST POST /v1/cdr/{cdrId}/adjustments | P95 ≤ 400 ms (single); bulk async |
| UC-07 | EncodeTapFile | Cron daily 02:00 Asia/Kabul, leader-only | Complete by 02:00 for D-1 |
| UC-08 | EncodeRapFile | Cron daily 02:30 Asia/Kabul, leader-only | Complete by 02:30 for D-1 |
| UC-09 | SignFileWithHsm | Embedded in UC-07 / UC-08 | P95 ≤ 200 ms per file |
| UC-10 | DropFileToSftp | Post-sign step | P95 ≤ 60 s per file |
| UC-11 | FallbackUploadToAtraRest | Invoked on SFTP failure | P95 ≤ 90 s per file |
| UC-12 | VerifyChainIntegrity | Nightly cron 03:00 UTC | Full walk ≤ 30 min |
| UC-13 | AnchorToTransparencyLog | Cron 00:15 Asia/Kabul | Day anchor submitted within 30 min of day-close |
| UC-14 | ResubmitQuarantined | REST POST /v1/cdr/quarantine/{id}/resubmit | P95 ≤ 400 ms |
| UC-15 | ArchiveToColdStorage | Monthly cron | Completes overnight |
| UC-16 | VerifyChainApi | REST POST /v1/cdr/chain/verify | P95 ≤ 1.5 s |
2. UC-01 — IngestDlrSignal (NATS consumer)
Trigger. Durable JetStream consumer cdr-mediation-dlr on subject sms.dlr.inbound, ackWait 30 s, maxDeliver 5.
Input. DlrEvent { messageId, tenantId, accountId, to, from, senderId, finalState, operatorId, smscId, messageReference, segmentCount, encoding, eventTimestamp, correlationId, traceId }.
Steps.
- Guard — reject if
finalState ∉ {DELIVERED, FAILED, EXPIRED}; non-terminal DLRs are ignored (ACKed) — they will be reprojected when a terminal DLR arrives. - Idempotency — lookup
cdr.recordswheresourceEventId = eventId. On hit, ACK and return. - Enrich pricing — call
BillingPricingPort.GetPricingSnapshot(messageId):- On success, read
{chargeAmount, currency, tapTariffClass, billingIndicator}. - On
NOT_FOUNDorUNAVAILABLE, proceed withchargeAmount=NULL, tapTariffClass=NULL, billingIndicator=UNKNOWNand schedule a reprice (UC-02) within 24 h.
- On success, read
- Enrich operator — call
OperatorRegistryPort.GetRecordingEntity(operatorId)(Redis-cached 1 h) → returnsrecordingEntity,taxonomy. - Hash MSISDNs —
msisdnHashTo = sha256(to ‖ tenantSalt(tenantId))and same forfromwhen MSISDN. - Determine
bucketHour—date_trunc('hour', eventTimeStamp)in UTC. - Compose canonical row — see
DOMAIN_MODEL §2.1field set. - Compute
chainHashPrev—SELECT rowHash FROM cdr.records WHERE bucketHour = $1 ORDER BY cdrSequence DESC LIMIT 1 FOR KEY SHARE. For first row of partition, useSELECT chainHash FROM cdr.rollups WHERE bucketHour = (HH - 1) AND operatorId = $2. - Compute
rowHash—sha256(canonicalJson(row) ‖ chainHashPrev)(RFC 8785 JCS; decimals rendered as fixed-point strings). - Store raw MSISDN — encrypt
toand (if MSISDN)fromwith per-operator KEK viaVault Transit; insert intocdr.msisdn_vaultkeyed bycdrId. - Persist — single transaction:
- INSERT
cdr.records - INSERT
cdr.outbox(subject='cdr.record.appended', payload=...)
- INSERT
- ACK NATS on commit.
Output. cdr.record.appended.v1 event published by outbox relay within 1 s.
Latency budget (P99).
| Stage | Budget |
|---|---|
| Billing snapshot lookup | 150 ms (cache hit) / 400 ms (miss) |
| Operator registry lookup | 5 ms (Redis hit) / 50 ms (miss) |
| MSISDN hash + canonical JSON | 1 ms |
| Vault Transit encrypt | 20 ms |
| Postgres INSERT + outbox | 30 ms |
| NATS ACK | 5 ms |
| DLR → CDR end-to-end | ≤ 10 s P99 (per ADR-0004 §9) |
3. UC-02 — RepriceCdr (late-arriving pricing)
Trigger. NATS consumer on billing.pricing.snapshot.ready — fired by billing-service when a deferred pricing snapshot materialises (see billing-service/EP-BILL-09).
Steps.
- Read payload
{messageId, tenantId, chargeAmount, currency, tapTariffClass, billingIndicator}. SELECT cdrId FROM cdr.records WHERE messageId = $1 AND chargeAmount IS NULL. If zero rows, ACK (nothing to do).- Because rows are immutable, issue an internal
IssueAdjustment(UC-06) withtype=CORRECTION, correctedFields={chargeAmount, tapTariffClass, billingIndicator}, reason='late_pricing_arrival', ticketId='SYS-REPRICE'. - Publish
cdr.record.repriced.v1in the same transaction.
Failure handling. If the original CDR was already included in a TAP file (state=EXPORTED/ACKED), the correction will be batched in the next RAP run (UC-08). Consumer ACKs only after transaction commit.
4. UC-03 — GenerateHourlyRollup (seal + chain)
Trigger. Cron @Cron('0 * * * *') at each HH:00 UTC. Leader-only via PG advisory lock pg_advisory_lock(hashtext('cdr.export.leader')).
Steps.
- Acquire leader lock; on fail, skip (another replica is running).
- For each
operatorIdactive in the past hour:SELECT rowHash FROM cdr.records WHERE bucketHour = HH-1 AND operatorId = $1 ORDER BY cdrSequence ASC FOR UPDATE- Compute
bucketRoot= Merkle root over orderedrowHash[](binary tree; padded with zero leaves to next power of 2; if empty,sha256('EMPTY:' || bucketHour || ':' || operatorId)). - Read
prevBucketChainHashfrom the previousCdrRollupfor sameoperatorId. chainHash = sha256(prevBucketChainHash ‖ bucketRoot).- Compute aggregates (
moCount,mtCount,chargeableSum, ...). - INSERT into
cdr.rollupswithsealedAt = now(). - UPDATE
cdr.records SET state = 'AGGREGATED' WHERE bucketHour = HH-1 AND operatorId = $1 AND state = 'RAW'. - Publish
cdr.bucket.sealed.v1via outbox.
- Release leader lock.
- Emit
cdr_rollup_seal_duration_secondsPrometheus histogram.
Failure. Partial completion is transactional per operator. If the cron crashes halfway, the next run picks up remaining operators. If not completed by HH+10m, CdrRollupBehind SEV1 alert fires and export jobs for D-1 are blocked until resolved.
5. UC-04 — ComputeHashChain (embedded invariant)
Pure function used by UC-01 and UC-03.
function computeRowHash(row, chainHashPrev):
canonical = rfc8785(row with rowHash=null, chainHashPrev=chainHashPrev)
return sha256(chainHashPrev || canonical)
function computeBucketRoot(orderedRowHashes):
leaves = orderedRowHashes
while len(leaves) > 1:
leaves = [sha256(L[2i] || L[2i+1]) for i in range(len(leaves)/2)]
if odd count, pad with sha256('0'*64)
return leaves[0]
function computeChainHash(prevChainHash, bucketRoot):
return sha256(prevChainHash || bucketRoot)
Determinism is required — the verifier (UC-12) re-runs the same function and compares.
6. UC-05 — MirrorBucketToObjectStorage
Trigger. NATS consumer on cdr.bucket.sealed.v1.
Steps.
- Read bucket rows (
SELECT * FROM cdr.records WHERE bucketHour = $1 AND operatorId = $2 ORDER BY cdrSequence) — raw MSISDNs are fetched viacdr.msisdn_vaulton behalf of the encoder into a transient in-memory projection. - Write zstd-compressed JSON Lines to
s3://ghasi-cdr-archive/v1/region={region}/operator={op}/year=YYYY/month=MM/day=DD/hour=HH/{seq}.cdr.jsonl.zst— 1000 rows per file. - Set Object Lock:
x-amz-object-lock-mode: COMPLIANCE,retain-until = sealedAt + 7 years(Class-IV retention perdocs/security/data-classification.md). - Write
_MANIFEST.jsonper hour with{bucketHour, operatorId, recordCount, bucketRoot, chainHash, files:[{name, sha256, recordCount}], signerKeyId, schemaVersion}. - UPDATE
cdr.rollups SET mirroredAt = now(), objectStoreRoot = .... - Publish
cdr.archive.completed.v1.
Retry. Exponential backoff up to 24 h on upload error. After 24 h → SEV1 CdrArchiveFailed.
7. UC-06 — IssueAdjustment
Trigger. REST POST /v1/cdr/{cdrId}/adjustments (single) or POST /v1/cdr/adjustments/rerate (bulk).
Steps (single).
- Validate caller role (
cdr:write); reject with 403 otherwise. - Validate body:
type ∈ {CORRECTION, VOID, RE_RATE}, non-emptyreason,ticketIdpresent. - Load original
CdrRecord; 404 if missing. - Guard: if
type=VOIDand original hasadjustmentType=VOID(already voided), return 409ALREADY_VOIDED. - Compose new row (see
DOMAIN_MODEL §2.4).bucketHour = date_trunc('hour', now())— adjustments seal in the hour they were issued, not the original CDR's hour. - For
RE_RATE, callBillingPricingPort.GetPricingByTable(tenantId, messageId, newPricingTableId)→ newchargeAmount, tapTariffClass. - INSERT into
cdr.records(shares the same table —adjustmentOf IS NOT NULLdiscriminates). - INSERT into
cdr.adjustmentsmetadata (FK to the CDR row). - Outbox
cdr.adjustment.created.v1. - Return
{ adjustmentId, originalCdrId, bucketHour }.
Steps (bulk re-rate).
- Validate role; if estimated impact (
SELECT COUNT(*) FROM cdr.records WHERE filter) > 100,000 — requireX-Approver-Jwtheader with second signature, else 403FOUR_EYES_REQUIRED. - Create a
jobId; enqueue work incdr.adjustment_jobs. - Worker processes in 10,000-row batches. Each batch is a transaction: N inserts + 1 outbox row.
- On job completion, publish
cdr.adjustment.batch.issued.v1with{jobId, type, count, sumDelta}. GET /v1/cdr/adjustments/jobs/{jobId}returns{status, processed, total, startedAt, finishedAt, errors[]}.
8. UC-07 — EncodeTapFile
Trigger. Cron @Cron('0 22 * * *') (22:00 UTC ≈ 02:30 Asia/Kabul) leader-only; produces files for settlement day D-1.
Steps.
- Pin
schemaVariant= value ofcdr.export.tap.schemaVariantat job start (not at CDR-creation time). - For each
(operatorId, recordingEntity)tuple:- Acquire next
fileSequenceNumberviaUPDATE cdr.tap_sequence SET last = last + 1 RETURNING last(atomic; rolled back on job failure). - Stream
SELECT * FROM cdr.records WHERE operatorId = $1 AND bucketHour BETWEEN D-1T00:00 AND D-1T23:59 AND state IN ('AGGREGATED','EXPORTED') AND adjustmentOf IS NULL. - For each row, invoke
TapEncoder.encode(row, schemaVariant):- Map to ASN.1 fields (see §9).
- On ASN.1 encoding error → INSERT
cdr.quarantinerow with{cdrId, encoderError, encoderVariant, quarantinedAt}and continue.
- BER-encode the resulting
DataInterchangerecord batch. - Compute SHA-256 of the complete file.
- Write to
object-storage://ghasi-cdr-exports/pending/TAP_{recordingEntity}_{roamingPartner}_{seq:08d}.{YYYYMMDD}.tap. - Invoke UC-09 (SignFileWithHsm).
- Upload signed file + sidecar to ATRA (UC-10 → UC-11 fallback).
- INSERT
cdr.exports+cdr.export_delivery_log.
- Acquire next
Zero-record day. If no rows match for an operator, emit a NIL-filler TAP file per GSMA TD.57 §4.1 rules (contains only NotificationTrailerInformation).
9. TAP 3.12 Field Mapping (reference)
Canonical CDR field → TAP 3.12 ASN.1 leaf:
callingPartyAddress -> CallingNumber (BCDString, with address-nature/plan octets)
calledPartyAddress -> CalledNumber (BCDString)
serviceCenterAddress -> MessageCentreAddress
messageReference -> MessageReference (printable)
recordingEntity -> RecordingEntity (numeric, 15 digits)
eventTimeStamp -> TimeStamp (localTimeStamp + UTC offset octet)
chargeAmount (minor) -> ChargeDetail.charge (integer, minor units per local currency)
tapTariffClass -> TariffCode (Asn1Numeric, 4 digits)
segmentCount -> ChargeableUnits
chargeType MT -> BasicServiceUsed.telephonyBasicService (smsMT)
chargeType MO -> telephonyBasicService (smsMO)
messageType FLASH -> TeleServiceCode = 0x23
encoding GSM7 -> CharacterSetList: 0x00 (default 7-bit)
encoding UCS2 -> CharacterSetList: 0x08
Fixed headers:
BatchControlInfo: operator id, TAP version 3.12, release 12, file available time, specification version.TransferBatch:sender,recipient,fileSequenceNumber,fileCreationTimeStamp,transferCutOffTimeStamp,fileAvailableTimeStamp.
RAP 1.5 (used by UC-08) wraps returned CallEventDetail elements in ReturnDetail referencing the original fileSequenceNumber + messageReference.
10. UC-08 — EncodeRapFile
Trigger. Cron @Cron('30 22 * * *') — 30 min after TAP.
Steps.
- Pin RAP schema variant (
cdr.export.rap.schemaVariant). - For each
(operatorId, recordingEntity), select adjustmentsWHERE rapBatchedAt IS NULL AND issuedAt BETWEEN D-1T00:00 AND D-1T23:59. - For each adjustment, compose a
ReturnDetailelement:CORRECTION→returnCode = 100(tariffing-error), include amendedChargeDetail.VOID→returnCode = 200(invalid-call),chargeInformation = 0.RE_RATE→returnCode = 110(tariff-change).- Link via
originalFileSequenceNumber + originalMessageReference(both stored incdr.recordsat original insert).
- BER-encode; compute SHA-256.
- Acquire next
fileSequenceNumberper(recordingEntity, RAP). - UPDATE adjustments
SET rapBatchedAt = now(), rapFileId = $1atomically with file write. - Sign (UC-09), upload (UC-10/11), publish
cdr.exported.v1. - If no adjustments exist for day D-1 → skip file creation, log
cdr.rap.no_adjustments(absence is logged, not alerted).
11. UC-09 — SignFileWithHsm
Steps.
- Compute
fileSha256. - Open PKCS#11 session to HSM via
SunPKCS11/pkcs11-toollibrary; authenticate withCKU_USERPIN from Vault-mounted tmpfs. C_FindObjectsforCKO_PRIVATE_KEYwith label =cdr-export-signer-v1(or active key id per config).C_SignInitwith mechanismCKM_EDDSA(Ed25519; fallbackCKM_EC_EDWARDSon older HSMs).C_Sign(fileSha256)→ 64-byte signature.- Close session.
- Emit sidecar
<filename>.sig:KeyId: cdr-export-signer-v1Algorithm: Ed25519Signature: <base64(sig)>FileSha256: <hex>SignedAt: 2026-04-21T22:00:14Z
Budget. P95 ≤ 200 ms per file. HSM unavailable → exponential backoff up to 1 h, then SEV1 CdrHsmUnavailable (see FAILURE_MODES FM-05).
12. UC-10 — DropFileToSftp
Steps.
- Verify SFTP host fingerprint against configured
atra.sftp.hostFingerprintSha256; mismatch → SEV1CdrSftpFingerprintMismatchand abort. - Connect over SSH-2; port
2222; key-based auth (key loaded from Vault tmpfs); ciphers restricted toaes256-gcm@openssh.com, chacha20-poly1305@openssh.com; MACshmac-sha2-512-etm@openssh.com. - Upload
<filename>.tmp(chunked),fstat, compare size + SHA-256 against local. rename <filename>.tmp <filename>(atomic server-side).- Upload
<filename>.sig.tmp; verify; rename. - If a file with same
(recordingEntity, fileSequenceNumber)already exists → abort withDUPLICATE_FILE, SEV1 (sequence corruption). - Close connection.
- Record
cdr.export_delivery_logwith channel=SFTP, latency, bytes, receipt id (if ATRA returns one post-rename).
13. UC-11 — FallbackUploadToAtraRest
Trigger condition. Three consecutive SFTP failures within a 15-minute window.
Steps.
- Establish mTLS session to
https://atra.gov.af/api/v1/cdr/upload— client cert pinned, cert expiry checked ≥ 7 d in advance (SEV2 alert if < 7 d). POSTmultipart/form-data:file=@<filename>,signature=@<filename>.sig,metadata={recordingEntity, fileSequenceNumber, fileSha256}.- Expected 201
{ receiptId, sha256 }. - If
sha256 != local→ SEV1CdrIntegrityMismatch, mark deliveryINTEGRITY_MISMATCH, stop. - If 5xx → exponential backoff with jitter, max 6 attempts over 1 h.
- Both channels failing > 2 h → publish
cdr.export.rejected.v1reason=DELIVERY_CHANNELS_EXHAUSTED, open P0 incident.
14. UC-12 — VerifyChainIntegrity (nightly verifier)
Trigger. Cron @Cron('0 3 * * *').
Steps.
- For each operator, walk
cdr.rollups ORDER BY bucketHour ASCfrom last verified checkpoint. - For each rollup, recompute
chainHashfrom storedprevBucketChainHash + bucketRoot; compare. - For each row in bucket, recompute
rowHashfrom canonical JSON + storedchainHashPrev; compare. - On any mismatch → INSERT
cdr.auditwithentryType=CHAIN_BREAK_DETECTED, details={bucketHour, operatorId, firstMismatchAt, computedHash, storedHash}and fire SEV1CdrChainBroken. - On clean pass → INSERT
cdr.auditwithentryType=CHAIN_VERIFY_OKper operator. - Full national walk target ≤ 30 min.
Incremental vs full. Incremental by default (last verified checkpoint forward). Full replay once monthly + on any chain-break recovery.
15. UC-13 — AnchorToTransparencyLog
Trigger. Cron at 00:15 Asia/Kabul (20:15 UTC).
Steps.
- For each operator, fetch
chainHashfor hour 23 of prior day. - Submit to configured Trillian transparency log endpoint (
TRANSPARENCY_LOG_URL) as a leaf containing{operatorId, day, chainHash, signerKeyId}. - On success, store returned
SignedLogEntryincdr.transparency_anchors. - If the log returns a different chain hash → SEV1 (indicates a supplier-side confusion or local tampering).
GET /v1/cdr/transparency/{day}serves the SLE + inclusion proof.
16. UC-14 — ResubmitQuarantined
Trigger. REST POST /v1/cdr/quarantine/{quarantineId}/resubmit (with optional correctedFields body).
Steps.
- Load
cdr.quarantinerow; 404 if missing. - Apply any
correctedFieldsover the original CDR's in-memory projection. - Re-invoke
TapEncoder.encode(...). - On success, mark quarantine row
resolvedAt,resolvedBy,resolution='RESUBMITTED'; enqueue inclusion in next day's TAP supplement file. - On failure, update
encoderErrorand keep row in queue. - Publish
cdr.quarantine.resubmitted.v1.
17. UC-15 — ArchiveToColdStorage
Trigger. Monthly cron @Cron('0 4 1 * *').
Steps.
- Identify partitions older than 13 months in
cdr.records,cdr.adjustments,cdr.audit. - Verify each partition has been mirrored to S3 (check
cdr.rollups.mirroredAt IS NOT NULL). - DROP partition.
- UPDATE
cdr.archive_logwith partition name, row count, S3 manifest URI, droppedAt. - Publish
cdr.archive.completed.v1.
18. UC-16 — VerifyChainApi (external-facing)
Trigger. REST POST /v1/cdr/chain/verify { bucketHour, operatorId, proofForCdrId? } from regulator-portal-service.
Steps.
- Role check:
cdr:reador regulator-scoped JWT. SELECT * FROM cdr.rollups WHERE bucketHour = $1 AND operatorId = $2. 409 if not sealed.- Recompute
bucketRootfrom stored rows (see UC-04). Compare with stored value →verified: true|false. - If
proofForCdrIdpresent, build Merkle inclusion proof (sibling hash list + leaf index) and include in response. - Return
{bucketRoot, chainHash, prevChainHash, recordCount, sealedAt, signerKeyId, verified, inclusionProof?}.
Budget. P95 ≤ 1.5 s for any 24 h range (SERVICE_OVERVIEW §12).
19. Ports (hexagonal architecture)
| Port | Adapter(s) | Contract |
|---|---|---|
CdrRepositoryPort | Postgres | insert, getByMessageId, listByBucket, streamForEncoding |
ChainRepositoryPort | Postgres | getLastRowHash(bucketHour, operatorId), getRollup, insertRollup, walkChain |
ObjectStorePort | MinIO / S3 | putObjectLocked, putManifest, getObject, headObject |
HsmSignerPort | PKCS#11 (Thales Luna / SoftHSMv2 dev) | sign(keyId, sha256), listKeys, rotateKey |
SftpClientPort | Apache Mina SSHD | uploadAtomic(localPath, remotePath), fingerprint() |
AtraRestClientPort | mTLS HTTP | uploadMultipart(file, sig, meta), getReceipt |
BillingPricingPort | gRPC → billing-service | getPricingSnapshot(messageId), getPricingByTable(...) |
OperatorRegistryPort | gRPC → operator-management-service | getRecordingEntity(operatorId) |
TransparencyLogPort | Trillian HTTPS | submitLeaf(leaf), getInclusionProof(leafIndex) |
EventPublisherPort | NATS JetStream via outbox relay | publish(subject, payload) |
LeaderElectionPort | Postgres advisory lock | tryAcquire(lockName), release(lockName) |
MsisdnVaultPort | Vault Transit | encryptForOperator(operatorId, msisdn), decryptForEncoding(cdrId, operatorId) |
All ports are mockable in tests (see TESTING_STRATEGY §3).
20. Concurrency & Leadership
- Leader election — single leader replica holds
pg_advisory_lock(hashtext('cdr.export.leader'))and runs cron jobs (UC-03, UC-07, UC-08, UC-12, UC-13, UC-15). On leader crash the lock releases automatically within Postgres heartbeat (≤ 30 s) and another replica takes over. - Ingest concurrency — UC-01 runs on all replicas; consumers are a JetStream queue group so each NATS message is delivered to exactly one consumer.
- Idempotency — every NATS consumer uses
sourceEventIdfor dedup; every REST mutation acceptsIdempotency-Keyand stores outcomes for 24 h. - Retry policy — ingest: JetStream redelivers up to 5× with exponential backoff capped at 5 min; after that the event moves to
.deadletterwith reasoncdr_ingest_failed.
21. Cross-References
- SERVICE_OVERVIEW §9 (TAP/RAP export sequence)
- DOMAIN_MODEL §4 (state machine)
- DATA_MODEL §2 (tables + outbox)
- FAILURE_MODES — degradation per port
- SECURITY_MODEL §3 (HSM + KMS)
- compliance-engine/APPLICATION_LOGIC.md — outbox + leader-election precedent
- dlr-processor/EVENT_SCHEMAS.md —
sms.dlr.inboundschema - billing-service/API_CONTRACTS.md —
GetPricingSnapshot
End of APPLICATION_LOGIC.md