Skip to main content

search-aggregation-service — Application Logic

Companion: DOMAIN_MODEL · API_CONTRACTS · EVENT_SCHEMAS · DATA_MODEL · SECURITY_MODEL

The application layer hosts commands (projection writes), queries (search/detail/suggest/facets), event consumers (one per upstream topic), and policies (allow-list, ranking, region pinning). It coordinates aggregates and ports; no domain rules live here.

1. Module map

src/application/
├── ports/
├── commands/ # projection writes (idempotent, vector-clock guarded)
├── queries/ # consumer search reads
├── consumers/ # one file per upstream event
└── policies/ # allow-list, ranking, region pinning

Every command and query has a single handler, a single zod-validated input DTO, and a single output DTO. Errors bubble as typed domain errors and map to HTTP / event responses in presentation.

2. Ports (consumed)

PortAdapter(s)Used by
HotelIndexRepositoryHotelIndexRepositoryPgupsert/delete/suppress index entries; queries for hotel detail
RateSnapshotRepositoryRateSnapshotRepositoryPgupsert rate snapshots; lookup cheapest in window
AvailabilityHintRepositoryAvailabilityHintRepositoryPgupsert per-date counts
BoostRuleRepositoryBoostRuleRepositoryPgmanage boost rules
ClickEventRepositoryClickEventRepositoryPgrecord clicks; popularity recompute job
SearchEnginePortOpenSearchEngineAdapter, PostgresFallbackEngineAdapter (degraded)execute query, autocomplete, facets, write to OpenSearch
SearchCachePortSearchCacheRedishot result cache
EventPublisherEventPublisherPubSub (transactional outbox)publish melmastoon.search.* events
EventArchiveReaderEventArchiveBigQueryAdapterreplay events for full reindex
FxSnapshotPortFxSnapshotCacheRedisAdaptercurrency conversion at read time
ProvinceCenterPortProvinceCenterCatalogAdapter (seeded JSON)city → lat/lng/timezone
AnalyticsSinkPortAnalyticsSinkPubSubAdapterpublish click + sampled query events
ClockSystemClock, FixedClocktimestamps
IdGeneratorUlidIdGenerator, SeededIdGenerator (tests)srh_…, clk_…, q_…, brt_…, ibd_…

3. Commands (projection writes)

CommandAggregate(s)Triggered byOutbox events
UpsertHotelIndexEntryHotelIndexEntryproperty.published.v1, .updated.v1, .amenity_set.updated.v1, .photo.added.v1, .photo.removed.v1, .room_type.updated.v1search.projection.updated.v1
SuppressHotelIndexEntryHotelIndexEntryproperty.unpublished.v1, tenant.region_changed.v1 (out of region)search.projection.updated.v1
DeleteHotelIndexEntryHotelIndexEntryproperty.deleted.v1search.projection.updated.v1
PurgeTenantFromIndexHotelIndexEntry, RateSnapshot, AvailabilityHint, BoostRuletenant.deleted.v1search.projection.updated.v1 (per affected property)
UpsertRateSnapshotRateSnapshotpricing.rate_plan.updated.v1, pricing.rate_plan.published.v1search.projection.updated.v1 (light-weight delta on entry's priceFrom*)
UpsertFxSnapshot(cache only)pricing.fx_snapshot.updated.v1none — cache update + cache-invalidation broadcast
UpsertAvailabilityHintAvailabilityHintinventory.allocation.confirmed.v1, .released.v1, inventory.block.created.v1, .released.v1search.projection.updated.v1
RecordClickClickEventAPI POST /search/clickssearch.click.recorded.v1
RecomputePopularityHotelIndexEntryscheduled (every 15 min)search.projection.updated.v1
CreateBoostRuleBoostRuleAPI POST /search/boost-rulessearch.boost_rule.created.v1
ActivateBoostRuleBoostRule, HotelIndexEntryAPI POST /search/boost-rules/{id}:activatesearch.boost_rule.activated.v1, search.projection.updated.v1
StartIndexRebuildIndexBuildAPI POST /search/index:rebuild (admin)search.index.rebuilt.v1 (on completion), search.index.health_alert.v1 (on failure)

3.1 Command pseudocode pattern

Every projection-write command follows the same skeleton: vector-clock check → write Postgres canonical → write OpenSearch (idempotent ID) → outbox row → return new state.

export class UpsertHotelIndexEntryHandler
implements CommandHandler<UpsertHotelIndexEntryInput, HotelIndexEntry> {
constructor(
private readonly entries: HotelIndexRepository,
private readonly engine: SearchEnginePort,
private readonly publisher: EventPublisher,
private readonly cache: SearchCachePort,
private readonly allowList: ProjectionAllowListPolicy,
private readonly ranking: RankingPolicy,
private readonly regionPinning: RegionPinningPolicy,
private readonly clock: Clock,
private readonly ids: IdGenerator,
) {}

async handle(input: UpsertHotelIndexEntryInput): Promise<HotelIndexEntry> {
// 1. Allow-list filter — strips any field not flagged cross_tenant_searchable
const safeInput = this.allowList.filter(input);

// 2. Load existing
const existing = await this.entries.findByPropertyId(safeInput.propertyId);

// 3. Vector-clock guard — last-write-wins
if (existing && !VectorClock.isNewer(safeInput.vectorClock, existing.vectorClock)) {
// stale; counted in metrics; do not throw
this.metrics.staleProjectionDrop.inc({ source: safeInput.source });
return existing;
}

// 4. Build / merge aggregate
const next = HotelIndexEntry.upsert({
...existing?.toSnapshot(),
...safeInput,
lastUpsertedAt: this.clock.nowISO(),
schemaVersion: 1,
});

// 5. Region pinning evaluation
if (!this.regionPinning.isVisible(next)) next.suppress('region_pinned');

// 6. Recompute composite ranking inputs
this.ranking.recompose(next);

// 7. Persist canonical (Postgres) + outbox in same TX
await this.entries.runInTx(async (tx) => {
await tx.upsert(next);
await tx.outbox.append({
eventType: 'melmastoon.search.projection.updated.v1',
aggregateId: next.id,
tenantId: next.tenantId, // preserved for cascade purges
data: { propertyId: next.propertyId, status: next.status, version: next.lastUpsertedAt },
});
});

// 8. Mirror to OpenSearch (idempotent doc ID = propertyId)
await this.engine.indexDoc({
index: 'melmastoon-search-current',
id: next.propertyId,
body: next.toOpenSearchDoc(),
refresh: 'wait_for', // ensure search-after consistency
});

// 9. Targeted cache invalidation
await this.cache.invalidateForProperty(next.propertyId);

return next;
}
}

3.2 Idempotency

  • Inbox dedup by upstream eventId. Re-delivery is a no-op.
  • OpenSearch doc ID = propertyId. Re-index of the same document with same vectorClock is a no-op write (we compare lastUpsertedAt as if_seq_no analogue using the optimistic-concurrency pattern with a doc-level version token).
  • Outbox dedup at the publisher: (aggregateId, eventType, lastUpsertedAt) is the publisher's idempotency key.

3.3 PurgeTenantFromIndex

Triggered by tenant.deleted.v1. Cascade behavior:

  1. Mark all HotelIndexEntry rows with tenantId == event.tenantId status='suppressed'.
  2. Issue OpenSearch delete_by_query: { term: { tenant_id: <id> } } against the live alias.
  3. Soft-delete RateSnapshot, AvailabilityHint, BoostRule, ClickEvent rows for the tenant (retention 30 d for audit; hard-purge job runs after).
  4. Emit search.projection.updated.v1 per affected property and one search.index.health_alert.v1 summary.

4. Queries (read side)

QueryReturnsCachedNotes
ExecuteSearchSearchResultDtoyes (srh:q:<sha256> 60 s)The main meta-search; cross-tenant by design
GetHotelDetailHotelDetailDtoyes (srh:detail:<propertyId>:<currency>:<dateRange> 300 s)Aggregates HotelIndexEntry + per-date RateSnapshot + AvailabilityHint
SuggestSuggestDtoyes (srh:sug:<sha256> 60 s)Edge n-grams; per-locale
FacetsFacetsDtoyes (srh:facets:<sha256> 60 s)OpenSearch aggregations
IndexHealthIndexHealthDtonoDoc count + alias + freshness

4.1 ExecuteSearch flow

client → SearchController.execute(input)
→ ExecuteSearchHandler
→ 1. Validate + canonicalize (zod)
→ 2. Apply RegionPinningPolicy → narrows visible region
→ 3. Cache lookup `srh:q:<hash(canonical)>` → return on hit
→ 4. SearchEnginePort.query(buildOpenSearchDsl(canonical))
↳ OpenSearch DSL composed by RankingPolicy:
function_score { query, functions:[ price, distance, popularity, freshness, boost ] }
↳ post_filter: amenities (cheap), starRating
↳ aggs: amenity_facets, price_band_buckets
→ 5. For each hit, attach RateSnapshot.convert(targetCurrency, fxSnapshot)
↳ FX age stamp included; flag `fxStale` if > 1h
→ 6. AvailabilityHint join (Postgres) for the dates window
↳ properties with no hint OR hint shows 0 across all nights are dropped
→ 7. Persist `SearchQuery` (sampled 1% anon, 100% authed)
→ 8. Cache write (60 s TTL)
→ 9. Emit `search.query.executed.v1` (sampled) via AnalyticsSinkPort
→ 10. Return `SearchResultDto`

4.2 Fallback path (OpenSearch unavailable)

If SearchEnginePort reports unhealthy (≥ 3 consecutive 5xx within 10 s) the circuit opens and PostgresFallbackEngineAdapter is used:

  • Filters: WHERE clauses on Postgres hotel_index_entries.
  • Ranking: deterministic (price ASC, popularity_28d DESC, distance ASC if geo provided). No fuzzy text matching.
  • Response carries degradationLevel: 'pg_fallback' so the UI can warn the user.
  • search.index.health_alert.v1 is emitted on circuit-open and on circuit-close.

4.3 Cursor pagination

  • Cursors are opaque base64 of { "search_after": [...], "queryHash": "<hash>", "expiresAt": <epoch> }.
  • Cursor TTL: 5 min. Expired cursor → MELMASTOON.SEARCH.PAGE_OUT_OF_RANGE.
  • Cursor is bound to the original canonical query hash; mutated filters issue a new cursor.

5. Event consumers

One file per upstream event under src/application/consumers/. All consumers share:

  1. Inbox dedup by eventId (SELECT … FOR UPDATE NOWAIT then insert).
  2. JSON-Schema validation of the envelope against the upstream service's published schema.
  3. Vector-clock check using the version field of the upstream aggregate.
  4. Map → command handler → emit projection event.
  5. ACK on success, NACK with backoff on retryable errors, DLQ after 5 attempts.
ConsumerTopicSide effect
PropertyPublishedConsumermelmastoon.property.published.v1UpsertHotelIndexEntry
PropertyUpdatedConsumermelmastoon.property.updated.v1UpsertHotelIndexEntry (delta only)
PropertyUnpublishedConsumermelmastoon.property.unpublished.v1SuppressHotelIndexEntry
PropertyDeletedConsumermelmastoon.property.deleted.v1DeleteHotelIndexEntry
PropertyAmenitySetUpdatedConsumermelmastoon.property.amenity_set.updated.v1UpsertHotelIndexEntry (amenities only)
PropertyPhotoAddedConsumermelmastoon.property.photo.added.v1UpsertHotelIndexEntry (hero photo refresh)
RoomTypeUpdatedConsumermelmastoon.property.room_type.updated.v1UpsertHotelIndexEntry (capacity, occupancy hint)
RatePlanUpdatedConsumermelmastoon.pricing.rate_plan.updated.v1UpsertRateSnapshot per affected date
FxSnapshotUpdatedConsumermelmastoon.pricing.fx_snapshot.updated.v1UpsertFxSnapshot cache; broadcast cache invalidation
InventoryAllocationConfirmedConsumermelmastoon.inventory.allocation.confirmed.v1UpsertAvailabilityHint (decrement)
InventoryAllocationReleasedConsumermelmastoon.inventory.allocation.released.v1UpsertAvailabilityHint (increment)
InventoryBlockCreatedConsumermelmastoon.inventory.block.created.v1UpsertAvailabilityHint (decrement by block size)
InventoryBlockReleasedConsumermelmastoon.inventory.block.released.v1UpsertAvailabilityHint (increment)
TenantDeletedConsumermelmastoon.tenant.deleted.v1PurgeTenantFromIndex
TenantRegionChangedConsumermelmastoon.tenant.region_changed.v1per-property re-evaluation of RegionPinningPolicy

5.1 Out-of-order handling

Upstream events may arrive out of order under network partitions or replay. The pattern across all consumers:

  • Per upstream service, an aggregateVersion (e.g., event.version) is the canonical clock for that service.
  • The projection's vectorClock keeps the maximum-seen value for each upstream service.
  • Stale events (lower vector clock) are dropped silently and counted in search_projection_drop_total{source}.
  • Tie (same vectorClock): later occurredAt wins; truly-equal: server arrival order wins (last write).

6. Scheduled jobs

JobCadencePurpose
recompute-popularityevery 15 minSlide popularity score window using ClickEventRepository aggregates
expire-boost-rulesevery 5 minMove active → expired for rules past expiresAt
refresh-fx-snapshotsevery 5 minRefresh FX cache from pricing-service if last fx_snapshot.updated.v1 is stale
nightly-projection-auditor02:00 region-localWalk Postgres canonical projection, assert no forbidden fields, no unpublished refs; alert on any finding
nightly-anonymize-search-queries03:00 region-localDrop text and userBucket for SearchQuery rows older than 30 d
weekly-old-index-pruneSunday 04:00Delete OpenSearch indices behind the alias older than 7 d

7. Sagas / cross-service coordination

This service does not orchestrate sagas. It is purely a conformist consumer of upstream events. All cross-service writes happen in the upstream services; this service projects only.

The closest thing to coordination is the IndexBuild flow (§4.10 in DOMAIN_MODEL), which is internally orchestrated:

StartIndexRebuild
→ snapshot current alias
→ create new index with current template
→ EventArchiveReader.streamSince(0) → indexBatch (1 000 docs/batch)
→ catching_up: re-consume live deltas published since snapshot start
→ swap alias atomically (`POST /_aliases { actions: [{remove: …}, {add: …}] }`)
→ keep old index 7 d for rollback
→ emit `search.index.rebuilt.v1`

Failure path: any failure transitions IndexBuild → failed and emits search.index.health_alert.v1 with details; the alias is not swapped, so production traffic stays on the previous index.

8. Error mapping (application → presentation)

Domain errorHTTP code (consumer surface)Notes
ForbiddenFieldInProjectionError500 (internal)Should never surface to consumers; logs P0 — projector bug
StaleProjectionEventErrorn/aInternal-only; counted, not surfaced
InvalidGeoBoundingBoxError422 MELMASTOON.SEARCH.GEO_OUT_OF_BOUNDS
QueryBudgetExceededError422 MELMASTOON.SEARCH.QUERY_INVALID
BoostRuleScopeViolationError403 MELMASTOON.SEARCH.BOOST_RULE_SCOPE_VIOLATIONOperator API only
IndexBuildAlreadyRunningError409 MELMASTOON.SEARCH.INDEX_REBUILD_IN_PROGRESSAdmin API

MELMASTOON.SEARCH.INDEX_UNAVAILABLE (503) is emitted by the controller layer when the circuit is open and Postgres fallback is itself unhealthy (rare).