Skip to main content

CDR Mediation Service — Event Schemas

Version: 1.0 Status: Draft Owner: Commerce + Regulator Liaison Last Updated: 2026-04-21 Companion: DOMAIN_MODEL · SYNC_CONTRACT · SECURITY_MODEL

All events are published to NATS JetStream via a transactional outbox pattern (see DATA_MODEL §2). Every event carries schemaVersion, eventId (UUIDv4), traceId, occurredAt (RFC 3339). Bodies are PII-safe: raw MSISDNs never appear; only msisdnHashTo / msisdnHashFrom bytea-hex.


1. Streams, subjects, retention

StreamSubjectsHot retentionCold retentionReplicasDedup window
CDR_EVENTScdr.record.appended.v1, cdr.record.repriced.v1, cdr.bucket.sealed.v1, cdr.generated.v113 months7 years (S3 archive)35 min
CDR_EXPORTScdr.exported.v1, cdr.export.acked.v1, cdr.export.rejected.v113 months7 years35 min
CDR_ADJUSTMENTScdr.adjustment.created.v1, cdr.adjustment.batch.issued.v113 months7 years35 min
CDR_AUDITcdr.audit.v1, cdr.archive.completed.v1, cdr.config.changed.v1, cdr.quarantine.resubmitted.v113 months7 years35 min

Each stream has a .deadletter companion. Consumers are durable with explicit ack.

All subjects carry the partitionKey = operatorId NATS header so JetStream subject-partition sharding gives per-operator ordering.


2. Produced events

2.1 cdr.record.appended.v1

Emitted once per inserted CdrRecord (including adjustments). High volume (~1 per DLR). Consumers may opt for the hourly cdr.generated.v1 summary instead.

{
"$id": "https://schemas.ghasi.af/cdr/record-appended-v1.json",
"type": "object",
"required": [
"schemaVersion","eventId","cdrId","bucketHour","operatorId",
"direction","chargeType","rowHash","chainHashPrev",
"tenantId","messageId","segmentCount","traceId","occurredAt"
],
"properties": {
"schemaVersion": { "const": "1" },
"eventId": { "type": "string", "format": "uuid" },
"cdrId": { "type": "string", "pattern": "^cdr_[0-9A-HJKMNP-TV-Z]{26}$" },
"bucketHour": { "type": "string", "format": "date-time" },
"operatorId": { "type": "string", "enum": ["AWCC","MTN_AF","ETS","ROSHAN","SALAAM","INTL_IN","INTL_OUT"] },
"tenantId": { "type": ["string","null"], "format": "uuid" },
"messageId": { "type": "string", "format": "uuid" },
"direction": { "type": "string", "enum": ["MO","MT"] },
"chargeType": { "type": "string", "enum": ["MO","MT","BROADCAST","INTERNATIONAL_MT","P2P","A2P"] },
"billingIndicator": { "type": "string", "enum": ["CHARGEABLE","FREE","REVERSE_CHARGED","CPP","MPP","UNKNOWN"] },
"tapTariffClass":{ "type": ["string","null"], "pattern": "^[A-Z0-9]{4}$" },
"chargeAmount": { "type": ["string","null"], "pattern": "^-?\\d+\\.\\d{6}$" },
"chargeCurrency":{ "type": "string", "enum": ["AFN","USD"] },
"segmentCount": { "type": "integer", "minimum": 1, "maximum": 255 },
"encoding": { "type": "string", "enum": ["GSM7","UCS2","GSM8_LATIN"] },
"msisdnHashTo": { "type": "string", "pattern": "^[0-9a-f]{64}$" },
"msisdnHashFrom":{ "type": ["string","null"], "pattern": "^[0-9a-f]{64}$" },
"rowHash": { "type": "string", "pattern": "^[0-9a-f]{64}$" },
"chainHashPrev": { "type": "string", "pattern": "^[0-9a-f]{64}$" },
"adjustmentOf": { "type": ["string","null"], "pattern": "^cdr_[0-9A-HJKMNP-TV-Z]{26}$" },
"adjustmentType":{ "type": ["string","null"], "enum": [null,"CORRECTION","VOID","RE_RATE"] },
"traceId": { "type": "string" },
"occurredAt": { "type": "string", "format": "date-time" }
},
"additionalProperties": false
}

2.2 cdr.generated.v1 (hourly summary)

Fired once per (bucketHour, operatorId) at rollup seal. Downstream consumers such as analytics-service prefer this coarser stream.

interface CdrGenerated {
schemaVersion: "1";
eventId: string;
rollupId: string; // "roll_..."
bucketHour: string; // RFC3339
operatorId: string;
recordCount: number;
moCount: number;
mtCount: number;
broadcastCount: number;
internationalMtCount: number;
chargeableSumMinor: string; // decimal(18,6) as string
freeSumMinor: string;
bucketRoot: string; // 64-hex
chainHash: string; // 64-hex
prevChainHash: string; // 64-hex
sealedAt: string;
signerKeyId: string;
traceId: string;
occurredAt: string;
}

2.3 cdr.record.repriced.v1

Emitted after a deferred billing snapshot updates a previously NULL-priced CDR via a CORRECTION adjustment.

interface CdrRecordRepriced {
schemaVersion: "1";
eventId: string;
originalCdrId: string;
adjustmentId: string;
chargeAmount: string;
chargeCurrency: "AFN" | "USD";
tapTariffClass: string;
billingIndicator: string;
traceId: string;
occurredAt: string;
}

2.4 cdr.bucket.sealed.v1

interface CdrBucketSealed {
schemaVersion: "1";
eventId: string;
bucketHour: string;
operatorId: string;
recordCount: number;
bucketRoot: string;
chainHash: string;
prevChainHash: string;
sealedAt: string;
emptyBucket: boolean;
signerKeyId: string;
traceId: string;
occurredAt: string;
}

2.5 cdr.exported.v1

Emitted when a TAP/RAP file leaves the platform (SFTP or HTTPS).

interface CdrExported {
schemaVersion: "1";
eventId: string;
exportId: string;
exportType: "TAP_3_12" | "RAP_1_5" | "CSV_ANCILLARY";
schemaVariant: string;
recordingEntity: string;
senderRoamingPartner: string | null;
settlementDay: string; // YYYY-MM-DD
fileSequenceNumber: number;
fileName: string;
fileSha256: string;
byteSize: number;
recordsIncluded: number;
quarantinedCount: number;
signerKeyId: string;
deliveryChannel: "SFTP" | "API";
deliveryState: "DELIVERED_SFTP" | "DELIVERED_API";
deliveredAt: string;
atraReceiptId: string | null;
traceId: string;
occurredAt: string;
}

2.6 cdr.export.acked.v1

interface CdrExportAcked {
schemaVersion: "1";
eventId: string;
exportId: string;
atraReceiptId: string;
atraVerificationStatus: "VERIFIED";
ackedAt: string;
traceId: string;
occurredAt: string;
}

2.7 cdr.export.rejected.v1

interface CdrExportRejected {
schemaVersion: "1";
eventId: string;
exportId: string;
rejectionReason:
| "SCHEMA_MISMATCH"
| "INTEGRITY_MISMATCH"
| "SIGNATURE_INVALID"
| "SEQUENCE_OUT_OF_ORDER"
| "DELIVERY_CHANNELS_EXHAUSTED";
atraReceiptId: string | null;
atraErrorPayload: unknown; // raw ATRA response for diagnosis
retryCount: number;
rejectedAt: string;
traceId: string;
occurredAt: string;
}

2.8 cdr.adjustment.created.v1

interface CdrAdjustmentCreated {
schemaVersion: "1";
eventId: string;
adjustmentId: string;
originalCdrId: string;
adjustmentType: "CORRECTION" | "VOID" | "RE_RATE";
reasonCode: string; // mapped from free-text reason
voidReason: string | null;
ticketId: string;
issuedByUserId: string;
approverUserId: string | null;
correctedFields: Record<string, unknown> | null;
newPricingTableId: string | null;
bucketHour: string;
traceId: string;
occurredAt: string;
}

2.9 cdr.adjustment.batch.issued.v1

interface CdrAdjustmentBatch {
schemaVersion: "1";
eventId: string;
jobId: string;
adjustmentType: "RE_RATE" | "VOID";
count: number;
sumDeltaMinor: string; // net price delta across batch, decimal string
operatorId: string | null;
tenantId: string | null;
bucketHourFrom: string;
bucketHourTo: string;
issuedByUserId: string;
approverUserId: string;
ticketId: string;
startedAt: string;
finishedAt: string;
traceId: string;
occurredAt: string;
}

2.10 cdr.audit.v1

Emitted on chain-verify results, transparency anchor events, schema changes, and HSM key rotations.

interface CdrAudit {
schemaVersion: "1";
eventId: string;
auditId: string;
entryType:
| "CHAIN_VERIFY_OK"
| "CHAIN_BREAK_DETECTED"
| "TRANSPARENCY_ANCHORED"
| "SCHEMA_CHANGED"
| "KEY_ROTATED"
| "EXPORT_REPLAYED"
| "ADJUSTMENT_ISSUED";
bucketHour: string | null;
operatorId: string | null;
actorUserId: string | null;
details: Record<string, unknown>;
traceId: string;
occurredAt: string;
}

2.11 cdr.archive.completed.v1

interface CdrArchiveCompleted {
schemaVersion: "1";
eventId: string;
archiveId: string;
partitionNames: string[];
rowCount: number;
byteSize: number;
s3ManifestUri: string;
droppedAt: string;
traceId: string;
occurredAt: string;
}

2.12 cdr.config.changed.v1

Fired when the active schemaVariant, signerKeyId, SFTP endpoint, or any regulator-facing config changes.

interface CdrConfigChanged {
schemaVersion: "1";
eventId: string;
configKey: string;
previousValue: string | null;
newValue: string;
actorUserId: string;
approverUserId: string | null;
reason: string;
ticketId: string | null;
traceId: string;
occurredAt: string;
}

2.13 cdr.quarantine.resubmitted.v1

interface CdrQuarantineResubmitted {
schemaVersion: "1";
eventId: string;
quarantineId: string;
cdrId: string;
resolution: "RESUBMITTED" | "DISCARDED";
encoderError: string | null;
resolvedBy: string;
resolvedAt: string;
traceId: string;
occurredAt: string;
}

3. Consumed events

SubjectProducerConsumer action
sms.dlr.inbounddlr-processorUC-01: project into CDR row
sms.events.statussms-orchestratorIgnored by default; reserved for non-DLR terminal transitions (e.g., EXPIRED before DLR)
compliance.audit.v1compliance-engineCorrelate with CDR for cdrAuditEntry.details.complianceEvaluationId — enriches regulator evidence
billing.events.v1billing-serviceCross-reference for reconciliation reports (not on hot path)
billing.pricing.snapshot.ready.v1billing-serviceUC-02: late reprice
operator.registry.updated.v1operator-management-serviceInvalidate OperatorRegistryPort cache
fraud.reversal.v1fraud-intel-serviceAuto-issue VOID adjustment with voidReason='FRAUD_REVERSAL'

Consumer durability: cdr-mediation-dlr (queue group, 3 pods), cdr-mediation-audit-sink, cdr-mediation-reprice, cdr-mediation-fraud-reversal. Ack-wait 30 s, maxDeliver 5, dead-letter after.


4. Consumer contracts (by subscribing service)

ServiceSubjectsPurpose
regulator-portal-servicecdr.generated.v1, cdr.exported.v1, cdr.export.acked.v1, cdr.export.rejected.v1, cdr.audit.v1Regulator-facing dashboard & compliance reports
analytics-servicecdr.generated.v1, cdr.adjustment.batch.issued.v1ClickHouse hot-tier ingestion (per EP-ANLYT-02)
billing-servicecdr.record.appended.v1, cdr.adjustment.created.v1, cdr.record.repriced.v1Reconciliation per EP-BILL-09 (billing-vs-CDR audit)
notification-servicecdr.export.rejected.v1, cdr.audit.v1 (CHAIN_BREAK_DETECTED)Platform-operator notifications
admin-dashboard (SSE)cdr.exported.v1, cdr.export.acked.v1, cdr.export.rejected.v1, cdr.adjustment.batch.issued.v1Live ops view

5. PII & security rules for events

  • Raw MSISDN is forbidden in any event body. Only msisdnHashTo/msisdnHashFrom (hex SHA-256) appear.
  • Message body never appears (not stored in CDR anyway — CDRs are metadata only).
  • Operator codes, recording entities, tariff class codes are internal classifications, not PII.
  • Event payloads are signed with the producer's RS256 outbox-relay key (platform-wide); critical consumers (regulator-portal, billing) verify opportunistically.
  • Event payloads classified CONFIDENTIAL-INTERNAL; replication outside platform requires Legal approval.

6. Outbox pattern

All CDR state changes write an event row to cdr.outbox in the same Postgres transaction as the aggregate write. A relay process (leader-elected) publishes to NATS with retry + ack. Outbox entries older than 7 days with published_at IS NULL raise SEV1 (platform-wide outbox monitoring).

CREATE TABLE cdr.outbox (
event_id UUID PRIMARY KEY,
subject TEXT NOT NULL,
partition_key TEXT, -- operatorId or null
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
published_at TIMESTAMPTZ
);
CREATE INDEX ix_cdr_outbox_unpublished
ON cdr.outbox (created_at)
WHERE published_at IS NULL;

7. Schema evolution

  • Additive fields with defaults are non-breaking within the same schemaVersion.
  • Breaking changes bump to cdr.<topic>.v2; subjects coexist during a 90-day deprecation window.
  • Enum values are additive. Consumers MUST treat unknown enum values as UNKNOWN and not fail.
  • A schema registry entry under cdr.* namespace is maintained; schemas-cli validate runs in CI on every PR.

8. Cross-References

End of EVENT_SCHEMAS.md