Skip to main content

CDR Mediation Service — Application Logic

Version: 1.0 Status: Draft Owner: Commerce + Regulator Liaison Last Updated: 2026-04-21 Companion: DOMAIN_MODEL · API_CONTRACTS · DATA_MODEL · FAILURE_MODES

This document describes the use cases executed by the cdr-mediation-service, the ports they depend on, the concurrency model, and the latency budgets against which they are alerted. Each use case is framed as a discrete, testable flow.


1. Use Case Catalog

IDUse CaseTriggerLatency Budget
UC-01IngestDlrSignalNATS consumer on sms.dlr.inboundP99 ≤ 10 s DLR → CDR row
UC-02RepriceCdrNATS consumer on billing.pricing.snapshot.readyP95 ≤ 5 s
UC-03GenerateHourlyRollupCron on HH:00 UTC, leader-onlySeal within 5 min of close
UC-04ComputeHashChainEmbedded in UC-03N/A (bounded by UC-03)
UC-05MirrorBucketToObjectStorageFired on cdr.bucket.sealed.v1≤ 15 min after seal
UC-06IssueAdjustmentREST POST /v1/cdr/{cdrId}/adjustmentsP95 ≤ 400 ms (single); bulk async
UC-07EncodeTapFileCron daily 02:00 Asia/Kabul, leader-onlyComplete by 02:00 for D-1
UC-08EncodeRapFileCron daily 02:30 Asia/Kabul, leader-onlyComplete by 02:30 for D-1
UC-09SignFileWithHsmEmbedded in UC-07 / UC-08P95 ≤ 200 ms per file
UC-10DropFileToSftpPost-sign stepP95 ≤ 60 s per file
UC-11FallbackUploadToAtraRestInvoked on SFTP failureP95 ≤ 90 s per file
UC-12VerifyChainIntegrityNightly cron 03:00 UTCFull walk ≤ 30 min
UC-13AnchorToTransparencyLogCron 00:15 Asia/KabulDay anchor submitted within 30 min of day-close
UC-14ResubmitQuarantinedREST POST /v1/cdr/quarantine/{id}/resubmitP95 ≤ 400 ms
UC-15ArchiveToColdStorageMonthly cronCompletes overnight
UC-16VerifyChainApiREST POST /v1/cdr/chain/verifyP95 ≤ 1.5 s

2. UC-01 — IngestDlrSignal (NATS consumer)

Trigger. Durable JetStream consumer cdr-mediation-dlr on subject sms.dlr.inbound, ackWait 30 s, maxDeliver 5.

Input. DlrEvent { messageId, tenantId, accountId, to, from, senderId, finalState, operatorId, smscId, messageReference, segmentCount, encoding, eventTimestamp, correlationId, traceId }.

Steps.

  1. Guard — reject if finalState ∉ {DELIVERED, FAILED, EXPIRED}; non-terminal DLRs are ignored (ACKed) — they will be reprojected when a terminal DLR arrives.
  2. Idempotency — lookup cdr.records where sourceEventId = eventId. On hit, ACK and return.
  3. Enrich pricing — call BillingPricingPort.GetPricingSnapshot(messageId):
    • On success, read {chargeAmount, currency, tapTariffClass, billingIndicator}.
    • On NOT_FOUND or UNAVAILABLE, proceed with chargeAmount=NULL, tapTariffClass=NULL, billingIndicator=UNKNOWN and schedule a reprice (UC-02) within 24 h.
  4. Enrich operator — call OperatorRegistryPort.GetRecordingEntity(operatorId) (Redis-cached 1 h) → returns recordingEntity, taxonomy.
  5. Hash MSISDNsmsisdnHashTo = sha256(to ‖ tenantSalt(tenantId)) and same for from when MSISDN.
  6. Determine bucketHourdate_trunc('hour', eventTimeStamp) in UTC.
  7. Compose canonical row — see DOMAIN_MODEL §2.1 field set.
  8. Compute chainHashPrevSELECT rowHash FROM cdr.records WHERE bucketHour = $1 ORDER BY cdrSequence DESC LIMIT 1 FOR KEY SHARE. For first row of partition, use SELECT chainHash FROM cdr.rollups WHERE bucketHour = (HH - 1) AND operatorId = $2.
  9. Compute rowHashsha256(canonicalJson(row) ‖ chainHashPrev) (RFC 8785 JCS; decimals rendered as fixed-point strings).
  10. Store raw MSISDN — encrypt to and (if MSISDN) from with per-operator KEK via Vault Transit; insert into cdr.msisdn_vault keyed by cdrId.
  11. Persist — single transaction:
    • INSERT cdr.records
    • INSERT cdr.outbox(subject='cdr.record.appended', payload=...)
  12. ACK NATS on commit.

Output. cdr.record.appended.v1 event published by outbox relay within 1 s.

Latency budget (P99).

StageBudget
Billing snapshot lookup150 ms (cache hit) / 400 ms (miss)
Operator registry lookup5 ms (Redis hit) / 50 ms (miss)
MSISDN hash + canonical JSON1 ms
Vault Transit encrypt20 ms
Postgres INSERT + outbox30 ms
NATS ACK5 ms
DLR → CDR end-to-end≤ 10 s P99 (per ADR-0004 §9)

3. UC-02 — RepriceCdr (late-arriving pricing)

Trigger. NATS consumer on billing.pricing.snapshot.ready — fired by billing-service when a deferred pricing snapshot materialises (see billing-service/EP-BILL-09).

Steps.

  1. Read payload {messageId, tenantId, chargeAmount, currency, tapTariffClass, billingIndicator}.
  2. SELECT cdrId FROM cdr.records WHERE messageId = $1 AND chargeAmount IS NULL. If zero rows, ACK (nothing to do).
  3. Because rows are immutable, issue an internal IssueAdjustment (UC-06) with type=CORRECTION, correctedFields={chargeAmount, tapTariffClass, billingIndicator}, reason='late_pricing_arrival', ticketId='SYS-REPRICE'.
  4. Publish cdr.record.repriced.v1 in the same transaction.

Failure handling. If the original CDR was already included in a TAP file (state=EXPORTED/ACKED), the correction will be batched in the next RAP run (UC-08). Consumer ACKs only after transaction commit.


4. UC-03 — GenerateHourlyRollup (seal + chain)

Trigger. Cron @Cron('0 * * * *') at each HH:00 UTC. Leader-only via PG advisory lock pg_advisory_lock(hashtext('cdr.export.leader')).

Steps.

  1. Acquire leader lock; on fail, skip (another replica is running).
  2. For each operatorId active in the past hour:
    • SELECT rowHash FROM cdr.records WHERE bucketHour = HH-1 AND operatorId = $1 ORDER BY cdrSequence ASC FOR UPDATE
    • Compute bucketRoot = Merkle root over ordered rowHash[] (binary tree; padded with zero leaves to next power of 2; if empty, sha256('EMPTY:' || bucketHour || ':' || operatorId)).
    • Read prevBucketChainHash from the previous CdrRollup for same operatorId.
    • chainHash = sha256(prevBucketChainHash ‖ bucketRoot).
    • Compute aggregates (moCount, mtCount, chargeableSum, ...).
    • INSERT into cdr.rollups with sealedAt = now().
    • UPDATE cdr.records SET state = 'AGGREGATED' WHERE bucketHour = HH-1 AND operatorId = $1 AND state = 'RAW'.
    • Publish cdr.bucket.sealed.v1 via outbox.
  3. Release leader lock.
  4. Emit cdr_rollup_seal_duration_seconds Prometheus histogram.

Failure. Partial completion is transactional per operator. If the cron crashes halfway, the next run picks up remaining operators. If not completed by HH+10m, CdrRollupBehind SEV1 alert fires and export jobs for D-1 are blocked until resolved.


5. UC-04 — ComputeHashChain (embedded invariant)

Pure function used by UC-01 and UC-03.

function computeRowHash(row, chainHashPrev):
canonical = rfc8785(row with rowHash=null, chainHashPrev=chainHashPrev)
return sha256(chainHashPrev || canonical)

function computeBucketRoot(orderedRowHashes):
leaves = orderedRowHashes
while len(leaves) > 1:
leaves = [sha256(L[2i] || L[2i+1]) for i in range(len(leaves)/2)]
if odd count, pad with sha256('0'*64)
return leaves[0]

function computeChainHash(prevChainHash, bucketRoot):
return sha256(prevChainHash || bucketRoot)

Determinism is required — the verifier (UC-12) re-runs the same function and compares.


6. UC-05 — MirrorBucketToObjectStorage

Trigger. NATS consumer on cdr.bucket.sealed.v1.

Steps.

  1. Read bucket rows (SELECT * FROM cdr.records WHERE bucketHour = $1 AND operatorId = $2 ORDER BY cdrSequence) — raw MSISDNs are fetched via cdr.msisdn_vault on behalf of the encoder into a transient in-memory projection.
  2. Write zstd-compressed JSON Lines to s3://ghasi-cdr-archive/v1/region={region}/operator={op}/year=YYYY/month=MM/day=DD/hour=HH/{seq}.cdr.jsonl.zst — 1000 rows per file.
  3. Set Object Lock: x-amz-object-lock-mode: COMPLIANCE, retain-until = sealedAt + 7 years (Class-IV retention per docs/security/data-classification.md).
  4. Write _MANIFEST.json per hour with {bucketHour, operatorId, recordCount, bucketRoot, chainHash, files:[{name, sha256, recordCount}], signerKeyId, schemaVersion}.
  5. UPDATE cdr.rollups SET mirroredAt = now(), objectStoreRoot = ....
  6. Publish cdr.archive.completed.v1.

Retry. Exponential backoff up to 24 h on upload error. After 24 h → SEV1 CdrArchiveFailed.


7. UC-06 — IssueAdjustment

Trigger. REST POST /v1/cdr/{cdrId}/adjustments (single) or POST /v1/cdr/adjustments/rerate (bulk).

Steps (single).

  1. Validate caller role (cdr:write); reject with 403 otherwise.
  2. Validate body: type ∈ {CORRECTION, VOID, RE_RATE}, non-empty reason, ticketId present.
  3. Load original CdrRecord; 404 if missing.
  4. Guard: if type=VOID and original has adjustmentType=VOID (already voided), return 409 ALREADY_VOIDED.
  5. Compose new row (see DOMAIN_MODEL §2.4). bucketHour = date_trunc('hour', now()) — adjustments seal in the hour they were issued, not the original CDR's hour.
  6. For RE_RATE, call BillingPricingPort.GetPricingByTable(tenantId, messageId, newPricingTableId) → new chargeAmount, tapTariffClass.
  7. INSERT into cdr.records (shares the same table — adjustmentOf IS NOT NULL discriminates).
  8. INSERT into cdr.adjustments metadata (FK to the CDR row).
  9. Outbox cdr.adjustment.created.v1.
  10. Return { adjustmentId, originalCdrId, bucketHour }.

Steps (bulk re-rate).

  1. Validate role; if estimated impact (SELECT COUNT(*) FROM cdr.records WHERE filter) > 100,000 — require X-Approver-Jwt header with second signature, else 403 FOUR_EYES_REQUIRED.
  2. Create a jobId; enqueue work in cdr.adjustment_jobs.
  3. Worker processes in 10,000-row batches. Each batch is a transaction: N inserts + 1 outbox row.
  4. On job completion, publish cdr.adjustment.batch.issued.v1 with {jobId, type, count, sumDelta}.
  5. GET /v1/cdr/adjustments/jobs/{jobId} returns {status, processed, total, startedAt, finishedAt, errors[]}.

8. UC-07 — EncodeTapFile

Trigger. Cron @Cron('0 22 * * *') (22:00 UTC ≈ 02:30 Asia/Kabul) leader-only; produces files for settlement day D-1.

Steps.

  1. Pin schemaVariant = value of cdr.export.tap.schemaVariant at job start (not at CDR-creation time).
  2. For each (operatorId, recordingEntity) tuple:
    • Acquire next fileSequenceNumber via UPDATE cdr.tap_sequence SET last = last + 1 RETURNING last (atomic; rolled back on job failure).
    • Stream SELECT * FROM cdr.records WHERE operatorId = $1 AND bucketHour BETWEEN D-1T00:00 AND D-1T23:59 AND state IN ('AGGREGATED','EXPORTED') AND adjustmentOf IS NULL.
    • For each row, invoke TapEncoder.encode(row, schemaVariant):
      • Map to ASN.1 fields (see §9).
      • On ASN.1 encoding error → INSERT cdr.quarantine row with {cdrId, encoderError, encoderVariant, quarantinedAt} and continue.
    • BER-encode the resulting DataInterchange record batch.
    • Compute SHA-256 of the complete file.
    • Write to object-storage://ghasi-cdr-exports/pending/TAP_{recordingEntity}_{roamingPartner}_{seq:08d}.{YYYYMMDD}.tap.
    • Invoke UC-09 (SignFileWithHsm).
    • Upload signed file + sidecar to ATRA (UC-10 → UC-11 fallback).
    • INSERT cdr.exports + cdr.export_delivery_log.

Zero-record day. If no rows match for an operator, emit a NIL-filler TAP file per GSMA TD.57 §4.1 rules (contains only NotificationTrailerInformation).


9. TAP 3.12 Field Mapping (reference)

Canonical CDR field → TAP 3.12 ASN.1 leaf:

callingPartyAddress -> CallingNumber (BCDString, with address-nature/plan octets)
calledPartyAddress -> CalledNumber (BCDString)
serviceCenterAddress -> MessageCentreAddress
messageReference -> MessageReference (printable)
recordingEntity -> RecordingEntity (numeric, 15 digits)
eventTimeStamp -> TimeStamp (localTimeStamp + UTC offset octet)
chargeAmount (minor) -> ChargeDetail.charge (integer, minor units per local currency)
tapTariffClass -> TariffCode (Asn1Numeric, 4 digits)
segmentCount -> ChargeableUnits
chargeType MT -> BasicServiceUsed.telephonyBasicService (smsMT)
chargeType MO -> telephonyBasicService (smsMO)
messageType FLASH -> TeleServiceCode = 0x23
encoding GSM7 -> CharacterSetList: 0x00 (default 7-bit)
encoding UCS2 -> CharacterSetList: 0x08

Fixed headers:

  • BatchControlInfo: operator id, TAP version 3.12, release 12, file available time, specification version.
  • TransferBatch: sender, recipient, fileSequenceNumber, fileCreationTimeStamp, transferCutOffTimeStamp, fileAvailableTimeStamp.

RAP 1.5 (used by UC-08) wraps returned CallEventDetail elements in ReturnDetail referencing the original fileSequenceNumber + messageReference.


10. UC-08 — EncodeRapFile

Trigger. Cron @Cron('30 22 * * *') — 30 min after TAP.

Steps.

  1. Pin RAP schema variant (cdr.export.rap.schemaVariant).
  2. For each (operatorId, recordingEntity), select adjustments WHERE rapBatchedAt IS NULL AND issuedAt BETWEEN D-1T00:00 AND D-1T23:59.
  3. For each adjustment, compose a ReturnDetail element:
    • CORRECTIONreturnCode = 100 (tariffing-error), include amended ChargeDetail.
    • VOIDreturnCode = 200 (invalid-call), chargeInformation = 0.
    • RE_RATEreturnCode = 110 (tariff-change).
    • Link via originalFileSequenceNumber + originalMessageReference (both stored in cdr.records at original insert).
  4. BER-encode; compute SHA-256.
  5. Acquire next fileSequenceNumber per (recordingEntity, RAP).
  6. UPDATE adjustments SET rapBatchedAt = now(), rapFileId = $1 atomically with file write.
  7. Sign (UC-09), upload (UC-10/11), publish cdr.exported.v1.
  8. If no adjustments exist for day D-1 → skip file creation, log cdr.rap.no_adjustments (absence is logged, not alerted).

11. UC-09 — SignFileWithHsm

Steps.

  1. Compute fileSha256.
  2. Open PKCS#11 session to HSM via SunPKCS11 / pkcs11-tool library; authenticate with CKU_USER PIN from Vault-mounted tmpfs.
  3. C_FindObjects for CKO_PRIVATE_KEY with label = cdr-export-signer-v1 (or active key id per config).
  4. C_SignInit with mechanism CKM_EDDSA (Ed25519; fallback CKM_EC_EDWARDS on older HSMs).
  5. C_Sign(fileSha256) → 64-byte signature.
  6. Close session.
  7. Emit sidecar <filename>.sig:
    KeyId: cdr-export-signer-v1
    Algorithm: Ed25519
    Signature: <base64(sig)>
    FileSha256: <hex>
    SignedAt: 2026-04-21T22:00:14Z

Budget. P95 ≤ 200 ms per file. HSM unavailable → exponential backoff up to 1 h, then SEV1 CdrHsmUnavailable (see FAILURE_MODES FM-05).


12. UC-10 — DropFileToSftp

Steps.

  1. Verify SFTP host fingerprint against configured atra.sftp.hostFingerprintSha256; mismatch → SEV1 CdrSftpFingerprintMismatch and abort.
  2. Connect over SSH-2; port 2222; key-based auth (key loaded from Vault tmpfs); ciphers restricted to aes256-gcm@openssh.com, chacha20-poly1305@openssh.com; MACs hmac-sha2-512-etm@openssh.com.
  3. Upload <filename>.tmp (chunked), fstat, compare size + SHA-256 against local.
  4. rename <filename>.tmp <filename> (atomic server-side).
  5. Upload <filename>.sig.tmp; verify; rename.
  6. If a file with same (recordingEntity, fileSequenceNumber) already exists → abort with DUPLICATE_FILE, SEV1 (sequence corruption).
  7. Close connection.
  8. Record cdr.export_delivery_log with channel=SFTP, latency, bytes, receipt id (if ATRA returns one post-rename).

13. UC-11 — FallbackUploadToAtraRest

Trigger condition. Three consecutive SFTP failures within a 15-minute window.

Steps.

  1. Establish mTLS session to https://atra.gov.af/api/v1/cdr/upload — client cert pinned, cert expiry checked ≥ 7 d in advance (SEV2 alert if < 7 d).
  2. POST multipart/form-data: file=@<filename>, signature=@<filename>.sig, metadata={recordingEntity, fileSequenceNumber, fileSha256}.
  3. Expected 201 { receiptId, sha256 }.
  4. If sha256 != local → SEV1 CdrIntegrityMismatch, mark delivery INTEGRITY_MISMATCH, stop.
  5. If 5xx → exponential backoff with jitter, max 6 attempts over 1 h.
  6. Both channels failing > 2 h → publish cdr.export.rejected.v1 reason=DELIVERY_CHANNELS_EXHAUSTED, open P0 incident.

14. UC-12 — VerifyChainIntegrity (nightly verifier)

Trigger. Cron @Cron('0 3 * * *').

Steps.

  1. For each operator, walk cdr.rollups ORDER BY bucketHour ASC from last verified checkpoint.
  2. For each rollup, recompute chainHash from stored prevBucketChainHash + bucketRoot; compare.
  3. For each row in bucket, recompute rowHash from canonical JSON + stored chainHashPrev; compare.
  4. On any mismatch → INSERT cdr.audit with entryType=CHAIN_BREAK_DETECTED, details={bucketHour, operatorId, firstMismatchAt, computedHash, storedHash} and fire SEV1 CdrChainBroken.
  5. On clean pass → INSERT cdr.audit with entryType=CHAIN_VERIFY_OK per operator.
  6. Full national walk target ≤ 30 min.

Incremental vs full. Incremental by default (last verified checkpoint forward). Full replay once monthly + on any chain-break recovery.


15. UC-13 — AnchorToTransparencyLog

Trigger. Cron at 00:15 Asia/Kabul (20:15 UTC).

Steps.

  1. For each operator, fetch chainHash for hour 23 of prior day.
  2. Submit to configured Trillian transparency log endpoint (TRANSPARENCY_LOG_URL) as a leaf containing {operatorId, day, chainHash, signerKeyId}.
  3. On success, store returned SignedLogEntry in cdr.transparency_anchors.
  4. If the log returns a different chain hash → SEV1 (indicates a supplier-side confusion or local tampering).
  5. GET /v1/cdr/transparency/{day} serves the SLE + inclusion proof.

16. UC-14 — ResubmitQuarantined

Trigger. REST POST /v1/cdr/quarantine/{quarantineId}/resubmit (with optional correctedFields body).

Steps.

  1. Load cdr.quarantine row; 404 if missing.
  2. Apply any correctedFields over the original CDR's in-memory projection.
  3. Re-invoke TapEncoder.encode(...).
  4. On success, mark quarantine row resolvedAt, resolvedBy, resolution='RESUBMITTED'; enqueue inclusion in next day's TAP supplement file.
  5. On failure, update encoderError and keep row in queue.
  6. Publish cdr.quarantine.resubmitted.v1.

17. UC-15 — ArchiveToColdStorage

Trigger. Monthly cron @Cron('0 4 1 * *').

Steps.

  1. Identify partitions older than 13 months in cdr.records, cdr.adjustments, cdr.audit.
  2. Verify each partition has been mirrored to S3 (check cdr.rollups.mirroredAt IS NOT NULL).
  3. DROP partition.
  4. UPDATE cdr.archive_log with partition name, row count, S3 manifest URI, droppedAt.
  5. Publish cdr.archive.completed.v1.

18. UC-16 — VerifyChainApi (external-facing)

Trigger. REST POST /v1/cdr/chain/verify { bucketHour, operatorId, proofForCdrId? } from regulator-portal-service.

Steps.

  1. Role check: cdr:read or regulator-scoped JWT.
  2. SELECT * FROM cdr.rollups WHERE bucketHour = $1 AND operatorId = $2. 409 if not sealed.
  3. Recompute bucketRoot from stored rows (see UC-04). Compare with stored value → verified: true|false.
  4. If proofForCdrId present, build Merkle inclusion proof (sibling hash list + leaf index) and include in response.
  5. Return {bucketRoot, chainHash, prevChainHash, recordCount, sealedAt, signerKeyId, verified, inclusionProof?}.

Budget. P95 ≤ 1.5 s for any 24 h range (SERVICE_OVERVIEW §12).


19. Ports (hexagonal architecture)

PortAdapter(s)Contract
CdrRepositoryPortPostgresinsert, getByMessageId, listByBucket, streamForEncoding
ChainRepositoryPortPostgresgetLastRowHash(bucketHour, operatorId), getRollup, insertRollup, walkChain
ObjectStorePortMinIO / S3putObjectLocked, putManifest, getObject, headObject
HsmSignerPortPKCS#11 (Thales Luna / SoftHSMv2 dev)sign(keyId, sha256), listKeys, rotateKey
SftpClientPortApache Mina SSHDuploadAtomic(localPath, remotePath), fingerprint()
AtraRestClientPortmTLS HTTPuploadMultipart(file, sig, meta), getReceipt
BillingPricingPortgRPC → billing-servicegetPricingSnapshot(messageId), getPricingByTable(...)
OperatorRegistryPortgRPC → operator-management-servicegetRecordingEntity(operatorId)
TransparencyLogPortTrillian HTTPSsubmitLeaf(leaf), getInclusionProof(leafIndex)
EventPublisherPortNATS JetStream via outbox relaypublish(subject, payload)
LeaderElectionPortPostgres advisory locktryAcquire(lockName), release(lockName)
MsisdnVaultPortVault TransitencryptForOperator(operatorId, msisdn), decryptForEncoding(cdrId, operatorId)

All ports are mockable in tests (see TESTING_STRATEGY §3).


20. Concurrency & Leadership

  • Leader election — single leader replica holds pg_advisory_lock(hashtext('cdr.export.leader')) and runs cron jobs (UC-03, UC-07, UC-08, UC-12, UC-13, UC-15). On leader crash the lock releases automatically within Postgres heartbeat (≤ 30 s) and another replica takes over.
  • Ingest concurrency — UC-01 runs on all replicas; consumers are a JetStream queue group so each NATS message is delivered to exactly one consumer.
  • Idempotency — every NATS consumer uses sourceEventId for dedup; every REST mutation accepts Idempotency-Key and stores outcomes for 24 h.
  • Retry policy — ingest: JetStream redelivers up to 5× with exponential backoff capped at 5 min; after that the event moves to .deadletter with reason cdr_ingest_failed.

21. Cross-References

End of APPLICATION_LOGIC.md