Skip to main content

Events

:::info Source Sourced from services/search-service/EVENT_SCHEMAS.md in the documentation repo. :::

Inherits envelope, subject grammar, outbox/inbox, DLQ, and replay semantics from docs/04-event-driven-architecture.md.

Search-service consumes many, publishes few. It is a read-side service and never owns domain truth.

1. Streams & Subjects

1.1 Subjects consumed

SubjectProducerProjectorPurpose
catalog.course_version.published.v1catalog-serviceCatalogProjectorIndex course
catalog.course.deleted.v1catalog-serviceCatalogProjectorTombstone
catalog.course.visibility_changed.v1catalog-serviceCatalogProjectorRe-apply visibility
authoring.block.updated.v1authoring-serviceAuthoringProjectorIndex block content
authoring.lesson.updated.v1authoring-serviceAuthoringProjectorIndex lesson
authoring.lesson.deleted.v1authoring-serviceAuthoringProjectorTombstone
marketplace.listing.approved.v1marketplace-serviceMarketplaceProjectorIndex listing
marketplace.listing.withdrawn.v1marketplace-serviceMarketplaceProjectorTombstone
certification.certificate.issued.v1certification-serviceCertificationProjectorIndex certificate
certification.certificate.revoked.v1certification-serviceCertificationProjectorTombstone
tenant.membership_activated.v1tenant-serviceTenantProjectorIndex user
tenant.membership_deactivated.v1tenant-serviceTenantProjectorTombstone
tenant.policy.updated.v1tenant-serviceTenantProjectorInvalidate IndexPolicy cache
progress.completion.recorded.v1progress-serviceRecommendationProjectorInvalidate rec cache
ai.embedding.model.rotated.v1ai-gateway-serviceRebuildTriggerSchedule embedding rebuild

1.2 Subjects published

SubjectStreamRetention classPurpose
search.document.indexed.v1SEARCHoperationalanalytics/audit
search.document.tombstoned.v1SEARCHoperationalanalytics/audit
search.reindex.started.v1SEARCHoperationalobservability
search.reindex.completed.v1SEARCHoperationalobservability + sync invalidation
search.reindex.failed.v1SEARCHoperationalalerts
search.recommendation.generated.v1SEARCHoperationalanalytics + sync prefetch
search.recommendation.feedback.recorded.v1SEARCHoperationalL2R training pipeline

2. Consumer Configuration

All consumers are durable pull consumers with:

  • ack_policy = explicit
  • max_deliver = 5
  • ack_wait = 60s
  • filter_subject = per above
  • deliver_policy = all (except backfill consumers → by_start_sequence)
  • DLQ subject = <stream>.dlq.search

Per-consumer throughput target: 1k msgs/s.

3. Idempotency

Every projector writes to an inbox row keyed by eventId (ULID) before applying. Unique constraint ensures at-most-once application. See DATA_MODEL.md §4.3.

4. Published Event Schemas

4.1 search.document.indexed.v1

{
"$id": "schemas://search/document/indexed/v1",
"type": "object",
"required": ["id", "type", "sourceService", "sourceAggregateId", "indexedAt"],
"properties": {
"id": { "type": "string" },
"type": { "enum": ["course","lesson","block","listing","user","assignment","certificate"] },
"sourceService": { "type": "string" },
"sourceAggregateId": { "type": "string" },
"aggregateVersion": { "type": "integer", "minimum": 0 },
"indexedAt": { "type": "string", "format": "date-time" },
"hadEmbedding": { "type": "boolean" },
"embeddingModelId": { "type": "string" }
}
}

4.2 search.document.tombstoned.v1

{
"$id": "schemas://search/document/tombstoned/v1",
"type": "object",
"required": ["id", "tombstonedAt", "reason"],
"properties": {
"id": { "type": "string" },
"tombstonedAt": { "type": "string", "format": "date-time" },
"reason": { "enum": ["source-deleted","visibility-demoted","tenant-erasure","policy-restricted"] }
}
}

4.3 search.reindex.started.v1

{
"type": "object",
"required": ["jobId","scope","scopeTargetId","requestedBy","startedAt"],
"properties": {
"jobId": { "type": "string" },
"scope": { "enum": ["tenant","global"] },
"scopeTargetId": { "type": "string" },
"requestedBy": { "type": "string" },
"startedAt": { "type": "string", "format": "date-time" },
"reason": { "type": "string" }
}
}

4.4 search.reindex.completed.v1

{
"type": "object",
"required": ["jobId","documentCount","startedAt","completedAt"],
"properties": {
"jobId": { "type": "string" },
"scope": { "enum": ["tenant","global"] },
"scopeTargetId": { "type": "string" },
"documentCount": { "type": "integer" },
"startedAt": { "type": "string", "format": "date-time" },
"completedAt": { "type": "string", "format": "date-time" },
"durationSec": { "type": "number" },
"embeddedCount": { "type": "integer" }
}
}

4.5 search.reindex.failed.v1

{
"type": "object",
"required": ["jobId","failedAt","error"],
"properties": {
"jobId": { "type": "string" },
"failedAt": { "type": "string", "format": "date-time" },
"phase": { "enum": ["snapshot","replay","alias-swap"] },
"error": { "type": "string" },
"retriable": { "type": "boolean" }
}
}

4.6 search.recommendation.generated.v1

{
"type": "object",
"required": ["generationId","userId","generatedAt","itemCount","modelVersion"],
"properties": {
"generationId": { "type": "string" },
"userId": { "type": "string" },
"context": { "enum": ["home","post-completion","marketplace","next-step"] },
"generatedAt": { "type": "string", "format": "date-time" },
"expiresAt": { "type": "string", "format": "date-time" },
"itemCount": { "type": "integer" },
"modelVersion": { "type": "string" },
"source": { "enum": ["fresh","cached"] }
}
}

4.7 search.recommendation.feedback.recorded.v1

{
"type": "object",
"required": ["generationId","userId","itemId","action","recordedAt"],
"properties": {
"generationId": { "type": "string" },
"userId": { "type": "string" },
"itemId": { "type": "string" },
"action": { "enum": ["click","dismiss","convert","not_interested"] },
"position": { "type": "integer" },
"recordedAt": { "type": "string", "format": "date-time" }
}
}

5. Schema Evolution

  • Breaking change → new vN+1 subject. 90-day dual-publish window. Consumers explicitly migrated.
  • Non-breaking (additive) → bump minor in schemaUri hash but keep subject; validator permits unknown fields.
  • Every schema stored in schema-registry/search/** and referenced via schemaUri in envelope.

6. Replay Policy

Search-service supports two replay flows:

  1. Full reindex — command-triggered, reads from deliver_policy=by_start_sequence=0 on a scratch consumer.
  2. Targeted repair — admin tool replays events for a specific aggregate ID via NATS subject filter + by_start_time.

Both flows log into search.reindex.started/completed/failed for observability.

7. Poison Message Handling

  • After max_deliver=5, the envelope is routed to <stream>.dlq.search.
  • DLQ consumer runs every 5m, writes rows to search_dlq table with error context.
  • Dashboard alert fires when DLQ row count > 0.
  • Manual replay path: admin triggers POST /search/dlq/{id}/replay after a fix.

8. Tenant Isolation of Streams

For the top 10 largest tenants, search-service uses dedicated durable consumers on per-tenant subject partitions (catalog.course_version.published.v1.tenant_01H...). Default tenants share a shared consumer with partition-by-aggregateId ordering.