Skip to main content

Event-Driven Architecture

:::info Source Sourced from docs/04-event-driven-architecture.md in the documentation repo. :::

Companion: 02 DDD & Bounded Contexts · 03 Microservices · 05 API Design · 12 Data Models

This document is the authoritative source for NATS subjects, event schemas, outbox/inbox patterns, saga state machines, replay, DLQ, and schema evolution. Every service doc in 03 references this one.

1. Principles

  1. Events are the only approved cross-service integration mechanism. Exceptions (02 §9) limited to tenant-resolution + AI gateway requests, each capped at 1 hop.
  2. At-least-once delivery; exactly-once application. Producers use outbox; consumers use inbox.
  3. Idempotent consumers always. eventId unique-constraint in inbox table.
  4. Versioned subjects + schemas. Breaking changes require a new vN and a dual-publish window.
  5. Tenant-isolated streams where justified. Per-tenant subject-level ACLs; per-tenant stream partitions for largest customers.
  6. Audit + replay over the same log. Analytics + compliance + sync all consume the canonical log.

2. Transport — NATS JetStream

  • Broker: NATS with JetStream persistence, multi-AZ replication.
  • Streams: one per service domain (AUTHORING, DELIVERY, PROGRESS, AI_GATEWAY, SYNC, …).
  • Subjects: {service}.{aggregate}.{event}.v{N} with optional trailing partition suffix.
  • Retention:
Stream classHot (JetStream)Cold archive (S3/R2)
Operational (authoring, delivery, marketplace, billing, catalog, tenant, sync ops)30 days13 months
Regulated (progress, certification, ai_gateway audit, billing audit, identity audit)180 days7 years
Analytics firehose30 days5 years in warehouse
  • Delivery: at-least-once; durable consumer per projector.
  • Ordering: FIFO per subject (partition hash = aggregateId).
  • Replay: consumer can be reset to seq or timestamp.
  • DLQ: {stream}.dlq with poison-message metadata; alert on non-empty.

3. Subject Naming Grammar

subject := service "." aggregate "." event "." version [ "." partition ]
service := "identity" | "tenant" | "catalog" | "authoring" | "content"
| "marketplace" | "billing" | "enrollment" | "assignment"
| "delivery" | "progress" | "assessment" | "certification"
| "notification" | "media" | "search" | "analytics" | "ai" | "sync"
aggregate := snake_case noun (e.g. "course_draft", "play_session")
event := snake_case verb-past-tense (e.g. "registered", "published", "completed")
version := "v" digit+
partition := string (optional; aggregateId hash bucket)

4. Event Envelope (Mandatory)

Every event — domain or integration — uses the following envelope. Schema-registered at /event-schemas/envelope/v1.json.

interface EventEnvelope<T = unknown> {
// Identity
eventId: ULID; // globally unique
eventType: string; // '{service}.{aggregate}.{event}' (no version; carried separately)
eventVersion: number; // 1, 2, …
schemaUri: string; // URI incl. content-hash: 'schemas://authoring/course_draft/published/v1#sha256-XXXX'
// Origin
source: { service: string; instance: string; commit: string };
// Causality
occurredAt: ISODate;
ingestedAt: ISODate;
causationId?: ULID; // the eventId that caused this
correlationId: ULID; // trace group; usually propagated W3C traceparent -> traceId
// Tenancy & actor
tenantId: TenantId;
actor: { type: 'user' | 'system' | 'api_key' | 'service_account'; id: string };
// Payload
payload: T;
// Routing / replay hints
partitionKey: string; // usually aggregateId
// Outbox
outbox?: { dbWriteTs: ISODate; outboxId: ULID };
// Legal hold / compliance
retentionClass: 'operational' | 'regulated' | 'audit';
dataResidency: 'us' | 'eu' | 'me' | 'ap';
}

Every payload is validated against its JSON Schema at produce time and at consume time; mismatch moves the event to the service's DLQ and raises an alert.

5. Outbox Pattern — Producer Side

Every state change that must emit an event uses transactional outbox:

CREATE TABLE outbox (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
occurred_at timestamptz NOT NULL DEFAULT now(),
tenant_id uuid NOT NULL,
topic text NOT NULL,
envelope jsonb NOT NULL, -- full EventEnvelope serialized
published_at timestamptz,
attempts int NOT NULL DEFAULT 0,
last_error text
);
CREATE INDEX outbox_unpublished_idx ON outbox (occurred_at) WHERE published_at IS NULL;

A single OutboxRelay worker per service publishes rows in occurred_at order, marks them published, and retries on failure with exponential backoff. Relay crash is safe because rows are idempotent on eventId.

6. Inbox Pattern — Consumer Side

Every consumer maintains an inbox to deduplicate:

CREATE TABLE inbox (
event_id ulid PRIMARY KEY,
consumer text NOT NULL,
processed_at timestamptz NOT NULL DEFAULT now(),
result text
);

Processing is wrapped in a transaction:

  1. SELECT existing row — if present, skip.
  2. Do work (update domain, emit new outbox events).
  3. INSERT INTO inbox within same transaction.
  4. Commit. On conflict (duplicate event_id), rollback — another worker already processed.

7. Domain vs Integration Events

  • Domain events are facts internal to a context (e.g., authoring.draft.block.added.v1). Shape optimized for that context; other contexts treat them as data, not commands.
  • Integration events are business outcomes contracted across contexts (e.g., marketplace.license.granted.v1, content.play_package.built.v1, progress.completion.recorded.v1). These have the strictest schema-evolution rules and are Pact-tested.

8. Saga State Machines

Every saga is implemented as a state-driven process manager. Saga state is persisted in a Postgres sagas table; timeouts produce explicit timeout events. Compensations are explicit, not "just undo".

8.1 Purchase Saga (owned by marketplace-service)

StepFrom → ToTriggerCompensation on failure
1started → awaiting_paymentmarketplace.order.placed.v1
2awaiting_payment → licensingbilling.payment.succeeded.v1marketplace.order.failed.v1 (on timeout 30 min or billing.payment.failed.v1)
3licensing → enrollingmarketplace.license.granted.v1refund payment; emit marketplace.license.revoked.v1
4enrolling → fulfilledenrollment.created.v1revoke license; refund
5 (terminal-fail)* → failedany compensation completesemit marketplace.order.failed.v1 with reason

8.2 Course Publish Saga (owned by authoring-service)

StepFrom → ToTriggerCompensation
1draft_approved → buildingauthoring.course_draft.published.v1
2building → catalogingcontent.play_package.built.v1discard package artifacts; draft → approved
3cataloging → bundlingcatalog.course_version.published.v1unregister CourseVersion; discard artifacts
4bundling → readycontent.play_package.bundle.published.v1revoke bundle, unregister, discard
5 (terminal-fail)* → failedsaga timeout (15 min) or any compensation completesdraft → editing with explicit error block

8.3 Compliance Window Saga (owned by assignment-service)

StepFrom → ToTriggerCompensation
1— → openRRULE materialization
2open → in_progressenrollment.created.v1 (for that window)
3in_progress → completedprogress.completion.recorded.v1 (passing)
4* → overduetime > dueAtescalation fires
5overdue → closed_missedtime > graceUntil without completionnotify + analytics

8.4 GDPR Erasure Saga (platform-wide)

StepFrom → ToTriggerCompensation
1received → fanning_outgdpr.subject_request.received.v1
2fanning_out → awaiting_acksper-service fan-out
3awaiting_acks → completedall services ACK within 30 demit gdpr.subject_request.completed.v1
fail* → escalatedany service fails or times outoperator intervention queue

Services in scope: identity, tenant, authoring, content, marketplace, billing, enrollment, assignment, delivery, progress, assessment, certification, notification, media, search, analytics, ai-gateway, sync. Each emits gdpr.subject_request.acknowledged.v1 per subject.

8.5 Data Residency Migration Saga (owned by tenant-service)

Freeze writes → snapshot each service → copy to target region → verify checksums → rebuild search + vector indices in target → unfreeze → emit tenant.data_residency.changed.v1. Compensations: unfreeze at source; discard target copies.

8.6 Offline Bundle Publish Saga (owned by content-service)

Build package → sign → encrypt per-(tenant,device) → upload → emit content.play_package.bundle.published.v1 → sync projects delta to target devices → device acks via sync push. Failure: revoke + emit bundle.revoked.v1.

9. Event Catalog with TS Payload Schemas

Representative sample covering integration events (full schemas live in the Git-backed registry; every service doc references them). Every event has an envelope (§4) wrapping the payload shape below. IDs are branded strings, per 12 Data Models.

9.1 Identity

// identity.user.registered.v1
interface UserRegisteredV1 { userId: UserId; primaryEmail: Email; homeTenantId?: TenantId; createdAt: ISODate; registrationSource: 'self' | 'sso_jit' | 'invite' | 'bulk_import'; }
// identity.user.logged_in.v1
interface UserLoggedInV1 { userId: UserId; tenantId: TenantId; deviceId?: DeviceId; amr: string[]; ip: string; ua: string; at: ISODate; }
// identity.device.bound_for_offline.v1
interface DeviceBoundV1 { userId: UserId; tenantId: TenantId; deviceId: DeviceId; fingerprint: string; certExpiresAt: ISODate; }
// identity.session.revoked.v1
interface SessionRevokedV1 { sessionId: SessionId; userId: UserId; reason: 'logout' | 'rotation_reuse' | 'admin_wipe' | 'password_reset' | 'security_incident'; at: ISODate; }
// identity.password.reset_requested.v1
interface PasswordResetRequestedV1 { userId: UserId; at: ISODate; ip: string; ua: string; }

9.2 Tenant

// tenant.org.provisioned.v1
interface OrgProvisionedV1 { tenantId: TenantId; type: 'org' | 'provider' | 'individual' | 'org+provider'; slug: string; homeRegion: 'us'|'eu'|'me'|'ap'; planId: string; createdBy: UserId; createdAt: ISODate; }
// tenant.user.invited.v1
interface UserInvitedV1 { tenantId: TenantId; email: Email; invitedBy: UserId; roleIds: RoleId[]; orgUnitIds: OrgUnitId[]; expiresAt: ISODate; }
// tenant.role.assigned.v1
interface RoleAssignedV1 { tenantId: TenantId; userId: UserId; roleIds: RoleId[]; by: UserId; at: ISODate; }
// tenant.dynamic_group.evaluated.v1
interface DynamicGroupEvaluatedV1 { tenantId: TenantId; groupId: string; size: number; evaluatedAt: ISODate; memberDigest: SHA256; }
// tenant.data_residency.changed.v1
interface DataResidencyChangedV1 { tenantId: TenantId; fromRegion: string; toRegion: string; completedAt: ISODate; }

9.3 Catalog

// catalog.course.registered.v1
interface CourseRegisteredV1 { tenantId: TenantId; courseId: CourseId; slug: string; title: I18nString; defaultLocale: Locale; visibility: 'private'|'org'|'marketplace'|'public'; }
// catalog.course_version.published.v1
interface CourseVersionPublishedV1 { tenantId: TenantId; courseId: CourseId; courseVersionId: CourseVersionId; versionLabel: SemVer; durationMinutes: number; locales: Locale[]; playPackageId: PlayPackageId; publishedAt: ISODate; publishedBy: UserId; }
// catalog.course_version.withdrawn.v1
interface CourseVersionWithdrawnV1 { tenantId: TenantId; courseVersionId: CourseVersionId; reason: string; at: ISODate; }

9.4 Authoring

// authoring.draft.created.v1
interface DraftCreatedV1 { tenantId: TenantId; courseDraftId: CourseDraftId; title: I18nString; defaultLocale: Locale; createdBy: UserId; at: ISODate; }
// authoring.draft.block.added.v1
interface BlockAddedV1 { tenantId: TenantId; courseDraftId: CourseDraftId; lessonId: LessonId; blockId: BlockId; kind: string; status: 'draft' | 'draft_ai'; aiProvenance?: AIProvenance; at: ISODate; }
// authoring.draft.ai.block_generated.v1
interface BlockAIGeneratedV1 { tenantId: TenantId; courseDraftId: CourseDraftId; blockId: BlockId; promptId: string; promptVersion: SemVer; traceId: string; local: boolean; at: ISODate; }
// authoring.draft.ai.block_accepted.v1
interface BlockAIAcceptedV1 { tenantId: TenantId; blockId: BlockId; reviewedBy: UserId; decisionId: string; at: ISODate; }
// authoring.course_draft.published.v1
interface CourseDraftPublishedV1 { tenantId: TenantId; courseDraftId: CourseDraftId; courseId: CourseId; draftVersion: number; publishedBy: UserId; at: ISODate; }

9.5 Content-Packaging

// content.play_package.built.v1
interface PlayPackageBuiltV1 { tenantId: TenantId; playPackageId: PlayPackageId; courseVersionId: CourseVersionId; locale: Locale; hash: SHA256; signature: JWS; builtAt: ISODate; }
// content.play_package.bundle.published.v1
interface BundlePublishedV1 { tenantId: TenantId; bundleId: BundleId; playPackageId: PlayPackageId; sizeBytes: number; signature: JWS; at: ISODate; }
// content.play_package.bundle.license_issued.v1
interface BundleLicenseIssuedV1 { tenantId: TenantId; bundleId: BundleId; enrollmentId: EnrollmentId; userId: UserId; deviceId: DeviceId; expiresAt: ISODate; features: { aiTutor: boolean; assessments: boolean; certificate: boolean; copyDownloadable: boolean }; }
// content.play_package.bundle.revoked.v1
interface BundleRevokedV1 { tenantId: TenantId; bundleId: BundleId; reason: 'withdrawn' | 'license_revoked' | 'tamper' | 'rebuild' | 'manual'; at: ISODate; }
// content.play_package.tamper_detected.v1
interface BundleTamperDetectedV1 { tenantId: TenantId; bundleId: BundleId; deviceId: DeviceId; clientReportedHash: SHA256; expectedHash: SHA256; at: ISODate; }

9.6 Marketplace

// marketplace.listing.approved.v1
interface ListingApprovedV1 { tenantId: TenantId; listingId: ListingId; courseId: CourseId; approvedBy: UserId; at: ISODate; }
// marketplace.order.placed.v1
interface OrderPlacedV1 { tenantId: TenantId; orderId: OrderId; buyerUserId: UserId; lines: { listingId: ListingId; planId: string; qty: number; amountMicro: number; currency: ISO4217 }[]; total: Money; at: ISODate; }
// marketplace.license.granted.v1
interface LicenseGrantedV1 { tenantId: TenantId; licenseId: LicenseId; listingId: ListingId; courseId: CourseId; seats: number; validFrom: ISODate; validUntil?: ISODate; source: 'purchase' | 'gift' | 'manual'; sourceRef: string; }
// marketplace.license.assigned.v1
interface LicenseAssignedV1 { tenantId: TenantId; licenseId: LicenseId; assigneeUserId: UserId; assignedBy: UserId; at: ISODate; }
// marketplace.license.revoked.v1
interface LicenseRevokedV1 { tenantId: TenantId; licenseId: LicenseId; reason: string; at: ISODate; }

9.7 Billing

// billing.payment.succeeded.v1
interface PaymentSucceededV1 { tenantId: TenantId; paymentId: PaymentId; orderId?: OrderId; invoiceId?: InvoiceId; amount: Money; processor: string; processorRef: string; at: ISODate; }
// billing.payment.failed.v1
interface PaymentFailedV1 { tenantId: TenantId; paymentId: PaymentId; reason: string; code: string; at: ISODate; }
// billing.subscription.started.v1
interface SubscriptionStartedV1 { tenantId: TenantId; subscriptionId: SubscriptionId; planId: string; trialEnd?: ISODate; at: ISODate; }
// billing.subscription.changed.v1
interface SubscriptionChangedV1 { tenantId: TenantId; subscriptionId: SubscriptionId; fromPlanId: string; toPlanId: string; at: ISODate; }

9.8 Enrollment

// enrollment.created.v1
interface EnrollmentCreatedV1 { tenantId: TenantId; enrollmentId: EnrollmentId; userId: UserId; courseId: CourseId; courseVersionId: CourseVersionId; source: { kind: 'assignment'|'purchase'|'manual'|'self_signup'; ref: string }; at: ISODate; }
// enrollment.completed.v1
interface EnrollmentCompletedV1 { tenantId: TenantId; enrollmentId: EnrollmentId; completedAt: ISODate; attemptId: AttemptId; score?: number; }
// enrollment.revoked.v1
interface EnrollmentRevokedV1 { tenantId: TenantId; enrollmentId: EnrollmentId; reason: string; at: ISODate; }

9.9 Assignment

// assignment.compliance_window.opened.v1
interface ComplianceWindowOpenedV1 { tenantId: TenantId; assignmentId: AssignmentId; windowId: string; userId: UserId; occurrenceStart: ISODate; dueAt: ISODate; graceUntil: ISODate; }
// assignment.compliance_window.completed.v1
interface ComplianceWindowCompletedV1 { tenantId: TenantId; assignmentId: AssignmentId; windowId: string; userId: UserId; completedAt: ISODate; attemptId: AttemptId; }
// assignment.compliance_window.overdue.v1
interface ComplianceWindowOverdueV1 { tenantId: TenantId; assignmentId: AssignmentId; windowId: string; userId: UserId; graceUntil: ISODate; }
// assignment.recommendation.generated.v1
interface AssignmentRecommendationV1 { tenantId: TenantId; recommendationId: string; scope: { orgUnitId?: OrgUnitId; roleId?: RoleId }; items: { courseId: CourseId; rrule?: RRULEString; rationale: string }[]; aiProvenance: AIProvenance; }

9.10 Delivery

// delivery.play_session.started.v1
interface PlaySessionStartedV1 { tenantId: TenantId; playSessionId: PlaySessionId; userId: UserId; enrollmentId: EnrollmentId; courseVersionId: CourseVersionId; deviceId: DeviceId; offlineMountId?: string; at: ISODate; }
// delivery.navigation.cursor_moved.v1
interface NavigationCursorMovedV1 { tenantId: TenantId; playSessionId: PlaySessionId; cursor: { moduleId: string; lessonId: string; blockId?: string; branchPath?: string[] }; at: ISODate; }
// delivery.assistant.turn.completed.v1
interface AssistantTurnCompletedV1 { tenantId: TenantId; playSessionId: PlaySessionId; turnId: string; promptExcerpt: string; aiProvenance: AIProvenance; rating?: 'helpful' | 'unhelpful'; at: ISODate; }
// delivery.offline.bundle.tamper_detected.v1
interface OfflineTamperV1 { tenantId: TenantId; bundleId: BundleId; deviceId: DeviceId; at: ISODate; }

9.11 Progress

// progress.statement.recorded.v1
interface StatementRecordedV1 { tenantId: TenantId; attemptId: AttemptId; statementId: StatementId; actor: Actor; verb: Verb; object: Activity | StatementRef; result?: Result; timestamp: ISODate; stored: ISODate; cmi5?: { sessionId: string; registration: string }; }
// progress.attempt.closed.v1
interface AttemptClosedV1 { tenantId: TenantId; attemptId: AttemptId; outcome: 'passed'|'failed'|'incomplete'|'abandoned'; score?: number; closedAt: ISODate; }
// progress.completion.recorded.v1
interface CompletionRecordedV1 { tenantId: TenantId; enrollmentId: EnrollmentId; attemptId: AttemptId; outcome: 'passed'|'failed'; score?: number; completedAt: ISODate; }

9.12 Assessment

// assessment.attempt.scored.v1
interface AttemptScoredV1 { tenantId: TenantId; attemptId: AttemptId; quizBankId?: string; scenarioId?: string; scaledScore: number; passed: boolean; durationSeconds: number; at: ISODate; }
// assessment.short_answer.ai_graded.v1
interface ShortAnswerAIGradedV1 { tenantId: TenantId; attemptId: AttemptId; questionId: string; score: number; confidence: number; rationale: string; aiProvenance: AIProvenance; at: ISODate; }

9.13 Certification

// certification.certificate.issued.v1
interface CertificateIssuedV1 { tenantId: TenantId; certificateId: CertificateId; userId: UserId; courseVersionId: CourseVersionId; enrollmentId: EnrollmentId; templateId: string; expiresAt?: ISODate; proof: JWS; at: ISODate; }
// certification.offline_claim.verified.v1
interface OfflineClaimVerifiedV1 { tenantId: TenantId; claimId: string; certificateId: CertificateId; at: ISODate; }
// certification.certificate.revoked.v1
interface CertificateRevokedV1 { tenantId: TenantId; certificateId: CertificateId; reason: string; at: ISODate; }

9.14 Notification

// notification.sent.v1
interface NotificationSentV1 { tenantId: TenantId; notificationId: string; userId: UserId; channel: 'email'|'sms'|'push'|'inapp'|'webhook'; templateKey: string; at: ISODate; }
// notification.failed.v1
interface NotificationFailedV1 { tenantId: TenantId; notificationId: string; reason: string; terminal: boolean; at: ISODate; }

9.15 Media

// media.asset.ready.v1
interface MediaAssetReadyV1 { tenantId: TenantId; assetId: MediaAssetId; kind: 'image'|'audio'|'video'|'document'|'subtitle'|'ai_image'|'ai_audio'; variants: string[]; at: ISODate; }
// media.ai.image.generated.v1
interface MediaAIImageGeneratedV1 { tenantId: TenantId; assetId: MediaAssetId; aiProvenance: AIProvenance; at: ISODate; }
// media.asset.quarantined.v1
interface MediaQuarantinedV1 { tenantId: TenantId; assetId: MediaAssetId; reason: 'virus'|'nsfw'|'copyright'|'manual'; at: ISODate; }
// search.recommendation.generated.v1
interface RecommendationGeneratedV1 { tenantId: TenantId; userId: UserId; scope: string; items: { itemId: string; itemType: string; score: number; reason: string }[]; aiProvenance: AIProvenance; at: ISODate; }

9.17 Analytics

// analytics.export.completed.v1
interface AnalyticsExportCompletedV1 { tenantId: TenantId; jobId: string; format: 'csv'|'parquet'|'jsonl'; rowCount: number; deliveryUrl: string; at: ISODate; }
// analytics.insight.generated.v1
interface AnalyticsInsightV1 { tenantId: TenantId; insightId: string; dashboardId?: string; narrative: string; sources: string[]; aiProvenance: AIProvenance; at: ISODate; }

9.18 AI Gateway

// ai.gateway.call.completed.v1
interface AICallCompletedV1 { tenantId: TenantId; completionId: CompletionId; userId: UserId; promptId?: string; promptVersion?: SemVer; modelId: string; tokens: { in: number; out: number }; costMicroUSD: number; cacheHit: boolean; safety: { input: SafetyVerdict; output: SafetyVerdict }; latencyMs: number; traceId: string; decisionId?: string; at: ISODate; }
// ai.gateway.call.refused.v1
interface AICallRefusedV1 { tenantId: TenantId; userId: UserId; reason: 'safety.input'|'safety.output'|'budget'|'policy'|'provider_unavailable'; category?: string; at: ISODate; }
// ai.inference.local.completed.v1
interface AILocalInferenceV1 { tenantId: TenantId; userId: UserId; deviceId: DeviceId; promptHash: SHA256; modelId: string; latencyMs: number; at: ISODate; }
// ai.safety.flag.raised.v1
interface AISafetyFlagV1 { tenantId: TenantId; userId: UserId; category: 'sexual'|'violence'|'hate'|'self_harm'|'illegal'|'prompt_injection'|'pii'; action: 'warn'|'block'; at: ISODate; }
// ai.budget.exceeded.v1
interface AIBudgetExceededV1 { tenantId: TenantId; period: 'day'|'month'; limitMicroUSD: number; at: ISODate; }

9.19 Sync

// sync.push.applied.v1
interface SyncPushAppliedV1 { tenantId: TenantId; userId: UserId; deviceId: DeviceId; appliedCount: number; cursorLamport: number; at: ISODate; }
// sync.conflict.detected.v1
interface SyncConflictDetectedV1 { tenantId: TenantId; userId: UserId; deviceId: DeviceId; conflictId: string; entityType: string; entityId: string; policy: 'lww' | 'crdt_yjs' | 'server_authoritative'; at: ISODate; }
// sync.conflict.resolved.v1
interface SyncConflictResolvedV1 { tenantId: TenantId; userId: UserId; conflictId: string; resolution: 'kept_server'|'kept_client'|'merged'; resolvedBy: UserId; at: ISODate; }
// sync.device.wiped.v1
interface SyncDeviceWipedV1 { tenantId: TenantId; userId: UserId; deviceId: DeviceId; by: UserId; at: ISODate; }

9.20 Platform Cross-Cutting

// gdpr.subject_request.received.v1
interface GDPRSubjectRequestV1 { tenantId: TenantId; subjectUserId: UserId; kind: 'export' | 'erasure' | 'rectification' | 'restriction' | 'objection'; at: ISODate; }
// gdpr.subject_request.completed.v1
interface GDPRSubjectRequestCompletedV1 { tenantId: TenantId; subjectUserId: UserId; kind: string; completedAt: ISODate; deliveryUrl?: string; }
// audit.merkle.anchored.v1
interface AuditMerkleAnchoredV1 { tenantId?: TenantId; date: ISODate; rootHash: SHA256; anchorRef: string; }

10. Schema Evolution Rules

Change KindAllowed within vNRequired
Add optional fieldyesbump minor; update JSON Schema additionalProperties: false still valid
Add required fieldno (breaking)new vN + dual-publish window
Remove fieldno (breaking)new vN + dual-publish window
Rename fieldno (breaking)new vN
Narrow enumno (breaking)new vN
Widen enumyesbump minor
Change semanticsno (breaking)new vN
Add new event typeyesnew subject
Deprecate eventyes (parallel publish ≥ 1 release cycle)explicit deprecation header

Dual-publish window: producers emit both versions for a minimum of 1 release cycle; consumers declare which versions they accept; CI blocks removal of the old producer until no consumer registers for it.

11. Idempotency Guarantees

  • Producer: outbox keyed by outboxId; same transaction as the domain mutation. No dual-write.
  • Consumer: inbox event_id PRIMARY KEY; duplicate deliveries are silent no-ops.
  • Cross-boundary HTTP: Idempotency-Key required on writes (05 API §10).
  • Sync push: clientMutationId is the idempotency key; repeated submission returns original outcome.

12. Ordering Guarantees

  • Per partition (aggregateId): FIFO. Events for the same aggregate are delivered in order.
  • Across partitions: no ordering guarantee. Sagas must tolerate out-of-order arrival via state machines that wait for causal predecessors.
  • Causation chains: carried via causationId; consumers may buffer briefly if the causation ancestor is missing.

13. Replay & Recovery

  • Every durable consumer can be reset to seq=X or startTime=T.
  • Read-models persist lastEventId watermark and snapshot every N events.
  • Monthly game-day: wipe a non-prod read-model; measure rebuild time; record in runbook.
  • Audit + compliance consumers never replay backwards (append-only semantics).

14. DLQ Playbook

  1. Message arrives on {stream}.dlq after N retries (N = 10, exponential backoff).
  2. On-call receives PagerDuty alert on non-zero DLQ depth.
  3. Triage: schema-invalid → escalate to producer team; consumer-bug → fix + replay; transient external dependency → let normal retry drain; poison content → delete with operator sign-off + audit.
  4. Replay endpoint POST /admin/events/replay (platform-admin only; audited).

15. Stream Partitioning

  • Default: 12 partitions per stream; hash of partitionKey.
  • Per-tenant partitioning for top N tenants (configurable).
  • Backpressure: consumer lag alerts at 60 s, 5 min, 30 min thresholds.

16. Subject-Level ACL (NATS Accounts)

  • Each service has its own JetStream account.
  • Cross-service publishes gated by explicit account grants.
  • Consumer accounts granted read-only on upstream subjects they consume.
  • Tenant-level ACL mirrored in application-layer authorization when webhooks are tenant-subscribed (05 §13).

17. Schema Registry

  • Git repo /event-schemas/ with service/aggregate/event/vN.json.
  • CI validates every published subject against its schema on both produce + consume.
  • Registry publishes a SHA-256 content hash per file; envelope schemaUri references it.
  • Consumer service boot warns if any registered event subject lacks a matching registry entry.

18. Testing

  • Contract tests (Pact): per consumer-producer pair. Pact broker runs as platform infra; CI blocks producer changes that break any registered consumer.
  • Schema lint: every payload validated at publish + consume.
  • Sagas: integration tests with timeout + fault injection for each saga path.
  • Replay tests: curated stream replayed nightly against read-model rebuild; assert idempotency.
  • Chaos: consumer killed mid-batch; on restart asserts no double-apply, no missed events.
  • Load (k6): 10 k events/sec sustained through OutboxRelay on representative hardware.

19. Observability

  • Metrics: outbox_lag_seconds, consumer_lag_messages, dlq_depth, event_publish_rate, event_consume_rate, schema_validation_failures.
  • Traces: traceparent propagated through envelope → consumer → downstream publish; saga spans named after state transitions.
  • Logs: structured JSON; PII redacted; eventId/correlationId/tenantId always included.

20. Why This Design

Events decouple 19 services without RPC-spaghetti. Outbox + inbox patterns guarantee exactly-once application over at-least-once delivery. Versioned subjects give consumers stable contracts. Schema registry catches drift at publish time. Sagas make multi-service workflows explicit and testable. Sync-service consumes the same log to project per-device deltas, meaning offline-first is a natural extension — not a parallel mechanism.