Analytics Service — Application Logic
Status: populated Owner: Platform Engineering Last updated: 2026-04-18 Companion: DOMAIN_MODEL · API_CONTRACTS
1. Use Cases
1.1 ProcessBillingEventUseCase
Triggered by consuming billing.events (NATS durable consumer anlyt-billing-consumer).
1. Deserialize + validate event schema
2. Check dedup: SELECT FROM anlyt.processed_events WHERE event_id = $1
→ If found: ACK (idempotent replay)
3. Compute hour_bucket = date_trunc('hour', event.billedAt)
4. UPSERT anlyt.metrics_hourly:
ON CONFLICT (hour_bucket, scope='ACCOUNT', scope_id=accountId)
DO UPDATE SET
total_messages = total_messages + EXCLUDED.total_messages,
total_cost = total_cost + EXCLUDED.total_cost
5. UPSERT anlyt.metrics_hourly (scope='OPERATOR', scope_id=operatorId) — same pattern
6. UPSERT anlyt.metrics_hourly (scope='PLATFORM', scope_id='platform') — same pattern
7. INSERT anlyt.processed_events (event_id, event_type, processed_at)
8. ACK
1.2 ProcessDlrEventUseCase
Triggered by consuming sms.dlr.inbound (NATS durable consumer anlyt-dlr-consumer).
1. Deserialize + validate event schema
2. Dedup check (same as §1.1)
3. Compute hour_bucket from event.receivedAt
4. UPSERT anlyt.operator_performance:
ON CONFLICT (hour_bucket, operator_id)
DO UPDATE SET
delivered_messages = CASE WHEN deliveryStatus='DELIVERED' THEN delivered_messages + 1 ELSE delivered_messages END,
failed_messages = CASE WHEN deliveryStatus IN ('FAILED','UNDELIVERABLE') THEN failed_messages + 1 ELSE failed_messages END,
latency_sum_ms = latency_sum_ms + EXCLUDED.latency_ms,
latency_sample_count = latency_sample_count + 1,
peak_tps = GREATEST(peak_tps, EXCLUDED.tps_snapshot)
5. INSERT anlyt.processed_events
6. ACK
1.3 RollUpHourlyToDailyUseCase (scheduled)
Runs hourly (cron 0 * * * *).
1. Find hourly buckets with updated_at > last_rollup_at for each day that has changed
2. For each affected (day, scope, scope_id):
SELECT aggregated sums from anlyt.metrics_hourly WHERE day_bucket = date_trunc('day', hour_bucket)
3. UPSERT anlyt.metrics_daily (same idempotent pattern)
4. Update rollup checkpoint
1.4 GetPlatformSummaryUseCase
Triggered by GET /v1/internal/analytics/summary.
1. Parse + validate query params (from, to, granularity)
2. Validate window ≤ 90 days
3. SELECT FROM anlyt.metrics_daily (or metrics_hourly) WHERE day_bucket BETWEEN from AND to AND scope = 'PLATFORM'
4. Aggregate across rows: SUM totals, WEIGHTED AVG latency, MAX peakTps
5. Return DTO
1.5 GetOperatorPerformanceUseCase
Similar to §1.4 but queries anlyt.operator_performance for operatorId.
1.6 GetAccountUsageUseCase
Similar to §1.4 but queries anlyt.account_usage_daily for accountId. Also returns dailyBreakdown array.
1.7 GetThroughputUseCase
Queries hourly data with linear interpolation for sub-hour resolution. If resolution = 1m, returns synthesized values (not actual per-minute counts — caveat documented in response).
1.8 GetDeliveryBreakdownUseCase
Aggregates byStatus and byOperator from anlyt.operator_performance and anlyt.metrics_hourly for the requested window.
2. Ports
| Port | Adapter |
|---|---|
BillingEventSubscriber | NATS JetStream consumer (billing.events) |
DlrEventSubscriber | NATS JetStream consumer (sms.dlr.inbound) |
MetricsRepository | Prisma / PostgreSQL (anlyt schema) |
ClickHouseRepository | ClickHouse HTTP adapter (optional, used for > 90 d queries) |
Clock | System clock + test override |
3. Concurrency
- Two independent NATS consumers (billing, DLR) process in parallel.
- Each consumer has configurable in-flight limit (default 32 per pod).
- Upsert conflicts are resolved by the database
ON CONFLICT DO UPDATE— no application-level locking needed. - Same event replayed concurrently:
processed_eventsINSERT withON CONFLICT DO NOTHINGprevents double-count.
4. Observability Hooks
- OTel span per use-case:
anlyt.process.billingEvent,anlyt.process.dlrEvent,anlyt.rollup.daily. - Pino logs:
eventId,eventType,bucketHour,durationMs,traceId. - Prometheus:
anlyt_events_processed_total{type, result},anlyt_upsert_duration_seconds,anlyt_rollup_duration_seconds,anlyt_query_duration_seconds{endpoint}.