Application Logic
:::info Source
Sourced from services/search-service/APPLICATION_LOGIC.md in the documentation repo.
:::
Companion: DOMAIN_MODEL.md · API_CONTRACTS.md · EVENT_SCHEMAS.md
This document describes the use cases, command/query handlers, ports, adapters, and domain workflows of search-service.
1. Hexagonal Layout
services/search-service/
├── src/
│ ├── domain/ # Pure domain model (no I/O)
│ │ ├── document.ts
│ │ ├── recommendation.ts
│ │ ├── index-policy.ts
│ │ └── ranking.ts
│ ├── application/
│ │ ├── use-cases/
│ │ │ ├── index-document.ts
│ │ │ ├── bulk-index.ts
│ │ │ ├── delete-document.ts
│ │ │ ├── search-query.ts
│ │ │ ├── suggest.ts
│ │ │ ├── recommend-for-user.ts
│ │ │ ├── reindex-tenant.ts
│ │ │ └── rebuild-embeddings.ts
│ │ └── projectors/
│ │ ├── catalog-projector.ts
│ │ ├── authoring-projector.ts
│ │ ├── marketplace-projector.ts
│ │ ├── certification-projector.ts
│ │ └── tenant-projector.ts
│ ├── ports/ # Outbound interface contracts
│ │ ├── lexical-index.port.ts
│ │ ├── vector-index.port.ts
│ │ ├── embedding.port.ts
│ │ ├── event-publisher.port.ts
│ │ ├── recommendation-cache.port.ts
│ │ └── clock.port.ts
│ ├── adapters/
│ │ ├── opensearch/
│ │ ├── pgvector/ # via ai-gateway HTTP client
│ │ ├── nats/
│ │ ├── redis-cache/
│ │ └── ai-gateway-client/
│ ├── interfaces/
│ │ ├── http/ # Fastify routes
│ │ ├── consumers/ # NATS consumers
│ │ └── admin-cli/ # reindex / rebuild commands
│ └── infra/
│ ├── config.ts
│ ├── telemetry.ts
│ └── bootstrap.ts
└── test/
2. Use Cases
2.1 IndexDocument (command)
Trigger: projector receives a domain event.
Input: SearchableDocument candidate.
Output: void.
class IndexDocumentUseCase {
constructor(
private lex: LexicalIndexPort,
private vec: VectorIndexPort,
private embed: EmbeddingPort,
private clock: ClockPort,
private publisher: EventPublisherPort,
) {}
async execute(cmd: IndexDocumentCommand): Promise<void> {
// 1. Validate invariants D1..D8
validateDocument(cmd.doc);
// 2. Skip stale (if aggregateVersion <= current indexed)
const current = await this.lex.getVersion(cmd.doc.tenantId, cmd.doc.id);
if (current && current >= cmd.doc.source.aggregateVersion) return;
// 3. Generate embedding if content-relevant types and body changed
if (needsEmbedding(cmd.doc)) {
const hash = contentHash(cmd.doc);
if (hash !== cmd.doc.embeddingHash) {
const { vector, modelId } = await this.embed.embed(
buildEmbeddingInput(cmd.doc),
cmd.doc.tenantId,
);
cmd.doc.embedding = vector;
cmd.doc.embeddingModelId = modelId;
cmd.doc.embeddingHash = hash;
}
}
// 4. Dual-write: lexical + vector
await Promise.all([
this.lex.upsert(cmd.doc),
cmd.doc.embedding ? this.vec.upsert(cmd.doc) : Promise.resolve(),
]);
// 5. Emit internal event
await this.publisher.publish({
eventType: 'search.document.indexed',
tenantId: cmd.doc.tenantId,
payload: { id: cmd.doc.id, type: cmd.doc.type, indexedAt: this.clock.now() },
});
}
}
Failure handling: Any failure → projector leaves message unacknowledged; NATS redelivers. After maxDeliver=5 → DLQ. Partial success (lex ok, vec failed) records a needsEmbeddingRebuild=true flag on the doc so a reconciler fixes it asynchronously.
2.2 BulkIndex (command)
Used for backfills. Accepts a stream of docs, batches of 500, parallelism=4. Embedding calls are coalesced via embed.embedBatch(inputs[]).
2.3 DeleteDocument (command)
Soft delete → sets deletedAt, clears body to free index space, retains id and tenantId for tombstone lookups. Hard delete after 30d by background sweeper.
2.4 SearchQuery (query)
Input: SearchQueryInput:
interface SearchQueryInput {
tenantId: TenantId;
actor: Actor;
q?: string;
types?: DocumentType[];
filters?: Filter[];
facets?: string[];
semantic?: 'off' | 'hybrid' | 'semantic-only';
hybridAlpha?: number; // [0..1]
page?: { size: number; cursor?: string };
sort?: SortSpec[];
locale?: BCP47;
}
Execution flow:
- Authorization filter built from actor roles → appended to every subquery (
visibility IN ...,tenantId = ...). - If
semantic = off→ lexical OpenSearch query. - If
semantic = semantic-only→ embed query text → pgvector kNN. - If
semantic = hybrid(default for authenticated queries) → parallel lexical + semantic; merge via RRF (reciprocal rank fusion) then re-rank via L2R model. - Apply facet aggregations.
- Hydrate snippets via OpenSearch highlight API.
- Return page + next cursor.
2.5 Suggest (query)
Completion suggester: OpenSearch completion field fed from title + top-K tags. Results filtered by tenant + visibility. Cached in Redis for 30s by (tenantId, prefix, type).
2.6 RecommendForUser (query)
User profile embedding = mean of last 20 completed course embeddings + user skills vector, cached per user.
2.7 ReindexTenant (command, admin)
Steps:
- Create new physical index
tenant_X_YYYY-MM-DD. - Call each source service's
/snapshotendpoint (paginated). - Apply projector logic to every snapshot row.
- Replay NATS from
t-5mto catch up. - Atomically swap alias.
- Delete old physical index after 1h cooldown.
- Publish
search.reindex.completed.v1.
2.8 RebuildEmbeddings (command, admin)
Same as reindex but only re-embeds; lexical index untouched.
3. Ports
| Port | Responsibility | Main methods |
|---|---|---|
LexicalIndexPort | OpenSearch CRUD + search | upsert, delete, search, suggest, createIndex, aliasSwap |
VectorIndexPort | pgvector via ai-gateway | upsert, delete, knn, bulkUpsert |
EmbeddingPort | Generate embeddings | embed(input, tenantId), embedBatch(inputs) |
EventPublisherPort | NATS publish | publish(envelope) |
RecommendationCachePort | Redis | get, set, invalidate |
ClockPort | Time abstraction for tests | now() |
TenantConfigPort | Per-tenant policies | getIndexPolicy(tenantId) |
4. Projector Pattern
Every consumed event flows through a projector that:
- Validates the event envelope (signature, schema, tenant).
- Translates event payload →
SearchableDocumentcandidate. - Invokes
IndexDocumentUseCase. - Records the event in the
inboxtable (idempotency).
class CatalogProjector implements EventConsumer {
async handle(env: EventEnvelope): Promise<void> {
if (await this.inbox.seen(env.eventId)) return; // idempotent
switch (env.eventType) {
case 'catalog.course_version.published':
const doc = mapCoursePublishedToDoc(env.payload, env.tenantId);
await this.indexDocument.execute({ doc });
break;
case 'catalog.course.deleted':
await this.deleteDocument.execute({ id: env.payload.id, tenantId: env.tenantId });
break;
default:
return; // uninterested
}
await this.inbox.record(env.eventId, env.tenantId);
}
}
5. Authorization Filter
Every search/recommend query acquires the caller's access view and converts it to an index filter:
function buildAuthFilter(actor: Actor, tenantId: TenantId): IndexFilter {
return {
and: [
{ term: { tenantId } },
{
or: [
{ term: { visibility: 'public' } },
{ term: { visibility: 'marketplace' } },
{ and: [{ term: { visibility: 'org' } }, { term: { tenantId: actor.tenantId } }] },
{ and: [{ term: { visibility: 'private' } }, { terms: { audiences: actor.roles } }] },
],
},
{ bool: { must_not: { exists: { field: 'deletedAt' } } } },
],
};
}
6. Hybrid Ranking Algorithm
- Candidate generation: top 200 lexical BM25 + top 200 semantic kNN.
- Merge: RRF with
k = 60:score = Σ 1 / (k + rank_i). - Feature extraction: title match, recency, quality, enrollment count, user cohort affinity, locale match.
- L2R: LightGBM LambdaMART model hosted via ai-gateway; returns calibrated score.
- Diversification: MMR λ=0.7 across taxonomy.
- Cutoff: top
page.size.
7. Caching Strategy
| Cache | Key | TTL | Invalidation |
|---|---|---|---|
| Query result | hash(tenantId, actorGroup, query) | 30s | always TTL-based |
| Suggest | hash(tenantId, prefix, type) | 30s | TTL |
| Recommendations | (tenantId, userId) | 1h | on progress.completion.recorded.v1 |
| IndexPolicy | tenantId | 5m | on tenant.policy.updated.v1 |
| Embedding model metadata | modelId | 1h | on ai.embedding.model.rotated.v1 |
8. Error Model
All commands use a Result<T, DomainError> envelope. Domain errors:
| Code | Meaning | HTTP mapping |
|---|---|---|
SEARCH_TENANT_UNKNOWN | Unknown tenant | 404 |
SEARCH_QUERY_TOO_LONG | q > 512 chars | 400 |
SEARCH_INDEX_UNAVAILABLE | OpenSearch outage | 503 (degraded) |
SEARCH_VECTOR_UNAVAILABLE | pgvector outage | 200 with degraded=true metadata |
SEARCH_RATE_LIMITED | per-tenant quota exceeded | 429 |
SEARCH_DOCUMENT_VERSION_CONFLICT | stale projector write | absorbed silently |