Skip to main content

Application Logic

:::info Source Sourced from services/catalog-service/APPLICATION_LOGIC.md in the documentation repo. :::

Companion: DOMAIN_MODEL · EVENT_SCHEMAS · API_CONTRACTS

1. Application Architecture

Hexagonal / Ports & Adapters. Domain layer is pure; all side effects go through ports.

┌───────────────────────────────────────────────────────────────┐
│ Inbound Adapters │
│ HTTP (Fastify) NATS Subscriber Admin CLI │
└─────────┬─────────────────┬───────────────────┬───────────────┘
▼ ▼ ▼
┌───────────────────────────────────────────────────────────────┐
│ Application Layer │
│ UseCases (command handlers) Query handlers (CQRS-lite) │
│ Saga coordinators Event projectors │
└─────────┬──────────────────────────────────────┬──────────────┘
▼ ▼
┌─────────────────────────┐ ┌────────────────────────────────┐
│ Domain Layer │ │ Ports (interfaces) │
│ Course, CourseVersion │ │ CourseRepo, TaxonomyRepo │
│ Taxonomy invariants │ │ EventPublisher, MediaClient │
│ Policies, State machines│ │ Cache, TenantRegistry │
└─────────────────────────┘ └──────────────┬─────────────────┘

┌───────────────────────────────────────────────────────────────┐
│ Outbound Adapters │
│ Postgres Redis NATS Publisher media-service HTTP │
└───────────────────────────────────────────────────────────────┘

2. Use Cases (Command Handlers)

2.1 RegisterCourse (system-internal, from event)

Trigger: authoring.course_draft.published.v1 where no (tenantId, slug) exists.

Inputs : { tenantId, slug, title, description, defaultLocale, authors,
visibility, taxonomy, cover, sourceDraftId, eventId }
Output : { courseId }
Errors : SlugAlreadyExistsError, TenantNotFoundError

Steps:
1. Inbox check: if eventId already processed → return existing courseId.
2. Validate payload (Zod).
3. Load TenantFeatureFlags via TenantRegistry port.
4. If visibility in {'marketplace','public'} and feature flag disabled → downgrade to 'org' + log warning.
5. Begin Tx
6. INSERT Course (id=ulid(), status='active', latestVersionId=null)
7. INSERT outbox(event=catalog.course.registered.v1)
8. INSERT inbox(eventId=input.eventId)
9. Commit Tx
10. Return courseId

2.2 UpdateCourseMetadata (PATCH /courses/{id}/metadata)

Inputs : { courseId, tenantId, actor, If-Match(etag), changes: Partial<Course> }
Output : { course }
Errors : NotFoundError, PreconditionFailedError, ForbiddenError, ValidationError

Steps:
1. Load course, tenant-scoped (RLS).
2. Verify course.etag === If-Match; else 412.
3. Verify actor has catalog.course.edit on tenantId.
4. Merge allowed fields: title, description, cover, taxonomy, tags, defaultLocale.
5. Run domain invariants (slug never changes via this endpoint).
6. Begin Tx
7. UPDATE courses SET …, etag = ulid()
8. INSERT outbox(catalog.course.metadata_updated.v1 with diff)
9. Commit
10. Invalidate cache keys (course:{id}, browse:{tenantId})

2.3 SetVisibility (PATCH /courses/{id}/visibility)

Inputs : { courseId, tenantId, actor, from, to, reason? }
Validation:
- to in {'private','org','marketplace','public'}
- if to in {'marketplace','public'}: feature flag + marketplace.listing exists (via event replay cache)
- actor has catalog.course.visibility
Emits : catalog.course.visibility_changed.v1

2.4 PublishCourseVersion (system-internal, from event)

Trigger: content.play_package.built.v1.

Inputs : { tenantId, courseId, versionLabel, publishedBy, durationMinutes,
locales, moduleSummaries, playPackage:{ id, sha256, format }, eventId }
Preconditions:
- Course exists and status='active'
- versionLabel SemVer > current latest (else create but do not advance latestVersionId)
- playPackage.sha256 MUST equal event payload value (integrity check)
Steps:
1. Inbox check.
2. Load Course.
3. If Course.status='archived' → DLQ with code CATALOG_ARCHIVED_PUBLISH.
4. Begin Tx
5. INSERT course_versions (status='published')
6. IF SemVer(versionLabel) > SemVer(course.latestVersionId.label)
UPDATE courses.latestVersionId, version_count++
7. INSERT outbox(catalog.course_version.published.v1)
8. INSERT inbox(eventId)
9. Commit
10. Warm cache (course:{id}, version:{vid})

2.5 DeprecateCourseVersion / WithdrawCourseVersion

Inputs : { courseVersionId, tenantId, actor, reason? }
State machine:
- Deprecate: published → deprecated
- Withdraw: published | deprecated → withdrawn (terminal)
Steps: guard state; update; outbox; invalidate cache; notify search-service via event.

Side effect: if withdrawn version was latestVersionId, recompute latest to the newest published
sibling (SemVer max). If none exists, latestVersionId = null and Course.status stays 'active'
(cannot be discovered, but not archived).

2.6 ArchiveCourse

Inputs : { courseId, tenantId, actor }
Effect : status='active' → 'archived'; emits catalog.course.archived.v1.
Guard : no pending publishes in the last 24h (event-sourced check against inbox).

2.7 UpsertTaxonomy

Inputs : { namespace, tenantId?|null, tree, expectedVersion? }
Concurrency: optimistic lock on version field.
Authorization:
- tenantId=null → platform-admin only
- tenantId=self → tenant-admin with catalog.taxonomy.edit
Steps:
1. Validate tree (no duplicate paths, max depth 6, ≤ 1000 nodes).
2. If expectedVersion mismatches → 409 with current version in body.
3. Compute diff.
4. UPDATE taxonomies SET tree=$, version=version+1.
5. Emit catalog.taxonomy.updated.v1 with diff.

3. Query Handlers (CQRS-lite)

QueryImplCache
ListCourses(tenantId, filters, cursor)SQL on courses + course_versions_summary joinRedis list cache, key browse:{tenantId}:{hash(filters)}, TTL 30 s
GetCourse(id)SQL single-rowRedis, key course:{id}, TTL 300 s, invalidated on write
ListCourseVersions(courseId)SQL order by published_atRedis, TTL 60 s
GetCourseVersion(vid)SQL single-rowRedis, TTL 3600 s (immutable)
BrowseByTaxonomy(taxId, nodePath)SQL via course_taxonomyRedis, 30 s
GetTaxonomy(id)SQLRedis, 300 s

4. Saga / Event Choreography

4.1 Publish saga

authoring.course_draft.published.v1

├─▶ catalog.RegisterCourse (idempotent)
│ └─▶ emits catalog.course.registered.v1
│ └─▶ search-service indexes

content.play_package.built.v1

└─▶ catalog.PublishCourseVersion
└─▶ emits catalog.course_version.published.v1
├─▶ search-service re-indexes
├─▶ marketplace-service (if visibility=marketplace)
├─▶ delivery-service prewarms
└─▶ notification-service (author notice)

Compensation: failed PublishCourseVersion → event moves to CATALOG.dlq; manual operator replay via admin endpoint; no rollback of upstream content package.

4.2 Withdraw saga

POST /courses/{id}/versions/{vid}/withdraw

└─▶ UPDATE version + outbox
└─▶ catalog.course_version.withdrawn.v1
├─▶ search-service removes
├─▶ delivery-service blocks new launches
└─▶ progress-service marks read-only (no compensation)

5. Inbox / Outbox Pattern

  • Outbox table: catalog.outbox — columns outboxId ULID PK, subject, payload JSONB, envelope JSONB, createdAt, publishedAt nullable.
  • Outbox relay: background worker polls publishedAt IS NULL every 200 ms (batched) and publishes to NATS with Nats-Msg-Id = outboxId (dedup). On ack, UPDATE publishedAt = now().
  • Inbox table: catalog.inbox — columns eventId ULID PK, subject, processedAt. UNIQUE on eventId.
  • Consumer contract: handlers MUST be idempotent via inbox table check at the top of the transaction. Duplicate events are no-ops.

6. Concurrency & Locking

  • Optimistic concurrency on Course via etag (ULID re-rolled on every write).
  • Optimistic concurrency on Taxonomy via version: int.
  • Advisory Postgres lock on (tenantId, courseId) during PublishCourseVersion to serialise concurrent version inserts (fast, ≤ 10 ms).

7. Ports (interfaces)

interface CourseRepo {
findById(id: CourseId): Promise<Course | null>;
findBySlug(tenantId: TenantId, slug: string): Promise<Course | null>;
list(tenantId: TenantId, filter: CourseFilter, cursor?: string): Promise<Page<Course>>;
save(course: Course, expectedEtag?: string): Promise<Course>;
archive(id: CourseId): Promise<void>;
}
interface CourseVersionRepo { findById; listByCourse; save; updateStatus; }
interface TaxonomyRepo { findById; findByNamespace; list; save(t, expectedVersion); }
interface EventPublisher { publish(envelope: EventEnvelope): Promise<void>; }
interface MediaClient { resolveCover(ref: MediaRef, tenantId: TenantId): Promise<{ signedUrl, width, height }>; }
interface TenantRegistry { getFlags(tenantId: TenantId): Promise<TenantFeatureFlags>; }
interface Cache { get<T>(k): Promise<T|null>; set(k,v,ttl); del(pattern); }

8. Error Handling Policy

ClassHTTPAction
ValidationError400Return problem+json validation
NotFoundError404Return problem+json not_found
PreconditionFailedError412Return current etag in body
ConflictError (slug dup, SemVer regress)409Return existing resource
ForbiddenError403Log authz failure
TransientError (DB, NATS)503Retry with jitter; outbox absorbs
PoisonEventErrorn/aDLQ with metadata

All errors are logged with traceId, tenantId, courseId?, and categorised for alerting.

9. Retry / Backoff

  • HTTP: no automatic retries server-side; clients must retry idempotent reads.
  • Outbox: exponential backoff 200 ms, 400, 800, 1.6 s, 3.2 s, 6.4 s (cap 60 s), max 10 attempts; then DLQ.
  • Event consumers: NATS durable consumer, max redelivery 5, ack-wait 30 s, on exhaustion → DLQ (CATALOG.dlq).

10. Cache Invalidation Rules

On eventInvalidate
course.metadata_updated.v1course:{id}, browse:{tenantId}:*, search:* (nudge search-service)
course_version.published.v1course:{id}, versions:{id}, browse:{tenantId}:*
course.visibility_changed.v1course:{id}, browse:* (all tenants — cross-tenant discoverability affected)
course_version.withdrawn.v1course:{id}, version:{vid}, browse:{tenantId}:*
taxonomy.updated.v1taxonomy:{id}, browse:*

11. Feature Flags Honored

FlagEffect
feature.marketplace_publishAllows visibility=marketplace
feature.public_catalogAllows visibility=public
feature.ai_localize_metadataEnables POST /courses/{id}/metadata/localize
feature.taxonomy_customAllows tenant-scoped taxonomy creation