Skip to main content

APPLICATION_LOGIC — analytics-service

Sibling: DOMAIN_MODEL · API_CONTRACTS · EVENT_SCHEMAS · DATA_MODEL · platform anchors: docs/02 §4 Service template, docs/05 API design, docs/standards/SERVICE_TEMPLATE

The application layer is NestJS-based and follows the four-layer architecture (presentationapplicationdomaininfrastructure). It coordinates use cases, owns transactions on the metadata DB, and brokers BigQuery jobs and Pub/Sub I/O. It contains no business rules; those live in domain/.


1. Source layout

services/analytics-service/src/
├─ presentation/
│ ├─ http/ # NestJS controllers
│ ├─ pubsub/ # push subscription handlers (raw landing is managed; this handles signals + DQ)
│ └─ scheduler/ # Cloud Scheduler / Workflows entrypoints
├─ application/
│ ├─ use-cases/
│ └─ ports/ # Repository + Client interfaces
├─ domain/ # Pure TS aggregates + invariants
└─ infrastructure/
├─ persistence/ # Drizzle (Postgres) repos
├─ warehouse/ # BigQueryClient adapter
├─ messaging/ # Pub/Sub publisher/consumer wrappers
├─ ai/ # AIClient binding
└─ scheduling/ # Cloud Scheduler / Workflows

2. Ports

export interface ProjectionRepository {
save(p: Projection): Promise<void>;
findById(id: ProjectionId): Promise<Projection | null>;
findByKey(key: string): Promise<Projection | null>;
list(filter?: { archived?: boolean }): Promise<Projection[]>;
}

export interface MetricDefinitionRepository {
save(m: MetricDefinition): Promise<void>;
findByKeyVersion(key: string, version: number): Promise<MetricDefinition | null>;
list(tenantId: TenantId, filter?: { archived?: boolean }): Promise<MetricDefinition[]>;
}

export interface DashboardRepository {
save(d: Dashboard): Promise<void>;
findById(id: DashboardId, tenantId: TenantId): Promise<Dashboard | null>;
list(tenantId: TenantId, ownerUserId?: UserId): Promise<Dashboard[]>;
}

export interface WidgetRepository {
save(w: Widget): Promise<void>;
findById(id: WidgetId, tenantId: TenantId): Promise<Widget | null>;
listByDashboard(dashboardId: DashboardId, tenantId: TenantId): Promise<Widget[]>;
}

export interface QueryRepository {
save(q: Query): Promise<void>;
findById(id: QueryId, tenantId: TenantId): Promise<Query | null>;
}

export interface ETLRepository {
saveJob(j: ETLJob): Promise<void>;
saveRun(r: ETLRun): Promise<void>;
findRunById(id: ETLRunId): Promise<ETLRun | null>;
findDueJobs(now: Date): Promise<ETLJob[]>;
}

export interface DataQualityRepository {
saveCheck(c: DataQualityCheck): Promise<void>;
saveResult(r: DataQualityResult): Promise<void>;
list(filter: { tableLike?: string; enabled?: boolean }): Promise<DataQualityCheck[]>;
}

export interface BigQueryClient {
runQuery(input: {
projectId?: string;
sql: string;
params: Record<string, unknown>;
location: string;
byteCap: number;
labels: Record<string, string>;
deadlineMs: number;
}): Promise<{
rows: unknown[];
bytesScanned: number;
bytesBilled: number;
slotMs: number;
jobId: string;
cacheHit: boolean;
}>;

runMerge(input: {
targetTable: string;
mergeSql: string;
location: string;
byteCap: number;
labels: Record<string, string>;
}): Promise<{
bytesScanned: number;
bytesBilled: number;
slotMs: number;
rowsAffected: number;
jobId: string;
}>;

describeTable(table: string): Promise<{ schema: BigQuerySchema; partitioning: unknown; clustering: string[] }>;
}

export interface CacheClient {
get<T>(key: string): Promise<T | null>;
set<T>(key: string, value: T, ttlSeconds: number): Promise<void>;
invalidate(prefix: string): Promise<void>;
}

export interface EventPublisher {
publish<T>(envelope: EventEnvelope<T>): Promise<void>;
}

export interface AIClient {
invoke<TInput, TOutput>(req: {
capability: string;
tenantId: TenantId;
correlationId: string;
input: TInput;
options?: { maxLatencyMs?: number; maxCostUsdMicros?: number };
}): Promise<{ output: TOutput; provenance: AIProvenance }>;
}

export interface IdentityResolver {
resolveTenantUserPropertyAccess(jwt: VerifiedJwt): Promise<{
tenantId: TenantId;
userId: UserId;
propertyAccess: readonly PropertyId[];
permissions: readonly string[];
aiSettings: { capabilities: Record<string, boolean> };
}>;
}

Production bindings: DrizzleProjectionRepository, BigQuerySdkClient, RedisCacheClient, PubsubEventPublisher, OrchestratorAIClient, JwtIdentityResolver. Tests use in-memory adapters; BigQueryClient is faked with a deterministic SQL stub.


3. Cross-cutting middleware

  • RequestContextMiddleware — extracts correlationId, tenantId, userId from JWT/headers; stamps app.tenant_id GUC on the metadata Postgres connection.
  • ByteCapInterceptor — clamps BigQueryClient.runQuery byte caps to the lesser of metric/query/widget caps.
  • IdempotencyInterceptor — for POST endpoints accepting Idempotency-Key; persists (tenantId, idempotencyKey, requestHash, responseDigest).
  • OptimisticConcurrencyInterceptor — translates etag (header If-Match or body expectedVersion) to repository OCC and returns 412 on mismatch.
  • RateLimitGuard — token-bucket per (tenantId, action); backed by Memorystore.
  • AuditEmitter — emits analytics.* audit events post-success.

4. Use cases

4.1 RunWidgetDataUseCase

Reads widget data, applying cache, byte caps, and tenant scope.

export class RunWidgetDataUseCase {
constructor(
private readonly widgets: WidgetRepository,
private readonly metrics: MetricDefinitionRepository,
private readonly queries: QueryRepository,
private readonly bq: BigQueryClient,
private readonly cache: CacheClient,
private readonly publisher: EventPublisher,
private readonly clock: Clock,
) {}

async execute(input: {
tenantId: TenantId; userId: UserId; propertyAccess: readonly PropertyId[];
widgetId: WidgetId; params?: Record<string, unknown>;
correlationId: string;
}): Promise<{ rows: unknown[]; provenance: Provenance; cacheHit: boolean }> {
const widget = await this.widgets.findById(input.widgetId, input.tenantId);
if (!widget) throw new NotFoundError('widget', input.widgetId);

const cacheKey = makeWidgetCacheKey(widget, input.params);
const cached = await this.cache.get<{ rows: unknown[]; provenance: Provenance }>(cacheKey);
if (cached) return { ...cached, cacheHit: true };

const sql = await this.compileWidgetSql(widget, input.params, input.tenantId, input.propertyAccess);
const r = await this.bq.runQuery({
sql,
params: bindParams(widget, input.params, input.tenantId),
location: tenantLocation(input.tenantId),
byteCap: byteCapFor(widget),
labels: { service: 'analytics', tenant_id: input.tenantId, widget_id: input.widgetId },
deadlineMs: 5_000,
});
const provenance: Provenance = {
computedAt: this.clock.nowIso(),
computedBy: 'on_demand',
bytesScanned: r.bytesScanned, slotMs: r.slotMs, warehouseJobId: r.jobId,
};
await this.cache.set(cacheKey, { rows: r.rows, provenance }, ttlForWidget(widget));
await this.publisher.publish(envelopeFor('query.executed.v1', /* … */));
return { rows: r.rows, provenance, cacheHit: false };
}
}

The compileWidgetSql step:

  1. Resolves the metric or saved query by (key, version) or id.
  2. Re-validates that referenced tables match the metric/query's schemaVersion (else throws ProjectionSchemaMismatchError).
  3. Wraps the SQL in WITH session_tenant_scope AS (SELECT '<tenantId>' AS tenant_id) … and joins on it; this is on top of the BigQuery authorized-view RLS, defense-in-depth.
  4. Applies propertyAccess filter.

4.2 RunSavedQueryUseCase

Same pattern, with stricter analytics.author permission and byteCap = min(query.byteCap, perTenantRemainingDailyBudget).

4.3 ComputeMetricUseCase

Computes one metric for a window; called by ETL or on-demand by AI orchestrator.

async execute(input: {
tenantId: TenantId; metricKey: string; metricVersion: number;
windowFrom: string; windowTo: string;
filters?: Record<string, unknown>;
correlationId: string;
}): Promise<{ value: number; unit: MetricUnit; provenance: Provenance }>

After compute, publishes melmastoon.analytics.metric.computed.v1 with the value, unit, dimensions, and provenance.

4.4 RunETLJobUseCase

Orchestrates one projection refresh:

  1. Resolve Projection → MERGE SQL.
  2. Determine the incremental window: from max(business_date) already merged minus reprocess overlap, to now.
  3. Acquire a per-projection lock in Postgres (SELECT … FOR UPDATE) so only one run at a time.
  4. Persist ETLRun with status='running'.
  5. Call BigQueryClient.runMerge.
  6. Persist results, emit etl.completed.v1 or etl.failed.v1.
  7. Trigger downstream DQ checks scoped to the table window.

Concurrency: per-projection lock guarantees serialization; jobs across projections run in parallel up to the worker concurrency limit.

4.5 EnsureProjectionUseCase

Idempotently creates or updates the BigQuery target table to match the projection definition (via BigQueryClient.describeTable + applyDdlIfNeeded); this runs on deploy and on projection.published.

4.6 RunDataQualityCheckUseCase

Executes the DQ rule (template per DQKind), persists DataQualityResult, and emits data_quality.alert.v1 when status ∈ {warn, critical}.

4.7 CreateDashboardUseCase, AddWidgetUseCase, UpdateWidgetUseCase, ShareDashboardUseCase

Standard CRUD with OCC and authorization. ShareDashboardUseCase may issue a Looker Studio embed token (signed JWT) and persists the share row.

4.8 IngestForecastWritebackUseCase

Consumer for melmastoon.ai.forecast.produced.v1. Validates payload tenant + property scope, then MERGEs into fact_demand_forecast via BigQueryClient.runMerge. Emits projection.refreshed.v1 for fact_demand_forecast.

4.9 PurgeTenantUseCase

Consumer for melmastoon.tenant.deleted.v1. Pipeline:

  1. Drop tenant's authorized view bindings.
  2. DELETE rows from curated tables WHERE tenant_id = ….
  3. DELETE rows from events_raw.* WHERE tenant_id = … (operational classes).
  4. Anonymize regulated-class events (replace payload with { "anonymized": true }).
  5. Invalidate cache prefix widget:tnt_…:*.
  6. Emit audit event.

5. Saga participation

analytics-service is a conformist to upstream events; it does not orchestrate sagas. It does emit data_quality.alert.v1 which notification-service consumes to fan out to tenant admins.


6. Authorization decisions

export class AnalyticsAuthorizer {
decide(perm: readonly string[], action: AuthAction, ctx: { propertyAccess: readonly PropertyId[]; ai: { capabilities: Record<string, boolean> } }): AuthorizationDecision {
switch (action.kind) {
case 'view_dashboard': return needs(perm, 'analytics.viewer');
case 'edit_dashboard': return needs(perm, 'analytics.author');
case 'run_query': return needs(perm, 'analytics.author');
case 'run_metric': return needs(perm, 'analytics.viewer');
case 'manage_projection': return needs(perm, 'analytics.admin');
case 'manage_dq': return needs(perm, 'analytics.admin');
}
}
}

Property-scope intersection is performed at compile-sql time in compileWidgetSql.


7. Cross-references