tenant-service — APPLICATION_LOGIC
Companion: DOMAIN_MODEL · API_CONTRACTS · EVENT_SCHEMAS · DATA_MODEL
This is the application layer (use cases + ports + sagas + event consumers). It coordinates the domain model with the outside world. NestJS controllers in the presentation layer are thin and delegate every command and query to a handler defined here.
1. Architecture Layers
┌──────────────────────────────────────────────────────────────────────┐
│ presentation/ NestJS controllers, OpenAPI, DTO ⇄ command/query │
│ src/api REST + sync + admin BFF surfaces │
├──────────────────────────────────────────────────────────────────────┤
│ application/ ▼ Command + Query handlers, sagas, event consumers │
│ src/usecases orchestrates ports; never knows about the DB │
├──────────────────────────────────────────────────────────────────────┤
│ domain/ Pure TypeScript: aggregates, VOs, domain services │
│ @melmastoon/tenant-domain no framework imports │
├──────────────────────────────────────────────────────────────────────┤
│ infrastructure/ Adapters: PgRepositories, RedisCache, PubsubBus, │
│ src/infra IamHttpClient, BillingHttpClient, OutboxPoller, │
│ CryptoService, ClockAdapter │
└──────────────────────────────────────────────────────────────────────┘
CQRS-light: writes go through command handlers (one transaction = one aggregate + outbox), reads go through dedicated query handlers that may read from cache or read replica.
2. Ports (interfaces in the application layer)
2.1 Repositories
export interface TenantRepository {
findById(id: TenantId): Promise<Tenant | null>;
findBySlug(slug: string): Promise<Tenant | null>;
save(t: Tenant): Promise<void>; // upsert; outbox in same tx
}
export interface TenantConfigRepository {
findByTenantId(t: TenantId): Promise<TenantConfig | null>;
save(c: TenantConfig): Promise<void>;
}
export interface OrganizationUnitRepository {
findById(id: OrganizationUnitId): Promise<OrganizationUnit | null>;
listForTenant(t: TenantId): Promise<OrganizationUnit[]>;
childrenOf(parent: OrganizationUnitId): Promise<OrganizationUnit[]>;
save(o: OrganizationUnit): Promise<void>;
movePathSubtree(o: OrganizationUnit, newParent: OrganizationUnit | null): Promise<void>;
}
export interface MembershipRepository {
findById(id: MembershipId): Promise<Membership | null>;
findFor(t: TenantId, u: UserId): Promise<Membership | null>;
listForTenant(t: TenantId, opts: PageOpts): Promise<Page<Membership>>;
listForUser(u: UserId): Promise<Membership[]>;
countOwners(t: TenantId): Promise<number>;
save(m: Membership): Promise<void>;
}
export interface RoleRepository {
findById(id: RoleId): Promise<Role | null>;
findByCode(t: TenantId, code: string): Promise<Role | null>;
listForTenant(t: TenantId): Promise<Role[]>;
save(r: Role): Promise<void>;
}
export interface RoleAssignmentRepository {
listForMembership(m: MembershipId): Promise<RoleAssignment[]>;
save(ra: RoleAssignment): Promise<void>;
remove(id: RoleAssignmentId): Promise<void>;
}
export interface InvitationRepository {
findById(id: InvitationId): Promise<Invitation | null>;
findPendingByEmail(t: TenantId, email: string): Promise<Invitation | null>;
listExpiringBy(now: Date): Promise<Invitation[]>;
save(i: Invitation): Promise<void>;
}
export interface FeatureFlagOverrideRepository {
list(t: TenantId): Promise<FeatureFlagOverride[]>;
upsert(f: FeatureFlagOverride): Promise<void>;
}
export interface BillingContactRepository {
get(t: TenantId): Promise<BillingContact | null>;
save(b: BillingContact): Promise<void>;
}
2.2 Outbound infrastructure
export interface EventPublisher { // backed by transactional outbox
enqueue(events: IntegrationEvent[]): Promise<void>; // called inside the same tx as repo.save
}
export interface IdentityClient { // talks to iam-service
findUserByEmail(email: string): Promise<{ userId: UserId } | null>;
preRegister(email: string, displayName: string): Promise<{ userId: UserId }>;
revokeUserSessions(t: TenantId, u: UserId): Promise<void>;
}
export interface BillingClient { // talks to billing-service
getSubscriptionState(t: TenantId): Promise<{ active: boolean; gracePeriodEndsAt?: Date }>;
}
export interface PolicyEngine { // ABAC PDP, embedded
permittedActions(opts: { membership: Membership; assignments: RoleAssignment[]; roles: Role[]; }): Permission[];
check(req: AuthzRequest): AuthzDecision;
}
export interface NotificationClient { // event-driven; this is a thin enqueue
sendInviteEmail(invitation: Invitation, plaintextToken: string, locale: string): Promise<void>;
}
export interface AIClient { /* see AI_INTEGRATION */ }
export interface IdempotencyStore {
get(key: IdempotencyKey, route: string): Promise<StoredResponse | null>;
put(key: IdempotencyKey, route: string, response: StoredResponse, ttlSec: number): Promise<void>;
}
export interface Clock { now(): Date; }
export interface CryptoService { randomToken(): string; sha256Hex(s: string): string; }
2.3 Caches
export interface TenantConfigCache {
get(t: TenantId): Promise<TenantConfigSnapshot | null>;
put(t: TenantId, snapshot: TenantConfigSnapshot, ttlSec: number): Promise<void>;
invalidate(t: TenantId): Promise<void>;
}
export interface MembershipCache {
resolveForUser(u: UserId): Promise<MembershipSnapshot[] | null>;
invalidateForUser(u: UserId): Promise<void>;
invalidateForTenant(t: TenantId): Promise<void>;
}
3. Command Use Cases
Each handler is a small class implementing Handler<Cmd, Result>. The wrapper @TransactionScope opens a Postgres transaction, the @Tenancy decorator pins the RLS context (SET app.tenant_id), and the @Idempotent decorator wraps the body in the idempotency store.
3.1 ProvisionTenant
class ProvisionTenant implements Handler<ProvisionTenantCmd, { tenantId: TenantId }> {
/* required role: platform.super_admin */
async exec(cmd: ProvisionTenantCmd): Promise<{ tenantId: TenantId }> {
if (await this.tenants.findBySlug(cmd.slug)) throw new TenantSlugTakenError();
const tenant = Tenant.provision({ slug: cmd.slug, legalName: cmd.legalName, country: cmd.country, residencyRegion: cmd.region });
const config = TenantConfig.defaultsFor(tenant); // DOMAIN_MODEL §4
const root = OrganizationUnit.createChainRoot(tenant.id, cmd.legalName);
const ownerRole = Role.systemSeed(tenant.id, 'tenant.owner');
const seedRoles = [...this.systemRoleSeeds(tenant.id)]; // 9 system roles
const ownerMembership = Membership.preRegister(tenant.id, cmd.ownerUserId, /* scope */ []);
const ownerAssignment = RoleAssignment.create(tenant.id, ownerMembership.id, ownerRole.id, /* scope */ [], cmd.actor);
await this.tx.run(async () => {
await this.tenants.save(tenant);
await this.configs.save(config);
await this.orgUnits.save(root);
for (const r of seedRoles) await this.roles.save(r);
await this.memberships.save(ownerMembership);
await this.assignments.save(ownerAssignment);
await this.publisher.enqueue([
IntegrationEvent.from(new TenantCreated(tenant)),
IntegrationEvent.from(new OrganizationUnitCreated(root)),
IntegrationEvent.from(new MembershipCreated(ownerMembership)),
]);
});
return { tenantId: tenant.id };
}
}
3.2 UpdateTenantConfig
class UpdateTenantConfig implements Handler<UpdateTenantConfigCmd, { etag: string }> {
/* required role: tenant.owner | tenant.gm */
async exec(cmd: UpdateTenantConfigCmd) {
const cfg = await this.configs.findByTenantId(cmd.tenantId);
if (!cfg) throw new NotFoundError();
cfg.update(cmd.patch, cmd.actor, cmd.expectVersion); // throws OptimisticConcurrencyError on stale
await this.tx.run(async () => {
await this.configs.save(cfg);
await this.publisher.enqueue([IntegrationEvent.from(new TenantConfigUpdated(cfg))]);
});
await this.cache.invalidate(cmd.tenantId);
return { etag: `"v${cfg.version}"` };
}
}
3.3 SuspendTenant / ReactivateTenant
Both require platform.super_admin (or are triggered automatically by the billing consumer; see §4).
class SuspendTenant implements Handler<SuspendTenantCmd, void> {
async exec(cmd: SuspendTenantCmd) {
const tenant = await this.tenants.findById(cmd.tenantId) ?? throwNotFound();
const events = tenant.suspend(cmd.reason, cmd.by);
await this.tx.run(async () => {
await this.tenants.save(tenant);
await this.publisher.enqueue(events.map(IntegrationEvent.from));
});
await this.cache.invalidate(cmd.tenantId);
}
}
ReactivateTenant is symmetric.
3.4 DeleteTenant (cascading saga)
class CloseTenantSaga {
/* required role: platform.super_admin; plus 24h two-person approval */
async start(cmd: CloseTenantCmd): Promise<{ sagaId: string }> {
const tenant = await this.tenants.findById(cmd.tenantId) ?? throwNotFound();
const events = tenant.close(cmd.reason);
const sagaId = await this.sagaStore.start('CloseTenant', { tenantId: tenant.id });
await this.tx.run(async () => {
await this.tenants.save(tenant);
await this.publisher.enqueue([
IntegrationEvent.from(new TenantClosed(tenant), { sagaId }),
]);
});
return { sagaId };
}
async onAck(serviceName: string, evt: TenantDeletedAckEvent) {
await this.sagaStore.markAck(evt.sagaId, serviceName);
if (await this.sagaStore.allAcksReceived(evt.sagaId)) {
await this.publisher.enqueue([
IntegrationEvent.create('melmastoon.tenant.deletion_completed.v1', { tenantId: evt.tenantId, sagaId: evt.sagaId }),
]);
}
}
async onAckTimeout(sagaId: string) {
/* alert + retry; never silently complete */
}
}
Downstream services (reservation, billing, payment-gateway, lock-integration, housekeeping, notification, theme-config, property, iam) subscribe to melmastoon.tenant.deleted.v1, perform their cascade, and emit melmastoon.tenant.deletion_acked.v1 with their service name. The saga collects acks and only flips the tenant status to archived (via a final compaction job) once every required ack has been received within the configured window (default 7 days). Missing acks trigger PagerDuty.
3.5 MoveProperty (chain restructuring saga)
class MovePropertySaga {
async start(cmd: MovePropertyCmd) {
const unit = await this.orgUnits.findById(cmd.unitId) ?? throwNotFound();
const newParent = cmd.newParentId ? await this.orgUnits.findById(cmd.newParentId) : null;
/* domain assertions: kind, depth, no cycles */
const sagaId = await this.sagaStore.start('MoveProperty', { unitId: unit.id });
await this.publisher.enqueue([IntegrationEvent.create('melmastoon.tenant.organization_unit.move_started.v1', { sagaId, unitId: unit.id })]);
/* downstream services pause writes for that propertyId */
/* this handler awaits 'move_ready' acks then performs move + emits 'organization_unit.moved.v1' */
}
}
3.6 InviteStaff
class InviteStaff implements Handler<InviteStaffCmd, { invitationId: InvitationId }> {
async exec(cmd: InviteStaffCmd) {
/* RBAC: actor must hold all proposed roles (RoleEscalationGuard) */
await this.escalationGuard.assertAllowed(cmd.actor, cmd.tenantId, cmd.rolesProposed);
const existing = await this.invitations.findPendingByEmail(cmd.tenantId, cmd.email);
if (existing) existing.revoke(cmd.actor); // I-4
const rawToken = this.crypto.randomToken(); // 32 bytes base64url
const tokenHash = this.crypto.sha256Hex(rawToken);
const invitation = Invitation.create({
tenantId: cmd.tenantId, email: cmd.email.toLowerCase(),
tokenHash, rolesProposed: cmd.rolesProposed,
propertyScope: cmd.propertyScope, invitedBy: cmd.actor, ttlDays: 14,
});
await this.tx.run(async () => {
if (existing) await this.invitations.save(existing);
await this.invitations.save(invitation);
await this.publisher.enqueue([IntegrationEvent.from(new InvitationSent(invitation))]);
});
/* notification-service consumes the event and renders the email; we never store the raw token */
await this.notifications.sendInviteEmail(invitation, rawToken, cmd.locale);
/* AI invite-abuse classifier (advisory; non-blocking) */
this.ai.classifyInvite(invitation).catch(() => undefined);
return { invitationId: invitation.id };
}
}
3.7 AcceptInvitation
class AcceptInvitation implements Handler<AcceptInvitationCmd, { membershipId: MembershipId }> {
async exec(cmd: AcceptInvitationCmd) {
const inv = await this.invitations.findById(cmd.invitationId) ?? throwNotFound();
inv.accept(cmd.rawToken, cmd.actorUserId, this.clock.now());
let user = await this.identity.findUserByEmail(inv.email);
if (!user) {
/* invite landed before the user exists in iam-service: pre-register + create pending membership */
user = await this.identity.preRegister(inv.email, /* displayName */ inv.email.split('@')[0]);
}
const membership = Membership.activateFromInvite({
tenantId: inv.tenantId, userId: user.userId,
propertyScope: inv['propertyScope'], invitedBy: inv.invitedBy, invitedAt: inv.invitedAt,
});
const assignments = inv['rolesProposed'].map(rid => RoleAssignment.create(inv.tenantId, membership.id, rid, [], cmd.actorUserId));
await this.tx.run(async () => {
await this.invitations.save(inv);
await this.memberships.save(membership);
for (const ra of assignments) await this.assignments.save(ra);
await this.publisher.enqueue([
IntegrationEvent.from(new InvitationAccepted(inv)),
IntegrationEvent.from(new MembershipCreated(membership)),
...assignments.map(ra => IntegrationEvent.from(new MembershipRoleChanged(membership, ra))),
]);
});
await this.membershipCache.invalidateForUser(user.userId);
return { membershipId: membership.id };
}
}
3.8 AssignRole and RemoveMembership
class AssignRole implements Handler<AssignRoleCmd, void> {
async exec(cmd: AssignRoleCmd) {
await this.escalationGuard.assertAllowed(cmd.actor, cmd.tenantId, [cmd.roleId]);
const ra = RoleAssignment.create(cmd.tenantId, cmd.membershipId, cmd.roleId, cmd.propertyScope, cmd.actor);
await this.tx.run(async () => {
await this.assignments.save(ra);
await this.publisher.enqueue([IntegrationEvent.create('melmastoon.tenant.membership.role_changed.v1', /* … */)]);
});
await this.membershipCache.invalidateForTenant(cmd.tenantId);
}
}
class RemoveMembership implements Handler<RemoveMembershipCmd, void> {
async exec(cmd: RemoveMembershipCmd) {
const m = await this.memberships.findById(cmd.membershipId) ?? throwNotFound();
/* OwnerProtectionService: count active owners; reject if removing the last */
await this.ownerGuard.assertNotLastOwner(m);
const events = m.remove(cmd.actor);
await this.tx.run(async () => {
await this.memberships.save(m);
await this.publisher.enqueue(events.map(IntegrationEvent.from));
});
await this.identity.revokeUserSessions(m.tenantId, m.userId); // best-effort
await this.membershipCache.invalidateForUser(m.userId);
}
}
3.9 CreateOrganizationUnit, ToggleFeatureFlag, RequestGuestErasure
CreateOrganizationUnit: validates parent kind + depth, computesltreepath, persists, emitsorganization_unit.created.v1.ToggleFeatureFlag: looks up the platform flag registry, persists override, emitsfeature_flag.toggled.v1, invalidates cache.RequestGuestErasure: emitsmelmastoon.tenant.guest.erasure_requested.v1withguestId,tenantId,reason. This is a fan-out trigger —tenant-serviceitself holds no guest data;reservation-service,billing-service,notification-serviceeach implement the actual erasure and ack.
4. Query Use Cases
| Query | Source | Cache | Notes |
|---|---|---|---|
GetTenantById | repo | yes (60 s) | Platform-scoped read |
GetTenantConfig | repo | yes (60 s) | Read-through cache |
ListMembershipsForTenant | repo (RLS) | no | Cursor pagination |
ListPropertiesForUser | join Membership × RoleAssignment × OrganizationUnit | yes (300 s, per user) | Used by bff-backoffice-service to populate the property switcher |
GetOrgTree | repo (ltree) | yes (300 s) | Returns nested array |
ListInvitations | repo | no | Filterable by status |
CheckAuthorization | composes membership + assignments + roles + ABAC | yes (10 s; key includes membership version) | Returns { allowed, denyReason, obligations } |
ListFeatureFlagOverrides | repo | yes (60 s) | Per tenant |
ListPropertiesForUser deserves a sketch since the user query lists it explicitly:
class ListPropertiesForUser implements Handler<{ userId: UserId }, PropertyForUser[]> {
async exec({ userId }) {
const memberships = await this.memberships.listForUser(userId);
const out: PropertyForUser[] = [];
for (const m of memberships.filter(x => x['status'] === 'active')) {
const assignments = await this.assignments.listForMembership(m.id);
const orgs = await this.orgUnits.listForTenant(m.tenantId);
const propertyUnits = orgs.filter(o => o['kind'] === 'property');
const scope = m['propertyScope'].length > 0 ? m['propertyScope'] : propertyUnits.map(p => p.id);
for (const u of propertyUnits.filter(p => scope.includes(p.id))) {
out.push({ tenantId: m.tenantId, organizationUnitId: u.id, propertyId: u['propertyId']!, name: u['name'] });
}
}
return out;
}
}
5. Event Consumers
| Subject | Handler | Action |
|---|---|---|
melmastoon.iam.user.registered.v1 | OnUserRegistered | For every pending invitation matching email, attempt acceptance via the same AcceptInvitation use case (auto-link); membership materialized |
melmastoon.iam.user.deleted.v1 | OnUserDeleted | Sweep all memberships for userId; flip to removed; emit membership.removed.v1 |
melmastoon.billing.subscription.cancelled.v1 | OnSubscriptionCancelled | After grace period (read from event payload), invoke SuspendTenant with by='billing' |
melmastoon.billing.subscription.reactivated.v1 | OnSubscriptionReactivated | Invoke ReactivateTenant with by='billing' |
melmastoon.tenant.deletion_acked.v1 | OnDeletionAcked | Update saga state |
All consumers are wrapped by an inbox table keyed on (eventId, consumerName); first delivery commits the inbox row + the side-effect in one transaction; subsequent deliveries are no-ops.
6. Background Jobs
| Job | Schedule | Action |
|---|---|---|
ExpireInvitations | every 15 min | Invitation.expireIfDue for all rows where expiresAt < now() and status = pending; emits invitation.expired.v1 |
OutboxPoller | continuous | Reads outbox table batched by tenant_id, publishes to Pub/Sub with orderingKey = tenant_id; marks dispatched |
OrphanedMembershipSweep | daily | Cross-checks memberships against iam-service for users that vanished without an event |
SagaTimeoutWatcher | every 5 min | Alerts on any saga step that has not progressed within its SLA |
RoleCatalogReconciler | weekly | Compares each tenant's seeded system roles to the canonical permission registry; opens drift report (does not auto-mutate) |
7. Cross-Cutting Concerns
- Idempotency.
Idempotency-Keyheader captured at the controller;IdempotencyStoreuses Memorystore with TTL 24 h. Replay returns the original response; mismatched body returns409 MELMASTOON.SYNC.IDEMPOTENCY_KEY_REUSED. - Optimistic concurrency. All aggregates carry a
version. PATCH endpoints requireIf-Match: "v<n>". On mismatch the handler throwsOptimisticConcurrencyError→ 412. - Audit trail. Every command appends a row to
audit_events(in the same transaction as the domain mutation) with(tenantId, actorUserId, action, before, after, requestId, traceId). Long-term storage is in BigQuery via the audit sink. - Tenant context. A NestJS interceptor sets
app.tenant_id = $1on the Postgres session for every request; RLS does the rest. Read endpoints that span tenants (admin only) requireplatform.super_adminand explicitly callSET LOCAL row_security = off. - Rate limiting. Per-tenant invitation rate limit (50/hour by default); per-IP limit on accept-invitation endpoint to mitigate token brute-force; configured per environment.
- Observability. All handlers emit OTel spans
tenant.<usecase>with attributestenant.id,actor.user_id,request.id. See OBSERVABILITY.