Analytics Service — Data Model
Status: populated Owner: Platform Engineering Last updated: 2026-04-18 Companion: DOMAIN_MODEL
Schema: anlyt. Owned exclusively by analytics-service. No cross-service writes.
1. Tables
anlyt.metrics_hourly
Hourly aggregates across all scopes (platform, operator, account).
CREATE TABLE anlyt.metrics_hourly (
hour_bucket TIMESTAMPTZ NOT NULL, -- date_trunc('hour', ...)
scope TEXT NOT NULL, -- 'PLATFORM'|'OPERATOR'|'ACCOUNT'
scope_id TEXT NOT NULL, -- operatorId, accountId, or 'platform'
total_messages BIGINT NOT NULL DEFAULT 0,
delivered_messages BIGINT NOT NULL DEFAULT 0,
failed_messages BIGINT NOT NULL DEFAULT 0,
pending_messages BIGINT NOT NULL DEFAULT 0,
total_cost NUMERIC(18,6) NOT NULL DEFAULT 0,
latency_sum_ms BIGINT NOT NULL DEFAULT 0,
latency_sample_count BIGINT NOT NULL DEFAULT 0,
peak_tps INTEGER NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (hour_bucket, scope, scope_id)
) PARTITION BY RANGE (hour_bucket);
-- Monthly partitions; 90-day hot retention
Computed fields (not stored, derived on read):
delivery_rate = delivered_messages / NULLIF(total_messages, 0)avg_latency_ms = latency_sum_ms / NULLIF(latency_sample_count, 0)error_rate = failed_messages / NULLIF(total_messages, 0)
anlyt.metrics_daily
Daily roll-up. Same columns as metrics_hourly but day_bucket replaces hour_bucket.
CREATE TABLE anlyt.metrics_daily (
day_bucket TIMESTAMPTZ NOT NULL, -- date_trunc('day', ...)
scope TEXT NOT NULL,
scope_id TEXT NOT NULL,
total_messages BIGINT NOT NULL DEFAULT 0,
delivered_messages BIGINT NOT NULL DEFAULT 0,
failed_messages BIGINT NOT NULL DEFAULT 0,
pending_messages BIGINT NOT NULL DEFAULT 0,
total_cost NUMERIC(18,6) NOT NULL DEFAULT 0,
latency_sum_ms BIGINT NOT NULL DEFAULT 0,
latency_sample_count BIGINT NOT NULL DEFAULT 0,
peak_tps INTEGER NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (day_bucket, scope, scope_id)
);
anlyt.operator_performance
Dedicated hourly operator-level table with richer latency tracking.
CREATE TABLE anlyt.operator_performance (
hour_bucket TIMESTAMPTZ NOT NULL,
operator_id UUID NOT NULL,
total_messages BIGINT NOT NULL DEFAULT 0,
delivered_messages BIGINT NOT NULL DEFAULT 0,
failed_messages BIGINT NOT NULL DEFAULT 0,
latency_sum_ms BIGINT NOT NULL DEFAULT 0,
latency_sample_count BIGINT NOT NULL DEFAULT 0,
latency_p95_ms INTEGER, -- updated by periodic percentile recompute job
peak_tps INTEGER NOT NULL DEFAULT 0,
error_count BIGINT NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (hour_bucket, operator_id)
) PARTITION BY RANGE (hour_bucket);
CREATE INDEX ix_anlyt_op_perf_operator_hour ON anlyt.operator_performance(operator_id, hour_bucket DESC);
anlyt.account_usage_daily
Daily account-level cost and volume summary.
CREATE TABLE anlyt.account_usage_daily (
day_bucket TIMESTAMPTZ NOT NULL,
account_id UUID NOT NULL,
messages_sent BIGINT NOT NULL DEFAULT 0,
messages_delivered BIGINT NOT NULL DEFAULT 0,
messages_failed BIGINT NOT NULL DEFAULT 0,
total_cost NUMERIC(18,6) NOT NULL DEFAULT 0,
total_segments BIGINT NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (day_bucket, account_id)
);
CREATE INDEX ix_anlyt_acct_usage_account ON anlyt.account_usage_daily(account_id, day_bucket DESC);
anlyt.processed_events
Deduplication table. Prevents double-counting on NATS redelivery.
CREATE TABLE anlyt.processed_events (
event_id TEXT PRIMARY KEY,
event_type TEXT NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX ix_anlyt_processed_at ON anlyt.processed_events(processed_at);
-- Purge job: DELETE WHERE processed_at < now() - interval '48 hours'
2. Retention
| Table | Hot retention | Archive |
|---|---|---|
metrics_hourly | 90 days | ClickHouse (optional) |
metrics_daily | 365 days | ClickHouse (optional) |
operator_performance | 90 days | ClickHouse (optional) |
account_usage_daily | 365 days | ClickHouse (optional) |
processed_events | 48 hours (purge job) | Not archived |
3. ClickHouse (Optional, > 90 d)
Tables mirror PostgreSQL structure. ETL job (anlyt-etl) copies rows nightly.
-- ClickHouse example
CREATE TABLE anlyt.metrics_daily (
day_bucket DateTime,
scope LowCardinality(String),
scope_id String,
total_messages UInt64,
...
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(day_bucket)
ORDER BY (scope, scope_id, day_bucket);
4. TypeScript Interfaces
export interface MetricsHourlyRow {
hourBucket: Date;
scope: 'PLATFORM' | 'OPERATOR' | 'ACCOUNT';
scopeId: string;
totalMessages: bigint;
deliveredMessages: bigint;
failedMessages: bigint;
pendingMessages: bigint;
totalCost: string; // NUMERIC as string
latencySumMs: bigint;
latencySampleCount: bigint;
peakTps: number;
updatedAt: Date;
}