Events
:::info Source
Sourced from services/identity-service/EVENT_SCHEMAS.md in the documentation repo.
:::
Companion: 04 Event-Driven Architecture · DOMAIN_MODEL · APPLICATION_LOGIC
All events wrap payloads in the platform EventEnvelope (frozen at F01; see 04 §4). Publishing uses the transactional outbox pattern; consumption uses the inbox deduplication pattern.
1. Subject Naming
All identity events follow: identity.{aggregate}.{event}.v{N}.
- Stream:
IDENTITY - Retention class: regulated (180d hot in JetStream, 7y cold archive)
- Partition key:
aggregateId(e.g.,userId,sessionId,deviceId,apiKeyId)
2. Events Published
2.1 identity.user.registered.v1
Emitted when a new user is created via any registration path.
Subject: identity.user.registered.v1
Partition key: userId
Payload schema:
interface UserRegisteredV1 {
userId: UserId;
primaryEmail: Email;
emailVerified: boolean; // true if registered via SSO with verified claim
homeTenantId?: TenantId;
status: 'pending_verification' | 'active';
registrationSource: 'self' | 'sso_jit' | 'invite' | 'bulk_import';
createdAt: ISODate;
}
JSON Schema:
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "schemas://identity/user/registered/v1",
"type": "object",
"additionalProperties": false,
"required": ["userId", "primaryEmail", "emailVerified", "status", "registrationSource", "createdAt"],
"properties": {
"userId": { "type": "string", "pattern": "^usr_[0-9A-HJKMNP-TV-Z]{26}$" },
"primaryEmail": { "type": "string", "format": "email", "maxLength": 254 },
"emailVerified": { "type": "boolean" },
"homeTenantId": { "type": "string", "pattern": "^ten_[0-9A-HJKMNP-TV-Z]{26}$" },
"status": { "enum": ["pending_verification", "active"] },
"registrationSource": { "enum": ["self", "sso_jit", "invite", "bulk_import"] },
"createdAt": { "type": "string", "format": "date-time" }
}
}
Example:
{
"eventId": "01HN2K...",
"eventType": "identity.user.registered",
"eventVersion": 1,
"schemaUri": "schemas://identity/user/registered/v1#sha256-...",
"source": { "service": "identity-service", "instance": "identity-7f8d", "commit": "abc123" },
"occurredAt": "2026-04-15T10:00:00Z",
"ingestedAt": "2026-04-15T10:00:00.050Z",
"correlationId": "01HN2K...",
"tenantId": "ten_01HN...",
"actor": { "type": "user", "id": "usr_01HN..." },
"partitionKey": "usr_01HN...",
"retentionClass": "regulated",
"dataResidency": "us",
"payload": {
"userId": "usr_01HN...",
"primaryEmail": "user@example.com",
"emailVerified": false,
"homeTenantId": "ten_01HN...",
"status": "pending_verification",
"registrationSource": "self",
"createdAt": "2026-04-15T10:00:00Z"
}
}
Consumers:
tenant-service— creates membership records if invite-sourcednotification-service— sends welcome/verification emailanalytics-service— tracks registration funnel
2.2 identity.user.email_verified.v1
Emitted when a user confirms their email.
Payload:
interface UserEmailVerifiedV1 {
userId: UserId;
primaryEmail: Email;
verifiedAt: ISODate;
}
Consumers:
tenant-service— activates pending membershipsnotification-service— welcome email (post-verification flow)
2.3 identity.user.logged_in.v1
Emitted on every successful authentication (password, SSO, refresh-after-MFA).
Payload:
interface UserLoggedInV1 {
userId: UserId;
sessionId: SessionId;
tenantId: TenantId;
deviceId?: DeviceId;
amr: string[]; // ['pwd'], ['pwd','totp'], ['webauthn'], ['sso'], etc.
ip: string; // IPv4 or IPv6
ua: string; // truncated to 512 chars
at: ISODate;
riskScore?: number; // 0-100, from adaptive MFA classifier
}
JSON Schema:
{
"$id": "schemas://identity/user/logged_in/v1",
"type": "object",
"additionalProperties": false,
"required": ["userId", "sessionId", "tenantId", "amr", "ip", "ua", "at"],
"properties": {
"userId": { "type": "string", "pattern": "^usr_[0-9A-HJKMNP-TV-Z]{26}$" },
"sessionId": { "type": "string", "pattern": "^ses_[0-9A-HJKMNP-TV-Z]{26}$" },
"tenantId": { "type": "string", "pattern": "^ten_[0-9A-HJKMNP-TV-Z]{26}$" },
"deviceId": { "type": "string", "pattern": "^dev_[0-9A-HJKMNP-TV-Z]{26}$" },
"amr": {
"type": "array",
"items": { "enum": ["pwd", "totp", "webauthn", "sms", "recovery_codes", "sso", "magic_link"] },
"minItems": 1
},
"ip": { "type": "string", "maxLength": 45 },
"ua": { "type": "string", "maxLength": 512 },
"at": { "type": "string", "format": "date-time" },
"riskScore": { "type": "number", "minimum": 0, "maximum": 100 }
}
}
Consumers:
analytics-service— login metrics, cohortsnotification-service— "new device" notificationssecurity-monitoring— anomaly detection
2.3a identity.user.mfa_enrolled.v1 (US-5)
Emitted when a user completes MFA enrollment (passkey / WebAuthn).
Payload:
interface UserMfaEnrolledV1 {
userId: UserId;
tenantId: TenantId;
factorId: string;
kind: 'webauthn' | 'totp' | 'recovery_codes';
at: ISODate;
}
Consumers: analytics-service (coverage metrics), notification-service (optional enrollment notice), tenant-service (policy compliance roll-ups).
2.3b identity.user.webauthn_registration_canceled.v1 (US-5)
Emitted when the user abandons WebAuthn enrollment before registration-verify (session deleted; no mfa_factors row created).
Payload:
interface WebAuthnRegistrationCanceledV1 {
userId: UserId;
tenantId: TenantId;
sessionId: string;
reason: string; // e.g. `user_canceled`, `user_closed_dialog`
at: ISODate;
}
Consumers: analytics-service (enrollment funnel / cancellation reasons).
2.4 identity.user.locked.v1
Emitted when an account is locked (failed attempts or admin action).
Payload:
interface UserLockedV1 {
userId: UserId;
reason: 'failed_attempts' | 'admin_action' | 'security_incident' | 'compliance_hold';
lockedUntil?: ISODate; // absent = indefinite
lockedBy?: UserId; // admin who locked (if applicable)
failedAttempts?: number;
at: ISODate;
}
Consumers:
notification-service— email user about lockoutanalytics-service— security metricsaudit-service— append to audit log
2.5 identity.session.revoked.v1
Emitted when a session is revoked.
Payload:
interface SessionRevokedV1 {
sessionId: SessionId;
userId: UserId;
tenantId: TenantId;
deviceId?: DeviceId;
reason: 'logout' | 'rotation_reuse' | 'admin_wipe' | 'password_reset' | 'security_incident' | 'mfa_changed' | 'device_revoked' | 'gdpr_erasure';
at: ISODate;
}
Consumers:
sync-service— invalidate device-bound sync cursoranalytics-service— session metricsnotification-service— security alert onrotation_reuseorsecurity_incident
2.6 identity.device.bound_for_offline.v1
Emitted when a device receives an offline binding certificate.
Payload:
interface DeviceBoundV1 {
userId: UserId;
tenantId: TenantId;
deviceId: DeviceId;
fingerprint: string; // SHA-256 of public key + UA
publicKeyFingerprint: string; // SHA-256 of public key alone
certificateKid: string; // key ID of identity-service CA used to sign
certExpiresAt: ISODate;
at: ISODate;
}
Consumers:
content-service— derive per-device bundle encryption keysync-service— register device for offline delta syncaudit-service— append binding event to audit log
2.7 identity.api_key.issued.v1
Payload:
interface APIKeyIssuedV1 {
apiKeyId: APIKeyId;
tenantId: TenantId;
ownerUserId?: UserId;
name: string;
prefix: string; // public prefix for identification
scopes: string[];
expiresAt?: ISODate;
createdAt: ISODate;
createdBy: UserId;
}
2.8 identity.api_key.revoked.v1
Payload:
interface APIKeyRevokedV1 {
apiKeyId: APIKeyId;
tenantId: TenantId;
reason: 'user_revoked' | 'expired' | 'security_incident' | 'tenant_closed';
revokedBy?: UserId;
at: ISODate;
}
2.9 identity.password.reset_requested.v1
Payload:
interface PasswordResetRequestedV1 {
userId: UserId;
resetTokenHash: string; // hash of token; notification-service has the token via secure channel
ip: string;
ua: string;
requestedAt: ISODate;
expiresAt: ISODate;
}
Consumers:
notification-service— send reset email with token
Security note: The raw reset token is not in the event payload. It is passed to notification-service via a secure side channel (encrypted payload) or reconstructed via a lookup on resetTokenHash.
3. Events Consumed
3.1 tenant.org.user_invited.v1
Producer: tenant-service
Handler: OnUserInvitedPolicy
Payload (from tenant-service):
interface UserInvitedV1 {
tenantId: TenantId;
email: Email;
invitedBy: UserId;
roleIds: RoleId[];
orgUnitIds: OrgUnitId[];
expiresAt: ISODate;
}
Handler logic:
- Check inbox for duplicate
eventId. - Look up user by email.
- If user exists: no-op (tenant-service will create membership separately).
- If no user: create shadow
Userwithstatus: pending_verification, no credentials (SSO or invite-accept flow will add them). - Write to inbox within same transaction.
- Commit.
3.2 gdpr.subject_request.received.v1
Producer: platform (cross-cutting)
Handler: OnGDPRRequestPolicy
Payload:
interface GDPRSubjectRequestV1 {
tenantId: TenantId;
subjectUserId: UserId;
kind: 'export' | 'erasure' | 'rectification' | 'restriction' | 'objection';
requestId: string;
at: ISODate;
}
Handler logic for erasure:
- Check inbox.
- Execute
HandleGDPRErasurecommand (see APPLICATION_LOGIC §2.15). - Emit
gdpr.subject_request.acknowledged.v1withservice: 'identity-service'.
Handler logic for export:
- Export all identity data for subject (users, credentials metadata excluding hash, sessions metadata, devices, MFA factors metadata, API keys metadata, external identities).
- Upload to secure export staging.
- Emit
gdpr.subject_request.acknowledged.v1.
4. Event Envelope (Applied)
Every published event uses the envelope (see 04 §4):
{
eventId: ULID,
eventType: "identity.user.registered" | "identity.session.revoked" | ...,
eventVersion: 1,
schemaUri: "schemas://identity/{aggregate}/{event}/v1#sha256-...",
source: { service: "identity-service", instance: "identity-{pod}", commit: "{git-sha}" },
occurredAt: ISODate,
ingestedAt: ISODate,
correlationId: ULID, // from W3C traceparent
causationId?: ULID,
tenantId: TenantId,
actor: { type: "user" | "system" | "api_key" | "service_account", id: string },
partitionKey: string, // userId / sessionId / etc.
retentionClass: "regulated",
dataResidency: "us" | "eu" | "me" | "ap",
outbox: { dbWriteTs: ISODate, outboxId: ULID },
payload: { ... }
}
5. Outbox Implementation
5.1 Table DDL
CREATE TABLE identity.outbox (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
occurred_at timestamptz NOT NULL DEFAULT now(),
tenant_id uuid NOT NULL,
topic text NOT NULL,
envelope jsonb NOT NULL,
published_at timestamptz,
attempts int NOT NULL DEFAULT 0,
last_error text,
partition_key text NOT NULL
);
CREATE INDEX outbox_unpublished_idx ON identity.outbox (occurred_at) WHERE published_at IS NULL;
CREATE INDEX outbox_tenant_idx ON identity.outbox (tenant_id);
5.2 Producer Pattern
Every state change that must emit an event:
// Pseudocode
await db.transaction(async (tx) => {
await userRepo.save(user, tx);
await outboxRepo.insert({
topic: 'identity.user.registered.v1',
envelope: buildEnvelope(payload),
partition_key: user.id,
}, tx);
});
// Single commit ensures atomicity
5.3 OutboxRelay
Background worker:
- Poll unpublished rows ordered by
occurred_at. - Publish to NATS JetStream on
identity.{aggregate}.{event}.v{N}. - On success:
UPDATE outbox SET published_at = now() WHERE id = ?. - On failure: increment
attempts, recordlast_error, backoff2^attemptsseconds (max 10 attempts, then DLQ).
Lock strategy: SELECT ... FOR UPDATE SKIP LOCKED LIMIT 100 ensures safe concurrent relays.
6. Inbox Implementation
6.1 Table DDL
CREATE TABLE identity.inbox (
event_id text PRIMARY KEY,
consumer text NOT NULL,
processed_at timestamptz NOT NULL DEFAULT now(),
result text NOT NULL
);
CREATE INDEX inbox_processed_idx ON identity.inbox (processed_at);
6.2 Consumer Pattern
// Pseudocode
await db.transaction(async (tx) => {
const existing = await inboxRepo.find(event.eventId, 'identity-service', tx);
if (existing) return; // duplicate - silent no-op
await handleEvent(event, tx);
await inboxRepo.insert({
event_id: event.eventId,
consumer: 'identity-service',
result: 'success',
}, tx);
});
7. Idempotency Guarantees
- Producer: Outbox
iduniqueness ensures each domain write produces exactly one outbox row. DuplicateNATS publishis idempotent because consumers deduplicate via inbox. - Consumer: Inbox
event_id PRIMARY KEYensures duplicate deliveries are silent no-ops. - Cross-boundary: Every write endpoint (REST) requires
Idempotency-Keyheader; identity-service stores(userId, route, key) -> responseSnapshotin Redis for 24h.
8. Schema Evolution
| Change | Allowed in v1? | Path |
|---|---|---|
| Add optional field | yes | PR + schema registry bump |
| Add required field | no | new v2 + dual-publish window (≥ 1 release cycle) |
| Remove field | no | new v2 |
| Rename field | no | new v2 |
| Widen enum | yes | PR + schema registry bump |
| Narrow enum | no | new v2 |
Schema registry at git@ghasi:/event-schemas/identity/. CI validates every published envelope at produce time and at consume time.
9. DLQ Handling
- Stream:
IDENTITY.dlq - Alert: PagerDuty on non-zero DLQ depth.
- Triage:
- Schema-invalid payload → fix producer or accept as poison with operator sign-off.
- Consumer bug → fix, replay via
POST /admin/events/replay. - Transient external failure → drain naturally.
10. Replay
- Any durable consumer can be reset to
seq=XorstartTime=T. - Identity-service does not maintain read-models internally, so replay is primarily for downstream recovery.
- Monthly game-day: wipe a non-prod consumer and measure recovery time.
11. Observability
| Metric | Meaning | SLO |
|---|---|---|
identity_outbox_lag_seconds | Oldest unpublished row age | p95 < 5s |
identity_outbox_depth | Unpublished rows | < 1000 |
identity_inbox_processed_rate | Events consumed per second | — |
identity_dlq_depth | DLQ backlog | 0 (alert on any) |
identity_event_publish_rate | Events published per second | — |
Traces: every publish span carries eventType, correlationId, tenantId, userId.