Skip to main content

Application Logic

:::info Source Sourced from services/assignment-service/APPLICATION_LOGIC.md in the documentation repo. :::

Companion: DOMAIN_MODEL · EVENT_SCHEMAS · API_CONTRACTS


1. Use Cases

UCActorUse caseTriggerSlice
UC-AS-01Admin (HTTP)Create assignment (draft)POST /api/v1/assignmentsS4
UC-AS-02Admin (HTTP)Activate assignmentPOST /{id}/activateS4
UC-AS-03Admin (HTTP)Pause / resume assignmentPOST /{id}/pauseS4
UC-AS-04Admin (HTTP)Add / remove targetsPOST /{id}/targetsS4
UC-AS-05System (cron)Materialise compliance windowsRRULEMaterializer jobS4
UC-AS-06System (consumer)Window → in_progressenrollment.created.v1S4
UC-AS-07System (consumer)Window → completedprogress.completion.recorded.v1S4
UC-AS-08System (cron)Window → overdueOverdueSweeper jobS4
UC-AS-09System (cron)Window → closed_missedClosedMissedSweeper jobS4
UC-AS-10System (scheduler)Dispatch remindersReminderDispatcher jobS4
UC-AS-11System (scheduler)Fire escalation actionsEscalationRunnerS4
UC-AS-12Admin / Auditor (HTTP)Compliance reportGET /compliance-reportS4
UC-AS-13System (consumer)Rebind targets on dynamic-group updatetenant.dynamic_group.evaluated.v1S4
UC-AS-14System (consumer)Attach new member to active assignmentstenant.membership_activated.v1S4
UC-AS-15System (consumer)GDPR purgegdpr.subject_request.received.v1M3
UC-AS-16AI Gateway (sync call)Suggest assignmentsPOST /assignments/suggestS5

2. Command Model

All write operations flow through CQRS-style commands → handlers → repositories.

interface CreateAssignmentCommand {
tenantId: TenantId;
createdBy: UserId;
title: I18nString;
description?: I18nString;
courseId: CourseId;
courseVersionPolicy: 'pin' | 'latest';
pinnedVersionId?: CourseVersionId;
targets: AssignmentTarget[];
rrule?: RRULEString;
startDate: ISODate;
dueOffset: ISODuration;
gracePeriod: ISODuration;
escalation: EscalationPolicy;
reminderPolicy: ReminderPolicy;
idempotencyKey: string;
}

interface ActivateAssignmentCommand { id: AssignmentId; tenantId: TenantId; actor: UserId; }
interface PauseAssignmentCommand { id: AssignmentId; tenantId: TenantId; actor: UserId; reason?: string; }
interface AddTargetsCommand { id: AssignmentId; tenantId: TenantId; actor: UserId; add: AssignmentTarget[]; }

3. Handler Flows

3.1 CreateAssignment

HTTP POST → validate body (Zod) → authorize (role: compliance_admin | tenant_admin)
→ deduplicate by idempotencyKey (Redis SETNX 24h)
→ resolve targets (sanity check only; real expansion at activation + materialize)
→ new Assignment(state='draft') → repo.save
→ outbox.publish('assignment.created.v1')
→ respond 201

3.2 ActivateAssignment (critical path)

validate state=draft → verify RRULE syntactic → verify pinnedVersionId exists (catalog projection)
→ Assignment.activate() → set activatedAt=now
→ outbox.publish('assignment.activated.v1')
→ enqueue MaterializeWindowsJob(assignmentId, horizonUntil=now+90d)
→ respond 200

3.3 MaterializeWindowsJob (UC-AS-05)

Runs on activation AND daily cron (0 */1 * * * — hourly look-ahead).

for each Assignment where state='active' AND (rrule IS NOT NULL OR not yet materialized):
occurrences ← RRULEEngine.occurrencesBetween(
rrule, startDate, now+90d, tenantTZ)
members ← TargetResolver.expand(targets, at=occurrence.start)
existingKeys ← repo.existingKeysForAssignment(assignmentId)
newWindows ← {(a,u,occ) for each (u,occ) not in existingKeys}
in batches of 1,000:
tx:
repo.insertMany(newWindows)
outbox.insertMany(window.opened.v1 events)
commit

Idempotency: unique index on (assignment_id, user_id, occurrence_start).

3.4 WindowTransition consumers

On enrollment.created.v1:

if event.source.kind === 'assignment' AND ref points to our window:
w = repo.findById(ref.windowId)
if w.state == 'open': w.state='in_progress'; w.enrollmentId=event.enrollmentId
outbox.publish('assignment.window.in_progress.v1')
else: log stale transition, metric++

On progress.completion.recorded.v1:

if event.enrollmentId maps to a window we own AND passed==true:
w = repo.findByEnrollmentId(event.enrollmentId)
if w.state in (in_progress, overdue):
w.state='completed'; w.completedAt=event.recordedAt
late = w.completedAt > w.dueAt
outbox.publish('assignment.window.completed.v1', {late})

3.5 OverdueSweeper (UC-AS-08)

every 5 minutes:
cursor-based scan of windows WHERE state IN ('open','in_progress') AND due_at <= now()
for each in batch of 500:
w.state = 'overdue'; w.overdueAt = now()
outbox.publish('assignment.window.overdue.v1')
enqueue EscalationRunner(windowId, trigger='on_overdue')

3.6 ClosedMissedSweeper (UC-AS-09)

every 15 minutes:
scan WHERE state='overdue' AND grace_until <= now()
w.state='closed_missed'; w.closedAt=now()
outbox.publish('assignment.window.closed_missed.v1')

3.7 ReminderDispatcher (UC-AS-10)

Reads windows with open|in_progress|overdue and upcoming reminder triggers, emits notification.dispatch.requested.v1 (owned by notification-service schema), and increments remindersSent.

3.8 EscalationRunner (UC-AS-11)

Stateless worker. Input = (windowId, trigger). Evaluates EscalationPolicy.steps, for each pending step emits assignment.escalation.triggered.v1 with the action payload (notification-service consumes), records escalationLevel.

3.9 DynamicGroupRebind (UC-AS-13)

on tenant.dynamic_group.evaluated.v1:
for each active assignment targeting this groupId:
added = newMembers - previousMembers
removed = previousMembers - newMembers
for each added userId:
synthesise windows for upcoming occurrences within horizon
for each removed userId:
for each open window of (assignmentId, userId): close with reason 'target_removed'

3.10 AI Suggested Assignments (UC-AS-16, S5)

Synchronous admin-facing endpoint. Service calls AI Gateway (authoring/assignment_suggest prompt), returns a proposed CreateAssignmentCommand with aiSuggested=true and aiProvenance. Admin reviews and accepts (HITL). No auto-activation. Acceptance → standard UC-AS-01 flow.

4. Transactional Outbox

Every event publication goes through an outbox pattern to guarantee exactly-once-ish semantics:

tx BEGIN
UPDATE assignment / compliance_window …
INSERT INTO outbox (subject, payload, headers, tenant_id) VALUES (…)
tx COMMIT
────────────────────────────────────
OutboxDispatcher (background):
SELECT … FROM outbox WHERE published_at IS NULL LIMIT 500 FOR UPDATE SKIP LOCKED
publish to NATS JetStream (Ack required)
UPDATE outbox SET published_at = now() WHERE id IN (…)

5. Concurrency Model

  • Assignment aggregate — optimistic locking via version column; 409 on conflict.
  • ComplianceWindow — optimistic locking via version column. Sweepers always WHERE state=expected ensuring CAS.
  • Materializer — advisory lock pg_advisory_xact_lock(hash(assignmentId)) during materialization batch.
  • Escalation & reminders — stateless; idempotent by (windowId, stepLevel, actionHash).

6. Idempotency

OperationKey
CreateAssignmentidempotency:create:{tenantId}:{idempotencyKey} (24h)
Window materializationUnique DB index (assignmentId, userId, occurrenceStart)
Escalation firing(windowId, stepLevel) uniqueness in escalation_log
Reminder dispatch(windowId, reminderTriggerHash) uniqueness
GDPR purgesubjectRequestId

7. Authorization Matrix

ActionRoleNotes
Create assignmenttenant_admin, compliance_admin
Activatesame as create
Pause/resumesame
Edit RRULE / dueOffsetsame, while draft onlyActivated assignments are immutable re: schedule
Read own windowslearnerscoped to userId === me
Read all windowscompliance_admin, manager (scoped to OU)RLS
Compliance reportcompliance_admin, auditor
AI suggestcompliance_admin, tenant_adminrate-limited

8. Error Handling

All handlers emit typed domain errors → mapped to RFC 7807 problem responses (see API_CONTRACTS). Retryable vs non-retryable is explicit:

ErrorRetry?
DuplicateIdempotencyKeyno
InvalidRRULEno
TargetGroupNotFoundno
CourseVersionNotFoundno
OutboxFullyes (backoff)
DependencyUnavailableyes
ConflictVersionMismatchyes (refetch + retry at caller)

9. Extension Points

  • Custom escalation actions — plugin registry keyed by action.kind (reserved for custom_webhook plugin per tenant).
  • Custom reminder channels — delegated to notification-service.
  • RRULE extensions — use BYSETPOS, EXDATE; non-standard extensions rejected at validation.