Application Logic
:::info Source
Sourced from services/assessment-service/APPLICATION_LOGIC.md in the documentation repo.
:::
Layer: Application (Use-Cases, Command/Query Handlers, Orchestration) Companion: DOMAIN_MODEL.md · API_CONTRACTS.md · EVENT_SCHEMAS.md
This doc describes the use-cases — the orchestration that sits between the HTTP/NATS inbound edge and the pure domain. Use-cases own transactions, outbox writes, port invocations, and observability spans.
1. Use-Case catalogue
| ID | Use-case | Invoked by | Aggregate touched |
|---|---|---|---|
| UC-01 | CreateQuizBank | HTTP POST /quiz-banks | QuizBank |
| UC-02 | UpdateQuizBank | HTTP PATCH /quiz-banks/{id} | QuizBank |
| UC-03 | AddQuestion | HTTP POST /quiz-banks/{id}/questions | QuizBank |
| UC-04 | UpdateQuestion | HTTP PATCH /questions/{id} | QuizBank |
| UC-05 | PublishQuizBank | HTTP POST /quiz-banks/{id}/publish | QuizBank |
| UC-06 | ArchiveQuizBank | HTTP POST /quiz-banks/{id}/archive | QuizBank |
| UC-07 | ServePresentation | HTTP GET /quiz-banks/{id}/questions | QuizBank (read) |
| UC-08 | SubmitResponse | HTTP POST /attempts/{id}/submit-response | (buffered, no aggregate write) |
| UC-09 | ScoreAttempt | HTTP POST /attempts/{id}/score | AttemptResult |
| UC-10 | ReconcileOfflineAttempt | NATS sync.offline_artifact.received.v1 | AttemptResult |
| UC-11 | CreateScenario | HTTP POST /branching-scenarios | BranchingScenario |
| UC-12 | UpdateScenarioNode | HTTP PATCH /branching-scenarios/{id}/nodes/{nid} | BranchingScenario |
| UC-13 | PublishScenario | HTTP POST /branching-scenarios/{id}/publish | BranchingScenario |
| UC-14 | Navigate | HTTP POST /branching-scenarios/{id}/navigate | (read) |
| UC-15 | RequestAIQuizGeneration | HTTP POST /quiz-banks/{id}/ai/generate | QuizBank (draft) |
| UC-16 | RequestAIBranchingGeneration | HTTP POST /branching-scenarios/{id}/ai/generate | BranchingScenario (draft) |
| UC-17 | GradeShortAnswerWithAI | Internal (async, triggered by UC-09) | AttemptResult |
| UC-18 | HumanReviewShortAnswer | HTTP POST /attempts/{id}/responses/{qid}/human-grade | AttemptResult |
| UC-19 | RegradeAttempt | HTTP POST /attempts/{id}/regrade | AttemptResult (new) |
| UC-20 | ProjectBlockAdded | NATS authoring.block.added.v1 | QuizBank (link) |
| UC-21 | HandleGDPRSubjectRequest | NATS gdpr.subject_request.received.v1 | all attempts (redact) |
| UC-22 | PreloadQuestionsForPlaySession | NATS delivery.play_session.started.v1 | QuizBank (warm cache) |
2. Canonical use-case shape
Every use-case follows this shape (NestJS-style, but framework-agnostic):
@UseCase()
class CreateQuizBank {
constructor(
private readonly repo: QuizBankRepo, // port
private readonly outbox: OutboxPort, // port
private readonly tenants: TenantScopePort, // port
private readonly clock: ClockPort, // port
private readonly tracer: Tracer, // OTel
private readonly policies: PolicyEnginePort, // ABAC
) {}
async execute(
cmd: CreateQuizBankCommand,
ctx: RequestContext,
): Promise<Result<QuizBankId, DomainError>> {
return this.tracer.span('assessment.create_quiz_bank', async (span) => {
// 1. authorize
const decision = await this.policies.check(ctx.principal, 'quiz_bank:create', { tenantId: ctx.tenantId });
if (!decision.allow) return err(new Forbidden(decision.reasonCode));
// 2. domain validation & construction
const bank = createQuizBank({
id: ULID.generate(),
tenantId: ctx.tenantId,
title: cmd.title,
questions: [],
gradingRule: cmd.gradingRule,
createdBy: ctx.principal.userId,
now: this.clock.now(),
});
// 3. transactional write: aggregate + outbox, single Postgres tx
await this.repo.withTx(async (tx) => {
await this.repo.save(bank, tx);
await this.outbox.enqueue(tx, {
topic: 'assessment.quiz_bank.created.v1',
payload: serializeQuizBankCreated(bank),
partitionKey: bank.id,
correlationId: ctx.correlationId,
tenantId: bank.tenantId,
actor: ctx.principal,
});
});
// 4. telemetry
span.setAttributes({ 'quiz_bank.id': bank.id, 'tenant.id': bank.tenantId });
return ok(bank.id);
});
}
}
Rules every use-case follows:
- One Postgres transaction per write use-case.
- Outbox write in same tx as aggregate write (no separate "publish" step).
- Authorization at use-case entry, not only at route layer (defense in depth).
- Clock via port (testable).
- ULID generation via port (seedable for tests).
- Returns
Result<T, DomainError>— never throws domain errors.
3. High-signal use-cases explained
3.1 UC-07 ServePresentation (GET questions for a learner)
learner ─▶ delivery-service ─▶ assessment-service (this UC)
Steps:
- Resolve
QuizBankby ID (withtenantIdfilter from RLS). - Assert
state === 'published'. - Determine
seed:seedStrategy = 'attemptId'→seed = attemptIdseedStrategy = 'userIdAndAttemptId'→seed = hash(userId + attemptId)seedStrategy = 'random'→seed = ULID()
- Apply
PoolConfigsampling (deterministic given seed) → shortlist of questions. - Invoke
QuestionSerializer.toPresentationForm(q, seed)for each → answer keys stripped, options shuffled if configured. - Return
PresentationPayloadwith locale-resolved prompts. - Emit span
assessment.serve_presentationwithquizBank.id,questions.count,seed.hash.
Security invariants:
PresentationPayloadis asserted free ofisCorrect,correctIndex,acceptedAnswers,expectedfields via a content-shape linter in unit tests (AssertNoCorrectAnswer).- If
ctx.principalis a learner andstate !== 'published'→ 403.
Caching: Per-(quizBankId, version, locale) the stripped, unshuffled payload is cached in Redis (15 min TTL); the shuffle is applied per-request from the seed.
3.2 UC-09 ScoreAttempt
player ─▶ POST /attempts/{id}/score { responses: [...] }
Steps:
- Load
QuizBank(orBranchingScenariofor scenario attempts). - Call
scoreAttempt(qb, responses, scoringContext)pure function. - Inspect result:
- All deterministic →
state = 'final'. - Any short-answer with rubric + AI grading enabled →
state = 'pending_human_review'for those questions; queueGradeShortAnswerWithAI(UC-17) async per response.
- All deterministic →
- Persist
AttemptResult+ outboxassessment.attempt_result.scored.v1in one tx. - If
pending_human_review, also emitassessment.attempt.pending_human_review.v1. - Return summary DTO to caller.
Offline reconciliation path: if offlineScored === true the client claimed score is also included; ScoreReconciliation is computed. If mismatch > configured tolerance (default 0.001), emit assessment.score_mismatch_detected.v1 but still use server score as authoritative.
Latency target: p99 < 350 ms deterministic, < 2.5 s if AI-graded response scheduled.
3.3 UC-15 RequestAIQuizGeneration
author ─▶ POST /quiz-banks/{id}/ai/generate
{ lessonId, questionCount, types, difficulty }
Steps:
- Fetch lesson content via
AuthoringReadPort.getLesson(lessonId)(read-only, idempotent). - Call
AIClient.generate({ promptId: 'quiz_generation/v3', context, constraints }). - Validate structured output against JSON Schema
QuizGenerationOutput/v3.json. Reject if invalid. - Produce draft
Question[]withaiProvenancestamped (model, prompt version, traceId). - Return draft to caller — does not persist yet; authoring UI shows side-by-side with Accept/Edit/Reject actions.
- On accept: author posts
PATCH /quiz-banks/{id}which runs UC-02 (normal path) with the approved questions. HITL enforced:aiProvenance.reviewedByandreviewedAtset by UC-02.
Cost guard: per-tenant token budget checked before call. Exceeded budget → 402 with costBudget.exceeded error code.
Safety eval: The prompt quiz_generation/v3 has a locked eval set (300 items, multiple domains); any prompt change requires ≥ 95% pass on the eval before promotion per doc 15 §P5.
3.4 UC-17 GradeShortAnswerWithAI
Runs asynchronously after UC-09 schedules it.
Steps:
- Fetch
AttemptResult(state =pending_human_review). - For each response marked
gradedBy: 'ai':- Build grading context: question prompt, rubric, learner response.
- Call
AIClient.grade({ promptId: 'rubric_grading/v2', rubric, response }). - Parse
AIGradeOutcome(pointsPerCriterion, confidence, rationale). - If
confidence < rubric.humanReviewThreshold→ keephumanReviewRequired: true, do not promote to final. - If
confidence >= threshold→ mark response withgradedBy: 'ai', points,aiConfidence, rationale stored.
- After all responses processed:
- If all AI-graded responses meet threshold and no human override pending → recompute
scaledScore, setstate = 'final', emitattempt_result.scored.v1. - Else → stay in
pending_human_review, emitassessment.short_answer.ai_graded.v1for telemetry.
- If all AI-graded responses meet threshold and no human override pending → recompute
- Record full
aiProvenanceonAttemptResult.
Anti-gaming: Learner-submitted text is sanitized before being sent to AI (strip control sequences, nested prompts). The system prompt is isolated at the gateway — assessment-service never puts learner text directly in the system message.
3.5 UC-10 ReconcileOfflineAttempt
Triggered when sync-service delivers a learner's offline attempt.
Steps:
- Deduplicate on
(attemptId, clientMutationId)via inbox table. - Extract
ClaimedAttemptpayload:responses[],clientScaledScore, device signature, timestamps. - Verify device signature via
identity-servicedevice pubkey cache. - Load
QuizBankat thequizBankVersionthe client used; if version is archived, prefer the version the client bundled. - Call UC-09
ScoreAttemptinternally withscoringMode='deterministic', markingofflineScored = true. - Compare client vs server scaled score →
ScoreReconciliation. - Persist
AttemptResult, emit appropriate events. - If mismatch > tolerance, also trigger notification via
notification-serviceto compliance officer queue.
Idempotency: Re-delivery of the same (attemptId, clientMutationId) returns the previously scored result (200 OK, same payload).
3.6 UC-22 PreloadQuestionsForPlaySession
Reactive to delivery.play_session.started.v1.
Steps:
- Parse event; extract
courseVersionId,userId. - For each quiz block in the session:
- Prewarm Redis cache with the
PresentationPayload(unshuffled variant) for that bank + locale.
- Prewarm Redis cache with the
- Emit telemetry metric
assessment.preload.hit_rate.
Non-blocking — failures only log warnings.
4. Ports (abstractions the use-cases depend on)
interface QuizBankRepo {
findById(id: QuizBankId): Promise<QuizBank | null>;
findByIds(ids: QuizBankId[]): Promise<QuizBank[]>;
save(qb: QuizBank, tx: Tx): Promise<void>;
listByTenant(tenantId: TenantId, page: Pagination): Promise<Page<QuizBank>>;
withTx<T>(fn: (tx: Tx) => Promise<T>): Promise<T>;
}
interface BranchingScenarioRepo { /* analogous */ }
interface AttemptResultRepo {
findByAttemptId(id: AttemptId): Promise<AttemptResult | null>;
save(ar: AttemptResult, tx: Tx): Promise<void>;
withTx<T>(fn: (tx: Tx) => Promise<T>): Promise<T>;
}
interface OutboxPort {
enqueue(tx: Tx, entry: OutboxEntry): Promise<void>;
}
interface AIClient {
generate(req: AIGenerationRequest): Promise<AIGenerationResponse>;
grade(req: AIGradingRequest): Promise<AIGradingResponse>;
}
interface MediaClient {
resolveAssetRef(ref: MediaRef): Promise<MediaDescriptor>;
}
interface AuthoringReadPort {
getLesson(id: LessonId): Promise<LessonReadModel>;
getBlockRef(ref: BlockRef): Promise<BlockReadModel>;
}
interface PolicyEnginePort {
check(principal: Principal, action: string, resource: Resource): Promise<Decision>;
}
interface TenantScopePort {
currentTenantId(): TenantId;
}
interface ClockPort { now(): ISODate; }
interface IdempotencyStore {
claim(key: string): Promise<'new' | 'duplicate' | { cached: any }>;
commit(key: string, result: any): Promise<void>;
}
interface ULIDGenerator { next(): ULID; }
interface NotificationClient {
notify(req: NotifyRequest): Promise<void>;
}
5. Transactional boundaries
| Concern | Mechanism |
|---|---|
| Aggregate write + event emission | Single Postgres transaction via withTx; outbox row is inserted in same tx |
| Outbox → NATS publication | Separate background worker (OutboxPublisher) with SELECT … FOR UPDATE SKIP LOCKED |
| Cross-service calls | Only authoring-read and AI-grading are cross-service; both use Saga-light compensations (best-effort) |
| AI grading retries | Exponential backoff, max 3 attempts, DLQ on final failure |
| Consumer idempotency | Every inbound event checked against inbox_events unique (eventId) |
6. Error taxonomy (application layer)
| Code | Reason | HTTP | Example |
|---|---|---|---|
quiz_bank.not_found | QuizBank not found or wrong tenant | 404 | Serve presentation for missing bank |
quiz_bank.draft_not_servable | Attempted to serve a draft | 409 | Learner tries to take unpublished quiz |
quiz_bank.invariant_violation | Domain invariant failed | 422 | No correct answer on MCQ |
attempt.already_scored | Attempt already scored | 409 | Duplicate score call |
attempt.expired | Time limit elapsed before submit | 422 | Late submission |
ai.budget_exceeded | Tenant AI budget hit | 402 | Request blocked |
ai.output_invalid | AI returned schema-invalid data | 502 | Retry or reject |
policy.forbidden | ABAC denied | 403 | Learner POSTs create-bank |
idempotency.replay_mismatch | Same Idempotency-Key, different body | 409 | Client bug |
concurrency.stale_version | If-Match mismatch | 412 | Two authors editing |
All errors conform to doc 05 §8 problem+json.
7. Concurrency & locking
- Optimistic concurrency on QuizBank / BranchingScenario via
versioncolumn +If-Matchheader. - AttemptResult uses
attemptIdas primary key — collisions resolved by idempotency (same ID returns cached result). - Outbox publisher uses row-level lock
SELECT … FOR UPDATE SKIP LOCKED LIMIT 100to allow N parallel publishers.
8. Observability hooks (application layer)
Every use-case emits:
- Span:
assessment.<use_case_name>with attributestenant.id, aggregate ID, actor type. - Metric counter:
assessment.usecase.invocations_total{name, outcome}. - Metric histogram:
assessment.usecase.duration_ms{name}. - Structured log: input hash (not plaintext), result outcome, error code if any.
AI use-cases additionally emit:
ai.prompt.id,ai.model,ai.tokens.in,ai.tokens.out,ai.cost.microUSD,ai.confidence,ai.decision.accepted.
See OBSERVABILITY.md for the full taxonomy.
9. Policy decisions encoded at application layer
| Decision | Rule |
|---|---|
| Can learner see correct answers? | Per gradingRule.showCorrectAnswers |
| Can learner retake? | Per gradingRule.maxAttempts and enrollment policy (delivery-service owns attempt count) |
| Is AI grading allowed for this tenant? | tenant.ai.rubric_grading.enabled flag via tenant-service |
| Is prompt injection suspected? | AI gateway's classifier returns suspected: true → reject with ai.prompt_injection_suspected |
10. Saga summary
Assessment-service participates in one multi-step workflow:
"AI-assisted quiz authoring" saga (orchestrated by authoring-service)
1. Author clicks "Generate quiz" ─▶ authoring-service
2. authoring-service ─▶ assessment-service UC-15 (RequestAIQuizGeneration)
3. assessment-service ─▶ ai-gateway-service (grade)
4. assessment-service returns draft ─▶ authoring-service
5. Author reviews; on approval ─▶ authoring-service publishes block
6. authoring.block.added.v1 ─▶ assessment-service UC-20 (ProjectBlockAdded) links bank to block
No compensations needed — UC-15 does not persist; the draft is ephemeral until UC-20 links it.
11. References
- Ports for adapters: DATA_MODEL.md, EVENT_SCHEMAS.md
- Security decisions: SECURITY_MODEL.md
- AI contracts: AI_INTEGRATION.md