Skip to main content

tenant-service — APPLICATION_LOGIC

Companion: DOMAIN_MODEL · API_CONTRACTS · EVENT_SCHEMAS · DATA_MODEL

This is the application layer (use cases + ports + sagas + event consumers). It coordinates the domain model with the outside world. NestJS controllers in the presentation layer are thin and delegate every command and query to a handler defined here.


1. Architecture Layers

┌──────────────────────────────────────────────────────────────────────┐
│ presentation/ NestJS controllers, OpenAPI, DTO ⇄ command/query │
│ src/api REST + sync + admin BFF surfaces │
├──────────────────────────────────────────────────────────────────────┤
│ application/ ▼ Command + Query handlers, sagas, event consumers │
│ src/usecases orchestrates ports; never knows about the DB │
├──────────────────────────────────────────────────────────────────────┤
│ domain/ Pure TypeScript: aggregates, VOs, domain services │
│ @melmastoon/tenant-domain no framework imports │
├──────────────────────────────────────────────────────────────────────┤
│ infrastructure/ Adapters: PgRepositories, RedisCache, PubsubBus, │
│ src/infra IamHttpClient, BillingHttpClient, OutboxPoller, │
│ CryptoService, ClockAdapter │
└──────────────────────────────────────────────────────────────────────┘

CQRS-light: writes go through command handlers (one transaction = one aggregate + outbox), reads go through dedicated query handlers that may read from cache or read replica.


2. Ports (interfaces in the application layer)

2.1 Repositories

export interface TenantRepository {
findById(id: TenantId): Promise<Tenant | null>;
findBySlug(slug: string): Promise<Tenant | null>;
save(t: Tenant): Promise<void>; // upsert; outbox in same tx
}

export interface TenantConfigRepository {
findByTenantId(t: TenantId): Promise<TenantConfig | null>;
save(c: TenantConfig): Promise<void>;
}

export interface OrganizationUnitRepository {
findById(id: OrganizationUnitId): Promise<OrganizationUnit | null>;
listForTenant(t: TenantId): Promise<OrganizationUnit[]>;
childrenOf(parent: OrganizationUnitId): Promise<OrganizationUnit[]>;
save(o: OrganizationUnit): Promise<void>;
movePathSubtree(o: OrganizationUnit, newParent: OrganizationUnit | null): Promise<void>;
}

export interface MembershipRepository {
findById(id: MembershipId): Promise<Membership | null>;
findFor(t: TenantId, u: UserId): Promise<Membership | null>;
listForTenant(t: TenantId, opts: PageOpts): Promise<Page<Membership>>;
listForUser(u: UserId): Promise<Membership[]>;
countOwners(t: TenantId): Promise<number>;
save(m: Membership): Promise<void>;
}

export interface RoleRepository {
findById(id: RoleId): Promise<Role | null>;
findByCode(t: TenantId, code: string): Promise<Role | null>;
listForTenant(t: TenantId): Promise<Role[]>;
save(r: Role): Promise<void>;
}

export interface RoleAssignmentRepository {
listForMembership(m: MembershipId): Promise<RoleAssignment[]>;
save(ra: RoleAssignment): Promise<void>;
remove(id: RoleAssignmentId): Promise<void>;
}

export interface InvitationRepository {
findById(id: InvitationId): Promise<Invitation | null>;
findPendingByEmail(t: TenantId, email: string): Promise<Invitation | null>;
listExpiringBy(now: Date): Promise<Invitation[]>;
save(i: Invitation): Promise<void>;
}

export interface FeatureFlagOverrideRepository {
list(t: TenantId): Promise<FeatureFlagOverride[]>;
upsert(f: FeatureFlagOverride): Promise<void>;
}

export interface BillingContactRepository {
get(t: TenantId): Promise<BillingContact | null>;
save(b: BillingContact): Promise<void>;
}

2.2 Outbound infrastructure

export interface EventPublisher { // backed by transactional outbox
enqueue(events: IntegrationEvent[]): Promise<void>; // called inside the same tx as repo.save
}

export interface IdentityClient { // talks to iam-service
findUserByEmail(email: string): Promise<{ userId: UserId } | null>;
preRegister(email: string, displayName: string): Promise<{ userId: UserId }>;
revokeUserSessions(t: TenantId, u: UserId): Promise<void>;
}

export interface BillingClient { // talks to billing-service
getSubscriptionState(t: TenantId): Promise<{ active: boolean; gracePeriodEndsAt?: Date }>;
}

export interface PolicyEngine { // ABAC PDP, embedded
permittedActions(opts: { membership: Membership; assignments: RoleAssignment[]; roles: Role[]; }): Permission[];
check(req: AuthzRequest): AuthzDecision;
}

export interface NotificationClient { // event-driven; this is a thin enqueue
sendInviteEmail(invitation: Invitation, plaintextToken: string, locale: string): Promise<void>;
}

export interface AIClient { /* see AI_INTEGRATION */ }

export interface IdempotencyStore {
get(key: IdempotencyKey, route: string): Promise<StoredResponse | null>;
put(key: IdempotencyKey, route: string, response: StoredResponse, ttlSec: number): Promise<void>;
}

export interface Clock { now(): Date; }
export interface CryptoService { randomToken(): string; sha256Hex(s: string): string; }

2.3 Caches

export interface TenantConfigCache {
get(t: TenantId): Promise<TenantConfigSnapshot | null>;
put(t: TenantId, snapshot: TenantConfigSnapshot, ttlSec: number): Promise<void>;
invalidate(t: TenantId): Promise<void>;
}

export interface MembershipCache {
resolveForUser(u: UserId): Promise<MembershipSnapshot[] | null>;
invalidateForUser(u: UserId): Promise<void>;
invalidateForTenant(t: TenantId): Promise<void>;
}

3. Command Use Cases

Each handler is a small class implementing Handler<Cmd, Result>. The wrapper @TransactionScope opens a Postgres transaction, the @Tenancy decorator pins the RLS context (SET app.tenant_id), and the @Idempotent decorator wraps the body in the idempotency store.

3.1 ProvisionTenant

class ProvisionTenant implements Handler<ProvisionTenantCmd, { tenantId: TenantId }> {
/* required role: platform.super_admin */
async exec(cmd: ProvisionTenantCmd): Promise<{ tenantId: TenantId }> {
if (await this.tenants.findBySlug(cmd.slug)) throw new TenantSlugTakenError();

const tenant = Tenant.provision({ slug: cmd.slug, legalName: cmd.legalName, country: cmd.country, residencyRegion: cmd.region });
const config = TenantConfig.defaultsFor(tenant); // DOMAIN_MODEL §4
const root = OrganizationUnit.createChainRoot(tenant.id, cmd.legalName);
const ownerRole = Role.systemSeed(tenant.id, 'tenant.owner');
const seedRoles = [...this.systemRoleSeeds(tenant.id)]; // 9 system roles
const ownerMembership = Membership.preRegister(tenant.id, cmd.ownerUserId, /* scope */ []);
const ownerAssignment = RoleAssignment.create(tenant.id, ownerMembership.id, ownerRole.id, /* scope */ [], cmd.actor);

await this.tx.run(async () => {
await this.tenants.save(tenant);
await this.configs.save(config);
await this.orgUnits.save(root);
for (const r of seedRoles) await this.roles.save(r);
await this.memberships.save(ownerMembership);
await this.assignments.save(ownerAssignment);
await this.publisher.enqueue([
IntegrationEvent.from(new TenantCreated(tenant)),
IntegrationEvent.from(new OrganizationUnitCreated(root)),
IntegrationEvent.from(new MembershipCreated(ownerMembership)),
]);
});

return { tenantId: tenant.id };
}
}

3.2 UpdateTenantConfig

class UpdateTenantConfig implements Handler<UpdateTenantConfigCmd, { etag: string }> {
/* required role: tenant.owner | tenant.gm */
async exec(cmd: UpdateTenantConfigCmd) {
const cfg = await this.configs.findByTenantId(cmd.tenantId);
if (!cfg) throw new NotFoundError();
cfg.update(cmd.patch, cmd.actor, cmd.expectVersion); // throws OptimisticConcurrencyError on stale
await this.tx.run(async () => {
await this.configs.save(cfg);
await this.publisher.enqueue([IntegrationEvent.from(new TenantConfigUpdated(cfg))]);
});
await this.cache.invalidate(cmd.tenantId);
return { etag: `"v${cfg.version}"` };
}
}

3.3 SuspendTenant / ReactivateTenant

Both require platform.super_admin (or are triggered automatically by the billing consumer; see §4).

class SuspendTenant implements Handler<SuspendTenantCmd, void> {
async exec(cmd: SuspendTenantCmd) {
const tenant = await this.tenants.findById(cmd.tenantId) ?? throwNotFound();
const events = tenant.suspend(cmd.reason, cmd.by);
await this.tx.run(async () => {
await this.tenants.save(tenant);
await this.publisher.enqueue(events.map(IntegrationEvent.from));
});
await this.cache.invalidate(cmd.tenantId);
}
}

ReactivateTenant is symmetric.

3.4 DeleteTenant (cascading saga)

class CloseTenantSaga {
/* required role: platform.super_admin; plus 24h two-person approval */
async start(cmd: CloseTenantCmd): Promise<{ sagaId: string }> {
const tenant = await this.tenants.findById(cmd.tenantId) ?? throwNotFound();
const events = tenant.close(cmd.reason);
const sagaId = await this.sagaStore.start('CloseTenant', { tenantId: tenant.id });
await this.tx.run(async () => {
await this.tenants.save(tenant);
await this.publisher.enqueue([
IntegrationEvent.from(new TenantClosed(tenant), { sagaId }),
]);
});
return { sagaId };
}

async onAck(serviceName: string, evt: TenantDeletedAckEvent) {
await this.sagaStore.markAck(evt.sagaId, serviceName);
if (await this.sagaStore.allAcksReceived(evt.sagaId)) {
await this.publisher.enqueue([
IntegrationEvent.create('melmastoon.tenant.deletion_completed.v1', { tenantId: evt.tenantId, sagaId: evt.sagaId }),
]);
}
}

async onAckTimeout(sagaId: string) {
/* alert + retry; never silently complete */
}
}

Downstream services (reservation, billing, payment-gateway, lock-integration, housekeeping, notification, theme-config, property, iam) subscribe to melmastoon.tenant.deleted.v1, perform their cascade, and emit melmastoon.tenant.deletion_acked.v1 with their service name. The saga collects acks and only flips the tenant status to archived (via a final compaction job) once every required ack has been received within the configured window (default 7 days). Missing acks trigger PagerDuty.

3.5 MoveProperty (chain restructuring saga)

class MovePropertySaga {
async start(cmd: MovePropertyCmd) {
const unit = await this.orgUnits.findById(cmd.unitId) ?? throwNotFound();
const newParent = cmd.newParentId ? await this.orgUnits.findById(cmd.newParentId) : null;
/* domain assertions: kind, depth, no cycles */
const sagaId = await this.sagaStore.start('MoveProperty', { unitId: unit.id });
await this.publisher.enqueue([IntegrationEvent.create('melmastoon.tenant.organization_unit.move_started.v1', { sagaId, unitId: unit.id })]);
/* downstream services pause writes for that propertyId */
/* this handler awaits 'move_ready' acks then performs move + emits 'organization_unit.moved.v1' */
}
}

3.6 InviteStaff

class InviteStaff implements Handler<InviteStaffCmd, { invitationId: InvitationId }> {
async exec(cmd: InviteStaffCmd) {
/* RBAC: actor must hold all proposed roles (RoleEscalationGuard) */
await this.escalationGuard.assertAllowed(cmd.actor, cmd.tenantId, cmd.rolesProposed);

const existing = await this.invitations.findPendingByEmail(cmd.tenantId, cmd.email);
if (existing) existing.revoke(cmd.actor); // I-4

const rawToken = this.crypto.randomToken(); // 32 bytes base64url
const tokenHash = this.crypto.sha256Hex(rawToken);
const invitation = Invitation.create({
tenantId: cmd.tenantId, email: cmd.email.toLowerCase(),
tokenHash, rolesProposed: cmd.rolesProposed,
propertyScope: cmd.propertyScope, invitedBy: cmd.actor, ttlDays: 14,
});

await this.tx.run(async () => {
if (existing) await this.invitations.save(existing);
await this.invitations.save(invitation);
await this.publisher.enqueue([IntegrationEvent.from(new InvitationSent(invitation))]);
});

/* notification-service consumes the event and renders the email; we never store the raw token */
await this.notifications.sendInviteEmail(invitation, rawToken, cmd.locale);

/* AI invite-abuse classifier (advisory; non-blocking) */
this.ai.classifyInvite(invitation).catch(() => undefined);

return { invitationId: invitation.id };
}
}

3.7 AcceptInvitation

class AcceptInvitation implements Handler<AcceptInvitationCmd, { membershipId: MembershipId }> {
async exec(cmd: AcceptInvitationCmd) {
const inv = await this.invitations.findById(cmd.invitationId) ?? throwNotFound();
inv.accept(cmd.rawToken, cmd.actorUserId, this.clock.now());

let user = await this.identity.findUserByEmail(inv.email);
if (!user) {
/* invite landed before the user exists in iam-service: pre-register + create pending membership */
user = await this.identity.preRegister(inv.email, /* displayName */ inv.email.split('@')[0]);
}

const membership = Membership.activateFromInvite({
tenantId: inv.tenantId, userId: user.userId,
propertyScope: inv['propertyScope'], invitedBy: inv.invitedBy, invitedAt: inv.invitedAt,
});

const assignments = inv['rolesProposed'].map(rid => RoleAssignment.create(inv.tenantId, membership.id, rid, [], cmd.actorUserId));

await this.tx.run(async () => {
await this.invitations.save(inv);
await this.memberships.save(membership);
for (const ra of assignments) await this.assignments.save(ra);
await this.publisher.enqueue([
IntegrationEvent.from(new InvitationAccepted(inv)),
IntegrationEvent.from(new MembershipCreated(membership)),
...assignments.map(ra => IntegrationEvent.from(new MembershipRoleChanged(membership, ra))),
]);
});

await this.membershipCache.invalidateForUser(user.userId);
return { membershipId: membership.id };
}
}

3.8 AssignRole and RemoveMembership

class AssignRole implements Handler<AssignRoleCmd, void> {
async exec(cmd: AssignRoleCmd) {
await this.escalationGuard.assertAllowed(cmd.actor, cmd.tenantId, [cmd.roleId]);
const ra = RoleAssignment.create(cmd.tenantId, cmd.membershipId, cmd.roleId, cmd.propertyScope, cmd.actor);
await this.tx.run(async () => {
await this.assignments.save(ra);
await this.publisher.enqueue([IntegrationEvent.create('melmastoon.tenant.membership.role_changed.v1', /* … */)]);
});
await this.membershipCache.invalidateForTenant(cmd.tenantId);
}
}

class RemoveMembership implements Handler<RemoveMembershipCmd, void> {
async exec(cmd: RemoveMembershipCmd) {
const m = await this.memberships.findById(cmd.membershipId) ?? throwNotFound();
/* OwnerProtectionService: count active owners; reject if removing the last */
await this.ownerGuard.assertNotLastOwner(m);
const events = m.remove(cmd.actor);
await this.tx.run(async () => {
await this.memberships.save(m);
await this.publisher.enqueue(events.map(IntegrationEvent.from));
});
await this.identity.revokeUserSessions(m.tenantId, m.userId); // best-effort
await this.membershipCache.invalidateForUser(m.userId);
}
}

3.9 CreateOrganizationUnit, ToggleFeatureFlag, RequestGuestErasure

  • CreateOrganizationUnit: validates parent kind + depth, computes ltree path, persists, emits organization_unit.created.v1.
  • ToggleFeatureFlag: looks up the platform flag registry, persists override, emits feature_flag.toggled.v1, invalidates cache.
  • RequestGuestErasure: emits melmastoon.tenant.guest.erasure_requested.v1 with guestId, tenantId, reason. This is a fan-out trigger — tenant-service itself holds no guest data; reservation-service, billing-service, notification-service each implement the actual erasure and ack.

4. Query Use Cases

QuerySourceCacheNotes
GetTenantByIdrepoyes (60 s)Platform-scoped read
GetTenantConfigrepoyes (60 s)Read-through cache
ListMembershipsForTenantrepo (RLS)noCursor pagination
ListPropertiesForUserjoin Membership × RoleAssignment × OrganizationUnityes (300 s, per user)Used by bff-backoffice-service to populate the property switcher
GetOrgTreerepo (ltree)yes (300 s)Returns nested array
ListInvitationsreponoFilterable by status
CheckAuthorizationcomposes membership + assignments + roles + ABACyes (10 s; key includes membership version)Returns { allowed, denyReason, obligations }
ListFeatureFlagOverridesrepoyes (60 s)Per tenant

ListPropertiesForUser deserves a sketch since the user query lists it explicitly:

class ListPropertiesForUser implements Handler<{ userId: UserId }, PropertyForUser[]> {
async exec({ userId }) {
const memberships = await this.memberships.listForUser(userId);
const out: PropertyForUser[] = [];
for (const m of memberships.filter(x => x['status'] === 'active')) {
const assignments = await this.assignments.listForMembership(m.id);
const orgs = await this.orgUnits.listForTenant(m.tenantId);
const propertyUnits = orgs.filter(o => o['kind'] === 'property');
const scope = m['propertyScope'].length > 0 ? m['propertyScope'] : propertyUnits.map(p => p.id);
for (const u of propertyUnits.filter(p => scope.includes(p.id))) {
out.push({ tenantId: m.tenantId, organizationUnitId: u.id, propertyId: u['propertyId']!, name: u['name'] });
}
}
return out;
}
}

5. Event Consumers

SubjectHandlerAction
melmastoon.iam.user.registered.v1OnUserRegisteredFor every pending invitation matching email, attempt acceptance via the same AcceptInvitation use case (auto-link); membership materialized
melmastoon.iam.user.deleted.v1OnUserDeletedSweep all memberships for userId; flip to removed; emit membership.removed.v1
melmastoon.billing.subscription.cancelled.v1OnSubscriptionCancelledAfter grace period (read from event payload), invoke SuspendTenant with by='billing'
melmastoon.billing.subscription.reactivated.v1OnSubscriptionReactivatedInvoke ReactivateTenant with by='billing'
melmastoon.tenant.deletion_acked.v1OnDeletionAckedUpdate saga state

All consumers are wrapped by an inbox table keyed on (eventId, consumerName); first delivery commits the inbox row + the side-effect in one transaction; subsequent deliveries are no-ops.


6. Background Jobs

JobScheduleAction
ExpireInvitationsevery 15 minInvitation.expireIfDue for all rows where expiresAt < now() and status = pending; emits invitation.expired.v1
OutboxPollercontinuousReads outbox table batched by tenant_id, publishes to Pub/Sub with orderingKey = tenant_id; marks dispatched
OrphanedMembershipSweepdailyCross-checks memberships against iam-service for users that vanished without an event
SagaTimeoutWatcherevery 5 minAlerts on any saga step that has not progressed within its SLA
RoleCatalogReconcilerweeklyCompares each tenant's seeded system roles to the canonical permission registry; opens drift report (does not auto-mutate)

7. Cross-Cutting Concerns

  • Idempotency. Idempotency-Key header captured at the controller; IdempotencyStore uses Memorystore with TTL 24 h. Replay returns the original response; mismatched body returns 409 MELMASTOON.SYNC.IDEMPOTENCY_KEY_REUSED.
  • Optimistic concurrency. All aggregates carry a version. PATCH endpoints require If-Match: "v<n>". On mismatch the handler throws OptimisticConcurrencyError → 412.
  • Audit trail. Every command appends a row to audit_events (in the same transaction as the domain mutation) with (tenantId, actorUserId, action, before, after, requestId, traceId). Long-term storage is in BigQuery via the audit sink.
  • Tenant context. A NestJS interceptor sets app.tenant_id = $1 on the Postgres session for every request; RLS does the rest. Read endpoints that span tenants (admin only) require platform.super_admin and explicitly call SET LOCAL row_security = off.
  • Rate limiting. Per-tenant invitation rate limit (50/hour by default); per-IP limit on accept-invitation endpoint to mitigate token brute-force; configured per environment.
  • Observability. All handlers emit OTel spans tenant.<usecase> with attributes tenant.id, actor.user_id, request.id. See OBSERVABILITY.