APPLICATION_LOGIC — analytics-service
Sibling: DOMAIN_MODEL · API_CONTRACTS · EVENT_SCHEMAS · DATA_MODEL · platform anchors: docs/02 §4 Service template, docs/05 API design, docs/standards/SERVICE_TEMPLATE
The application layer is NestJS-based and follows the four-layer architecture (presentation → application → domain → infrastructure). It coordinates use cases, owns transactions on the metadata DB, and brokers BigQuery jobs and Pub/Sub I/O. It contains no business rules; those live in domain/.
1. Source layout
services/analytics-service/src/
├─ presentation/
│ ├─ http/ # NestJS controllers
│ ├─ pubsub/ # push subscription handlers (raw landing is managed; this handles signals + DQ)
│ └─ scheduler/ # Cloud Scheduler / Workflows entrypoints
├─ application/
│ ├─ use-cases/
│ └─ ports/ # Repository + Client interfaces
├─ domain/ # Pure TS aggregates + invariants
└─ infrastructure/
├─ persistence/ # Drizzle (Postgres) repos
├─ warehouse/ # BigQueryClient adapter
├─ messaging/ # Pub/Sub publisher/consumer wrappers
├─ ai/ # AIClient binding
└─ scheduling/ # Cloud Scheduler / Workflows
2. Ports
export interface ProjectionRepository {
save(p: Projection): Promise<void>;
findById(id: ProjectionId): Promise<Projection | null>;
findByKey(key: string): Promise<Projection | null>;
list(filter?: { archived?: boolean }): Promise<Projection[]>;
}
export interface MetricDefinitionRepository {
save(m: MetricDefinition): Promise<void>;
findByKeyVersion(key: string, version: number): Promise<MetricDefinition | null>;
list(tenantId: TenantId, filter?: { archived?: boolean }): Promise<MetricDefinition[]>;
}
export interface DashboardRepository {
save(d: Dashboard): Promise<void>;
findById(id: DashboardId, tenantId: TenantId): Promise<Dashboard | null>;
list(tenantId: TenantId, ownerUserId?: UserId): Promise<Dashboard[]>;
}
export interface WidgetRepository {
save(w: Widget): Promise<void>;
findById(id: WidgetId, tenantId: TenantId): Promise<Widget | null>;
listByDashboard(dashboardId: DashboardId, tenantId: TenantId): Promise<Widget[]>;
}
export interface QueryRepository {
save(q: Query): Promise<void>;
findById(id: QueryId, tenantId: TenantId): Promise<Query | null>;
}
export interface ETLRepository {
saveJob(j: ETLJob): Promise<void>;
saveRun(r: ETLRun): Promise<void>;
findRunById(id: ETLRunId): Promise<ETLRun | null>;
findDueJobs(now: Date): Promise<ETLJob[]>;
}
export interface DataQualityRepository {
saveCheck(c: DataQualityCheck): Promise<void>;
saveResult(r: DataQualityResult): Promise<void>;
list(filter: { tableLike?: string; enabled?: boolean }): Promise<DataQualityCheck[]>;
}
export interface BigQueryClient {
runQuery(input: {
projectId?: string;
sql: string;
params: Record<string, unknown>;
location: string;
byteCap: number;
labels: Record<string, string>;
deadlineMs: number;
}): Promise<{
rows: unknown[];
bytesScanned: number;
bytesBilled: number;
slotMs: number;
jobId: string;
cacheHit: boolean;
}>;
runMerge(input: {
targetTable: string;
mergeSql: string;
location: string;
byteCap: number;
labels: Record<string, string>;
}): Promise<{
bytesScanned: number;
bytesBilled: number;
slotMs: number;
rowsAffected: number;
jobId: string;
}>;
describeTable(table: string): Promise<{ schema: BigQuerySchema; partitioning: unknown; clustering: string[] }>;
}
export interface CacheClient {
get<T>(key: string): Promise<T | null>;
set<T>(key: string, value: T, ttlSeconds: number): Promise<void>;
invalidate(prefix: string): Promise<void>;
}
export interface EventPublisher {
publish<T>(envelope: EventEnvelope<T>): Promise<void>;
}
export interface AIClient {
invoke<TInput, TOutput>(req: {
capability: string;
tenantId: TenantId;
correlationId: string;
input: TInput;
options?: { maxLatencyMs?: number; maxCostUsdMicros?: number };
}): Promise<{ output: TOutput; provenance: AIProvenance }>;
}
export interface IdentityResolver {
resolveTenantUserPropertyAccess(jwt: VerifiedJwt): Promise<{
tenantId: TenantId;
userId: UserId;
propertyAccess: readonly PropertyId[];
permissions: readonly string[];
aiSettings: { capabilities: Record<string, boolean> };
}>;
}
Production bindings: DrizzleProjectionRepository, BigQuerySdkClient, RedisCacheClient, PubsubEventPublisher, OrchestratorAIClient, JwtIdentityResolver. Tests use in-memory adapters; BigQueryClient is faked with a deterministic SQL stub.
3. Cross-cutting middleware
RequestContextMiddleware— extractscorrelationId,tenantId,userIdfrom JWT/headers; stampsapp.tenant_idGUC on the metadata Postgres connection.ByteCapInterceptor— clampsBigQueryClient.runQuerybyte caps to the lesser of metric/query/widget caps.IdempotencyInterceptor— forPOSTendpoints acceptingIdempotency-Key; persists(tenantId, idempotencyKey, requestHash, responseDigest).OptimisticConcurrencyInterceptor— translatesetag(headerIf-Matchor bodyexpectedVersion) to repository OCC and returns412on mismatch.RateLimitGuard— token-bucket per(tenantId, action); backed by Memorystore.AuditEmitter— emitsanalytics.*audit events post-success.
4. Use cases
4.1 RunWidgetDataUseCase
Reads widget data, applying cache, byte caps, and tenant scope.
export class RunWidgetDataUseCase {
constructor(
private readonly widgets: WidgetRepository,
private readonly metrics: MetricDefinitionRepository,
private readonly queries: QueryRepository,
private readonly bq: BigQueryClient,
private readonly cache: CacheClient,
private readonly publisher: EventPublisher,
private readonly clock: Clock,
) {}
async execute(input: {
tenantId: TenantId; userId: UserId; propertyAccess: readonly PropertyId[];
widgetId: WidgetId; params?: Record<string, unknown>;
correlationId: string;
}): Promise<{ rows: unknown[]; provenance: Provenance; cacheHit: boolean }> {
const widget = await this.widgets.findById(input.widgetId, input.tenantId);
if (!widget) throw new NotFoundError('widget', input.widgetId);
const cacheKey = makeWidgetCacheKey(widget, input.params);
const cached = await this.cache.get<{ rows: unknown[]; provenance: Provenance }>(cacheKey);
if (cached) return { ...cached, cacheHit: true };
const sql = await this.compileWidgetSql(widget, input.params, input.tenantId, input.propertyAccess);
const r = await this.bq.runQuery({
sql,
params: bindParams(widget, input.params, input.tenantId),
location: tenantLocation(input.tenantId),
byteCap: byteCapFor(widget),
labels: { service: 'analytics', tenant_id: input.tenantId, widget_id: input.widgetId },
deadlineMs: 5_000,
});
const provenance: Provenance = {
computedAt: this.clock.nowIso(),
computedBy: 'on_demand',
bytesScanned: r.bytesScanned, slotMs: r.slotMs, warehouseJobId: r.jobId,
};
await this.cache.set(cacheKey, { rows: r.rows, provenance }, ttlForWidget(widget));
await this.publisher.publish(envelopeFor('query.executed.v1', /* … */));
return { rows: r.rows, provenance, cacheHit: false };
}
}
The compileWidgetSql step:
- Resolves the metric or saved query by
(key, version)or id. - Re-validates that referenced tables match the metric/query's
schemaVersion(else throwsProjectionSchemaMismatchError). - Wraps the SQL in
WITH session_tenant_scope AS (SELECT '<tenantId>' AS tenant_id) …and joins on it; this is on top of the BigQuery authorized-view RLS, defense-in-depth. - Applies
propertyAccessfilter.
4.2 RunSavedQueryUseCase
Same pattern, with stricter analytics.author permission and byteCap = min(query.byteCap, perTenantRemainingDailyBudget).
4.3 ComputeMetricUseCase
Computes one metric for a window; called by ETL or on-demand by AI orchestrator.
async execute(input: {
tenantId: TenantId; metricKey: string; metricVersion: number;
windowFrom: string; windowTo: string;
filters?: Record<string, unknown>;
correlationId: string;
}): Promise<{ value: number; unit: MetricUnit; provenance: Provenance }>
After compute, publishes melmastoon.analytics.metric.computed.v1 with the value, unit, dimensions, and provenance.
4.4 RunETLJobUseCase
Orchestrates one projection refresh:
- Resolve
Projection→ MERGE SQL. - Determine the incremental window: from
max(business_date)already merged minus reprocess overlap, tonow. - Acquire a per-projection lock in Postgres (
SELECT … FOR UPDATE) so only one run at a time. - Persist
ETLRunwithstatus='running'. - Call
BigQueryClient.runMerge. - Persist results, emit
etl.completed.v1oretl.failed.v1. - Trigger downstream DQ checks scoped to the table window.
Concurrency: per-projection lock guarantees serialization; jobs across projections run in parallel up to the worker concurrency limit.
4.5 EnsureProjectionUseCase
Idempotently creates or updates the BigQuery target table to match the projection definition (via BigQueryClient.describeTable + applyDdlIfNeeded); this runs on deploy and on projection.published.
4.6 RunDataQualityCheckUseCase
Executes the DQ rule (template per DQKind), persists DataQualityResult, and emits data_quality.alert.v1 when status ∈ {warn, critical}.
4.7 CreateDashboardUseCase, AddWidgetUseCase, UpdateWidgetUseCase, ShareDashboardUseCase
Standard CRUD with OCC and authorization. ShareDashboardUseCase may issue a Looker Studio embed token (signed JWT) and persists the share row.
4.8 IngestForecastWritebackUseCase
Consumer for melmastoon.ai.forecast.produced.v1. Validates payload tenant + property scope, then MERGEs into fact_demand_forecast via BigQueryClient.runMerge. Emits projection.refreshed.v1 for fact_demand_forecast.
4.9 PurgeTenantUseCase
Consumer for melmastoon.tenant.deleted.v1. Pipeline:
- Drop tenant's authorized view bindings.
- DELETE rows from curated tables
WHERE tenant_id = …. - DELETE rows from
events_raw.*WHERE tenant_id = …(operational classes). - Anonymize regulated-class events (replace payload with
{ "anonymized": true }). - Invalidate cache prefix
widget:tnt_…:*. - Emit audit event.
5. Saga participation
analytics-service is a conformist to upstream events; it does not orchestrate sagas. It does emit data_quality.alert.v1 which notification-service consumes to fan out to tenant admins.
6. Authorization decisions
export class AnalyticsAuthorizer {
decide(perm: readonly string[], action: AuthAction, ctx: { propertyAccess: readonly PropertyId[]; ai: { capabilities: Record<string, boolean> } }): AuthorizationDecision {
switch (action.kind) {
case 'view_dashboard': return needs(perm, 'analytics.viewer');
case 'edit_dashboard': return needs(perm, 'analytics.author');
case 'run_query': return needs(perm, 'analytics.author');
case 'run_metric': return needs(perm, 'analytics.viewer');
case 'manage_projection': return needs(perm, 'analytics.admin');
case 'manage_dq': return needs(perm, 'analytics.admin');
}
}
}
Property-scope intersection is performed at compile-sql time in compileWidgetSql.
7. Cross-references
- Domain types: DOMAIN_MODEL
- Error codes: API_CONTRACTS §0.4, docs/standards/ERROR_CODES
- Outbox/inbox patterns: 04 §6
- Test scaffolding: TESTING_STRATEGY