Application Logic
:::info Source
Sourced from services/assignment-service/APPLICATION_LOGIC.md in the documentation repo.
:::
Companion: DOMAIN_MODEL · EVENT_SCHEMAS · API_CONTRACTS
1. Use Cases
| UC | Actor | Use case | Trigger | Slice |
|---|---|---|---|---|
| UC-AS-01 | Admin (HTTP) | Create assignment (draft) | POST /api/v1/assignments | S4 |
| UC-AS-02 | Admin (HTTP) | Activate assignment | POST /{id}/activate | S4 |
| UC-AS-03 | Admin (HTTP) | Pause / resume assignment | POST /{id}/pause | S4 |
| UC-AS-04 | Admin (HTTP) | Add / remove targets | POST /{id}/targets | S4 |
| UC-AS-05 | System (cron) | Materialise compliance windows | RRULEMaterializer job | S4 |
| UC-AS-06 | System (consumer) | Window → in_progress | enrollment.created.v1 | S4 |
| UC-AS-07 | System (consumer) | Window → completed | progress.completion.recorded.v1 | S4 |
| UC-AS-08 | System (cron) | Window → overdue | OverdueSweeper job | S4 |
| UC-AS-09 | System (cron) | Window → closed_missed | ClosedMissedSweeper job | S4 |
| UC-AS-10 | System (scheduler) | Dispatch reminders | ReminderDispatcher job | S4 |
| UC-AS-11 | System (scheduler) | Fire escalation actions | EscalationRunner | S4 |
| UC-AS-12 | Admin / Auditor (HTTP) | Compliance report | GET /compliance-report | S4 |
| UC-AS-13 | System (consumer) | Rebind targets on dynamic-group update | tenant.dynamic_group.evaluated.v1 | S4 |
| UC-AS-14 | System (consumer) | Attach new member to active assignments | tenant.membership_activated.v1 | S4 |
| UC-AS-15 | System (consumer) | GDPR purge | gdpr.subject_request.received.v1 | M3 |
| UC-AS-16 | AI Gateway (sync call) | Suggest assignments | POST /assignments/suggest | S5 |
2. Command Model
All write operations flow through CQRS-style commands → handlers → repositories.
interface CreateAssignmentCommand {
tenantId: TenantId;
createdBy: UserId;
title: I18nString;
description?: I18nString;
courseId: CourseId;
courseVersionPolicy: 'pin' | 'latest';
pinnedVersionId?: CourseVersionId;
targets: AssignmentTarget[];
rrule?: RRULEString;
startDate: ISODate;
dueOffset: ISODuration;
gracePeriod: ISODuration;
escalation: EscalationPolicy;
reminderPolicy: ReminderPolicy;
idempotencyKey: string;
}
interface ActivateAssignmentCommand { id: AssignmentId; tenantId: TenantId; actor: UserId; }
interface PauseAssignmentCommand { id: AssignmentId; tenantId: TenantId; actor: UserId; reason?: string; }
interface AddTargetsCommand { id: AssignmentId; tenantId: TenantId; actor: UserId; add: AssignmentTarget[]; }
3. Handler Flows
3.1 CreateAssignment
HTTP POST → validate body (Zod) → authorize (role: compliance_admin | tenant_admin)
→ deduplicate by idempotencyKey (Redis SETNX 24h)
→ resolve targets (sanity check only; real expansion at activation + materialize)
→ new Assignment(state='draft') → repo.save
→ outbox.publish('assignment.created.v1')
→ respond 201
3.2 ActivateAssignment (critical path)
validate state=draft → verify RRULE syntactic → verify pinnedVersionId exists (catalog projection)
→ Assignment.activate() → set activatedAt=now
→ outbox.publish('assignment.activated.v1')
→ enqueue MaterializeWindowsJob(assignmentId, horizonUntil=now+90d)
→ respond 200
3.3 MaterializeWindowsJob (UC-AS-05)
Runs on activation AND daily cron (0 */1 * * * — hourly look-ahead).
for each Assignment where state='active' AND (rrule IS NOT NULL OR not yet materialized):
occurrences ← RRULEEngine.occurrencesBetween(
rrule, startDate, now+90d, tenantTZ)
members ← TargetResolver.expand(targets, at=occurrence.start)
existingKeys ← repo.existingKeysForAssignment(assignmentId)
newWindows ← {(a,u,occ) for each (u,occ) not in existingKeys}
in batches of 1,000:
tx:
repo.insertMany(newWindows)
outbox.insertMany(window.opened.v1 events)
commit
Idempotency: unique index on (assignment_id, user_id, occurrence_start).
3.4 WindowTransition consumers
On enrollment.created.v1:
if event.source.kind === 'assignment' AND ref points to our window:
w = repo.findById(ref.windowId)
if w.state == 'open': w.state='in_progress'; w.enrollmentId=event.enrollmentId
outbox.publish('assignment.window.in_progress.v1')
else: log stale transition, metric++
On progress.completion.recorded.v1:
if event.enrollmentId maps to a window we own AND passed==true:
w = repo.findByEnrollmentId(event.enrollmentId)
if w.state in (in_progress, overdue):
w.state='completed'; w.completedAt=event.recordedAt
late = w.completedAt > w.dueAt
outbox.publish('assignment.window.completed.v1', {late})
3.5 OverdueSweeper (UC-AS-08)
every 5 minutes:
cursor-based scan of windows WHERE state IN ('open','in_progress') AND due_at <= now()
for each in batch of 500:
w.state = 'overdue'; w.overdueAt = now()
outbox.publish('assignment.window.overdue.v1')
enqueue EscalationRunner(windowId, trigger='on_overdue')
3.6 ClosedMissedSweeper (UC-AS-09)
every 15 minutes:
scan WHERE state='overdue' AND grace_until <= now()
w.state='closed_missed'; w.closedAt=now()
outbox.publish('assignment.window.closed_missed.v1')
3.7 ReminderDispatcher (UC-AS-10)
Reads windows with open|in_progress|overdue and upcoming reminder triggers, emits notification.dispatch.requested.v1 (owned by notification-service schema), and increments remindersSent.
3.8 EscalationRunner (UC-AS-11)
Stateless worker. Input = (windowId, trigger). Evaluates EscalationPolicy.steps, for each pending step emits assignment.escalation.triggered.v1 with the action payload (notification-service consumes), records escalationLevel.
3.9 DynamicGroupRebind (UC-AS-13)
on tenant.dynamic_group.evaluated.v1:
for each active assignment targeting this groupId:
added = newMembers - previousMembers
removed = previousMembers - newMembers
for each added userId:
synthesise windows for upcoming occurrences within horizon
for each removed userId:
for each open window of (assignmentId, userId): close with reason 'target_removed'
3.10 AI Suggested Assignments (UC-AS-16, S5)
Synchronous admin-facing endpoint. Service calls AI Gateway (authoring/assignment_suggest prompt), returns a proposed CreateAssignmentCommand with aiSuggested=true and aiProvenance. Admin reviews and accepts (HITL). No auto-activation. Acceptance → standard UC-AS-01 flow.
4. Transactional Outbox
Every event publication goes through an outbox pattern to guarantee exactly-once-ish semantics:
tx BEGIN
UPDATE assignment / compliance_window …
INSERT INTO outbox (subject, payload, headers, tenant_id) VALUES (…)
tx COMMIT
────────────────────────────────────
OutboxDispatcher (background):
SELECT … FROM outbox WHERE published_at IS NULL LIMIT 500 FOR UPDATE SKIP LOCKED
publish to NATS JetStream (Ack required)
UPDATE outbox SET published_at = now() WHERE id IN (…)
5. Concurrency Model
- Assignment aggregate — optimistic locking via
versioncolumn; 409 on conflict. - ComplianceWindow — optimistic locking via
versioncolumn. Sweepers always WHERE state=expected ensuring CAS. - Materializer — advisory lock
pg_advisory_xact_lock(hash(assignmentId))during materialization batch. - Escalation & reminders — stateless; idempotent by (windowId, stepLevel, actionHash).
6. Idempotency
| Operation | Key |
|---|---|
| CreateAssignment | idempotency:create:{tenantId}:{idempotencyKey} (24h) |
| Window materialization | Unique DB index (assignmentId, userId, occurrenceStart) |
| Escalation firing | (windowId, stepLevel) uniqueness in escalation_log |
| Reminder dispatch | (windowId, reminderTriggerHash) uniqueness |
| GDPR purge | subjectRequestId |
7. Authorization Matrix
| Action | Role | Notes |
|---|---|---|
| Create assignment | tenant_admin, compliance_admin | — |
| Activate | same as create | — |
| Pause/resume | same | — |
| Edit RRULE / dueOffset | same, while draft only | Activated assignments are immutable re: schedule |
| Read own windows | learner | scoped to userId === me |
| Read all windows | compliance_admin, manager (scoped to OU) | RLS |
| Compliance report | compliance_admin, auditor | |
| AI suggest | compliance_admin, tenant_admin | rate-limited |
8. Error Handling
All handlers emit typed domain errors → mapped to RFC 7807 problem responses (see API_CONTRACTS). Retryable vs non-retryable is explicit:
| Error | Retry? |
|---|---|
DuplicateIdempotencyKey | no |
InvalidRRULE | no |
TargetGroupNotFound | no |
CourseVersionNotFound | no |
OutboxFull | yes (backoff) |
DependencyUnavailable | yes |
ConflictVersionMismatch | yes (refetch + retry at caller) |
9. Extension Points
- Custom escalation actions — plugin registry keyed by action.kind (reserved for
custom_webhookplugin per tenant). - Custom reminder channels — delegated to notification-service.
- RRULE extensions — use BYSETPOS, EXDATE; non-standard extensions rejected at validation.