file-storage-service — APPLICATION_LOGIC
Companion: DOMAIN_MODEL · API_CONTRACTS · EVENT_SCHEMAS · SECURITY_MODEL · DATA_MODEL
This document defines the application layer: ports, use cases (commands), queries, orchestration flows, saga participation, and the precise sequencing of every cross-system effect. The application layer is the only place that talks to ports; the domain layer never depends on infrastructure.
1. Ports
// application/ports/blob-storage.port.ts
export interface BlobStoragePort {
issueSignedUploadUrl(req: SignedUploadRequest): Promise<SignedUploadResult>;
issueSignedDownloadUrl(req: SignedDownloadRequest): Promise<SignedDownloadResult>;
head(key: ObjectKey): Promise<HeadResult | null>; // size, sha256 (if metadata), generation
copy(src: ObjectKey, dst: ObjectKey): Promise<void>;
delete(key: ObjectKey): Promise<void>; // hard delete
abortResumable(uploadId: string): Promise<void>;
}
// application/ports/scan.port.ts
export interface ScanPort {
requestScan(req: { tenantId: TenantId; fileObjectId: FileObjectId; objectKey: ObjectKey; scanner: 'clamav' | 'cloud_dlp' }): Promise<void>;
}
// application/ports/image-optimizer.port.ts
export interface ImageOptimizerPort {
enqueue(req: { tenantId: TenantId; fileObjectId: FileObjectId; presets: VariantPreset[] }): Promise<void>;
}
// application/ports/cdn-invalidation.port.ts
export interface CdnInvalidationPort {
invalidate(paths: string[]): Promise<void>; // CDN-relative paths (e.g., /tenants/.../property_photo/...)
}
// application/ports/quota.port.ts
export interface QuotaPort {
current(tenantId: TenantId): Promise<{ bytes: bigint; objects: number; capBytes: bigint; capObjects: number }>;
reserve(tenantId: TenantId, bytes: bigint, objects: number): Promise<void>;
release(tenantId: TenantId, bytes: bigint, objects: number): Promise<void>;
}
// application/ports/event-publisher.port.ts
export interface EventPublisher {
publishWithinTx(events: DomainEvent[], tx: Tx): Promise<void>;
}
// application/ports/clock.port.ts
export interface Clock { now(): Date; }
Every port has at least one in-memory test double in application/__test__/.
2. Use Cases (Commands)
2.1 InitiateUploadUseCase
Input:
type InitiateUploadCommand = {
tenantId: TenantId;
actor: ActorRef;
scope: Scope;
contentType: string;
bytes: number; // declared by client; verified at confirm
ownerScopeRefs: OwnerScopeRefs;
retentionPolicyName?: RetentionPolicyName; // optional override; resolved per scope+jurisdiction otherwise
preferredLocale?: Locale;
filenameHint?: string;
resumable?: boolean; // default true if bytes > 8 MiB
idempotencyKey: IdempotencyKey;
};
Flow:
- Resolve
BucketandDataClassfromscope(table in SECURITY_MODEL §2). - Resolve
RetentionPolicy(caller hint → per-scope default → per-jurisdiction override fromtenant-servicecache). - Construct
ContentType.from(contentType, scope)andByteSize.from(bytes, scope)(domain validation). - Check
QuotaPort.current(tenantId); ifbytes + currentBytes > capBytes→QuotaExceededError. Reserve speculatively (released on session expiry / abort). - Mint
FileObjectId med_…, buildObjectKey.for(tenantId, scope, dateShardedRelPath(filenameHint, fileObjectId)). FileObject.initiate({...})(domain).BlobStoragePort.issueSignedUploadUrl({ key, contentType, expiresIn: 600s, resumable }).- Persist
FileObject+UploadSession+Outbox(file.upload.initiated.v1)in a single transaction. - Return
{ fileObjectId, uploadSessionId, signedUrl, requiredHeaders, expiresAt, resumableSessionUri? }.
Idempotency: the (tenantId, idempotencyKey) pair returns the existing session if an active one exists.
Error mapping: see DOMAIN_MODEL §11 plus quota-specific codes.
2.2 ConfirmUploadUseCase
Input:
type ConfirmUploadCommand = {
tenantId: TenantId;
uploadSessionId: UploadSessionId;
declaredSha256: Sha256;
actor: ActorRef;
};
Flow:
- Load
UploadSession(must beopen, must belong to caller tenant). BlobStoragePort.head(session.objectKey)to getbytes+ (if available)sha256from object metadata. If absent, the GCS adapter falls back to a streaming HMAC compare againstdeclaredSha256over the object body (capped atscopeMaxBytes).- Verify hash and size; if mismatch → mark session aborted,
softDeletethe partial object, raiseMELMASTOON.FILE.HASH_MISMATCH. - Dedupe: look up
(tenantId, scope, sha256)infile_objects. If areadyrow exists, mark thisFileObjectas alias (asAliasOf(canonicalId)), copy the canonical's variants by reference, skip scan + optimization, jump straight toready. Return{ fileObjectId: canonicalId, alias: true }. - Otherwise,
FileObject.confirmUpload({byteSize, sha256, now})andFileObject.beginScan(now). ScanPort.requestScan(...)(Pub/Sub message).- Persist
FileObject+ closeUploadSession+ emitfile.upload.completed.v1+file.scan.requested.v1in one tx. - Note: image optimization is NOT enqueued until
scan.passed.v1arrives (saga step 3.1).
2.3 AbortUploadUseCase
Marks UploadSession aborted, calls BlobStoragePort.abortResumable (best-effort), softDeletes the FileObject (which never reached ready), releases reserved quota, emits file.upload.failed.v1 with reason: 'aborted_by_caller' | 'session_expired'.
2.4 IssueDownloadUrlUseCase
Input:
type IssueDownloadUrlCommand = {
tenantId: TenantId;
fileObjectId: FileObjectId;
variant?: VariantPreset; // default: original (file_objects.objectKey)
ttlSeconds?: number; // 60 ≤ ttl ≤ 3600; default 300
actor: ActorRef;
purpose: 'view' | 'download' | 'attach' | 'embed';
callerIp: string | null;
callerUserAgent: string | null;
};
Flow:
- Load
FileObject(RLS gates bytenantId). - Resolve to canonical if alias.
- Status gate:
ready→ continue.scanning/uploaded/initiated→ScanPendingError.quarantined→QuarantinedReadError.archived/purged→ResourceNotFoundError(mapped to 404).
- If
variantrequested, look upVariantrow; if notready, fall back tooriginaland emit adegraded_modelog line. BlobStoragePort.issueSignedDownloadUrl({ key, ttl, responseHeaders: { 'Content-Disposition': ... } }).- Insert
AccessGrantrow (id, fingerprint, ttl, actor, ip, ua). - Cache the URL in Redis under
signedurl:{tenantId}:{fileObjectId}:{variant}:{ttlBucket}with TTL = ttl − 30 s; reuse for repeat callers within the bucket. - Return
{ url, headers, expiresAt, accessGrantId }.
Cross-tenant defense: the objectKey used to sign is derived from the loaded FileObject (RLS-scoped), never from caller input.
2.5 DeleteFileUseCase
- Load
FileObject. softDelete(actor, now)(domain).CdnInvalidationPort.invalidate([cdnPathFor(objectKey), ...variantPaths]).- Persist + emit
file.deleted.v1. - The hard purge happens later in the retention sweep (default after 30 d archive window) unless an erasure command shortens it.
2.6 RestoreFileUseCase
Restores within the 30-day archive window. Re-validates retention policy hasn't expired. Re-issues CDN cache fill on first read.
2.7 EraseByGuestUseCase (GDPR)
Input: { tenantId, guestId: GuestId, requestedBy, reason }.
Flow (saga):
- Query all
file_objects WHERE tenant_id = ? AND owner_scope_refs->>'guestId' = ? AND status NOT IN ('purged'). - For each row:
- If retention class is
regulatedand current age <minRetentionDays, defer the row intoretention_holdsand continue. - Else:
BlobStoragePort.delete(objectKey), plus delete every variant;hardPurge('erasure', now).
- If retention class is
CdnInvalidationPort.invalidate(allTouchedPaths).- Emit one
file.erasure.completed.v1per row + a singlemelmastoon.file.erasure.batch_completed.v1summary. - Persist an erasure certificate row (counts, byte total, deferred-IDs) and return it; consumed by the audit / compliance dashboard.
2.8 EraseByTenantUseCase
Same shape but scoped by tenantId; runs after the legal retention window declared in tenant.deleted.v1 (the sweeper enqueues it on the appointed date).
2.9 ApplyRetentionUseCase
Background sweeper. Runs every 5 min:
- Find
file_objectspast theirhardDeleteAfter. Per row:BlobStoragePort.delete,hardPurge('retention_expired', now), emitfile.retention.expired.v1. - Find files whose policy declares
redactionAfterDaysand current age > threshold, scope =guest_id_scan. TriggerOcrRedactSaga(see §3.2). - Find quarantined files older than 30 d → hard purge (forensic export already taken).
2.10 OverrideQuarantineUseCase
Security-reviewer-only. Moves a file from quarantined → archived. Emits file.access.denied.v1 reversal-audit event with reviewer context.
3. Sagas (Choreography)
3.1 Upload → Scan → Optimize → Ready
[client] POST /uploads → InitiateUploadUseCase
[client] PUT signedUrl (bytes) → no service involvement
[client] POST /uploads/{ups}/confirm → ConfirmUploadUseCase
├─ emits file.upload.completed.v1
└─ emits file.scan.requested.v1 (via outbox)
[scan worker] consumes file.scan.requested.v1
runs ClamAV/DLP
POST /internal/v1/files/scan-callback (mTLS)
[file-storage-service] InternalScanCallbackUseCase
→ recordScanPassed | recordScanFailed
passed: emit file.scan.passed.v1
+ ImageOptimizerPort.enqueue(presets per scope)
failed: emit file.scan.failed.v1
(FileObject is now quarantined; reads blocked)
[optimizer worker] consumes optimize job
produces N variants in tenants/{tid}/{scope}/.../variants/...
POST /internal/v1/files/optimize-callback (mTLS) per variant
[file-storage-service] InternalOptimizeCallbackUseCase
→ upsert Variant
on the LAST variant of the requested preset set:
emit file.optimization.completed.v1
(Property/Theme consumers flip their photo to 'ready')
This is a choreography — no central saga runner. Each service reacts to events. The file_objects.status is the durable source of truth for ready-ness.
3.2 OCR Redaction (PII)
Triggered by ApplyRetentionUseCase when pii_id_scan files cross redactionAfterDays:
AIClient.callOcrRedact(fileObjectId)→ DLP / Vertex returns redacted bytes.- Upload redacted bytes as a new
FileObjectwithaliasOfFileId = original.idandtags += ['redacted']. - Replace
Photo/IdScanreference at the consumer (event consumed byreservation-service). - Original is hard-purged.
AIProvenancerecorded on the redacted file.
3.3 GDPR Erasure
See §2.7 for the row-level orchestration. The producer of the tenant.guest.erasure_requested.v1 event (tenant-service) runs an outer choreography that fans out to every PII-touching service and gathers each service's erasure certificate.
3.4 Tenant Closure
tenant.deleted.v1consumed → register aRetentionHold(tenantId, holdUntil = legalWindow).- Sweeper checks holds; on release, runs
EraseByTenantUseCase.
4. Queries
4.1 GetFileMetadataQuery
type Result = {
id: FileObjectId;
tenantId: TenantId;
status: FileStatus;
contentType: string;
bytes: number | null;
sha256: string | null;
scope: Scope;
ownerScopeRefs: OwnerScopeRefs;
retentionPolicyName: string;
createdAt: string;
updatedAt: string;
variants: { preset: VariantPreset; status: 'pending' | 'ready' | 'failed'; widthPx?: number; heightPx?: number }[];
scanResult: { verdict: 'passed' | 'failed' | 'inconclusive' | 'pending'; scannedAt?: string; threats?: string[] };
altText?: I18nString;
tags: string[];
aiProvenance?: AIProvenance;
};
4.2 ListVariantsQuery
Cursor-paginated list of all variants for one file. Used by bff-tenant-booking-service to render srcset.
4.3 GetQuotaQuery
Returns { bytesUsed, objectsUsed, capBytes, capObjects, byScope: { ... } }.
4.4 GetAccessLogQuery
Returns audited issuances (paginated) for one file. Restricted to Tenant.Owner and Platform.Admin roles.
4.5 ListErasureCertificatesQuery
Returns past erasure certificates by tenant + date range. For compliance officers.
5. Cross-Cutting Concerns
5.1 Idempotency
Every write endpoint requires Idempotency-Key. The handler stores (routeHash, tenantId, key) → responseHash in idempotency_records (TTL 24 h). Re-submitting the same key with a different body returns 409 MELMASTOON.SYNC.IDEMPOTENCY_KEY_REUSED.
5.2 Optimistic Concurrency
PATCH operations on FileObject (rare; mostly tag/altText updates) require If-Match: <version>; mismatch raises MELMASTOON.GENERAL.PRECONDITION_FAILED.
5.3 Outbox Pattern
Every use case that emits domain events writes to outbox in the same transaction as the aggregate. A separate outbox-relay worker tails the table and publishes to Pub/Sub, marking rows published_at only on ack. The mandatory outbox.spec.ts integration test proves at-least-once + no-loss-on-crash.
5.4 Inbox Pattern
Consumed events (tenant.deleted.v1, tenant.guest.erasure_requested.v1, property.photo.removed.v1, tenant.plan_changed.v1, tenant.settings.changed.v1) are deduped by messageId in inbox. Handlers are idempotent by (messageId, handler) and run inside a transaction with the inbox row.
5.5 Tenant Context
Every request boundary (HTTP, Pub/Sub consumer, scheduled job) opens an AsyncLocalStorage scope with tenantId. Every DB connection executes SET LOCAL app.tenant_id = '<uuid>' after checkout. Direct cross-tenant calls are guarded in the domain (CrossTenantReferenceError) and at the DB by RLS.
5.6 Caller Surface
The HTTP layer asserts the caller surface (backoffice / tenant-booking / consumer / internal) matches the route's expectations. Internal callbacks (/internal/v1/files/...) require mTLS via the platform service-mesh.
5.7 Rate Limits
Per-tenant token-bucket on issueDownloadUrl (default 600/min/tenant) and on initiateUpload (default 120/min/tenant). Buckets in Memorystore. Exceeding → 429 MELMASTOON.GENERAL.RATE_LIMITED with Retry-After.
6. Sequence (Initiate + Confirm + Scan + Optimize)
Caller file-storage-service GCS Pub/Sub Scan Worker Optimizer
│ │ │ │ │ │
│ POST /uploads │ │ │ │ │
│──────────────────────►│ │ │ │ │
│ │ initiate (domain) │ │ │ │
│ │ persist + outbox │ │ │ │
│ ◄──── signed PUT URL │ │ │ │ │
│ │ │ │ │ │
│ PUT bytes │ │ │ │ │
│──────────────────────────────────────────►│ │ │ │
│ │ outbox publish │ │ │ │
│ │ ────────────────►│ initiated.v1│ │ │
│ │ │ │ │ │
│ POST /uploads/{ups}/confirm │ │ │ │
│──────────────────────►│ │ │ │ │
│ │ head + verify hash│ │ │ │
│ │ confirm + scan req│ │ │ │
│ │ persist + outbox │ │ │ │
│ ◄──── 200 OK │ ────────────────►│ completed.v1 + scan.requested │ │
│ │ │ │ ────────────────►│ │
│ │ │ │ │ scan + callback │
│ │ ◄──────────────────── POST /internal/scan-callback │
│ │ recordScanPassed │ │ │ │
│ │ outbox publish │ │ │ │
│ │ ────────────────►│ scan.passed.v1 │ │
│ │ enqueue optimize │ │ ────────────────────────────────────►│
│ │ │ │ │ build variants │
│ │ ◄────────────────────── POST /internal/optimize-callback (per variant)
│ │ upsert Variant │ │ │ │
│ │ outbox publish │ │ │ │
│ │ ────────────────►│ optimization.completed.v1 (LAST)│ │
7. Concurrency Model
- Aggregate locks: pessimistic row lock on
file_objects.idduring scan/optimize callbacks (idempotent re-entry, no double-count). - Quota updates: atomic
UPDATE quotas SET bytes_used = bytes_used + ? WHERE tenant_id = ? AND bytes_used + ? <= cap_bytes RETURNING bytes_used(single-shot CAS). - Variant upsert:
INSERT ... ON CONFLICT (file_object_id, preset) DO UPDATEkeyed by(file_object_id, preset).
8. Failure & Compensation
| Step | Failure | Compensation |
|---|---|---|
| Initiate-upload, GCS rejects | rare; treat as 503 | release reserved quota, return 503 with retryAfter |
| Confirm-upload, hash mismatch | client error | abort session, soft-delete partial, release quota |
| Scan callback, idempotent re-entry | duplicate Pub/Sub | inbox dedupe by messageId |
| Optimizer never returns | timeout | sweeper after 60 min marks variant failed, file remains ready (original usable) |
| CDN invalidation fails | retry | exponential backoff 5×; alert at backlog > 50 |
| Erasure partial failure | row-level | record errors[] in certificate; cron retries deferred rows |
9. Module Wiring (DI sketch)
@Module({
imports: [PostgresModule, RedisModule, PubSubModule, GcsModule, KmsModule],
providers: [
{ provide: FileObjectRepository, useClass: FileObjectRepositoryPg },
{ provide: UploadSessionRepository, useClass: UploadSessionRepositoryPg },
{ provide: VariantRepository, useClass: VariantRepositoryPg },
{ provide: ScanResultRepository, useClass: ScanResultRepositoryPg },
{ provide: AccessGrantRepository, useClass: AccessGrantRepositoryPg },
{ provide: RetentionPolicyRepository, useClass: RetentionPolicyRepositoryPg },
{ provide: QuotaPort, useClass: QuotaRepositoryPg },
{ provide: BlobStoragePort, useClass: GcsBlobStorageAdapter },
{ provide: ScanPort, useClass: ClamAvScanAdapter },
{ provide: ImageOptimizerPort, useClass: PubSubImageOptimizerAdapter },
{ provide: CdnInvalidationPort, useClass: CloudCdnInvalidationAdapter },
{ provide: EventPublisher, useClass: EventPublisherPubSub },
{ provide: AIClient, useClass: AIClientHttpAdapter },
{ provide: Clock, useFactory: () => ({ now: () => new Date() }) },
InitiateUploadUseCase,
ConfirmUploadUseCase,
AbortUploadUseCase,
IssueDownloadUrlUseCase,
DeleteFileUseCase,
RestoreFileUseCase,
EraseByGuestUseCase,
EraseByTenantUseCase,
ApplyRetentionUseCase,
OverrideQuarantineUseCase,
],
controllers: [UploadsController, FilesController, DownloadsController, QuotasController, ErasureController, InternalCallbacksController, HealthController],
})
export class FileStorageModule {}