Skip to main content

Analytics Service — Application Logic

Status: populated Owner: Platform Engineering Last updated: 2026-04-18 Companion: DOMAIN_MODEL · API_CONTRACTS

1. Use Cases

1.1 ProcessBillingEventUseCase

Triggered by consuming billing.events (NATS durable consumer anlyt-billing-consumer).

1. Deserialize + validate event schema
2. Check dedup: SELECT FROM anlyt.processed_events WHERE event_id = $1
→ If found: ACK (idempotent replay)
3. Compute hour_bucket = date_trunc('hour', event.billedAt)
4. UPSERT anlyt.metrics_hourly:
ON CONFLICT (hour_bucket, scope='ACCOUNT', scope_id=accountId)
DO UPDATE SET
total_messages = total_messages + EXCLUDED.total_messages,
total_cost = total_cost + EXCLUDED.total_cost
5. UPSERT anlyt.metrics_hourly (scope='OPERATOR', scope_id=operatorId) — same pattern
6. UPSERT anlyt.metrics_hourly (scope='PLATFORM', scope_id='platform') — same pattern
7. INSERT anlyt.processed_events (event_id, event_type, processed_at)
8. ACK

1.2 ProcessDlrEventUseCase

Triggered by consuming sms.dlr.inbound (NATS durable consumer anlyt-dlr-consumer).

1. Deserialize + validate event schema
2. Dedup check (same as §1.1)
3. Compute hour_bucket from event.receivedAt
4. UPSERT anlyt.operator_performance:
ON CONFLICT (hour_bucket, operator_id)
DO UPDATE SET
delivered_messages = CASE WHEN deliveryStatus='DELIVERED' THEN delivered_messages + 1 ELSE delivered_messages END,
failed_messages = CASE WHEN deliveryStatus IN ('FAILED','UNDELIVERABLE') THEN failed_messages + 1 ELSE failed_messages END,
latency_sum_ms = latency_sum_ms + EXCLUDED.latency_ms,
latency_sample_count = latency_sample_count + 1,
peak_tps = GREATEST(peak_tps, EXCLUDED.tps_snapshot)
5. INSERT anlyt.processed_events
6. ACK

1.3 RollUpHourlyToDailyUseCase (scheduled)

Runs hourly (cron 0 * * * *).

1. Find hourly buckets with updated_at > last_rollup_at for each day that has changed
2. For each affected (day, scope, scope_id):
SELECT aggregated sums from anlyt.metrics_hourly WHERE day_bucket = date_trunc('day', hour_bucket)
3. UPSERT anlyt.metrics_daily (same idempotent pattern)
4. Update rollup checkpoint

1.4 GetPlatformSummaryUseCase

Triggered by GET /v1/internal/analytics/summary.

1. Parse + validate query params (from, to, granularity)
2. Validate window ≤ 90 days
3. SELECT FROM anlyt.metrics_daily (or metrics_hourly) WHERE day_bucket BETWEEN from AND to AND scope = 'PLATFORM'
4. Aggregate across rows: SUM totals, WEIGHTED AVG latency, MAX peakTps
5. Return DTO

1.5 GetOperatorPerformanceUseCase

Similar to §1.4 but queries anlyt.operator_performance for operatorId.

1.6 GetAccountUsageUseCase

Similar to §1.4 but queries anlyt.account_usage_daily for accountId. Also returns dailyBreakdown array.

1.7 GetThroughputUseCase

Queries hourly data with linear interpolation for sub-hour resolution. If resolution = 1m, returns synthesized values (not actual per-minute counts — caveat documented in response).

1.8 GetDeliveryBreakdownUseCase

Aggregates byStatus and byOperator from anlyt.operator_performance and anlyt.metrics_hourly for the requested window.

2. Ports

PortAdapter
BillingEventSubscriberNATS JetStream consumer (billing.events)
DlrEventSubscriberNATS JetStream consumer (sms.dlr.inbound)
MetricsRepositoryPrisma / PostgreSQL (anlyt schema)
ClickHouseRepositoryClickHouse HTTP adapter (optional, used for > 90 d queries)
ClockSystem clock + test override

3. Concurrency

  • Two independent NATS consumers (billing, DLR) process in parallel.
  • Each consumer has configurable in-flight limit (default 32 per pod).
  • Upsert conflicts are resolved by the database ON CONFLICT DO UPDATE — no application-level locking needed.
  • Same event replayed concurrently: processed_events INSERT with ON CONFLICT DO NOTHING prevents double-count.

4. Observability Hooks

  • OTel span per use-case: anlyt.process.billingEvent, anlyt.process.dlrEvent, anlyt.rollup.daily.
  • Pino logs: eventId, eventType, bucketHour, durationMs, traceId.
  • Prometheus: anlyt_events_processed_total{type, result}, anlyt_upsert_duration_seconds, anlyt_rollup_duration_seconds, anlyt_query_duration_seconds{endpoint}.