Skip to main content

Application Logic

:::info Source Sourced from services/search-service/APPLICATION_LOGIC.md in the documentation repo. :::

Companion: DOMAIN_MODEL.md · API_CONTRACTS.md · EVENT_SCHEMAS.md

This document describes the use cases, command/query handlers, ports, adapters, and domain workflows of search-service.

1. Hexagonal Layout

services/search-service/
├── src/
│ ├── domain/ # Pure domain model (no I/O)
│ │ ├── document.ts
│ │ ├── recommendation.ts
│ │ ├── index-policy.ts
│ │ └── ranking.ts
│ ├── application/
│ │ ├── use-cases/
│ │ │ ├── index-document.ts
│ │ │ ├── bulk-index.ts
│ │ │ ├── delete-document.ts
│ │ │ ├── search-query.ts
│ │ │ ├── suggest.ts
│ │ │ ├── recommend-for-user.ts
│ │ │ ├── reindex-tenant.ts
│ │ │ └── rebuild-embeddings.ts
│ │ └── projectors/
│ │ ├── catalog-projector.ts
│ │ ├── authoring-projector.ts
│ │ ├── marketplace-projector.ts
│ │ ├── certification-projector.ts
│ │ └── tenant-projector.ts
│ ├── ports/ # Outbound interface contracts
│ │ ├── lexical-index.port.ts
│ │ ├── vector-index.port.ts
│ │ ├── embedding.port.ts
│ │ ├── event-publisher.port.ts
│ │ ├── recommendation-cache.port.ts
│ │ └── clock.port.ts
│ ├── adapters/
│ │ ├── opensearch/
│ │ ├── pgvector/ # via ai-gateway HTTP client
│ │ ├── nats/
│ │ ├── redis-cache/
│ │ └── ai-gateway-client/
│ ├── interfaces/
│ │ ├── http/ # Fastify routes
│ │ ├── consumers/ # NATS consumers
│ │ └── admin-cli/ # reindex / rebuild commands
│ └── infra/
│ ├── config.ts
│ ├── telemetry.ts
│ └── bootstrap.ts
└── test/

2. Use Cases

2.1 IndexDocument (command)

Trigger: projector receives a domain event. Input: SearchableDocument candidate. Output: void.

class IndexDocumentUseCase {
constructor(
private lex: LexicalIndexPort,
private vec: VectorIndexPort,
private embed: EmbeddingPort,
private clock: ClockPort,
private publisher: EventPublisherPort,
) {}

async execute(cmd: IndexDocumentCommand): Promise<void> {
// 1. Validate invariants D1..D8
validateDocument(cmd.doc);

// 2. Skip stale (if aggregateVersion <= current indexed)
const current = await this.lex.getVersion(cmd.doc.tenantId, cmd.doc.id);
if (current && current >= cmd.doc.source.aggregateVersion) return;

// 3. Generate embedding if content-relevant types and body changed
if (needsEmbedding(cmd.doc)) {
const hash = contentHash(cmd.doc);
if (hash !== cmd.doc.embeddingHash) {
const { vector, modelId } = await this.embed.embed(
buildEmbeddingInput(cmd.doc),
cmd.doc.tenantId,
);
cmd.doc.embedding = vector;
cmd.doc.embeddingModelId = modelId;
cmd.doc.embeddingHash = hash;
}
}

// 4. Dual-write: lexical + vector
await Promise.all([
this.lex.upsert(cmd.doc),
cmd.doc.embedding ? this.vec.upsert(cmd.doc) : Promise.resolve(),
]);

// 5. Emit internal event
await this.publisher.publish({
eventType: 'search.document.indexed',
tenantId: cmd.doc.tenantId,
payload: { id: cmd.doc.id, type: cmd.doc.type, indexedAt: this.clock.now() },
});
}
}

Failure handling: Any failure → projector leaves message unacknowledged; NATS redelivers. After maxDeliver=5 → DLQ. Partial success (lex ok, vec failed) records a needsEmbeddingRebuild=true flag on the doc so a reconciler fixes it asynchronously.

2.2 BulkIndex (command)

Used for backfills. Accepts a stream of docs, batches of 500, parallelism=4. Embedding calls are coalesced via embed.embedBatch(inputs[]).

2.3 DeleteDocument (command)

Soft delete → sets deletedAt, clears body to free index space, retains id and tenantId for tombstone lookups. Hard delete after 30d by background sweeper.

2.4 SearchQuery (query)

Input: SearchQueryInput:

interface SearchQueryInput {
tenantId: TenantId;
actor: Actor;
q?: string;
types?: DocumentType[];
filters?: Filter[];
facets?: string[];
semantic?: 'off' | 'hybrid' | 'semantic-only';
hybridAlpha?: number; // [0..1]
page?: { size: number; cursor?: string };
sort?: SortSpec[];
locale?: BCP47;
}

Execution flow:

  1. Authorization filter built from actor roles → appended to every subquery (visibility IN ..., tenantId = ...).
  2. If semantic = off → lexical OpenSearch query.
  3. If semantic = semantic-only → embed query text → pgvector kNN.
  4. If semantic = hybrid (default for authenticated queries) → parallel lexical + semantic; merge via RRF (reciprocal rank fusion) then re-rank via L2R model.
  5. Apply facet aggregations.
  6. Hydrate snippets via OpenSearch highlight API.
  7. Return page + next cursor.

2.5 Suggest (query)

Completion suggester: OpenSearch completion field fed from title + top-K tags. Results filtered by tenant + visibility. Cached in Redis for 30s by (tenantId, prefix, type).

2.6 RecommendForUser (query)

User profile embedding = mean of last 20 completed course embeddings + user skills vector, cached per user.

2.7 ReindexTenant (command, admin)

Steps:

  1. Create new physical index tenant_X_YYYY-MM-DD.
  2. Call each source service's /snapshot endpoint (paginated).
  3. Apply projector logic to every snapshot row.
  4. Replay NATS from t-5m to catch up.
  5. Atomically swap alias.
  6. Delete old physical index after 1h cooldown.
  7. Publish search.reindex.completed.v1.

2.8 RebuildEmbeddings (command, admin)

Same as reindex but only re-embeds; lexical index untouched.

3. Ports

PortResponsibilityMain methods
LexicalIndexPortOpenSearch CRUD + searchupsert, delete, search, suggest, createIndex, aliasSwap
VectorIndexPortpgvector via ai-gatewayupsert, delete, knn, bulkUpsert
EmbeddingPortGenerate embeddingsembed(input, tenantId), embedBatch(inputs)
EventPublisherPortNATS publishpublish(envelope)
RecommendationCachePortRedisget, set, invalidate
ClockPortTime abstraction for testsnow()
TenantConfigPortPer-tenant policiesgetIndexPolicy(tenantId)

4. Projector Pattern

Every consumed event flows through a projector that:

  1. Validates the event envelope (signature, schema, tenant).
  2. Translates event payload → SearchableDocument candidate.
  3. Invokes IndexDocumentUseCase.
  4. Records the event in the inbox table (idempotency).
class CatalogProjector implements EventConsumer {
async handle(env: EventEnvelope): Promise<void> {
if (await this.inbox.seen(env.eventId)) return; // idempotent

switch (env.eventType) {
case 'catalog.course_version.published':
const doc = mapCoursePublishedToDoc(env.payload, env.tenantId);
await this.indexDocument.execute({ doc });
break;
case 'catalog.course.deleted':
await this.deleteDocument.execute({ id: env.payload.id, tenantId: env.tenantId });
break;
default:
return; // uninterested
}

await this.inbox.record(env.eventId, env.tenantId);
}
}

5. Authorization Filter

Every search/recommend query acquires the caller's access view and converts it to an index filter:

function buildAuthFilter(actor: Actor, tenantId: TenantId): IndexFilter {
return {
and: [
{ term: { tenantId } },
{
or: [
{ term: { visibility: 'public' } },
{ term: { visibility: 'marketplace' } },
{ and: [{ term: { visibility: 'org' } }, { term: { tenantId: actor.tenantId } }] },
{ and: [{ term: { visibility: 'private' } }, { terms: { audiences: actor.roles } }] },
],
},
{ bool: { must_not: { exists: { field: 'deletedAt' } } } },
],
};
}

6. Hybrid Ranking Algorithm

  1. Candidate generation: top 200 lexical BM25 + top 200 semantic kNN.
  2. Merge: RRF with k = 60: score = Σ 1 / (k + rank_i).
  3. Feature extraction: title match, recency, quality, enrollment count, user cohort affinity, locale match.
  4. L2R: LightGBM LambdaMART model hosted via ai-gateway; returns calibrated score.
  5. Diversification: MMR λ=0.7 across taxonomy.
  6. Cutoff: top page.size.

7. Caching Strategy

CacheKeyTTLInvalidation
Query resulthash(tenantId, actorGroup, query)30salways TTL-based
Suggesthash(tenantId, prefix, type)30sTTL
Recommendations(tenantId, userId)1hon progress.completion.recorded.v1
IndexPolicytenantId5mon tenant.policy.updated.v1
Embedding model metadatamodelId1hon ai.embedding.model.rotated.v1

8. Error Model

All commands use a Result<T, DomainError> envelope. Domain errors:

CodeMeaningHTTP mapping
SEARCH_TENANT_UNKNOWNUnknown tenant404
SEARCH_QUERY_TOO_LONGq > 512 chars400
SEARCH_INDEX_UNAVAILABLEOpenSearch outage503 (degraded)
SEARCH_VECTOR_UNAVAILABLEpgvector outage200 with degraded=true metadata
SEARCH_RATE_LIMITEDper-tenant quota exceeded429
SEARCH_DOCUMENT_VERSION_CONFLICTstale projector writeabsorbed silently

9. Request → Response Trace