Skip to main content

APPLICATION_LOGIC — reporting-service

Sibling: DOMAIN_MODEL · API_CONTRACTS · EVENT_SCHEMAS · SECURITY_MODEL

Strategic anchors: 02 §7.4 Use cases & ports · 04 §6 Outbox / Inbox · standards/SERVICE_TEMPLATE

The application layer is the only place where domain aggregates meet I/O. Every entry point is a use case (<verb-noun>.use-case.ts) constructed by the NestJS DI container with ports (interfaces). All adapters live in the infrastructure layer; the application layer never imports pg, @google-cloud/*, or puppeteer directly.


1. Ports declared by this service

// src/application/ports/

export interface ReportRepository { /* save, findById, findByIdempotency */ }
export interface TemplateRepository { /* save, findById, findByKeyVersion, listByCategory */ }
export interface ScheduleRepository { /* save, findById, findDueByTenant, listActive */ }
export interface SubscriptionRepository { /* save, findById, findByReport, findActiveByReport */ }
export interface ArtifactRepository { /* save, findById, listByRun */ }
export interface RegulatorySubmissionRepository { /* save, findById, findByRun, findDueForRetry */ }

export interface EventPublisher {
publish(events: DomainEvent[], opts: { tenantId: TenantId; orderingKey: string; }): Promise<void>;
}

export interface AnalyticsClient {
runQuery(spec: { queryRef: string; params: Record<string, unknown>; tenantId: TenantId; deadlineMs: number; }): Promise<QueryResult>;
}
export interface BillingReadClient { /* getFolioRollup, getDailyCashDrawer, getInvoiceList */ }
export interface ReservationReadClient { /* getArrivalsForDate, getDeparturesForDate, getInHouse */ }
export interface InventoryReadClient { /* getOccupancyByDay */ }
export interface HousekeepingReadClient { /* getProductivityByStaff, getCleaningTimes */ }
export interface StaffReadClient { /* getHoursWorked, getShiftsByDay */ }

export interface RendererPort {
renderPdf(payload: RenderPayload, opts: RenderOpts): Promise<Buffer>;
renderXlsx(payload: RenderPayload, opts: RenderOpts): Promise<Buffer>;
renderCsvStream(payload: RenderPayload, opts: RenderOpts): NodeJS.ReadableStream;
}

export interface ArtifactStorage {
upload(input: { bucket: string; objectPath: string; body: Buffer | NodeJS.ReadableStream;
contentType: string; sha256: string;
retentionClass: RetentionClass; tenantId: TenantId; }): Promise<{ sizeBytes: number; }>;
signV4(input: { bucket: string; objectPath: string; ttlSeconds: number; }): Promise<{ url: string; expiresAt: Date; }>;
setObjectLockUntil(input: { bucket: string; objectPath: string; until: Date; }): Promise<void>;
}

export interface NotificationClient {
send(input: NotificationSendInput): Promise<{ deliveryId: string; }>;
}

export interface AIClient {
generateQuerySuggestion(input: AIQuerySuggestionInput): Promise<AIQuerySuggestionOutput>;
detectAnomalies(input: AIAnomalyInput): Promise<AIAnomalyOutput>;
}

export interface RegulatorySubmissionPort {
capabilities(): Array<{ adapterRef: string; jurisdictionCode: string; }>;
submit(input: RegulatorySubmitInput): Promise<RegulatorySubmitResult>;
}

export interface IdentityResolver {
resolveActor(req: { jwt: string; tenantHeader: string; }): Promise<ActorRef>;
isPlatformAdmin(actor: ActorRef): boolean;
isMemberOfTenant(actor: ActorRef, tenantId: TenantId): Promise<boolean>;
}

export interface Clock { now(): Date; }
export interface IdGenerator { ulid(seed?: Date): string; }

PricingClient, LockClient, etc. are not declared here — reporting reads only from analytics & service-read clients.


2. Use cases (entry points)

Use caseTriggerOutput
PublishTemplateVersionUseCasePOST /api/v1/reports/templates/:id/versionsTemplateVersion + template.published.v1
ArchiveTemplateUseCaseDELETE /api/v1/reports/templates/:idtemplate.archived.v1
CreateReportUseCasePOST /api/v1/reportsReport
UpdateReportUseCasePATCH /api/v1/reports/:idupdated Report
RequestReportRunUseCasePOST /api/v1/reports/runsReportRun(queued) + report.requested.v1
StartReportRunUseCaseinbox: report.run.queued (worker)side-effects + report.started.v1
CompleteReportRunUseCaseend of render pipeline (in worker)artifacts persisted + report.completed.v1
FailReportRunUseCaserender error / data errorreport.failed.v1; possibly re-queues
CancelReportRunUseCasePOST /api/v1/reports/runs/:id/cancelreport.cancelled.v1
MintArtifactSignedUrlUseCaseGET /api/v1/reports/runs/:id/artifacts/:artId/download302 with V4 signed URL
CreateScheduleUseCasePOST /api/v1/reports/schedulesReportSchedule + schedule.created.v1
UpdateScheduleUseCasePATCH /api/v1/reports/schedules/:idupdated schedule
DisableScheduleUseCaseDELETE /api/v1/reports/schedules/:id or auto-after-failuresschedule.disabled.v1
FireScheduleUseCaseinbox: schedule.fired.v1 (translated from Cloud Scheduler push)dispatches a RequestReportRunUseCase invocation
CreateSubscriptionUseCasePOST /api/v1/reports/subscriptionssubscription.created.v1
CancelSubscriptionUseCaseDELETE /api/v1/reports/subscriptions/:idsubscription.cancelled.v1
RecordSubscriptionDeliveryUseCaseinbox: notification.delivery.recorded.v1flips lastDeliveryStatus, may emit report.delivered.v1
DispatchRegulatorySubmissionUseCaseinbox: report.completed.v1 (when regulatory)regulatory.submission_succeeded/failed.v1
RetryRegulatorySubmissionUseCasescheduler tick or POST /regulatory/submissions/:id/retryre-attempts adapter
PurgeTenantUseCaseinbox: tenant.deleted.v1tombstone aggregates per retention rules

3. Use case detail — RequestReportRunUseCase

3.1 Inputs

export interface RequestReportRunCommand {
tenantId: TenantId;
reportId: ReportId;
templateVersionPin?: number | null; // override Report's pin for this run
filters: Record<string, unknown>; // pre-validated as ResolvedFilterSet
formats: RenderFormat[]; // 1..N
locale?: Locale;
requestedBy: ActorRef;
idempotencyKey: string; // required
correlationId: string;
}

3.2 Algorithm

export class RequestReportRunUseCase {
constructor(
private readonly reports: ReportRepository,
private readonly templates: TemplateRepository,
private readonly runs: ReportRepository,
private readonly events: EventPublisher,
private readonly identity: IdentityResolver,
private readonly clock: Clock,
private readonly idgen: IdGenerator,
) {}

async execute(cmd: RequestReportRunCommand): Promise<ReportRun> {
// 1. authorize
if (!(await this.identity.isMemberOfTenant(cmd.requestedBy, cmd.tenantId))) {
throw new PermissionDeniedError();
}

// 2. idempotency — same key + same body returns the existing run
const existing = await this.runs.findByIdempotency(cmd.tenantId, cmd.idempotencyKey);
if (existing) {
assertSameBody(existing, cmd); // throws IDEMPOTENCY_KEY_REUSED if differing
return existing;
}

// 3. resolve template version
const report = await this.reports.findById(cmd.tenantId, cmd.reportId);
const versionNumber = cmd.templateVersionPin ?? report.templateVersionPin
?? (await this.templates.latestVersion(report.templateId)).versionNumber;
const tv = await this.templates.findByKeyVersion(report.templateId, versionNumber);

// 4. validate filters + formats
const resolved = FilterValidator.validate(cmd.filters, tv.filters);
if (!cmd.formats.every(f => tv.supportedFormats.includes(f))) {
throw new FormatNotSupportedError();
}

// 5. create + persist the run + outbox in one transaction
const run = ReportRun.createQueued({
id: ReportRunId.generate(this.clock, this.idgen),
tenantId: cmd.tenantId,
reportId: report.id,
templateId: tv.templateId,
templateVersionId: tv.id,
templateVersionNumber: tv.versionNumber,
resolvedFilters: resolved,
requestedFormats: cmd.formats,
requestedBy: cmd.requestedBy,
correlationId: cmd.correlationId,
idempotencyKey: cmd.idempotencyKey,
queuedAt: this.clock.now(),
});

const requested = ReportRequestedEvent.from(run);
const queued = ReportRunQueuedInternal.from(run); // internal subject for worker queue

await this.runs.saveWithEvents(run, [requested, queued]);
await this.events.publish([requested, queued], {
tenantId: cmd.tenantId,
orderingKey: `${cmd.tenantId}:${run.id}`,
});
return run;
}
}

3.3 Edge cases

CaseHandling
Same idempotency key + different bodyMELMASTOON.SYNC.IDEMPOTENCY_KEY_REUSED
Template archivedMELMASTOON.REPORTING.TEMPLATE_LOCKED (404 surface — caller saw archived)
Format not in supported setMELMASTOON.REPORTING.FORMAT_NOT_SUPPORTED
Filters invalidMELMASTOON.REPORTING.FILTER_INVALID with errors[] enumerating each field
Tenant suspendedMELMASTOON.TENANT.SUSPENDED (gate at controller guard)
Per-tenant concurrency cap exceededMELMASTOON.GENERAL.RATE_LIMITED with retryAfter (default cap: 20 in-flight runs per tenant)

4. Use case detail — StartReportRunUseCase (worker)

This runs in the render-worker Cloud Run service. It pulls a queue message via Pub/Sub push subscription, restores the trace context, and executes:

4.1 Algorithm

export class StartReportRunUseCase {
constructor(
private readonly runs: ReportRepository,
private readonly templates: TemplateRepository,
private readonly artifacts: ArtifactRepository,
private readonly events: EventPublisher,
private readonly storage: ArtifactStorage,
private readonly renderer: RendererPort,
private readonly analytics: AnalyticsClient,
private readonly billingRead: BillingReadClient,
private readonly reservationRead: ReservationReadClient,
private readonly inventoryRead: InventoryReadClient,
private readonly hkRead: HousekeepingReadClient,
private readonly staffRead: StaffReadClient,
private readonly ai: AIClient,
private readonly clock: Clock,
private readonly idgen: IdGenerator,
) {}

async execute(input: { runId: ReportRunId; tenantId: TenantId; }): Promise<void> {
const run = await this.runs.findById(input.tenantId, input.runId);
if (run.status !== 'queued' && run.status !== 'failed') return; // idempotent re-delivery
run.start(this.clock.now());
await this.runs.saveWithEvents(run, [ReportStartedEvent.from(run)]);

try {
// 1. fetch inputs
const tv = await this.templates.findById(run.templateVersionId);
const data = await this.fetchInputs(tv, run);

// 2. optional AI anomaly callouts
const anomalies = tv.layout.some(b => b.kind === 'narrative' && /\{\{anomalies/.test((b.html as any).default))
? await this.ai.detectAnomalies({ tenantId: run.tenantId, templateKey: tv.templateId, data })
: null;

// 3. render every requested format
const payload: RenderPayload = { templateVersion: tv, data, anomalies, locale: run.resolvedFilters.locale ?? tv.defaultLocale };
run.markRendering(this.clock.now());
await this.runs.save(run);

for (const fmt of run.requestedFormats) {
const buf = fmt === 'pdf' ? await this.renderer.renderPdf(payload, {})
: fmt === 'xlsx' ? await this.renderer.renderXlsx(payload, {})
: null;
const stream = fmt === 'csv' ? this.renderer.renderCsvStream(payload, {}) : null;
const { sha256, sizeBytes, objectPath } = await this.upload(run, fmt, buf ?? stream!, payload);
const art = ExportArtifact.create({
id: ExportArtifactId.generate(this.clock, this.idgen),
tenantId: run.tenantId, runId: run.id, format: fmt, locale: payload.locale,
bucket: BUCKET_FOR_REGION(run.tenantId), objectPath, sha256, sizeBytes,
retentionClass: RetentionClassifier.classify(tv, /* tenantRegion */ ''),
producedAt: this.clock.now(),
});
await this.artifacts.save(art);
run.attachArtifact(art.id);
}

// 4. dispatch deliveries handled by inbox handler on report.completed.v1; we just mark completed
const completed = ReportCompletedEvent.from(run);
run.markCompleted(this.clock.now());
await this.runs.saveWithEvents(run, [completed]);
await this.events.publish([completed], { tenantId: run.tenantId, orderingKey: `${run.tenantId}:${run.id}` });
} catch (err) {
const failure = classifyFailure(err);
run.markFailed(this.clock.now(), failure.code, failure.detail, failure.retriable);
const event = ReportFailedEvent.from(run);
await this.runs.saveWithEvents(run, [event]);
await this.events.publish([event], { tenantId: run.tenantId, orderingKey: `${run.tenantId}:${run.id}` });
if (failure.retriable && run.retryCount < run.maxRetries) {
await this.events.publish([ReportRunRequeueInternal.fromBackoff(run)], { tenantId: run.tenantId, orderingKey: `${run.tenantId}:${run.id}` });
}
}
}

private async fetchInputs(tv: TemplateVersion, run: ReportRun): Promise<unknown> { /* dispatch by DataSourceSpec.kind */ }
private async upload(run: ReportRun, fmt: RenderFormat, body: any, payload: RenderPayload): Promise<{ sha256: string; sizeBytes: number; objectPath: string; }> { /* … */ }
}

4.2 Streaming guard for large datasets

  • For tabular formats above templateVersion.rowCap (default 50 000) the renderer must stream — renderCsvStream always streams; renderXlsx switches to exceljs.stream.xlsx.WorkbookWriter; renderPdf paginates table blocks at 200 rows per page using Puppeteer's print-page CSS.
  • The worker enforces a hard MAX_DATA_ROWS=500_000 and a RENDER_TIMEOUT_MS=180_000. Exceeding either fails the run with MELMASTOON.REPORTING.RENDER_BUDGET_EXCEEDED — non-retriable; staff must split the filter.

5. Use case detail — DispatchRegulatorySubmissionUseCase

Triggered by inbox handler on melmastoon.reporting.report.completed.v1. Filters to runs whose template is regulatory == true.

export class DispatchRegulatorySubmissionUseCase {
async execute(input: { runId: ReportRunId; tenantId: TenantId; }) {
const run = await this.runs.findById(input.tenantId, input.runId);
const tv = await this.templates.findById(run.templateVersionId);
if (!tv.regulatory) return;

const tenant = await this.tenantRead.find(input.tenantId);
const adapterRef = `${tenant.jurisdictionCode}.${tv.dataSourceSpec.primary.queryRef}`;
if (!this.regulatory.capabilities().some(c => c.adapterRef === adapterRef)) {
throw new RegulatoryAdapterMissingError();
}

const sub = RegulatorySubmission.createPending({
id: RegulatorySubmissionId.generate(this.clock, this.idgen),
tenantId: input.tenantId, runId: run.id,
artifactId: pickArtifactByFormat(run, tv.preferredSubmissionFormat),
jurisdictionCode: tenant.jurisdictionCode,
adapterRef,
maxAttempts: 5,
});
await this.regulatorRepo.save(sub);

try {
const result = await this.regulatory.submit({
adapterRef, jurisdictionCode: tenant.jurisdictionCode,
artifactSignedUrl: await this.storage.signV4({ /* … */ }),
tenant, run, tv,
});
sub.markSucceeded(this.clock.now(), result.proof);
await this.regulatorRepo.save(sub);
await this.events.publish([RegulatorySucceededEvent.from(sub)], { tenantId: input.tenantId, orderingKey: `${input.tenantId}:${sub.id}` });
} catch (err) {
const cls = classifySubmissionFailure(err);
sub.markFailedAttempt(this.clock.now(), cls.code, cls.detail, cls.retriable);
await this.regulatorRepo.save(sub);
if (sub.status === 'failed') {
await this.events.publish([RegulatoryFailedEvent.from(sub)], { tenantId: input.tenantId, orderingKey: `${input.tenantId}:${sub.id}` });
await this.events.publish([RegulatorySubmissionDueEscalation.from(sub)], { tenantId: input.tenantId, orderingKey: `${input.tenantId}:${sub.id}` });
} else {
// schedule retry via Cloud Scheduler internal-jobs queue; nextAttemptAt set by aggregate
}
}
}
}

6. Authorization decisions (RBAC + ABAC)

Resolved by an AuthorizationDecider invoked at controller guard time. Full RBAC matrix lives in SECURITY_MODEL §2. Highlights:

ActionRoles allowedAttribute checks
PublishTemplateVersion (platform-shared, tenantId==null)platform_adminnone
PublishTemplateVersion (tenant-private)tenant_admin, gm, ownertenant member
RequestReportRunstaff_front_desk, gm, finance, owner; system for scheduledtenant member; property scope from filters
MintArtifactSignedUrlrequester of the run, gm, owner, finance, auditor; platform_admin only via break-glass audittenant member; URL TTL hard-capped at 15 min
CreateSchedulegm, owner, financetenant member
RetryRegulatorySubmissiongm, owner, finance, compliance_officertenant member; written justification required
PurgeTenantUseCasesystem (event-driven only)event signed by tenant-service IAM principal

Every authorization denial emits audit.permission_denied.v1.


7. Saga participation

reporting-service does not orchestrate sagas. It participates only as:

  • A producer of melmastoon.reporting.regulatory.submission_due.v1 — alerts notification-service to alarm tenant.owner.
  • A consumer of melmastoon.tenant.deleted.v1 (purge cascade) and melmastoon.notification.delivery.recorded.v1 (subscription delivery confirmation).

There is no compensating action across services — a failed report does not trigger compensation in billing or reservations because reporting is read-only of those services.


8. Inbox handler routing

SubjectHandlerUse case
internal report.run.queuedWorkerInbox.onReportRunQueuedStartReportRunUseCase
melmastoon.reporting.report.completed.v1Self.onReportCompletedDispatchSubscriptionDeliveriesUseCase + (if regulatory) DispatchRegulatorySubmissionUseCase
melmastoon.notification.delivery.recorded.v1Self.onNotificationDeliveredRecordSubscriptionDeliveryUseCase
melmastoon.tenant.deleted.v1Self.onTenantDeletedPurgeTenantUseCase
melmastoon.tenant.settings.changed.v1Self.onTenantSettingsChangedinvalidate tenant cache (jurisdiction, branding)
melmastoon.theme_config.theme.updated.v1Self.onThemeUpdatedinvalidate tenant branding cache
melmastoon.analytics.projection.refreshed.v1Self.onProjectionRefreshed(optional) trigger near-realtime template re-render for streaming reports
Cloud Scheduler push /internal/jobs/schedule-fireSchedulerInbox.onSchedulerHitFireScheduleUseCaseRequestReportRunUseCase
Cloud Scheduler push /internal/jobs/regulatory-retrySchedulerInbox.onRegulatoryRetryDueRetryRegulatorySubmissionUseCase

All handlers run inside InboxIdempotency middleware that checks the inbox_processed table by (subject, messageId) to make Pub/Sub at-least-once delivery safe.


9. Cross-references