Event-Driven Architecture
:::info Source
Sourced from docs/04-event-driven-architecture.md in the documentation repo.
:::
Companion: 02 DDD & Bounded Contexts · 03 Microservices · 05 API Design · 12 Data Models
This document is the authoritative source for NATS subjects, event schemas, outbox/inbox patterns, saga state machines, replay, DLQ, and schema evolution. Every service doc in 03 references this one.
1. Principles
- Events are the only approved cross-service integration mechanism. Exceptions (02 §9) limited to tenant-resolution + AI gateway requests, each capped at 1 hop.
- At-least-once delivery; exactly-once application. Producers use outbox; consumers use inbox.
- Idempotent consumers always.
eventIdunique-constraint in inbox table. - Versioned subjects + schemas. Breaking changes require a new
vNand a dual-publish window. - Tenant-isolated streams where justified. Per-tenant subject-level ACLs; per-tenant stream partitions for largest customers.
- Audit + replay over the same log. Analytics + compliance + sync all consume the canonical log.
2. Transport — NATS JetStream
- Broker: NATS with JetStream persistence, multi-AZ replication.
- Streams: one per service domain (
AUTHORING,DELIVERY,PROGRESS,AI_GATEWAY,SYNC, …). - Subjects:
{service}.{aggregate}.{event}.v{N}with optional trailing partition suffix. - Retention:
| Stream class | Hot (JetStream) | Cold archive (S3/R2) |
|---|---|---|
| Operational (authoring, delivery, marketplace, billing, catalog, tenant, sync ops) | 30 days | 13 months |
| Regulated (progress, certification, ai_gateway audit, billing audit, identity audit) | 180 days | 7 years |
| Analytics firehose | 30 days | 5 years in warehouse |
- Delivery: at-least-once; durable consumer per projector.
- Ordering: FIFO per subject (partition hash =
aggregateId). - Replay: consumer can be reset to
seqortimestamp. - DLQ:
{stream}.dlqwith poison-message metadata; alert on non-empty.
3. Subject Naming Grammar
subject := service "." aggregate "." event "." version [ "." partition ]
service := "identity" | "tenant" | "catalog" | "authoring" | "content"
| "marketplace" | "billing" | "enrollment" | "assignment"
| "delivery" | "progress" | "assessment" | "certification"
| "notification" | "media" | "search" | "analytics" | "ai" | "sync"
aggregate := snake_case noun (e.g. "course_draft", "play_session")
event := snake_case verb-past-tense (e.g. "registered", "published", "completed")
version := "v" digit+
partition := string (optional; aggregateId hash bucket)
4. Event Envelope (Mandatory)
Every event — domain or integration — uses the following envelope. Schema-registered at /event-schemas/envelope/v1.json.
interface EventEnvelope<T = unknown> {
// Identity
eventId: ULID; // globally unique
eventType: string; // '{service}.{aggregate}.{event}' (no version; carried separately)
eventVersion: number; // 1, 2, …
schemaUri: string; // URI incl. content-hash: 'schemas://authoring/course_draft/published/v1#sha256-XXXX'
// Origin
source: { service: string; instance: string; commit: string };
// Causality
occurredAt: ISODate;
ingestedAt: ISODate;
causationId?: ULID; // the eventId that caused this
correlationId: ULID; // trace group; usually propagated W3C traceparent -> traceId
// Tenancy & actor
tenantId: TenantId;
actor: { type: 'user' | 'system' | 'api_key' | 'service_account'; id: string };
// Payload
payload: T;
// Routing / replay hints
partitionKey: string; // usually aggregateId
// Outbox
outbox?: { dbWriteTs: ISODate; outboxId: ULID };
// Legal hold / compliance
retentionClass: 'operational' | 'regulated' | 'audit';
dataResidency: 'us' | 'eu' | 'me' | 'ap';
}
Every payload is validated against its JSON Schema at produce time and at consume time; mismatch moves the event to the service's DLQ and raises an alert.
5. Outbox Pattern — Producer Side
Every state change that must emit an event uses transactional outbox:
CREATE TABLE outbox (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
occurred_at timestamptz NOT NULL DEFAULT now(),
tenant_id uuid NOT NULL,
topic text NOT NULL,
envelope jsonb NOT NULL, -- full EventEnvelope serialized
published_at timestamptz,
attempts int NOT NULL DEFAULT 0,
last_error text
);
CREATE INDEX outbox_unpublished_idx ON outbox (occurred_at) WHERE published_at IS NULL;
A single OutboxRelay worker per service publishes rows in occurred_at order, marks them published, and retries on failure with exponential backoff. Relay crash is safe because rows are idempotent on eventId.
6. Inbox Pattern — Consumer Side
Every consumer maintains an inbox to deduplicate:
CREATE TABLE inbox (
event_id ulid PRIMARY KEY,
consumer text NOT NULL,
processed_at timestamptz NOT NULL DEFAULT now(),
result text
);
Processing is wrapped in a transaction:
SELECTexisting row — if present, skip.- Do work (update domain, emit new outbox events).
INSERT INTO inboxwithin same transaction.- Commit. On conflict (duplicate event_id), rollback — another worker already processed.
7. Domain vs Integration Events
- Domain events are facts internal to a context (e.g.,
authoring.draft.block.added.v1). Shape optimized for that context; other contexts treat them as data, not commands. - Integration events are business outcomes contracted across contexts (e.g.,
marketplace.license.granted.v1,content.play_package.built.v1,progress.completion.recorded.v1). These have the strictest schema-evolution rules and are Pact-tested.
8. Saga State Machines
Every saga is implemented as a state-driven process manager. Saga state is persisted in a Postgres sagas table; timeouts produce explicit timeout events. Compensations are explicit, not "just undo".
8.1 Purchase Saga (owned by marketplace-service)
| Step | From → To | Trigger | Compensation on failure |
|---|---|---|---|
| 1 | started → awaiting_payment | marketplace.order.placed.v1 | — |
| 2 | awaiting_payment → licensing | billing.payment.succeeded.v1 | marketplace.order.failed.v1 (on timeout 30 min or billing.payment.failed.v1) |
| 3 | licensing → enrolling | marketplace.license.granted.v1 | refund payment; emit marketplace.license.revoked.v1 |
| 4 | enrolling → fulfilled | enrollment.created.v1 | revoke license; refund |
| 5 (terminal-fail) | * → failed | any compensation completes | emit marketplace.order.failed.v1 with reason |
8.2 Course Publish Saga (owned by authoring-service)
| Step | From → To | Trigger | Compensation |
|---|---|---|---|
| 1 | draft_approved → building | authoring.course_draft.published.v1 | — |
| 2 | building → cataloging | content.play_package.built.v1 | discard package artifacts; draft → approved |
| 3 | cataloging → bundling | catalog.course_version.published.v1 | unregister CourseVersion; discard artifacts |
| 4 | bundling → ready | content.play_package.bundle.published.v1 | revoke bundle, unregister, discard |
| 5 (terminal-fail) | * → failed | saga timeout (15 min) or any compensation completes | draft → editing with explicit error block |
8.3 Compliance Window Saga (owned by assignment-service)
| Step | From → To | Trigger | Compensation |
|---|---|---|---|
| 1 | — → open | RRULE materialization | — |
| 2 | open → in_progress | enrollment.created.v1 (for that window) | — |
| 3 | in_progress → completed | progress.completion.recorded.v1 (passing) | — |
| 4 | * → overdue | time > dueAt | escalation fires |
| 5 | overdue → closed_missed | time > graceUntil without completion | notify + analytics |
8.4 GDPR Erasure Saga (platform-wide)
| Step | From → To | Trigger | Compensation |
|---|---|---|---|
| 1 | received → fanning_out | gdpr.subject_request.received.v1 | — |
| 2 | fanning_out → awaiting_acks | per-service fan-out | — |
| 3 | awaiting_acks → completed | all services ACK within 30 d | emit gdpr.subject_request.completed.v1 |
| fail | * → escalated | any service fails or times out | operator intervention queue |
Services in scope: identity, tenant, authoring, content, marketplace, billing, enrollment, assignment, delivery, progress, assessment, certification, notification, media, search, analytics, ai-gateway, sync. Each emits gdpr.subject_request.acknowledged.v1 per subject.
8.5 Data Residency Migration Saga (owned by tenant-service)
Freeze writes → snapshot each service → copy to target region → verify checksums → rebuild search + vector indices in target → unfreeze → emit tenant.data_residency.changed.v1. Compensations: unfreeze at source; discard target copies.
8.6 Offline Bundle Publish Saga (owned by content-service)
Build package → sign → encrypt per-(tenant,device) → upload → emit content.play_package.bundle.published.v1 → sync projects delta to target devices → device acks via sync push. Failure: revoke + emit bundle.revoked.v1.
9. Event Catalog with TS Payload Schemas
Representative sample covering integration events (full schemas live in the Git-backed registry; every service doc references them). Every event has an envelope (§4) wrapping the payload shape below. IDs are branded strings, per 12 Data Models.
9.1 Identity
// identity.user.registered.v1
interface UserRegisteredV1 { userId: UserId; primaryEmail: Email; homeTenantId?: TenantId; createdAt: ISODate; registrationSource: 'self' | 'sso_jit' | 'invite' | 'bulk_import'; }
// identity.user.logged_in.v1
interface UserLoggedInV1 { userId: UserId; tenantId: TenantId; deviceId?: DeviceId; amr: string[]; ip: string; ua: string; at: ISODate; }
// identity.device.bound_for_offline.v1
interface DeviceBoundV1 { userId: UserId; tenantId: TenantId; deviceId: DeviceId; fingerprint: string; certExpiresAt: ISODate; }
// identity.session.revoked.v1
interface SessionRevokedV1 { sessionId: SessionId; userId: UserId; reason: 'logout' | 'rotation_reuse' | 'admin_wipe' | 'password_reset' | 'security_incident'; at: ISODate; }
// identity.password.reset_requested.v1
interface PasswordResetRequestedV1 { userId: UserId; at: ISODate; ip: string; ua: string; }
9.2 Tenant
// tenant.org.provisioned.v1
interface OrgProvisionedV1 { tenantId: TenantId; type: 'org' | 'provider' | 'individual' | 'org+provider'; slug: string; homeRegion: 'us'|'eu'|'me'|'ap'; planId: string; createdBy: UserId; createdAt: ISODate; }
// tenant.user.invited.v1
interface UserInvitedV1 { tenantId: TenantId; email: Email; invitedBy: UserId; roleIds: RoleId[]; orgUnitIds: OrgUnitId[]; expiresAt: ISODate; }
// tenant.role.assigned.v1
interface RoleAssignedV1 { tenantId: TenantId; userId: UserId; roleIds: RoleId[]; by: UserId; at: ISODate; }
// tenant.dynamic_group.evaluated.v1
interface DynamicGroupEvaluatedV1 { tenantId: TenantId; groupId: string; size: number; evaluatedAt: ISODate; memberDigest: SHA256; }
// tenant.data_residency.changed.v1
interface DataResidencyChangedV1 { tenantId: TenantId; fromRegion: string; toRegion: string; completedAt: ISODate; }
9.3 Catalog
// catalog.course.registered.v1
interface CourseRegisteredV1 { tenantId: TenantId; courseId: CourseId; slug: string; title: I18nString; defaultLocale: Locale; visibility: 'private'|'org'|'marketplace'|'public'; }
// catalog.course_version.published.v1
interface CourseVersionPublishedV1 { tenantId: TenantId; courseId: CourseId; courseVersionId: CourseVersionId; versionLabel: SemVer; durationMinutes: number; locales: Locale[]; playPackageId: PlayPackageId; publishedAt: ISODate; publishedBy: UserId; }
// catalog.course_version.withdrawn.v1
interface CourseVersionWithdrawnV1 { tenantId: TenantId; courseVersionId: CourseVersionId; reason: string; at: ISODate; }
9.4 Authoring
// authoring.draft.created.v1
interface DraftCreatedV1 { tenantId: TenantId; courseDraftId: CourseDraftId; title: I18nString; defaultLocale: Locale; createdBy: UserId; at: ISODate; }
// authoring.draft.block.added.v1
interface BlockAddedV1 { tenantId: TenantId; courseDraftId: CourseDraftId; lessonId: LessonId; blockId: BlockId; kind: string; status: 'draft' | 'draft_ai'; aiProvenance?: AIProvenance; at: ISODate; }
// authoring.draft.ai.block_generated.v1
interface BlockAIGeneratedV1 { tenantId: TenantId; courseDraftId: CourseDraftId; blockId: BlockId; promptId: string; promptVersion: SemVer; traceId: string; local: boolean; at: ISODate; }
// authoring.draft.ai.block_accepted.v1
interface BlockAIAcceptedV1 { tenantId: TenantId; blockId: BlockId; reviewedBy: UserId; decisionId: string; at: ISODate; }
// authoring.course_draft.published.v1
interface CourseDraftPublishedV1 { tenantId: TenantId; courseDraftId: CourseDraftId; courseId: CourseId; draftVersion: number; publishedBy: UserId; at: ISODate; }
9.5 Content-Packaging
// content.play_package.built.v1
interface PlayPackageBuiltV1 { tenantId: TenantId; playPackageId: PlayPackageId; courseVersionId: CourseVersionId; locale: Locale; hash: SHA256; signature: JWS; builtAt: ISODate; }
// content.play_package.bundle.published.v1
interface BundlePublishedV1 { tenantId: TenantId; bundleId: BundleId; playPackageId: PlayPackageId; sizeBytes: number; signature: JWS; at: ISODate; }
// content.play_package.bundle.license_issued.v1
interface BundleLicenseIssuedV1 { tenantId: TenantId; bundleId: BundleId; enrollmentId: EnrollmentId; userId: UserId; deviceId: DeviceId; expiresAt: ISODate; features: { aiTutor: boolean; assessments: boolean; certificate: boolean; copyDownloadable: boolean }; }
// content.play_package.bundle.revoked.v1
interface BundleRevokedV1 { tenantId: TenantId; bundleId: BundleId; reason: 'withdrawn' | 'license_revoked' | 'tamper' | 'rebuild' | 'manual'; at: ISODate; }
// content.play_package.tamper_detected.v1
interface BundleTamperDetectedV1 { tenantId: TenantId; bundleId: BundleId; deviceId: DeviceId; clientReportedHash: SHA256; expectedHash: SHA256; at: ISODate; }
9.6 Marketplace
// marketplace.listing.approved.v1
interface ListingApprovedV1 { tenantId: TenantId; listingId: ListingId; courseId: CourseId; approvedBy: UserId; at: ISODate; }
// marketplace.order.placed.v1
interface OrderPlacedV1 { tenantId: TenantId; orderId: OrderId; buyerUserId: UserId; lines: { listingId: ListingId; planId: string; qty: number; amountMicro: number; currency: ISO4217 }[]; total: Money; at: ISODate; }
// marketplace.license.granted.v1
interface LicenseGrantedV1 { tenantId: TenantId; licenseId: LicenseId; listingId: ListingId; courseId: CourseId; seats: number; validFrom: ISODate; validUntil?: ISODate; source: 'purchase' | 'gift' | 'manual'; sourceRef: string; }
// marketplace.license.assigned.v1
interface LicenseAssignedV1 { tenantId: TenantId; licenseId: LicenseId; assigneeUserId: UserId; assignedBy: UserId; at: ISODate; }
// marketplace.license.revoked.v1
interface LicenseRevokedV1 { tenantId: TenantId; licenseId: LicenseId; reason: string; at: ISODate; }
9.7 Billing
// billing.payment.succeeded.v1
interface PaymentSucceededV1 { tenantId: TenantId; paymentId: PaymentId; orderId?: OrderId; invoiceId?: InvoiceId; amount: Money; processor: string; processorRef: string; at: ISODate; }
// billing.payment.failed.v1
interface PaymentFailedV1 { tenantId: TenantId; paymentId: PaymentId; reason: string; code: string; at: ISODate; }
// billing.subscription.started.v1
interface SubscriptionStartedV1 { tenantId: TenantId; subscriptionId: SubscriptionId; planId: string; trialEnd?: ISODate; at: ISODate; }
// billing.subscription.changed.v1
interface SubscriptionChangedV1 { tenantId: TenantId; subscriptionId: SubscriptionId; fromPlanId: string; toPlanId: string; at: ISODate; }
9.8 Enrollment
// enrollment.created.v1
interface EnrollmentCreatedV1 { tenantId: TenantId; enrollmentId: EnrollmentId; userId: UserId; courseId: CourseId; courseVersionId: CourseVersionId; source: { kind: 'assignment'|'purchase'|'manual'|'self_signup'; ref: string }; at: ISODate; }
// enrollment.completed.v1
interface EnrollmentCompletedV1 { tenantId: TenantId; enrollmentId: EnrollmentId; completedAt: ISODate; attemptId: AttemptId; score?: number; }
// enrollment.revoked.v1
interface EnrollmentRevokedV1 { tenantId: TenantId; enrollmentId: EnrollmentId; reason: string; at: ISODate; }
9.9 Assignment
// assignment.compliance_window.opened.v1
interface ComplianceWindowOpenedV1 { tenantId: TenantId; assignmentId: AssignmentId; windowId: string; userId: UserId; occurrenceStart: ISODate; dueAt: ISODate; graceUntil: ISODate; }
// assignment.compliance_window.completed.v1
interface ComplianceWindowCompletedV1 { tenantId: TenantId; assignmentId: AssignmentId; windowId: string; userId: UserId; completedAt: ISODate; attemptId: AttemptId; }
// assignment.compliance_window.overdue.v1
interface ComplianceWindowOverdueV1 { tenantId: TenantId; assignmentId: AssignmentId; windowId: string; userId: UserId; graceUntil: ISODate; }
// assignment.recommendation.generated.v1
interface AssignmentRecommendationV1 { tenantId: TenantId; recommendationId: string; scope: { orgUnitId?: OrgUnitId; roleId?: RoleId }; items: { courseId: CourseId; rrule?: RRULEString; rationale: string }[]; aiProvenance: AIProvenance; }
9.10 Delivery
// delivery.play_session.started.v1
interface PlaySessionStartedV1 { tenantId: TenantId; playSessionId: PlaySessionId; userId: UserId; enrollmentId: EnrollmentId; courseVersionId: CourseVersionId; deviceId: DeviceId; offlineMountId?: string; at: ISODate; }
// delivery.navigation.cursor_moved.v1
interface NavigationCursorMovedV1 { tenantId: TenantId; playSessionId: PlaySessionId; cursor: { moduleId: string; lessonId: string; blockId?: string; branchPath?: string[] }; at: ISODate; }
// delivery.assistant.turn.completed.v1
interface AssistantTurnCompletedV1 { tenantId: TenantId; playSessionId: PlaySessionId; turnId: string; promptExcerpt: string; aiProvenance: AIProvenance; rating?: 'helpful' | 'unhelpful'; at: ISODate; }
// delivery.offline.bundle.tamper_detected.v1
interface OfflineTamperV1 { tenantId: TenantId; bundleId: BundleId; deviceId: DeviceId; at: ISODate; }
9.11 Progress
// progress.statement.recorded.v1
interface StatementRecordedV1 { tenantId: TenantId; attemptId: AttemptId; statementId: StatementId; actor: Actor; verb: Verb; object: Activity | StatementRef; result?: Result; timestamp: ISODate; stored: ISODate; cmi5?: { sessionId: string; registration: string }; }
// progress.attempt.closed.v1
interface AttemptClosedV1 { tenantId: TenantId; attemptId: AttemptId; outcome: 'passed'|'failed'|'incomplete'|'abandoned'; score?: number; closedAt: ISODate; }
// progress.completion.recorded.v1
interface CompletionRecordedV1 { tenantId: TenantId; enrollmentId: EnrollmentId; attemptId: AttemptId; outcome: 'passed'|'failed'; score?: number; completedAt: ISODate; }
9.12 Assessment
// assessment.attempt.scored.v1
interface AttemptScoredV1 { tenantId: TenantId; attemptId: AttemptId; quizBankId?: string; scenarioId?: string; scaledScore: number; passed: boolean; durationSeconds: number; at: ISODate; }
// assessment.short_answer.ai_graded.v1
interface ShortAnswerAIGradedV1 { tenantId: TenantId; attemptId: AttemptId; questionId: string; score: number; confidence: number; rationale: string; aiProvenance: AIProvenance; at: ISODate; }
9.13 Certification
// certification.certificate.issued.v1
interface CertificateIssuedV1 { tenantId: TenantId; certificateId: CertificateId; userId: UserId; courseVersionId: CourseVersionId; enrollmentId: EnrollmentId; templateId: string; expiresAt?: ISODate; proof: JWS; at: ISODate; }
// certification.offline_claim.verified.v1
interface OfflineClaimVerifiedV1 { tenantId: TenantId; claimId: string; certificateId: CertificateId; at: ISODate; }
// certification.certificate.revoked.v1
interface CertificateRevokedV1 { tenantId: TenantId; certificateId: CertificateId; reason: string; at: ISODate; }
9.14 Notification
// notification.sent.v1
interface NotificationSentV1 { tenantId: TenantId; notificationId: string; userId: UserId; channel: 'email'|'sms'|'push'|'inapp'|'webhook'; templateKey: string; at: ISODate; }
// notification.failed.v1
interface NotificationFailedV1 { tenantId: TenantId; notificationId: string; reason: string; terminal: boolean; at: ISODate; }
9.15 Media
// media.asset.ready.v1
interface MediaAssetReadyV1 { tenantId: TenantId; assetId: MediaAssetId; kind: 'image'|'audio'|'video'|'document'|'subtitle'|'ai_image'|'ai_audio'; variants: string[]; at: ISODate; }
// media.ai.image.generated.v1
interface MediaAIImageGeneratedV1 { tenantId: TenantId; assetId: MediaAssetId; aiProvenance: AIProvenance; at: ISODate; }
// media.asset.quarantined.v1
interface MediaQuarantinedV1 { tenantId: TenantId; assetId: MediaAssetId; reason: 'virus'|'nsfw'|'copyright'|'manual'; at: ISODate; }
9.16 Search
// search.recommendation.generated.v1
interface RecommendationGeneratedV1 { tenantId: TenantId; userId: UserId; scope: string; items: { itemId: string; itemType: string; score: number; reason: string }[]; aiProvenance: AIProvenance; at: ISODate; }
9.17 Analytics
// analytics.export.completed.v1
interface AnalyticsExportCompletedV1 { tenantId: TenantId; jobId: string; format: 'csv'|'parquet'|'jsonl'; rowCount: number; deliveryUrl: string; at: ISODate; }
// analytics.insight.generated.v1
interface AnalyticsInsightV1 { tenantId: TenantId; insightId: string; dashboardId?: string; narrative: string; sources: string[]; aiProvenance: AIProvenance; at: ISODate; }
9.18 AI Gateway
// ai.gateway.call.completed.v1
interface AICallCompletedV1 { tenantId: TenantId; completionId: CompletionId; userId: UserId; promptId?: string; promptVersion?: SemVer; modelId: string; tokens: { in: number; out: number }; costMicroUSD: number; cacheHit: boolean; safety: { input: SafetyVerdict; output: SafetyVerdict }; latencyMs: number; traceId: string; decisionId?: string; at: ISODate; }
// ai.gateway.call.refused.v1
interface AICallRefusedV1 { tenantId: TenantId; userId: UserId; reason: 'safety.input'|'safety.output'|'budget'|'policy'|'provider_unavailable'; category?: string; at: ISODate; }
// ai.inference.local.completed.v1
interface AILocalInferenceV1 { tenantId: TenantId; userId: UserId; deviceId: DeviceId; promptHash: SHA256; modelId: string; latencyMs: number; at: ISODate; }
// ai.safety.flag.raised.v1
interface AISafetyFlagV1 { tenantId: TenantId; userId: UserId; category: 'sexual'|'violence'|'hate'|'self_harm'|'illegal'|'prompt_injection'|'pii'; action: 'warn'|'block'; at: ISODate; }
// ai.budget.exceeded.v1
interface AIBudgetExceededV1 { tenantId: TenantId; period: 'day'|'month'; limitMicroUSD: number; at: ISODate; }
9.19 Sync
// sync.push.applied.v1
interface SyncPushAppliedV1 { tenantId: TenantId; userId: UserId; deviceId: DeviceId; appliedCount: number; cursorLamport: number; at: ISODate; }
// sync.conflict.detected.v1
interface SyncConflictDetectedV1 { tenantId: TenantId; userId: UserId; deviceId: DeviceId; conflictId: string; entityType: string; entityId: string; policy: 'lww' | 'crdt_yjs' | 'server_authoritative'; at: ISODate; }
// sync.conflict.resolved.v1
interface SyncConflictResolvedV1 { tenantId: TenantId; userId: UserId; conflictId: string; resolution: 'kept_server'|'kept_client'|'merged'; resolvedBy: UserId; at: ISODate; }
// sync.device.wiped.v1
interface SyncDeviceWipedV1 { tenantId: TenantId; userId: UserId; deviceId: DeviceId; by: UserId; at: ISODate; }
9.20 Platform Cross-Cutting
// gdpr.subject_request.received.v1
interface GDPRSubjectRequestV1 { tenantId: TenantId; subjectUserId: UserId; kind: 'export' | 'erasure' | 'rectification' | 'restriction' | 'objection'; at: ISODate; }
// gdpr.subject_request.completed.v1
interface GDPRSubjectRequestCompletedV1 { tenantId: TenantId; subjectUserId: UserId; kind: string; completedAt: ISODate; deliveryUrl?: string; }
// audit.merkle.anchored.v1
interface AuditMerkleAnchoredV1 { tenantId?: TenantId; date: ISODate; rootHash: SHA256; anchorRef: string; }
10. Schema Evolution Rules
| Change Kind | Allowed within vN | Required |
|---|---|---|
| Add optional field | yes | bump minor; update JSON Schema additionalProperties: false still valid |
| Add required field | no (breaking) | new vN + dual-publish window |
| Remove field | no (breaking) | new vN + dual-publish window |
| Rename field | no (breaking) | new vN |
| Narrow enum | no (breaking) | new vN |
| Widen enum | yes | bump minor |
| Change semantics | no (breaking) | new vN |
| Add new event type | yes | new subject |
| Deprecate event | yes (parallel publish ≥ 1 release cycle) | explicit deprecation header |
Dual-publish window: producers emit both versions for a minimum of 1 release cycle; consumers declare which versions they accept; CI blocks removal of the old producer until no consumer registers for it.
11. Idempotency Guarantees
- Producer: outbox keyed by
outboxId; same transaction as the domain mutation. No dual-write. - Consumer: inbox
event_id PRIMARY KEY; duplicate deliveries are silent no-ops. - Cross-boundary HTTP:
Idempotency-Keyrequired on writes (05 API §10). - Sync push:
clientMutationIdis the idempotency key; repeated submission returns original outcome.
12. Ordering Guarantees
- Per partition (aggregateId): FIFO. Events for the same aggregate are delivered in order.
- Across partitions: no ordering guarantee. Sagas must tolerate out-of-order arrival via state machines that wait for causal predecessors.
- Causation chains: carried via
causationId; consumers may buffer briefly if the causation ancestor is missing.
13. Replay & Recovery
- Every durable consumer can be reset to
seq=XorstartTime=T. - Read-models persist
lastEventIdwatermark and snapshot every N events. - Monthly game-day: wipe a non-prod read-model; measure rebuild time; record in runbook.
- Audit + compliance consumers never replay backwards (append-only semantics).
14. DLQ Playbook
- Message arrives on
{stream}.dlqafter N retries (N = 10, exponential backoff). - On-call receives PagerDuty alert on non-zero DLQ depth.
- Triage: schema-invalid → escalate to producer team; consumer-bug → fix + replay; transient external dependency → let normal retry drain; poison content → delete with operator sign-off + audit.
- Replay endpoint
POST /admin/events/replay(platform-admin only; audited).
15. Stream Partitioning
- Default: 12 partitions per stream; hash of
partitionKey. - Per-tenant partitioning for top N tenants (configurable).
- Backpressure: consumer lag alerts at 60 s, 5 min, 30 min thresholds.
16. Subject-Level ACL (NATS Accounts)
- Each service has its own JetStream account.
- Cross-service publishes gated by explicit account grants.
- Consumer accounts granted read-only on upstream subjects they consume.
- Tenant-level ACL mirrored in application-layer authorization when webhooks are tenant-subscribed (05 §13).
17. Schema Registry
- Git repo
/event-schemas/withservice/aggregate/event/vN.json. - CI validates every published subject against its schema on both produce + consume.
- Registry publishes a SHA-256 content hash per file; envelope
schemaUrireferences it. - Consumer service boot warns if any registered event subject lacks a matching registry entry.
18. Testing
- Contract tests (Pact): per consumer-producer pair. Pact broker runs as platform infra; CI blocks producer changes that break any registered consumer.
- Schema lint: every payload validated at publish + consume.
- Sagas: integration tests with timeout + fault injection for each saga path.
- Replay tests: curated stream replayed nightly against read-model rebuild; assert idempotency.
- Chaos: consumer killed mid-batch; on restart asserts no double-apply, no missed events.
- Load (k6): 10 k events/sec sustained through OutboxRelay on representative hardware.
19. Observability
- Metrics:
outbox_lag_seconds,consumer_lag_messages,dlq_depth,event_publish_rate,event_consume_rate,schema_validation_failures. - Traces:
traceparentpropagated through envelope → consumer → downstream publish; saga spans named after state transitions. - Logs: structured JSON; PII redacted;
eventId/correlationId/tenantIdalways included.
20. Why This Design
Events decouple 19 services without RPC-spaghetti. Outbox + inbox patterns guarantee exactly-once application over at-least-once delivery. Versioned subjects give consumers stable contracts. Schema registry catches drift at publish time. Sagas make multi-service workflows explicit and testable. Sync-service consumes the same log to project per-device deltas, meaning offline-first is a natural extension — not a parallel mechanism.