APPLICATION_LOGIC — reporting-service
Sibling: DOMAIN_MODEL · API_CONTRACTS · EVENT_SCHEMAS · SECURITY_MODEL
Strategic anchors: 02 §7.4 Use cases & ports · 04 §6 Outbox / Inbox · standards/SERVICE_TEMPLATE
The application layer is the only place where domain aggregates meet I/O. Every entry point is a use case (<verb-noun>.use-case.ts) constructed by the NestJS DI container with ports (interfaces). All adapters live in the infrastructure layer; the application layer never imports pg, @google-cloud/*, or puppeteer directly.
1. Ports declared by this service
// src/application/ports/
export interface ReportRepository { /* save, findById, findByIdempotency */ }
export interface TemplateRepository { /* save, findById, findByKeyVersion, listByCategory */ }
export interface ScheduleRepository { /* save, findById, findDueByTenant, listActive */ }
export interface SubscriptionRepository { /* save, findById, findByReport, findActiveByReport */ }
export interface ArtifactRepository { /* save, findById, listByRun */ }
export interface RegulatorySubmissionRepository { /* save, findById, findByRun, findDueForRetry */ }
export interface EventPublisher {
publish(events: DomainEvent[], opts: { tenantId: TenantId; orderingKey: string; }): Promise<void>;
}
export interface AnalyticsClient {
runQuery(spec: { queryRef: string; params: Record<string, unknown>; tenantId: TenantId; deadlineMs: number; }): Promise<QueryResult>;
}
export interface BillingReadClient { /* getFolioRollup, getDailyCashDrawer, getInvoiceList */ }
export interface ReservationReadClient { /* getArrivalsForDate, getDeparturesForDate, getInHouse */ }
export interface InventoryReadClient { /* getOccupancyByDay */ }
export interface HousekeepingReadClient { /* getProductivityByStaff, getCleaningTimes */ }
export interface StaffReadClient { /* getHoursWorked, getShiftsByDay */ }
export interface RendererPort {
renderPdf(payload: RenderPayload, opts: RenderOpts): Promise<Buffer>;
renderXlsx(payload: RenderPayload, opts: RenderOpts): Promise<Buffer>;
renderCsvStream(payload: RenderPayload, opts: RenderOpts): NodeJS.ReadableStream;
}
export interface ArtifactStorage {
upload(input: { bucket: string; objectPath: string; body: Buffer | NodeJS.ReadableStream;
contentType: string; sha256: string;
retentionClass: RetentionClass; tenantId: TenantId; }): Promise<{ sizeBytes: number; }>;
signV4(input: { bucket: string; objectPath: string; ttlSeconds: number; }): Promise<{ url: string; expiresAt: Date; }>;
setObjectLockUntil(input: { bucket: string; objectPath: string; until: Date; }): Promise<void>;
}
export interface NotificationClient {
send(input: NotificationSendInput): Promise<{ deliveryId: string; }>;
}
export interface AIClient {
generateQuerySuggestion(input: AIQuerySuggestionInput): Promise<AIQuerySuggestionOutput>;
detectAnomalies(input: AIAnomalyInput): Promise<AIAnomalyOutput>;
}
export interface RegulatorySubmissionPort {
capabilities(): Array<{ adapterRef: string; jurisdictionCode: string; }>;
submit(input: RegulatorySubmitInput): Promise<RegulatorySubmitResult>;
}
export interface IdentityResolver {
resolveActor(req: { jwt: string; tenantHeader: string; }): Promise<ActorRef>;
isPlatformAdmin(actor: ActorRef): boolean;
isMemberOfTenant(actor: ActorRef, tenantId: TenantId): Promise<boolean>;
}
export interface Clock { now(): Date; }
export interface IdGenerator { ulid(seed?: Date): string; }
PricingClient, LockClient, etc. are not declared here — reporting reads only from analytics & service-read clients.
2. Use cases (entry points)
| Use case | Trigger | Output |
|---|---|---|
PublishTemplateVersionUseCase | POST /api/v1/reports/templates/:id/versions | TemplateVersion + template.published.v1 |
ArchiveTemplateUseCase | DELETE /api/v1/reports/templates/:id | template.archived.v1 |
CreateReportUseCase | POST /api/v1/reports | Report |
UpdateReportUseCase | PATCH /api/v1/reports/:id | updated Report |
RequestReportRunUseCase | POST /api/v1/reports/runs | ReportRun(queued) + report.requested.v1 |
StartReportRunUseCase | inbox: report.run.queued (worker) | side-effects + report.started.v1 |
CompleteReportRunUseCase | end of render pipeline (in worker) | artifacts persisted + report.completed.v1 |
FailReportRunUseCase | render error / data error | report.failed.v1; possibly re-queues |
CancelReportRunUseCase | POST /api/v1/reports/runs/:id/cancel | report.cancelled.v1 |
MintArtifactSignedUrlUseCase | GET /api/v1/reports/runs/:id/artifacts/:artId/download | 302 with V4 signed URL |
CreateScheduleUseCase | POST /api/v1/reports/schedules | ReportSchedule + schedule.created.v1 |
UpdateScheduleUseCase | PATCH /api/v1/reports/schedules/:id | updated schedule |
DisableScheduleUseCase | DELETE /api/v1/reports/schedules/:id or auto-after-failures | schedule.disabled.v1 |
FireScheduleUseCase | inbox: schedule.fired.v1 (translated from Cloud Scheduler push) | dispatches a RequestReportRunUseCase invocation |
CreateSubscriptionUseCase | POST /api/v1/reports/subscriptions | subscription.created.v1 |
CancelSubscriptionUseCase | DELETE /api/v1/reports/subscriptions/:id | subscription.cancelled.v1 |
RecordSubscriptionDeliveryUseCase | inbox: notification.delivery.recorded.v1 | flips lastDeliveryStatus, may emit report.delivered.v1 |
DispatchRegulatorySubmissionUseCase | inbox: report.completed.v1 (when regulatory) | regulatory.submission_succeeded/failed.v1 |
RetryRegulatorySubmissionUseCase | scheduler tick or POST /regulatory/submissions/:id/retry | re-attempts adapter |
PurgeTenantUseCase | inbox: tenant.deleted.v1 | tombstone aggregates per retention rules |
3. Use case detail — RequestReportRunUseCase
3.1 Inputs
export interface RequestReportRunCommand {
tenantId: TenantId;
reportId: ReportId;
templateVersionPin?: number | null; // override Report's pin for this run
filters: Record<string, unknown>; // pre-validated as ResolvedFilterSet
formats: RenderFormat[]; // 1..N
locale?: Locale;
requestedBy: ActorRef;
idempotencyKey: string; // required
correlationId: string;
}
3.2 Algorithm
export class RequestReportRunUseCase {
constructor(
private readonly reports: ReportRepository,
private readonly templates: TemplateRepository,
private readonly runs: ReportRepository,
private readonly events: EventPublisher,
private readonly identity: IdentityResolver,
private readonly clock: Clock,
private readonly idgen: IdGenerator,
) {}
async execute(cmd: RequestReportRunCommand): Promise<ReportRun> {
// 1. authorize
if (!(await this.identity.isMemberOfTenant(cmd.requestedBy, cmd.tenantId))) {
throw new PermissionDeniedError();
}
// 2. idempotency — same key + same body returns the existing run
const existing = await this.runs.findByIdempotency(cmd.tenantId, cmd.idempotencyKey);
if (existing) {
assertSameBody(existing, cmd); // throws IDEMPOTENCY_KEY_REUSED if differing
return existing;
}
// 3. resolve template version
const report = await this.reports.findById(cmd.tenantId, cmd.reportId);
const versionNumber = cmd.templateVersionPin ?? report.templateVersionPin
?? (await this.templates.latestVersion(report.templateId)).versionNumber;
const tv = await this.templates.findByKeyVersion(report.templateId, versionNumber);
// 4. validate filters + formats
const resolved = FilterValidator.validate(cmd.filters, tv.filters);
if (!cmd.formats.every(f => tv.supportedFormats.includes(f))) {
throw new FormatNotSupportedError();
}
// 5. create + persist the run + outbox in one transaction
const run = ReportRun.createQueued({
id: ReportRunId.generate(this.clock, this.idgen),
tenantId: cmd.tenantId,
reportId: report.id,
templateId: tv.templateId,
templateVersionId: tv.id,
templateVersionNumber: tv.versionNumber,
resolvedFilters: resolved,
requestedFormats: cmd.formats,
requestedBy: cmd.requestedBy,
correlationId: cmd.correlationId,
idempotencyKey: cmd.idempotencyKey,
queuedAt: this.clock.now(),
});
const requested = ReportRequestedEvent.from(run);
const queued = ReportRunQueuedInternal.from(run); // internal subject for worker queue
await this.runs.saveWithEvents(run, [requested, queued]);
await this.events.publish([requested, queued], {
tenantId: cmd.tenantId,
orderingKey: `${cmd.tenantId}:${run.id}`,
});
return run;
}
}
3.3 Edge cases
| Case | Handling |
|---|---|
| Same idempotency key + different body | MELMASTOON.SYNC.IDEMPOTENCY_KEY_REUSED |
| Template archived | MELMASTOON.REPORTING.TEMPLATE_LOCKED (404 surface — caller saw archived) |
| Format not in supported set | MELMASTOON.REPORTING.FORMAT_NOT_SUPPORTED |
| Filters invalid | MELMASTOON.REPORTING.FILTER_INVALID with errors[] enumerating each field |
| Tenant suspended | MELMASTOON.TENANT.SUSPENDED (gate at controller guard) |
| Per-tenant concurrency cap exceeded | MELMASTOON.GENERAL.RATE_LIMITED with retryAfter (default cap: 20 in-flight runs per tenant) |
4. Use case detail — StartReportRunUseCase (worker)
This runs in the render-worker Cloud Run service. It pulls a queue message via Pub/Sub push subscription, restores the trace context, and executes:
4.1 Algorithm
export class StartReportRunUseCase {
constructor(
private readonly runs: ReportRepository,
private readonly templates: TemplateRepository,
private readonly artifacts: ArtifactRepository,
private readonly events: EventPublisher,
private readonly storage: ArtifactStorage,
private readonly renderer: RendererPort,
private readonly analytics: AnalyticsClient,
private readonly billingRead: BillingReadClient,
private readonly reservationRead: ReservationReadClient,
private readonly inventoryRead: InventoryReadClient,
private readonly hkRead: HousekeepingReadClient,
private readonly staffRead: StaffReadClient,
private readonly ai: AIClient,
private readonly clock: Clock,
private readonly idgen: IdGenerator,
) {}
async execute(input: { runId: ReportRunId; tenantId: TenantId; }): Promise<void> {
const run = await this.runs.findById(input.tenantId, input.runId);
if (run.status !== 'queued' && run.status !== 'failed') return; // idempotent re-delivery
run.start(this.clock.now());
await this.runs.saveWithEvents(run, [ReportStartedEvent.from(run)]);
try {
// 1. fetch inputs
const tv = await this.templates.findById(run.templateVersionId);
const data = await this.fetchInputs(tv, run);
// 2. optional AI anomaly callouts
const anomalies = tv.layout.some(b => b.kind === 'narrative' && /\{\{anomalies/.test((b.html as any).default))
? await this.ai.detectAnomalies({ tenantId: run.tenantId, templateKey: tv.templateId, data })
: null;
// 3. render every requested format
const payload: RenderPayload = { templateVersion: tv, data, anomalies, locale: run.resolvedFilters.locale ?? tv.defaultLocale };
run.markRendering(this.clock.now());
await this.runs.save(run);
for (const fmt of run.requestedFormats) {
const buf = fmt === 'pdf' ? await this.renderer.renderPdf(payload, {})
: fmt === 'xlsx' ? await this.renderer.renderXlsx(payload, {})
: null;
const stream = fmt === 'csv' ? this.renderer.renderCsvStream(payload, {}) : null;
const { sha256, sizeBytes, objectPath } = await this.upload(run, fmt, buf ?? stream!, payload);
const art = ExportArtifact.create({
id: ExportArtifactId.generate(this.clock, this.idgen),
tenantId: run.tenantId, runId: run.id, format: fmt, locale: payload.locale,
bucket: BUCKET_FOR_REGION(run.tenantId), objectPath, sha256, sizeBytes,
retentionClass: RetentionClassifier.classify(tv, /* tenantRegion */ ''),
producedAt: this.clock.now(),
});
await this.artifacts.save(art);
run.attachArtifact(art.id);
}
// 4. dispatch deliveries handled by inbox handler on report.completed.v1; we just mark completed
const completed = ReportCompletedEvent.from(run);
run.markCompleted(this.clock.now());
await this.runs.saveWithEvents(run, [completed]);
await this.events.publish([completed], { tenantId: run.tenantId, orderingKey: `${run.tenantId}:${run.id}` });
} catch (err) {
const failure = classifyFailure(err);
run.markFailed(this.clock.now(), failure.code, failure.detail, failure.retriable);
const event = ReportFailedEvent.from(run);
await this.runs.saveWithEvents(run, [event]);
await this.events.publish([event], { tenantId: run.tenantId, orderingKey: `${run.tenantId}:${run.id}` });
if (failure.retriable && run.retryCount < run.maxRetries) {
await this.events.publish([ReportRunRequeueInternal.fromBackoff(run)], { tenantId: run.tenantId, orderingKey: `${run.tenantId}:${run.id}` });
}
}
}
private async fetchInputs(tv: TemplateVersion, run: ReportRun): Promise<unknown> { /* dispatch by DataSourceSpec.kind */ }
private async upload(run: ReportRun, fmt: RenderFormat, body: any, payload: RenderPayload): Promise<{ sha256: string; sizeBytes: number; objectPath: string; }> { /* … */ }
}
4.2 Streaming guard for large datasets
- For tabular formats above
templateVersion.rowCap(default 50 000) the renderer must stream —renderCsvStreamalways streams;renderXlsxswitches toexceljs.stream.xlsx.WorkbookWriter;renderPdfpaginates table blocks at 200 rows per page using Puppeteer's print-page CSS. - The worker enforces a hard
MAX_DATA_ROWS=500_000and aRENDER_TIMEOUT_MS=180_000. Exceeding either fails the run withMELMASTOON.REPORTING.RENDER_BUDGET_EXCEEDED— non-retriable; staff must split the filter.
5. Use case detail — DispatchRegulatorySubmissionUseCase
Triggered by inbox handler on melmastoon.reporting.report.completed.v1. Filters to runs whose template is regulatory == true.
export class DispatchRegulatorySubmissionUseCase {
async execute(input: { runId: ReportRunId; tenantId: TenantId; }) {
const run = await this.runs.findById(input.tenantId, input.runId);
const tv = await this.templates.findById(run.templateVersionId);
if (!tv.regulatory) return;
const tenant = await this.tenantRead.find(input.tenantId);
const adapterRef = `${tenant.jurisdictionCode}.${tv.dataSourceSpec.primary.queryRef}`;
if (!this.regulatory.capabilities().some(c => c.adapterRef === adapterRef)) {
throw new RegulatoryAdapterMissingError();
}
const sub = RegulatorySubmission.createPending({
id: RegulatorySubmissionId.generate(this.clock, this.idgen),
tenantId: input.tenantId, runId: run.id,
artifactId: pickArtifactByFormat(run, tv.preferredSubmissionFormat),
jurisdictionCode: tenant.jurisdictionCode,
adapterRef,
maxAttempts: 5,
});
await this.regulatorRepo.save(sub);
try {
const result = await this.regulatory.submit({
adapterRef, jurisdictionCode: tenant.jurisdictionCode,
artifactSignedUrl: await this.storage.signV4({ /* … */ }),
tenant, run, tv,
});
sub.markSucceeded(this.clock.now(), result.proof);
await this.regulatorRepo.save(sub);
await this.events.publish([RegulatorySucceededEvent.from(sub)], { tenantId: input.tenantId, orderingKey: `${input.tenantId}:${sub.id}` });
} catch (err) {
const cls = classifySubmissionFailure(err);
sub.markFailedAttempt(this.clock.now(), cls.code, cls.detail, cls.retriable);
await this.regulatorRepo.save(sub);
if (sub.status === 'failed') {
await this.events.publish([RegulatoryFailedEvent.from(sub)], { tenantId: input.tenantId, orderingKey: `${input.tenantId}:${sub.id}` });
await this.events.publish([RegulatorySubmissionDueEscalation.from(sub)], { tenantId: input.tenantId, orderingKey: `${input.tenantId}:${sub.id}` });
} else {
// schedule retry via Cloud Scheduler internal-jobs queue; nextAttemptAt set by aggregate
}
}
}
}
6. Authorization decisions (RBAC + ABAC)
Resolved by an AuthorizationDecider invoked at controller guard time. Full RBAC matrix lives in SECURITY_MODEL §2. Highlights:
| Action | Roles allowed | Attribute checks |
|---|---|---|
PublishTemplateVersion (platform-shared, tenantId==null) | platform_admin | none |
PublishTemplateVersion (tenant-private) | tenant_admin, gm, owner | tenant member |
RequestReportRun | staff_front_desk, gm, finance, owner; system for scheduled | tenant member; property scope from filters |
MintArtifactSignedUrl | requester of the run, gm, owner, finance, auditor; platform_admin only via break-glass audit | tenant member; URL TTL hard-capped at 15 min |
CreateSchedule | gm, owner, finance | tenant member |
RetryRegulatorySubmission | gm, owner, finance, compliance_officer | tenant member; written justification required |
PurgeTenantUseCase | system (event-driven only) | event signed by tenant-service IAM principal |
Every authorization denial emits audit.permission_denied.v1.
7. Saga participation
reporting-service does not orchestrate sagas. It participates only as:
- A producer of
melmastoon.reporting.regulatory.submission_due.v1— alertsnotification-serviceto alarm tenant.owner. - A consumer of
melmastoon.tenant.deleted.v1(purge cascade) andmelmastoon.notification.delivery.recorded.v1(subscription delivery confirmation).
There is no compensating action across services — a failed report does not trigger compensation in billing or reservations because reporting is read-only of those services.
8. Inbox handler routing
| Subject | Handler | Use case |
|---|---|---|
internal report.run.queued | WorkerInbox.onReportRunQueued | StartReportRunUseCase |
melmastoon.reporting.report.completed.v1 | Self.onReportCompleted | DispatchSubscriptionDeliveriesUseCase + (if regulatory) DispatchRegulatorySubmissionUseCase |
melmastoon.notification.delivery.recorded.v1 | Self.onNotificationDelivered | RecordSubscriptionDeliveryUseCase |
melmastoon.tenant.deleted.v1 | Self.onTenantDeleted | PurgeTenantUseCase |
melmastoon.tenant.settings.changed.v1 | Self.onTenantSettingsChanged | invalidate tenant cache (jurisdiction, branding) |
melmastoon.theme_config.theme.updated.v1 | Self.onThemeUpdated | invalidate tenant branding cache |
melmastoon.analytics.projection.refreshed.v1 | Self.onProjectionRefreshed | (optional) trigger near-realtime template re-render for streaming reports |
Cloud Scheduler push /internal/jobs/schedule-fire | SchedulerInbox.onSchedulerHit | FireScheduleUseCase → RequestReportRunUseCase |
Cloud Scheduler push /internal/jobs/regulatory-retry | SchedulerInbox.onRegulatoryRetryDue | RetryRegulatorySubmissionUseCase |
All handlers run inside InboxIdempotency middleware that checks the inbox_processed table by (subject, messageId) to make Pub/Sub at-least-once delivery safe.
9. Cross-references
- Domain shapes & invariants: DOMAIN_MODEL
- REST endpoints: API_CONTRACTS
- Event payloads: EVENT_SCHEMAS
- AI flows used here: AI_INTEGRATION
- Tests required: TESTING_STRATEGY