Application Logic
:::info Source
Sourced from services/catalog-service/APPLICATION_LOGIC.md in the documentation repo.
:::
Companion: DOMAIN_MODEL · EVENT_SCHEMAS · API_CONTRACTS
1. Application Architecture
Hexagonal / Ports & Adapters. Domain layer is pure; all side effects go through ports.
┌───────────────────────────────────────────────────────────────┐
│ Inbound Adapters │
│ HTTP (Fastify) NATS Subscriber Admin CLI │
└─────────┬─────────────────┬───────────────────┬───────────────┘
▼ ▼ ▼
┌───────────────────────────────────────────────────────────────┐
│ Application Layer │
│ UseCases (command handlers) Query handlers (CQRS-lite) │
│ Saga coordinators Event projectors │
└─────────┬──────────────────────────────────────┬──────────────┘
▼ ▼
┌─────────────────────────┐ ┌────────────────────────────────┐
│ Domain Layer │ │ Ports (interfaces) │
│ Course, CourseVersion │ │ CourseRepo, TaxonomyRepo │
│ Taxonomy invariants │ │ EventPublisher, MediaClient │
│ Policies, State machines│ │ Cache, TenantRegistry │
└─────────────────────────┘ └──────────────┬─────────────────┘
▼
┌───────────────────────────────────────────────────────────────┐
│ Outbound Adapters │
│ Postgres Redis NATS Publisher media-service HTTP │
└───────────────────────────────────────────────────────────────┘
2. Use Cases (Command Handlers)
2.1 RegisterCourse (system-internal, from event)
Trigger: authoring.course_draft.published.v1 where no (tenantId, slug) exists.
Inputs : { tenantId, slug, title, description, defaultLocale, authors,
visibility, taxonomy, cover, sourceDraftId, eventId }
Output : { courseId }
Errors : SlugAlreadyExistsError, TenantNotFoundError
Steps:
1. Inbox check: if eventId already processed → return existing courseId.
2. Validate payload (Zod).
3. Load TenantFeatureFlags via TenantRegistry port.
4. If visibility in {'marketplace','public'} and feature flag disabled → downgrade to 'org' + log warning.
5. Begin Tx
6. INSERT Course (id=ulid(), status='active', latestVersionId=null)
7. INSERT outbox(event=catalog.course.registered.v1)
8. INSERT inbox(eventId=input.eventId)
9. Commit Tx
10. Return courseId
2.2 UpdateCourseMetadata (PATCH /courses/{id}/metadata)
Inputs : { courseId, tenantId, actor, If-Match(etag), changes: Partial<Course> }
Output : { course }
Errors : NotFoundError, PreconditionFailedError, ForbiddenError, ValidationError
Steps:
1. Load course, tenant-scoped (RLS).
2. Verify course.etag === If-Match; else 412.
3. Verify actor has catalog.course.edit on tenantId.
4. Merge allowed fields: title, description, cover, taxonomy, tags, defaultLocale.
5. Run domain invariants (slug never changes via this endpoint).
6. Begin Tx
7. UPDATE courses SET …, etag = ulid()
8. INSERT outbox(catalog.course.metadata_updated.v1 with diff)
9. Commit
10. Invalidate cache keys (course:{id}, browse:{tenantId})
2.3 SetVisibility (PATCH /courses/{id}/visibility)
Inputs : { courseId, tenantId, actor, from, to, reason? }
Validation:
- to in {'private','org','marketplace','public'}
- if to in {'marketplace','public'}: feature flag + marketplace.listing exists (via event replay cache)
- actor has catalog.course.visibility
Emits : catalog.course.visibility_changed.v1
2.4 PublishCourseVersion (system-internal, from event)
Trigger: content.play_package.built.v1.
Inputs : { tenantId, courseId, versionLabel, publishedBy, durationMinutes,
locales, moduleSummaries, playPackage:{ id, sha256, format }, eventId }
Preconditions:
- Course exists and status='active'
- versionLabel SemVer > current latest (else create but do not advance latestVersionId)
- playPackage.sha256 MUST equal event payload value (integrity check)
Steps:
1. Inbox check.
2. Load Course.
3. If Course.status='archived' → DLQ with code CATALOG_ARCHIVED_PUBLISH.
4. Begin Tx
5. INSERT course_versions (status='published')
6. IF SemVer(versionLabel) > SemVer(course.latestVersionId.label)
UPDATE courses.latestVersionId, version_count++
7. INSERT outbox(catalog.course_version.published.v1)
8. INSERT inbox(eventId)
9. Commit
10. Warm cache (course:{id}, version:{vid})
2.5 DeprecateCourseVersion / WithdrawCourseVersion
Inputs : { courseVersionId, tenantId, actor, reason? }
State machine:
- Deprecate: published → deprecated
- Withdraw: published | deprecated → withdrawn (terminal)
Steps: guard state; update; outbox; invalidate cache; notify search-service via event.
Side effect: if withdrawn version was latestVersionId, recompute latest to the newest published
sibling (SemVer max). If none exists, latestVersionId = null and Course.status stays 'active'
(cannot be discovered, but not archived).
2.6 ArchiveCourse
Inputs : { courseId, tenantId, actor }
Effect : status='active' → 'archived'; emits catalog.course.archived.v1.
Guard : no pending publishes in the last 24h (event-sourced check against inbox).
2.7 UpsertTaxonomy
Inputs : { namespace, tenantId?|null, tree, expectedVersion? }
Concurrency: optimistic lock on version field.
Authorization:
- tenantId=null → platform-admin only
- tenantId=self → tenant-admin with catalog.taxonomy.edit
Steps:
1. Validate tree (no duplicate paths, max depth 6, ≤ 1000 nodes).
2. If expectedVersion mismatches → 409 with current version in body.
3. Compute diff.
4. UPDATE taxonomies SET tree=$, version=version+1.
5. Emit catalog.taxonomy.updated.v1 with diff.
3. Query Handlers (CQRS-lite)
| Query | Impl | Cache |
|---|---|---|
ListCourses(tenantId, filters, cursor) | SQL on courses + course_versions_summary join | Redis list cache, key browse:{tenantId}:{hash(filters)}, TTL 30 s |
GetCourse(id) | SQL single-row | Redis, key course:{id}, TTL 300 s, invalidated on write |
ListCourseVersions(courseId) | SQL order by published_at | Redis, TTL 60 s |
GetCourseVersion(vid) | SQL single-row | Redis, TTL 3600 s (immutable) |
BrowseByTaxonomy(taxId, nodePath) | SQL via course_taxonomy | Redis, 30 s |
GetTaxonomy(id) | SQL | Redis, 300 s |
4. Saga / Event Choreography
4.1 Publish saga
authoring.course_draft.published.v1
│
├─▶ catalog.RegisterCourse (idempotent)
│ └─▶ emits catalog.course.registered.v1
│ └─▶ search-service indexes
│
content.play_package.built.v1
│
└─▶ catalog.PublishCourseVersion
└─▶ emits catalog.course_version.published.v1
├─▶ search-service re-indexes
├─▶ marketplace-service (if visibility=marketplace)
├─▶ delivery-service prewarms
└─▶ notification-service (author notice)
Compensation: failed PublishCourseVersion → event moves to CATALOG.dlq; manual operator replay via admin endpoint; no rollback of upstream content package.
4.2 Withdraw saga
POST /courses/{id}/versions/{vid}/withdraw
│
└─▶ UPDATE version + outbox
└─▶ catalog.course_version.withdrawn.v1
├─▶ search-service removes
├─▶ delivery-service blocks new launches
└─▶ progress-service marks read-only (no compensation)
5. Inbox / Outbox Pattern
- Outbox table:
catalog.outbox— columnsoutboxId ULID PK, subject, payload JSONB, envelope JSONB, createdAt, publishedAt nullable. - Outbox relay: background worker polls
publishedAt IS NULLevery 200 ms (batched) and publishes to NATS withNats-Msg-Id = outboxId(dedup). On ack,UPDATE publishedAt = now(). - Inbox table:
catalog.inbox— columnseventId ULID PK, subject, processedAt. UNIQUE oneventId. - Consumer contract: handlers MUST be idempotent via inbox table check at the top of the transaction. Duplicate events are no-ops.
6. Concurrency & Locking
- Optimistic concurrency on
Courseviaetag(ULID re-rolled on every write). - Optimistic concurrency on
Taxonomyviaversion: int. - Advisory Postgres lock on
(tenantId, courseId)duringPublishCourseVersionto serialise concurrent version inserts (fast, ≤ 10 ms).
7. Ports (interfaces)
interface CourseRepo {
findById(id: CourseId): Promise<Course | null>;
findBySlug(tenantId: TenantId, slug: string): Promise<Course | null>;
list(tenantId: TenantId, filter: CourseFilter, cursor?: string): Promise<Page<Course>>;
save(course: Course, expectedEtag?: string): Promise<Course>;
archive(id: CourseId): Promise<void>;
}
interface CourseVersionRepo { findById; listByCourse; save; updateStatus; }
interface TaxonomyRepo { findById; findByNamespace; list; save(t, expectedVersion); }
interface EventPublisher { publish(envelope: EventEnvelope): Promise<void>; }
interface MediaClient { resolveCover(ref: MediaRef, tenantId: TenantId): Promise<{ signedUrl, width, height }>; }
interface TenantRegistry { getFlags(tenantId: TenantId): Promise<TenantFeatureFlags>; }
interface Cache { get<T>(k): Promise<T|null>; set(k,v,ttl); del(pattern); }
8. Error Handling Policy
| Class | HTTP | Action |
|---|---|---|
ValidationError | 400 | Return problem+json validation |
NotFoundError | 404 | Return problem+json not_found |
PreconditionFailedError | 412 | Return current etag in body |
ConflictError (slug dup, SemVer regress) | 409 | Return existing resource |
ForbiddenError | 403 | Log authz failure |
TransientError (DB, NATS) | 503 | Retry with jitter; outbox absorbs |
PoisonEventError | n/a | DLQ with metadata |
All errors are logged with traceId, tenantId, courseId?, and categorised for alerting.
9. Retry / Backoff
- HTTP: no automatic retries server-side; clients must retry idempotent reads.
- Outbox: exponential backoff 200 ms, 400, 800, 1.6 s, 3.2 s, 6.4 s (cap 60 s), max 10 attempts; then DLQ.
- Event consumers: NATS durable consumer, max redelivery 5, ack-wait 30 s, on exhaustion → DLQ (
CATALOG.dlq).
10. Cache Invalidation Rules
| On event | Invalidate |
|---|---|
course.metadata_updated.v1 | course:{id}, browse:{tenantId}:*, search:* (nudge search-service) |
course_version.published.v1 | course:{id}, versions:{id}, browse:{tenantId}:* |
course.visibility_changed.v1 | course:{id}, browse:* (all tenants — cross-tenant discoverability affected) |
course_version.withdrawn.v1 | course:{id}, version:{vid}, browse:{tenantId}:* |
taxonomy.updated.v1 | taxonomy:{id}, browse:* |
11. Feature Flags Honored
| Flag | Effect |
|---|---|
feature.marketplace_publish | Allows visibility=marketplace |
feature.public_catalog | Allows visibility=public |
feature.ai_localize_metadata | Enables POST /courses/{id}/metadata/localize |
feature.taxonomy_custom | Allows tenant-scoped taxonomy creation |