search-aggregation-service — Application Logic
Companion: DOMAIN_MODEL · API_CONTRACTS · EVENT_SCHEMAS · DATA_MODEL · SECURITY_MODEL
The application layer hosts commands (projection writes), queries (search/detail/suggest/facets), event consumers (one per upstream topic), and policies (allow-list, ranking, region pinning). It coordinates aggregates and ports; no domain rules live here.
1. Module map
src/application/
├── ports/
├── commands/ # projection writes (idempotent, vector-clock guarded)
├── queries/ # consumer search reads
├── consumers/ # one file per upstream event
└── policies/ # allow-list, ranking, region pinning
Every command and query has a single handler, a single zod-validated input DTO, and a single output DTO. Errors bubble as typed domain errors and map to HTTP / event responses in presentation.
2. Ports (consumed)
| Port | Adapter(s) | Used by |
|---|---|---|
HotelIndexRepository | HotelIndexRepositoryPg | upsert/delete/suppress index entries; queries for hotel detail |
RateSnapshotRepository | RateSnapshotRepositoryPg | upsert rate snapshots; lookup cheapest in window |
AvailabilityHintRepository | AvailabilityHintRepositoryPg | upsert per-date counts |
BoostRuleRepository | BoostRuleRepositoryPg | manage boost rules |
ClickEventRepository | ClickEventRepositoryPg | record clicks; popularity recompute job |
SearchEnginePort | OpenSearchEngineAdapter, PostgresFallbackEngineAdapter (degraded) | execute query, autocomplete, facets, write to OpenSearch |
SearchCachePort | SearchCacheRedis | hot result cache |
EventPublisher | EventPublisherPubSub (transactional outbox) | publish melmastoon.search.* events |
EventArchiveReader | EventArchiveBigQueryAdapter | replay events for full reindex |
FxSnapshotPort | FxSnapshotCacheRedisAdapter | currency conversion at read time |
ProvinceCenterPort | ProvinceCenterCatalogAdapter (seeded JSON) | city → lat/lng/timezone |
AnalyticsSinkPort | AnalyticsSinkPubSubAdapter | publish click + sampled query events |
Clock | SystemClock, FixedClock | timestamps |
IdGenerator | UlidIdGenerator, SeededIdGenerator (tests) | srh_…, clk_…, q_…, brt_…, ibd_… |
3. Commands (projection writes)
| Command | Aggregate(s) | Triggered by | Outbox events |
|---|---|---|---|
UpsertHotelIndexEntry | HotelIndexEntry | property.published.v1, .updated.v1, .amenity_set.updated.v1, .photo.added.v1, .photo.removed.v1, .room_type.updated.v1 | search.projection.updated.v1 |
SuppressHotelIndexEntry | HotelIndexEntry | property.unpublished.v1, tenant.region_changed.v1 (out of region) | search.projection.updated.v1 |
DeleteHotelIndexEntry | HotelIndexEntry | property.deleted.v1 | search.projection.updated.v1 |
PurgeTenantFromIndex | HotelIndexEntry, RateSnapshot, AvailabilityHint, BoostRule | tenant.deleted.v1 | search.projection.updated.v1 (per affected property) |
UpsertRateSnapshot | RateSnapshot | pricing.rate_plan.updated.v1, pricing.rate_plan.published.v1 | search.projection.updated.v1 (light-weight delta on entry's priceFrom*) |
UpsertFxSnapshot | (cache only) | pricing.fx_snapshot.updated.v1 | none — cache update + cache-invalidation broadcast |
UpsertAvailabilityHint | AvailabilityHint | inventory.allocation.confirmed.v1, .released.v1, inventory.block.created.v1, .released.v1 | search.projection.updated.v1 |
RecordClick | ClickEvent | API POST /search/clicks | search.click.recorded.v1 |
RecomputePopularity | HotelIndexEntry | scheduled (every 15 min) | search.projection.updated.v1 |
CreateBoostRule | BoostRule | API POST /search/boost-rules | search.boost_rule.created.v1 |
ActivateBoostRule | BoostRule, HotelIndexEntry | API POST /search/boost-rules/{id}:activate | search.boost_rule.activated.v1, search.projection.updated.v1 |
StartIndexRebuild | IndexBuild | API POST /search/index:rebuild (admin) | search.index.rebuilt.v1 (on completion), search.index.health_alert.v1 (on failure) |
3.1 Command pseudocode pattern
Every projection-write command follows the same skeleton: vector-clock check → write Postgres canonical → write OpenSearch (idempotent ID) → outbox row → return new state.
export class UpsertHotelIndexEntryHandler
implements CommandHandler<UpsertHotelIndexEntryInput, HotelIndexEntry> {
constructor(
private readonly entries: HotelIndexRepository,
private readonly engine: SearchEnginePort,
private readonly publisher: EventPublisher,
private readonly cache: SearchCachePort,
private readonly allowList: ProjectionAllowListPolicy,
private readonly ranking: RankingPolicy,
private readonly regionPinning: RegionPinningPolicy,
private readonly clock: Clock,
private readonly ids: IdGenerator,
) {}
async handle(input: UpsertHotelIndexEntryInput): Promise<HotelIndexEntry> {
// 1. Allow-list filter — strips any field not flagged cross_tenant_searchable
const safeInput = this.allowList.filter(input);
// 2. Load existing
const existing = await this.entries.findByPropertyId(safeInput.propertyId);
// 3. Vector-clock guard — last-write-wins
if (existing && !VectorClock.isNewer(safeInput.vectorClock, existing.vectorClock)) {
// stale; counted in metrics; do not throw
this.metrics.staleProjectionDrop.inc({ source: safeInput.source });
return existing;
}
// 4. Build / merge aggregate
const next = HotelIndexEntry.upsert({
...existing?.toSnapshot(),
...safeInput,
lastUpsertedAt: this.clock.nowISO(),
schemaVersion: 1,
});
// 5. Region pinning evaluation
if (!this.regionPinning.isVisible(next)) next.suppress('region_pinned');
// 6. Recompute composite ranking inputs
this.ranking.recompose(next);
// 7. Persist canonical (Postgres) + outbox in same TX
await this.entries.runInTx(async (tx) => {
await tx.upsert(next);
await tx.outbox.append({
eventType: 'melmastoon.search.projection.updated.v1',
aggregateId: next.id,
tenantId: next.tenantId, // preserved for cascade purges
data: { propertyId: next.propertyId, status: next.status, version: next.lastUpsertedAt },
});
});
// 8. Mirror to OpenSearch (idempotent doc ID = propertyId)
await this.engine.indexDoc({
index: 'melmastoon-search-current',
id: next.propertyId,
body: next.toOpenSearchDoc(),
refresh: 'wait_for', // ensure search-after consistency
});
// 9. Targeted cache invalidation
await this.cache.invalidateForProperty(next.propertyId);
return next;
}
}
3.2 Idempotency
- Inbox dedup by upstream
eventId. Re-delivery is a no-op. - OpenSearch doc ID = propertyId. Re-index of the same document with same
vectorClockis a no-op write (we comparelastUpsertedAtasif_seq_noanalogue using the optimistic-concurrency pattern with a doc-levelversiontoken). - Outbox dedup at the publisher:
(aggregateId, eventType, lastUpsertedAt)is the publisher's idempotency key.
3.3 PurgeTenantFromIndex
Triggered by tenant.deleted.v1. Cascade behavior:
- Mark all
HotelIndexEntryrows withtenantId == event.tenantIdstatus='suppressed'. - Issue OpenSearch
delete_by_query: { term: { tenant_id: <id> } }against the live alias. - Soft-delete
RateSnapshot,AvailabilityHint,BoostRule,ClickEventrows for the tenant (retention 30 d for audit; hard-purge job runs after). - Emit
search.projection.updated.v1per affected property and onesearch.index.health_alert.v1summary.
4. Queries (read side)
| Query | Returns | Cached | Notes |
|---|---|---|---|
ExecuteSearch | SearchResultDto | yes (srh:q:<sha256> 60 s) | The main meta-search; cross-tenant by design |
GetHotelDetail | HotelDetailDto | yes (srh:detail:<propertyId>:<currency>:<dateRange> 300 s) | Aggregates HotelIndexEntry + per-date RateSnapshot + AvailabilityHint |
Suggest | SuggestDto | yes (srh:sug:<sha256> 60 s) | Edge n-grams; per-locale |
Facets | FacetsDto | yes (srh:facets:<sha256> 60 s) | OpenSearch aggregations |
IndexHealth | IndexHealthDto | no | Doc count + alias + freshness |
4.1 ExecuteSearch flow
client → SearchController.execute(input)
→ ExecuteSearchHandler
→ 1. Validate + canonicalize (zod)
→ 2. Apply RegionPinningPolicy → narrows visible region
→ 3. Cache lookup `srh:q:<hash(canonical)>` → return on hit
→ 4. SearchEnginePort.query(buildOpenSearchDsl(canonical))
↳ OpenSearch DSL composed by RankingPolicy:
function_score { query, functions:[ price, distance, popularity, freshness, boost ] }
↳ post_filter: amenities (cheap), starRating
↳ aggs: amenity_facets, price_band_buckets
→ 5. For each hit, attach RateSnapshot.convert(targetCurrency, fxSnapshot)
↳ FX age stamp included; flag `fxStale` if > 1h
→ 6. AvailabilityHint join (Postgres) for the dates window
↳ properties with no hint OR hint shows 0 across all nights are dropped
→ 7. Persist `SearchQuery` (sampled 1% anon, 100% authed)
→ 8. Cache write (60 s TTL)
→ 9. Emit `search.query.executed.v1` (sampled) via AnalyticsSinkPort
→ 10. Return `SearchResultDto`
4.2 Fallback path (OpenSearch unavailable)
If SearchEnginePort reports unhealthy (≥ 3 consecutive 5xx within 10 s) the circuit opens and PostgresFallbackEngineAdapter is used:
- Filters: WHERE clauses on Postgres
hotel_index_entries. - Ranking: deterministic
(price ASC, popularity_28d DESC, distance ASC if geo provided). No fuzzy text matching. - Response carries
degradationLevel: 'pg_fallback'so the UI can warn the user. search.index.health_alert.v1is emitted on circuit-open and on circuit-close.
4.3 Cursor pagination
- Cursors are opaque base64 of
{ "search_after": [...], "queryHash": "<hash>", "expiresAt": <epoch> }. - Cursor TTL: 5 min. Expired cursor →
MELMASTOON.SEARCH.PAGE_OUT_OF_RANGE. - Cursor is bound to the original canonical query hash; mutated filters issue a new cursor.
5. Event consumers
One file per upstream event under src/application/consumers/. All consumers share:
- Inbox dedup by
eventId(SELECT … FOR UPDATE NOWAITthen insert). - JSON-Schema validation of the envelope against the upstream service's published schema.
- Vector-clock check using the
versionfield of the upstream aggregate. - Map → command handler → emit projection event.
- ACK on success, NACK with backoff on retryable errors, DLQ after 5 attempts.
| Consumer | Topic | Side effect |
|---|---|---|
PropertyPublishedConsumer | melmastoon.property.published.v1 | UpsertHotelIndexEntry |
PropertyUpdatedConsumer | melmastoon.property.updated.v1 | UpsertHotelIndexEntry (delta only) |
PropertyUnpublishedConsumer | melmastoon.property.unpublished.v1 | SuppressHotelIndexEntry |
PropertyDeletedConsumer | melmastoon.property.deleted.v1 | DeleteHotelIndexEntry |
PropertyAmenitySetUpdatedConsumer | melmastoon.property.amenity_set.updated.v1 | UpsertHotelIndexEntry (amenities only) |
PropertyPhotoAddedConsumer | melmastoon.property.photo.added.v1 | UpsertHotelIndexEntry (hero photo refresh) |
RoomTypeUpdatedConsumer | melmastoon.property.room_type.updated.v1 | UpsertHotelIndexEntry (capacity, occupancy hint) |
RatePlanUpdatedConsumer | melmastoon.pricing.rate_plan.updated.v1 | UpsertRateSnapshot per affected date |
FxSnapshotUpdatedConsumer | melmastoon.pricing.fx_snapshot.updated.v1 | UpsertFxSnapshot cache; broadcast cache invalidation |
InventoryAllocationConfirmedConsumer | melmastoon.inventory.allocation.confirmed.v1 | UpsertAvailabilityHint (decrement) |
InventoryAllocationReleasedConsumer | melmastoon.inventory.allocation.released.v1 | UpsertAvailabilityHint (increment) |
InventoryBlockCreatedConsumer | melmastoon.inventory.block.created.v1 | UpsertAvailabilityHint (decrement by block size) |
InventoryBlockReleasedConsumer | melmastoon.inventory.block.released.v1 | UpsertAvailabilityHint (increment) |
TenantDeletedConsumer | melmastoon.tenant.deleted.v1 | PurgeTenantFromIndex |
TenantRegionChangedConsumer | melmastoon.tenant.region_changed.v1 | per-property re-evaluation of RegionPinningPolicy |
5.1 Out-of-order handling
Upstream events may arrive out of order under network partitions or replay. The pattern across all consumers:
- Per upstream service, an
aggregateVersion(e.g.,event.version) is the canonical clock for that service. - The projection's
vectorClockkeeps the maximum-seen value for each upstream service. - Stale events (lower vector clock) are dropped silently and counted in
search_projection_drop_total{source}. - Tie (same
vectorClock): lateroccurredAtwins; truly-equal: server arrival order wins (last write).
6. Scheduled jobs
| Job | Cadence | Purpose |
|---|---|---|
recompute-popularity | every 15 min | Slide popularity score window using ClickEventRepository aggregates |
expire-boost-rules | every 5 min | Move active → expired for rules past expiresAt |
refresh-fx-snapshots | every 5 min | Refresh FX cache from pricing-service if last fx_snapshot.updated.v1 is stale |
nightly-projection-auditor | 02:00 region-local | Walk Postgres canonical projection, assert no forbidden fields, no unpublished refs; alert on any finding |
nightly-anonymize-search-queries | 03:00 region-local | Drop text and userBucket for SearchQuery rows older than 30 d |
weekly-old-index-prune | Sunday 04:00 | Delete OpenSearch indices behind the alias older than 7 d |
7. Sagas / cross-service coordination
This service does not orchestrate sagas. It is purely a conformist consumer of upstream events. All cross-service writes happen in the upstream services; this service projects only.
The closest thing to coordination is the IndexBuild flow (§4.10 in DOMAIN_MODEL), which is internally orchestrated:
StartIndexRebuild
→ snapshot current alias
→ create new index with current template
→ EventArchiveReader.streamSince(0) → indexBatch (1 000 docs/batch)
→ catching_up: re-consume live deltas published since snapshot start
→ swap alias atomically (`POST /_aliases { actions: [{remove: …}, {add: …}] }`)
→ keep old index 7 d for rollback
→ emit `search.index.rebuilt.v1`
Failure path: any failure transitions IndexBuild → failed and emits search.index.health_alert.v1 with details; the alias is not swapped, so production traffic stays on the previous index.
8. Error mapping (application → presentation)
| Domain error | HTTP code (consumer surface) | Notes |
|---|---|---|
ForbiddenFieldInProjectionError | 500 (internal) | Should never surface to consumers; logs P0 — projector bug |
StaleProjectionEventError | n/a | Internal-only; counted, not surfaced |
InvalidGeoBoundingBoxError | 422 MELMASTOON.SEARCH.GEO_OUT_OF_BOUNDS | |
QueryBudgetExceededError | 422 MELMASTOON.SEARCH.QUERY_INVALID | |
BoostRuleScopeViolationError | 403 MELMASTOON.SEARCH.BOOST_RULE_SCOPE_VIOLATION | Operator API only |
IndexBuildAlreadyRunningError | 409 MELMASTOON.SEARCH.INDEX_REBUILD_IN_PROGRESS | Admin API |
MELMASTOON.SEARCH.INDEX_UNAVAILABLE (503) is emitted by the controller layer when the circuit is open and Postgres fallback is itself unhealthy (rare).