ai-orchestrator-service — Application Logic
Companion to:
DOMAIN_MODEL.md·API_CONTRACTS.md·EVENT_SCHEMAS.md· Canonical: 08 AI Architecture
The application layer orchestrates use cases through ports. The four-layer split is non-negotiable: domain → application → infrastructure + presentation. Use cases are stateless, single-responsibility, and return discriminated unions for success/failure.
1. Ports (src/application/ports/)
export interface AIProviderPort {
readonly name: ProviderName;
complete(req: ProviderCompletionRequest): Promise<ProviderCompletionResponse>;
embed(req: ProviderEmbeddingRequest): Promise<ProviderEmbeddingResponse>;
vision(req: ProviderVisionRequest): Promise<ProviderVisionResponse>;
transcribe(req: ProviderSttRequest): Promise<ProviderSttResponse>;
moderate(req: ProviderModerationRequest): Promise<ProviderModerationResponse>;
capabilities(): ProviderCapabilities;
isHealthy(): boolean;
}
export interface CapabilityCatalogPort {
get(key: string): Promise<Capability>;
list(filter: CapabilityFilter): Promise<Capability[]>;
upsert(input: UpsertCapabilityInput): Promise<Capability>;
}
export interface PromptRegistryPort {
getActive(promptIdentity: PromptIdentity): Promise<PromptVersion>;
getById(id: PromptVersionId): Promise<PromptVersion>;
pinForCall(capabilityKey: string, tenantId: TenantId, abBucket: number): Promise<PromptVersion>;
draft(input: DraftPromptInput): Promise<PromptVersion>;
promote(id: PromptVersionId, evalRunId: EvalRunId): Promise<PromptVersion>;
deprecate(id: PromptVersionId): Promise<PromptVersion>;
retire(id: PromptVersionId): Promise<PromptVersion>;
publishedSnapshotForDesktop(tenantId: TenantId): Promise<PromptSnapshot>;
}
export interface ModelCatalogPort {
get(ref: ModelRef): Promise<Model>;
list(filter: ModelFilter): Promise<Model[]>;
costFor(model: Model, tokens: TokenCount, calls?: number): CostUsd;
}
export interface ProviderRegistryPort {
get(name: ProviderName): AIProviderPort;
health(name: ProviderName): Promise<ProviderHealth>;
recordSuccess(name: ProviderName): Promise<void>;
recordFailure(name: ProviderName, errorCode: string): Promise<void>;
}
export interface BudgetPort {
snapshot(tenantId: TenantId, scope: BudgetScope, periodKey: string): Promise<BudgetCounter>;
reserve(tenantId: TenantId, scope: BudgetScope, estimatedTokens: number, requestId: RequestId): Promise<BudgetReservation>;
commit(reservation: BudgetReservation, actualTokens: TokenCount, actualCost: CostUsd): Promise<void>;
release(reservation: BudgetReservation): Promise<void>;
}
export interface ModerationPort {
preCall(input: string, capability: Capability): Promise<SafetyVerdict>;
postCall(output: string, capability: Capability): Promise<SafetyVerdict>;
}
export interface RedactionPort {
redact(input: unknown): { redacted: unknown; spans: RedactedSpan[] };
}
export interface CachePort {
lookup(key: InputHash): Promise<CachedResult | null>;
put(key: InputHash, result: CachedResult, ttlSeconds: number): Promise<void>;
invalidate(prefix: string): Promise<void>;
}
export interface RagPort {
query(corpusId: RAGCorpusId, queryEmbedding: Float32Array, topK: number, filter?: RagFilter): Promise<RagHit[]>;
ingest(corpusId: RAGCorpusId, chunks: RagChunkInput[]): Promise<{ ingested: number; replaced: number }>;
purgeForTenant(tenantId: TenantId): Promise<void>;
}
export interface HitlPort {
open(input: OpenHitlInput): Promise<HitlGate>;
decide(input: DecideHitlInput): Promise<HitlDecision>;
list(filter: HitlListFilter): Promise<HitlGate[]>;
expireOverdue(now: ISODate): Promise<HitlGate[]>; // scheduled job
}
export interface ProvenancePort {
stamp(input: StampProvenanceInput): Promise<AIProvenance>;
get(id: ProvenanceId): Promise<AIProvenance>;
}
export interface OutboxPort {
publish(events: DomainEvent[]): Promise<void>; // transactional with main write
}
export interface EdgeManifestPort {
current(): Promise<EdgeModelManifest>;
publish(input: PublishEdgeManifestInput): Promise<EdgeModelManifest>;
signWithKms(canonicalBytes: Uint8Array): Promise<{ algorithm: 'RSASSA_PSS_SHA_256'; valueB64: string; kmsKeyId: string }>;
}
export interface EvalHarnessPort {
run(input: { suiteId: EvalSuiteId; promptVersionId: PromptVersionId; modelRef: ModelRef }): Promise<EvalRun>;
scheduleNightly(): Promise<void>;
drift(promptVersionId: PromptVersionId): Promise<DriftReport>;
}
export interface TelemetryPort {
recordCall(input: TelemetryRecord): Promise<void>; // BigQuery streaming insert
metric(name: string, value: number, labels: Record<string, string>): void;
}
export interface ClockPort { now(): ISODate }
export interface IdGeneratorPort { ulid(prefix: string): string }
2. Use Case — RunInferenceUseCase
The hero use case. Every POST /api/v1/ai/complete request lands here.
export class RunInferenceUseCase {
constructor(
private readonly capabilities: CapabilityCatalogPort,
private readonly prompts: PromptRegistryPort,
private readonly models: ModelCatalogPort,
private readonly providers: ProviderRegistryPort,
private readonly budget: BudgetPort,
private readonly moderation: ModerationPort,
private readonly redaction: RedactionPort,
private readonly cache: CachePort,
private readonly hitl: HitlPort,
private readonly provenance: ProvenancePort,
private readonly outbox: OutboxPort,
private readonly telemetry: TelemetryPort,
private readonly clock: ClockPort,
private readonly ids: IdGeneratorPort,
) {}
async execute(cmd: RunInferenceCommand): Promise<RunInferenceResult> {
// 1. Resolve capability
const capability = await this.capabilities.get(cmd.capabilityKey);
assertActive(capability);
// 2. Pin prompt version (A/B sticky-by-tenant)
const promptVersion = await this.prompts.pinForCall(
cmd.capabilityKey,
cmd.tenantId,
abBucketFor(cmd.tenantId, cmd.capabilityKey),
);
// 3. Pre-call moderation (input)
const inputText = renderInputForModeration(promptVersion, cmd.input);
const inputVerdict = await this.moderation.preCall(inputText, capability);
if (inputVerdict === 'block') {
return refuseSafety(cmd, 'input');
}
// 4. PII redaction
const { redacted, spans } = this.redaction.redact(cmd.input);
// 5. Compose hash + cache lookup
const inputHash = sha256({
capability: cmd.capabilityKey,
promptVersionId: promptVersion.id,
tenantId: cmd.tenantId,
input: redacted,
});
if (capability.cacheTtlSeconds) {
const cached = await this.cache.lookup(inputHash);
if (cached) {
const provenance = await this.provenance.stamp(buildCachedProvenance(cached, cmd, promptVersion));
await this.outbox.publish([buildCachedHitEvent(cmd, provenance)]);
return { kind: 'completed', cached: true, output: cached.output, provenance };
}
}
// 6. Budget reservation
const estimatedTokens = estimateTokens(promptVersion, redacted);
const reservation = await this.budget.reserve(
cmd.tenantId,
{ kind: 'capability', capabilityKey: capability.key },
estimatedTokens,
cmd.correlation.requestId,
);
if (reservation.kind === 'denied_hard_cap') {
return executeDeterministicFallback(capability, cmd, 'budget_hard_cap');
}
if (reservation.kind === 'soft_cap_warning') {
await this.outbox.publish([buildBudgetWarningEvent(cmd, reservation)]);
}
// 7. Pick provider
const decision = pickProvider(capability, cmd.context, this.providers);
if (decision.kind === 'no_healthy_provider') {
await this.budget.release(reservation);
return executeDeterministicFallback(capability, cmd, 'all_providers_unhealthy');
}
// 8. Compose request
const providerRequest = composeProviderRequest(promptVersion, redacted, capability, decision.modelRef);
// 9. Call provider with timeout + retries (jittered)
let providerResult;
try {
providerResult = await callWithRetry(decision.provider, providerRequest, capability);
await this.providers.recordSuccess(decision.provider.name);
} catch (err) {
await this.providers.recordFailure(decision.provider.name, errorCode(err));
const fallback = await tryFallbackChain(capability, cmd, decision, err);
if (fallback.kind === 'unrecoverable') {
await this.budget.release(reservation);
return failureResult(cmd, fallback.errorCode);
}
providerResult = fallback.result;
}
// 10. Post-call moderation (output)
const outputVerdict = await this.moderation.postCall(stringifyOutput(providerResult), capability);
if (outputVerdict === 'block') {
await this.outbox.publish([buildModerationFlaggedEvent(cmd, providerResult, 'output')]);
return executeDeterministicFallback(capability, cmd, 'moderation_block');
}
// 11. Schema validation (with one repair attempt)
const validated = await validateOutput(providerResult, capability.outputSchemaJson, decision.provider, providerRequest);
if (validated.kind === 'invalid') {
await this.budget.release(reservation);
return failureResult(cmd, 'MELMASTOON.AI.OUTPUT_INVALID');
}
// 12. Stamp provenance
const provenance = await this.provenance.stamp(
buildProvenance({
cmd, providerResult, decision, promptVersion, inputVerdict, outputVerdict, latencyMs: providerResult.latencyMs,
}),
);
// 13. Commit budget
await this.budget.commit(reservation, providerResult.tokens, providerResult.cost);
// 14. Cache the result
if (capability.cacheTtlSeconds) {
await this.cache.put(inputHash, { output: validated.value, tokens: providerResult.tokens, cost: providerResult.cost }, capability.cacheTtlSeconds);
}
// 15. HITL gate (if configured)
let hitlGate: HitlGate | undefined;
if (shouldOpenHitl(capability, validated.value)) {
hitlGate = await this.hitl.open({
tenantId: cmd.tenantId,
capability,
artifactRef: { kind: capability.key, id: provenance.id },
draftJson: validated.value,
correlation: cmd.correlation,
});
}
// 16. Outbox events (transactional)
await this.outbox.publish([
buildInferenceCompletedEvent(cmd, provenance, validated.value, hitlGate),
...maybeCapabilitySpecificEvent(capability, validated.value, provenance),
...(hitlGate ? [buildHitlGateOpenedEvent(hitlGate, provenance)] : []),
]);
// 17. Telemetry
await this.telemetry.recordCall({
tenantId: cmd.tenantId,
capability: capability.key,
model: decision.modelRef,
tokens: providerResult.tokens,
cost: providerResult.cost,
latencyMs: providerResult.latencyMs,
cacheHit: false,
hitlOpened: !!hitlGate,
});
return { kind: 'completed', cached: false, output: validated.value, provenance, hitlGate };
}
}
Failure modes mapped to error codes are catalogued in FAILURE_MODES.md. Every numbered step above is independently unit-tested.
3. Use Case — RunEmbeddingUseCase
export class RunEmbeddingUseCase {
async execute(cmd: RunEmbeddingCommand): Promise<RunEmbeddingResult> {
// Same pipeline minus prompt + HITL: capability lookup, redact, budget reserve, route
// (Vertex text-embedding-004 default; local MiniLM if cmd.context.local), call, validate dim, stamp provenance, commit, outbox.
}
}
Batched embeddings (POST /api/v1/ai/embed with inputs: string[]) flow through a chunking helper that respects per-call provider limits (Vertex: 250 inputs per call) and keeps a per-batch correlation id for telemetry.
4. Use Case — RunRagQueryUseCase
export class RunRagQueryUseCase {
async execute(cmd: RunRagQueryCommand): Promise<RunRagQueryResult> {
// 1. Resolve corpus, assert tenant ownership (throws CrossTenantReferenceError on mismatch)
// 2. Embed the query (delegates to RunEmbeddingUseCase)
// 3. Set session GUC `app.tenant_id` and `SET LOCAL hnsw.ef_search = 40`
// 4. ragPort.query(...) under the tenant context
// 5. Return ranked hits with chunk text + source uri + score
}
}
RagFilter supports namespace, language, and metadata equality; never accepts arbitrary SQL.
5. Use Case — OpenHitlGateUseCase and DecideHitlGateUseCase
export class OpenHitlGateUseCase {
async execute(cmd: OpenHitlGateCommand): Promise<HitlGate> {
// Persist gate row + outbox `hitl.gate_opened.v1`; notification-service receives it and dispatches
// an in-app + (optional) push alert to users with one of the configured reviewerRoles in this tenant.
}
}
export class DecideHitlGateUseCase {
async execute(cmd: DecideHitlGateCommand): Promise<HitlDecision> {
// 1. Load gate; assert status === 'open' and reviewer holds an allowed role (claim from JWT)
// 2. Persist decision; flip gate.status = 'decided' (or 'closed')
// 3. Outbox `hitl.gate_decided.v1` carrying decisionId
// 4. If outcome === 'modified', persist modifiedJson; if 'rejected', justification required
// 5. The original capability emits its state-change event (e.g., pricing-service publishing the price)
// only after observing this decision event, so HITL is the gating fact.
}
}
Scheduled job ExpireOverdueHitlGatesJob runs every minute, calls hitl.expireOverdue(now), and submits a default decision (outcome: cap.hitl.defaultOnTimeout, auto: true).
6. Use Case — PromptLifecycleUseCases
export class DraftPromptVersionUseCase { /* admin only; creates new pmv_ in 'draft' */ }
export class PromotePromptVersionUseCase {
async execute(cmd: PromotePromptCommand): Promise<PromptVersion> {
// 1. Load draft version + the current active version (if any)
// 2. Verify a green EvalRun exists against the cited eval suite
// 3. Verify ≥7 days of A/B traffic at 5% with no production-metric regression beyond tolerance
// 4. Atomically: flip current 'active' → 'deprecated' with deprecatedAt = now
// flip draft → 'active' with activatedAt = now
// 5. Outbox `prompt.version_published.v1`
// 6. Invalidate prompt cache prefix; emit `edge_model.manifest_updated.v1`
// if the new prompt also implies an edge prompt-snapshot bump.
}
}
export class DeprecatePromptVersionUseCase { /* explicit deprecate; sets deprecatedAt */ }
export class RetirePromptVersionUseCase {
async execute(cmd: RetirePromptCommand): Promise<PromptVersion> {
// Refuse if (now - deprecatedAt) < 14 days. Sets retiredAt = now.
}
}
7. Use Case — EvalRunUseCase
export class TriggerEvalRunUseCase {
async execute(cmd: TriggerEvalRunCommand): Promise<EvalRun> {
// 1. Load suite (golden set) + prompt version + model
// 2. Enqueue an asynchronous EvalWorker job; return EvalRun row in 'queued'
}
}
export class EvalWorker {
async run(runId: EvalRunId): Promise<void> {
// For each example in the suite:
// - Render prompt with example input
// - Call provider (bypasses budget; uses 'eval' tenant scope)
// - Score against rubric (exact-match | regex | LLM-as-judge | embedding-similarity | classification-metric)
// Aggregate metrics; persist; outbox `eval.run_completed.v1`; trigger PromotionGate if green.
}
}
ScheduledNightlyEvalJob enumerates all active prompt versions and runs an abbreviated suite to detect drift.
8. Use Case — RagIngestionUseCase
export class IngestRagDocumentsUseCase {
async execute(cmd: IngestDocumentsCommand): Promise<IngestRagResult> {
// 1. Assert corpus tenant ownership
// 2. Chunk per corpus.chunkStrategy (fixed-token with overlap by default; semantic via splitter for structured docs)
// 3. Batch-embed (RunEmbeddingUseCase, capability='internal.rag_ingest')
// 4. Upsert into embeddings table with deterministic chunk ids (sha256 of content) so re-ingest replaces dup chunks
// 5. Update corpus.chunkCount + lastReindexAt
}
}
9. Use Case — PublishEdgeManifestUseCase
export class PublishEdgeManifestUseCase {
async execute(cmd: PublishEdgeManifestCommand): Promise<EdgeModelManifest> {
// 1. Validate every model entry: file exists in gs://melmastoon-ai-artifacts/edge-models/, sha256 verified
// 2. Canonicalize models + version (sorted keys, deterministic JSON)
// 3. Sign with KMS asymmetric key
// 4. Persist as 'published'; flip prior 'published' → 'superseded' with supersededAt = now
// 5. Outbox `edge_model.manifest_updated.v1`; sync-service includes in next desktop snapshot
}
}
10. Use Case — BudgetEnforcementUseCase
export class GetBudgetSnapshotUseCase { /* read-side */ }
export class RollOverBudgetPeriodJob {
// Cron at 00:00 UTC on the 1st of each month for monthly periods.
// Resets used = 0, clears warnedAt/trippedAt; any in-flight reservation for the prior period is settled first.
}
11. Orchestration Flow — A capability call from a sibling service
caller-service ai-orchestrator provider hitl reviewer (UI)
│ │ │ │
│ POST /api/v1/ai/complete │ │ │
│ { capability, tenantId, │ │ │
│ input, correlation } │ │ │
├─────────────────────────▶│ │ │
│ │ moderate(input) │ │
│ │ redact(input) │ │
│ │ cache lookup → MISS│ │
│ │ budget reserve │ │
│ │ pick provider │ │
│ │ call ─────────────▶│ │
│ │ │ │
│ │◀── output ─────────│ │
│ │ moderate(output) │ │
│ │ schema validate │ │
│ │ stamp provenance │ │
│ │ budget commit │ │
│ │ cache put │ │
│ │ HITL open ─────────────────────────────▶│ in-app + push notification
│ │ outbox events │ │
│ 200 OK { output, │ │ │
│ provenanceId, │ │ │
│ hitlGateId } │ │ │
│◀─────────────────────────│ │ │
│ │ │ ◀────────────── decision (accept/modify/reject)
│ │ outbox │ │
│ │ `hitl.gate_decided`│ │
caller-service consumes │ │ │
the decision event and ◀┤ │ │
applies the state change │ │ │
12. Orchestration Flow — Pub/Sub event-driven inference
For capabilities with high-volume triggers (anomaly.detect, upsell.recommend), callers can post a request via Pub/Sub topic melmastoon.ai_orchestrator.inference.requested.v1 instead of REST. The gateway processes them asynchronously and emits the same inference.completed / capability-specific event.
reservation-service ai-orchestrator pubsub
│ │ │
│ outbox: reservation.confirmed.v1 │ │
├──────────────────────────────────────────────────────────────▶│
│ │ consumes (subscriber: │
│ │ ai-orchestrator.melmast..│
│ │ reservation.confirmed.v1)│
│ │◀─────────────────────────│
│ │ trigger upsell.recommend │
│ │ (RunInferenceUseCase) │
│ │ outbox: upsell.recommended│
│ │ ─────────────────────────▶│
notification-service consumes │ │
upsell.recommended.v1 to send pre-arrival ◀┤ │
13. Saga participation
| Saga | Role | Action |
|---|---|---|
| Guest erasure (GDPR) | Participant | On melmastoon.tenant.guest.erasure_requested.v1: enumerate per-tenant embeddings + cached prompt artifacts + redacted inference rows referencing the guest, purge or anonymize within 7 days, emit melmastoon.ai_orchestrator.guest.erasure_acknowledged.v1 (out-of-scope event published via the audit envelope) |
| Tenant suspension | Participant | On melmastoon.tenant.suspended.v1: flip per-tenant capability allowlist to read-only catalog reads; refuse new inference with MELMASTOON.TENANT.SUSPENDED |
| Prompt rollout | Coordinator | Draft → 5% A/B → eval green → promote → outbox event |
| HITL gate | Coordinator | Open → notify → decision or timeout → close → emit decision event |
14. Idempotency
| Operation | Key | Window |
|---|---|---|
POST /api/v1/ai/complete | Idempotency-Key header (ULID) → results cached for 24 h | 24 h |
POST /api/v1/ai/embed | Idempotency-Key per batch | 24 h |
| Pub/Sub event consumption | Pub/Sub message id → inbox table | 7 d |
POST /api/v1/ai/hitl/gates/:gateId/decision | gate id + reviewer id; second submit returns MELMASTOON.GENERAL.PRECONDITION_FAILED | n/a |
RunEvalUseCase | (suiteId, promptVersionId, modelRef) per day | 24 h |
| Budget increment | (reservationId, requestId) | period |
15. Concurrency
- Budget reservation uses an atomic Postgres advisory lock per
(tenantId, scope, periodKey)to avoid races at the soft/hard cap boundary. - HITL decision submission uses optimistic concurrency on
HitlGate.version. - Prompt promotion uses an exclusive lock on the
Promptrow (logical lock viaSELECT … FOR UPDATE) to prevent two admins promoting different drafts simultaneously. - Provider circuit-breaker state is updated via atomic
INCREMENT consecutive_errors RETURNING …so concurrent calls can't double-count. - RAG ingest uses deterministic chunk ids (sha256 of content) so concurrent re-ingest produces an idempotent set.
16. Background workers
| Worker | Cadence | Purpose |
|---|---|---|
EvalNightlyJob | 04:00 UTC | Drift detection on every active prompt version |
BudgetRollOverJob | 00:00 UTC of period boundary | Reset counters; settle in-flight reservations |
HitlExpiryJob | every 60 s | Close overdue gates with conservative default |
ProviderProbeJob | every 30 s | Half-open probe for unhealthy providers |
CacheCleanupJob | hourly | Remove TTL-expired cache entries (Memorystore handles, but we audit for orphaned referenced provenance) |
EmbeddingReindexJob | when corpus growth > 25% since last build | Trigger HNSW rebuild |
EdgeManifestPromotionJob | on demand (admin) | Validate + sign + publish manifest |