Skip to main content

file-storage-service — APPLICATION_LOGIC

Companion: DOMAIN_MODEL · API_CONTRACTS · EVENT_SCHEMAS · SECURITY_MODEL · DATA_MODEL

This document defines the application layer: ports, use cases (commands), queries, orchestration flows, saga participation, and the precise sequencing of every cross-system effect. The application layer is the only place that talks to ports; the domain layer never depends on infrastructure.


1. Ports

// application/ports/blob-storage.port.ts
export interface BlobStoragePort {
issueSignedUploadUrl(req: SignedUploadRequest): Promise<SignedUploadResult>;
issueSignedDownloadUrl(req: SignedDownloadRequest): Promise<SignedDownloadResult>;
head(key: ObjectKey): Promise<HeadResult | null>; // size, sha256 (if metadata), generation
copy(src: ObjectKey, dst: ObjectKey): Promise<void>;
delete(key: ObjectKey): Promise<void>; // hard delete
abortResumable(uploadId: string): Promise<void>;
}

// application/ports/scan.port.ts
export interface ScanPort {
requestScan(req: { tenantId: TenantId; fileObjectId: FileObjectId; objectKey: ObjectKey; scanner: 'clamav' | 'cloud_dlp' }): Promise<void>;
}

// application/ports/image-optimizer.port.ts
export interface ImageOptimizerPort {
enqueue(req: { tenantId: TenantId; fileObjectId: FileObjectId; presets: VariantPreset[] }): Promise<void>;
}

// application/ports/cdn-invalidation.port.ts
export interface CdnInvalidationPort {
invalidate(paths: string[]): Promise<void>; // CDN-relative paths (e.g., /tenants/.../property_photo/...)
}

// application/ports/quota.port.ts
export interface QuotaPort {
current(tenantId: TenantId): Promise<{ bytes: bigint; objects: number; capBytes: bigint; capObjects: number }>;
reserve(tenantId: TenantId, bytes: bigint, objects: number): Promise<void>;
release(tenantId: TenantId, bytes: bigint, objects: number): Promise<void>;
}

// application/ports/event-publisher.port.ts
export interface EventPublisher {
publishWithinTx(events: DomainEvent[], tx: Tx): Promise<void>;
}

// application/ports/clock.port.ts
export interface Clock { now(): Date; }

Every port has at least one in-memory test double in application/__test__/.


2. Use Cases (Commands)

2.1 InitiateUploadUseCase

Input:

type InitiateUploadCommand = {
tenantId: TenantId;
actor: ActorRef;
scope: Scope;
contentType: string;
bytes: number; // declared by client; verified at confirm
ownerScopeRefs: OwnerScopeRefs;
retentionPolicyName?: RetentionPolicyName; // optional override; resolved per scope+jurisdiction otherwise
preferredLocale?: Locale;
filenameHint?: string;
resumable?: boolean; // default true if bytes > 8 MiB
idempotencyKey: IdempotencyKey;
};

Flow:

  1. Resolve Bucket and DataClass from scope (table in SECURITY_MODEL §2).
  2. Resolve RetentionPolicy (caller hint → per-scope default → per-jurisdiction override from tenant-service cache).
  3. Construct ContentType.from(contentType, scope) and ByteSize.from(bytes, scope) (domain validation).
  4. Check QuotaPort.current(tenantId); if bytes + currentBytes > capBytesQuotaExceededError. Reserve speculatively (released on session expiry / abort).
  5. Mint FileObjectId med_…, build ObjectKey.for(tenantId, scope, dateShardedRelPath(filenameHint, fileObjectId)).
  6. FileObject.initiate({...}) (domain).
  7. BlobStoragePort.issueSignedUploadUrl({ key, contentType, expiresIn: 600s, resumable }).
  8. Persist FileObject + UploadSession + Outbox(file.upload.initiated.v1) in a single transaction.
  9. Return { fileObjectId, uploadSessionId, signedUrl, requiredHeaders, expiresAt, resumableSessionUri? }.

Idempotency: the (tenantId, idempotencyKey) pair returns the existing session if an active one exists.

Error mapping: see DOMAIN_MODEL §11 plus quota-specific codes.

2.2 ConfirmUploadUseCase

Input:

type ConfirmUploadCommand = {
tenantId: TenantId;
uploadSessionId: UploadSessionId;
declaredSha256: Sha256;
actor: ActorRef;
};

Flow:

  1. Load UploadSession (must be open, must belong to caller tenant).
  2. BlobStoragePort.head(session.objectKey) to get bytes + (if available) sha256 from object metadata. If absent, the GCS adapter falls back to a streaming HMAC compare against declaredSha256 over the object body (capped at scopeMaxBytes).
  3. Verify hash and size; if mismatch → mark session aborted, softDelete the partial object, raise MELMASTOON.FILE.HASH_MISMATCH.
  4. Dedupe: look up (tenantId, scope, sha256) in file_objects. If a ready row exists, mark this FileObject as alias (asAliasOf(canonicalId)), copy the canonical's variants by reference, skip scan + optimization, jump straight to ready. Return { fileObjectId: canonicalId, alias: true }.
  5. Otherwise, FileObject.confirmUpload({byteSize, sha256, now}) and FileObject.beginScan(now).
  6. ScanPort.requestScan(...) (Pub/Sub message).
  7. Persist FileObject + close UploadSession + emit file.upload.completed.v1 + file.scan.requested.v1 in one tx.
  8. Note: image optimization is NOT enqueued until scan.passed.v1 arrives (saga step 3.1).

2.3 AbortUploadUseCase

Marks UploadSession aborted, calls BlobStoragePort.abortResumable (best-effort), softDeletes the FileObject (which never reached ready), releases reserved quota, emits file.upload.failed.v1 with reason: 'aborted_by_caller' | 'session_expired'.

2.4 IssueDownloadUrlUseCase

Input:

type IssueDownloadUrlCommand = {
tenantId: TenantId;
fileObjectId: FileObjectId;
variant?: VariantPreset; // default: original (file_objects.objectKey)
ttlSeconds?: number; // 60 ≤ ttl ≤ 3600; default 300
actor: ActorRef;
purpose: 'view' | 'download' | 'attach' | 'embed';
callerIp: string | null;
callerUserAgent: string | null;
};

Flow:

  1. Load FileObject (RLS gates by tenantId).
  2. Resolve to canonical if alias.
  3. Status gate:
    • ready → continue.
    • scanning / uploaded / initiatedScanPendingError.
    • quarantinedQuarantinedReadError.
    • archived / purgedResourceNotFoundError (mapped to 404).
  4. If variant requested, look up Variant row; if not ready, fall back to original and emit a degraded_mode log line.
  5. BlobStoragePort.issueSignedDownloadUrl({ key, ttl, responseHeaders: { 'Content-Disposition': ... } }).
  6. Insert AccessGrant row (id, fingerprint, ttl, actor, ip, ua).
  7. Cache the URL in Redis under signedurl:{tenantId}:{fileObjectId}:{variant}:{ttlBucket} with TTL = ttl − 30 s; reuse for repeat callers within the bucket.
  8. Return { url, headers, expiresAt, accessGrantId }.

Cross-tenant defense: the objectKey used to sign is derived from the loaded FileObject (RLS-scoped), never from caller input.

2.5 DeleteFileUseCase

  1. Load FileObject.
  2. softDelete(actor, now) (domain).
  3. CdnInvalidationPort.invalidate([cdnPathFor(objectKey), ...variantPaths]).
  4. Persist + emit file.deleted.v1.
  5. The hard purge happens later in the retention sweep (default after 30 d archive window) unless an erasure command shortens it.

2.6 RestoreFileUseCase

Restores within the 30-day archive window. Re-validates retention policy hasn't expired. Re-issues CDN cache fill on first read.

2.7 EraseByGuestUseCase (GDPR)

Input: { tenantId, guestId: GuestId, requestedBy, reason }.

Flow (saga):

  1. Query all file_objects WHERE tenant_id = ? AND owner_scope_refs->>'guestId' = ? AND status NOT IN ('purged').
  2. For each row:
    1. If retention class is regulated and current age < minRetentionDays, defer the row into retention_holds and continue.
    2. Else: BlobStoragePort.delete(objectKey), plus delete every variant; hardPurge('erasure', now).
  3. CdnInvalidationPort.invalidate(allTouchedPaths).
  4. Emit one file.erasure.completed.v1 per row + a single melmastoon.file.erasure.batch_completed.v1 summary.
  5. Persist an erasure certificate row (counts, byte total, deferred-IDs) and return it; consumed by the audit / compliance dashboard.

2.8 EraseByTenantUseCase

Same shape but scoped by tenantId; runs after the legal retention window declared in tenant.deleted.v1 (the sweeper enqueues it on the appointed date).

2.9 ApplyRetentionUseCase

Background sweeper. Runs every 5 min:

  1. Find file_objects past their hardDeleteAfter. Per row: BlobStoragePort.delete, hardPurge('retention_expired', now), emit file.retention.expired.v1.
  2. Find files whose policy declares redactionAfterDays and current age > threshold, scope = guest_id_scan. Trigger OcrRedactSaga (see §3.2).
  3. Find quarantined files older than 30 d → hard purge (forensic export already taken).

2.10 OverrideQuarantineUseCase

Security-reviewer-only. Moves a file from quarantined → archived. Emits file.access.denied.v1 reversal-audit event with reviewer context.


3. Sagas (Choreography)

3.1 Upload → Scan → Optimize → Ready

[client] POST /uploads → InitiateUploadUseCase
[client] PUT signedUrl (bytes) → no service involvement
[client] POST /uploads/{ups}/confirm → ConfirmUploadUseCase
├─ emits file.upload.completed.v1
└─ emits file.scan.requested.v1 (via outbox)

[scan worker] consumes file.scan.requested.v1
runs ClamAV/DLP
POST /internal/v1/files/scan-callback (mTLS)

[file-storage-service] InternalScanCallbackUseCase
→ recordScanPassed | recordScanFailed
passed: emit file.scan.passed.v1
+ ImageOptimizerPort.enqueue(presets per scope)
failed: emit file.scan.failed.v1
(FileObject is now quarantined; reads blocked)

[optimizer worker] consumes optimize job
produces N variants in tenants/{tid}/{scope}/.../variants/...
POST /internal/v1/files/optimize-callback (mTLS) per variant

[file-storage-service] InternalOptimizeCallbackUseCase
→ upsert Variant
on the LAST variant of the requested preset set:
emit file.optimization.completed.v1
(Property/Theme consumers flip their photo to 'ready')

This is a choreography — no central saga runner. Each service reacts to events. The file_objects.status is the durable source of truth for ready-ness.

3.2 OCR Redaction (PII)

Triggered by ApplyRetentionUseCase when pii_id_scan files cross redactionAfterDays:

  1. AIClient.callOcrRedact(fileObjectId) → DLP / Vertex returns redacted bytes.
  2. Upload redacted bytes as a new FileObject with aliasOfFileId = original.id and tags += ['redacted'].
  3. Replace Photo / IdScan reference at the consumer (event consumed by reservation-service).
  4. Original is hard-purged.
  5. AIProvenance recorded on the redacted file.

3.3 GDPR Erasure

See §2.7 for the row-level orchestration. The producer of the tenant.guest.erasure_requested.v1 event (tenant-service) runs an outer choreography that fans out to every PII-touching service and gathers each service's erasure certificate.

3.4 Tenant Closure

  1. tenant.deleted.v1 consumed → register a RetentionHold(tenantId, holdUntil = legalWindow).
  2. Sweeper checks holds; on release, runs EraseByTenantUseCase.

4. Queries

4.1 GetFileMetadataQuery

type Result = {
id: FileObjectId;
tenantId: TenantId;
status: FileStatus;
contentType: string;
bytes: number | null;
sha256: string | null;
scope: Scope;
ownerScopeRefs: OwnerScopeRefs;
retentionPolicyName: string;
createdAt: string;
updatedAt: string;
variants: { preset: VariantPreset; status: 'pending' | 'ready' | 'failed'; widthPx?: number; heightPx?: number }[];
scanResult: { verdict: 'passed' | 'failed' | 'inconclusive' | 'pending'; scannedAt?: string; threats?: string[] };
altText?: I18nString;
tags: string[];
aiProvenance?: AIProvenance;
};

4.2 ListVariantsQuery

Cursor-paginated list of all variants for one file. Used by bff-tenant-booking-service to render srcset.

4.3 GetQuotaQuery

Returns { bytesUsed, objectsUsed, capBytes, capObjects, byScope: { ... } }.

4.4 GetAccessLogQuery

Returns audited issuances (paginated) for one file. Restricted to Tenant.Owner and Platform.Admin roles.

4.5 ListErasureCertificatesQuery

Returns past erasure certificates by tenant + date range. For compliance officers.


5. Cross-Cutting Concerns

5.1 Idempotency

Every write endpoint requires Idempotency-Key. The handler stores (routeHash, tenantId, key) → responseHash in idempotency_records (TTL 24 h). Re-submitting the same key with a different body returns 409 MELMASTOON.SYNC.IDEMPOTENCY_KEY_REUSED.

5.2 Optimistic Concurrency

PATCH operations on FileObject (rare; mostly tag/altText updates) require If-Match: <version>; mismatch raises MELMASTOON.GENERAL.PRECONDITION_FAILED.

5.3 Outbox Pattern

Every use case that emits domain events writes to outbox in the same transaction as the aggregate. A separate outbox-relay worker tails the table and publishes to Pub/Sub, marking rows published_at only on ack. The mandatory outbox.spec.ts integration test proves at-least-once + no-loss-on-crash.

5.4 Inbox Pattern

Consumed events (tenant.deleted.v1, tenant.guest.erasure_requested.v1, property.photo.removed.v1, tenant.plan_changed.v1, tenant.settings.changed.v1) are deduped by messageId in inbox. Handlers are idempotent by (messageId, handler) and run inside a transaction with the inbox row.

5.5 Tenant Context

Every request boundary (HTTP, Pub/Sub consumer, scheduled job) opens an AsyncLocalStorage scope with tenantId. Every DB connection executes SET LOCAL app.tenant_id = '<uuid>' after checkout. Direct cross-tenant calls are guarded in the domain (CrossTenantReferenceError) and at the DB by RLS.

5.6 Caller Surface

The HTTP layer asserts the caller surface (backoffice / tenant-booking / consumer / internal) matches the route's expectations. Internal callbacks (/internal/v1/files/...) require mTLS via the platform service-mesh.

5.7 Rate Limits

Per-tenant token-bucket on issueDownloadUrl (default 600/min/tenant) and on initiateUpload (default 120/min/tenant). Buckets in Memorystore. Exceeding → 429 MELMASTOON.GENERAL.RATE_LIMITED with Retry-After.


6. Sequence (Initiate + Confirm + Scan + Optimize)

Caller file-storage-service GCS Pub/Sub Scan Worker Optimizer
│ │ │ │ │ │
│ POST /uploads │ │ │ │ │
│──────────────────────►│ │ │ │ │
│ │ initiate (domain) │ │ │ │
│ │ persist + outbox │ │ │ │
│ ◄──── signed PUT URL │ │ │ │ │
│ │ │ │ │ │
│ PUT bytes │ │ │ │ │
│──────────────────────────────────────────►│ │ │ │
│ │ outbox publish │ │ │ │
│ │ ────────────────►│ initiated.v1│ │ │
│ │ │ │ │ │
│ POST /uploads/{ups}/confirm │ │ │ │
│──────────────────────►│ │ │ │ │
│ │ head + verify hash│ │ │ │
│ │ confirm + scan req│ │ │ │
│ │ persist + outbox │ │ │ │
│ ◄──── 200 OK │ ────────────────►│ completed.v1 + scan.requested │ │
│ │ │ │ ────────────────►│ │
│ │ │ │ │ scan + callback │
│ │ ◄──────────────────── POST /internal/scan-callback │
│ │ recordScanPassed │ │ │ │
│ │ outbox publish │ │ │ │
│ │ ────────────────►│ scan.passed.v1 │ │
│ │ enqueue optimize │ │ ────────────────────────────────────►│
│ │ │ │ │ build variants │
│ │ ◄────────────────────── POST /internal/optimize-callback (per variant)
│ │ upsert Variant │ │ │ │
│ │ outbox publish │ │ │ │
│ │ ────────────────►│ optimization.completed.v1 (LAST)│ │

7. Concurrency Model

  • Aggregate locks: pessimistic row lock on file_objects.id during scan/optimize callbacks (idempotent re-entry, no double-count).
  • Quota updates: atomic UPDATE quotas SET bytes_used = bytes_used + ? WHERE tenant_id = ? AND bytes_used + ? <= cap_bytes RETURNING bytes_used (single-shot CAS).
  • Variant upsert: INSERT ... ON CONFLICT (file_object_id, preset) DO UPDATE keyed by (file_object_id, preset).

8. Failure & Compensation

StepFailureCompensation
Initiate-upload, GCS rejectsrare; treat as 503release reserved quota, return 503 with retryAfter
Confirm-upload, hash mismatchclient errorabort session, soft-delete partial, release quota
Scan callback, idempotent re-entryduplicate Pub/Subinbox dedupe by messageId
Optimizer never returnstimeoutsweeper after 60 min marks variant failed, file remains ready (original usable)
CDN invalidation failsretryexponential backoff 5×; alert at backlog > 50
Erasure partial failurerow-levelrecord errors[] in certificate; cron retries deferred rows

9. Module Wiring (DI sketch)

@Module({
imports: [PostgresModule, RedisModule, PubSubModule, GcsModule, KmsModule],
providers: [
{ provide: FileObjectRepository, useClass: FileObjectRepositoryPg },
{ provide: UploadSessionRepository, useClass: UploadSessionRepositoryPg },
{ provide: VariantRepository, useClass: VariantRepositoryPg },
{ provide: ScanResultRepository, useClass: ScanResultRepositoryPg },
{ provide: AccessGrantRepository, useClass: AccessGrantRepositoryPg },
{ provide: RetentionPolicyRepository, useClass: RetentionPolicyRepositoryPg },
{ provide: QuotaPort, useClass: QuotaRepositoryPg },
{ provide: BlobStoragePort, useClass: GcsBlobStorageAdapter },
{ provide: ScanPort, useClass: ClamAvScanAdapter },
{ provide: ImageOptimizerPort, useClass: PubSubImageOptimizerAdapter },
{ provide: CdnInvalidationPort, useClass: CloudCdnInvalidationAdapter },
{ provide: EventPublisher, useClass: EventPublisherPubSub },
{ provide: AIClient, useClass: AIClientHttpAdapter },
{ provide: Clock, useFactory: () => ({ now: () => new Date() }) },
InitiateUploadUseCase,
ConfirmUploadUseCase,
AbortUploadUseCase,
IssueDownloadUrlUseCase,
DeleteFileUseCase,
RestoreFileUseCase,
EraseByGuestUseCase,
EraseByTenantUseCase,
ApplyRetentionUseCase,
OverrideQuarantineUseCase,
],
controllers: [UploadsController, FilesController, DownloadsController, QuotasController, ErasureController, InternalCallbacksController, HealthController],
})
export class FileStorageModule {}