Skip to main content

Events

:::info Source Sourced from services/identity-service/EVENT_SCHEMAS.md in the documentation repo. :::

Companion: 04 Event-Driven Architecture · DOMAIN_MODEL · APPLICATION_LOGIC

All events wrap payloads in the platform EventEnvelope (frozen at F01; see 04 §4). Publishing uses the transactional outbox pattern; consumption uses the inbox deduplication pattern.

1. Subject Naming

All identity events follow: identity.{aggregate}.{event}.v{N}.

  • Stream: IDENTITY
  • Retention class: regulated (180d hot in JetStream, 7y cold archive)
  • Partition key: aggregateId (e.g., userId, sessionId, deviceId, apiKeyId)

2. Events Published

2.1 identity.user.registered.v1

Emitted when a new user is created via any registration path.

Subject: identity.user.registered.v1 Partition key: userId

Payload schema:

interface UserRegisteredV1 {
userId: UserId;
primaryEmail: Email;
emailVerified: boolean; // true if registered via SSO with verified claim
homeTenantId?: TenantId;
status: 'pending_verification' | 'active';
registrationSource: 'self' | 'sso_jit' | 'invite' | 'bulk_import';
createdAt: ISODate;
}

JSON Schema:

{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "schemas://identity/user/registered/v1",
"type": "object",
"additionalProperties": false,
"required": ["userId", "primaryEmail", "emailVerified", "status", "registrationSource", "createdAt"],
"properties": {
"userId": { "type": "string", "pattern": "^usr_[0-9A-HJKMNP-TV-Z]{26}$" },
"primaryEmail": { "type": "string", "format": "email", "maxLength": 254 },
"emailVerified": { "type": "boolean" },
"homeTenantId": { "type": "string", "pattern": "^ten_[0-9A-HJKMNP-TV-Z]{26}$" },
"status": { "enum": ["pending_verification", "active"] },
"registrationSource": { "enum": ["self", "sso_jit", "invite", "bulk_import"] },
"createdAt": { "type": "string", "format": "date-time" }
}
}

Example:

{
"eventId": "01HN2K...",
"eventType": "identity.user.registered",
"eventVersion": 1,
"schemaUri": "schemas://identity/user/registered/v1#sha256-...",
"source": { "service": "identity-service", "instance": "identity-7f8d", "commit": "abc123" },
"occurredAt": "2026-04-15T10:00:00Z",
"ingestedAt": "2026-04-15T10:00:00.050Z",
"correlationId": "01HN2K...",
"tenantId": "ten_01HN...",
"actor": { "type": "user", "id": "usr_01HN..." },
"partitionKey": "usr_01HN...",
"retentionClass": "regulated",
"dataResidency": "us",
"payload": {
"userId": "usr_01HN...",
"primaryEmail": "user@example.com",
"emailVerified": false,
"homeTenantId": "ten_01HN...",
"status": "pending_verification",
"registrationSource": "self",
"createdAt": "2026-04-15T10:00:00Z"
}
}

Consumers:

  • tenant-service — creates membership records if invite-sourced
  • notification-service — sends welcome/verification email
  • analytics-service — tracks registration funnel

2.2 identity.user.email_verified.v1

Emitted when a user confirms their email.

Payload:

interface UserEmailVerifiedV1 {
userId: UserId;
primaryEmail: Email;
verifiedAt: ISODate;
}

Consumers:

  • tenant-service — activates pending memberships
  • notification-service — welcome email (post-verification flow)

2.3 identity.user.logged_in.v1

Emitted on every successful authentication (password, SSO, refresh-after-MFA).

Payload:

interface UserLoggedInV1 {
userId: UserId;
sessionId: SessionId;
tenantId: TenantId;
deviceId?: DeviceId;
amr: string[]; // ['pwd'], ['pwd','totp'], ['webauthn'], ['sso'], etc.
ip: string; // IPv4 or IPv6
ua: string; // truncated to 512 chars
at: ISODate;
riskScore?: number; // 0-100, from adaptive MFA classifier
}

JSON Schema:

{
"$id": "schemas://identity/user/logged_in/v1",
"type": "object",
"additionalProperties": false,
"required": ["userId", "sessionId", "tenantId", "amr", "ip", "ua", "at"],
"properties": {
"userId": { "type": "string", "pattern": "^usr_[0-9A-HJKMNP-TV-Z]{26}$" },
"sessionId": { "type": "string", "pattern": "^ses_[0-9A-HJKMNP-TV-Z]{26}$" },
"tenantId": { "type": "string", "pattern": "^ten_[0-9A-HJKMNP-TV-Z]{26}$" },
"deviceId": { "type": "string", "pattern": "^dev_[0-9A-HJKMNP-TV-Z]{26}$" },
"amr": {
"type": "array",
"items": { "enum": ["pwd", "totp", "webauthn", "sms", "recovery_codes", "sso", "magic_link"] },
"minItems": 1
},
"ip": { "type": "string", "maxLength": 45 },
"ua": { "type": "string", "maxLength": 512 },
"at": { "type": "string", "format": "date-time" },
"riskScore": { "type": "number", "minimum": 0, "maximum": 100 }
}
}

Consumers:

  • analytics-service — login metrics, cohorts
  • notification-service — "new device" notifications
  • security-monitoring — anomaly detection

2.3a identity.user.mfa_enrolled.v1 (US-5)

Emitted when a user completes MFA enrollment (passkey / WebAuthn).

Payload:

interface UserMfaEnrolledV1 {
userId: UserId;
tenantId: TenantId;
factorId: string;
kind: 'webauthn' | 'totp' | 'recovery_codes';
at: ISODate;
}

Consumers: analytics-service (coverage metrics), notification-service (optional enrollment notice), tenant-service (policy compliance roll-ups).

2.3b identity.user.webauthn_registration_canceled.v1 (US-5)

Emitted when the user abandons WebAuthn enrollment before registration-verify (session deleted; no mfa_factors row created).

Payload:

interface WebAuthnRegistrationCanceledV1 {
userId: UserId;
tenantId: TenantId;
sessionId: string;
reason: string; // e.g. `user_canceled`, `user_closed_dialog`
at: ISODate;
}

Consumers: analytics-service (enrollment funnel / cancellation reasons).

2.4 identity.user.locked.v1

Emitted when an account is locked (failed attempts or admin action).

Payload:

interface UserLockedV1 {
userId: UserId;
reason: 'failed_attempts' | 'admin_action' | 'security_incident' | 'compliance_hold';
lockedUntil?: ISODate; // absent = indefinite
lockedBy?: UserId; // admin who locked (if applicable)
failedAttempts?: number;
at: ISODate;
}

Consumers:

  • notification-service — email user about lockout
  • analytics-service — security metrics
  • audit-service — append to audit log

2.5 identity.session.revoked.v1

Emitted when a session is revoked.

Payload:

interface SessionRevokedV1 {
sessionId: SessionId;
userId: UserId;
tenantId: TenantId;
deviceId?: DeviceId;
reason: 'logout' | 'rotation_reuse' | 'admin_wipe' | 'password_reset' | 'security_incident' | 'mfa_changed' | 'device_revoked' | 'gdpr_erasure';
at: ISODate;
}

Consumers:

  • sync-service — invalidate device-bound sync cursor
  • analytics-service — session metrics
  • notification-service — security alert on rotation_reuse or security_incident

2.6 identity.device.bound_for_offline.v1

Emitted when a device receives an offline binding certificate.

Payload:

interface DeviceBoundV1 {
userId: UserId;
tenantId: TenantId;
deviceId: DeviceId;
fingerprint: string; // SHA-256 of public key + UA
publicKeyFingerprint: string; // SHA-256 of public key alone
certificateKid: string; // key ID of identity-service CA used to sign
certExpiresAt: ISODate;
at: ISODate;
}

Consumers:

  • content-service — derive per-device bundle encryption key
  • sync-service — register device for offline delta sync
  • audit-service — append binding event to audit log

2.7 identity.api_key.issued.v1

Payload:

interface APIKeyIssuedV1 {
apiKeyId: APIKeyId;
tenantId: TenantId;
ownerUserId?: UserId;
name: string;
prefix: string; // public prefix for identification
scopes: string[];
expiresAt?: ISODate;
createdAt: ISODate;
createdBy: UserId;
}

2.8 identity.api_key.revoked.v1

Payload:

interface APIKeyRevokedV1 {
apiKeyId: APIKeyId;
tenantId: TenantId;
reason: 'user_revoked' | 'expired' | 'security_incident' | 'tenant_closed';
revokedBy?: UserId;
at: ISODate;
}

2.9 identity.password.reset_requested.v1

Payload:

interface PasswordResetRequestedV1 {
userId: UserId;
resetTokenHash: string; // hash of token; notification-service has the token via secure channel
ip: string;
ua: string;
requestedAt: ISODate;
expiresAt: ISODate;
}

Consumers:

  • notification-service — send reset email with token

Security note: The raw reset token is not in the event payload. It is passed to notification-service via a secure side channel (encrypted payload) or reconstructed via a lookup on resetTokenHash.

3. Events Consumed

3.1 tenant.org.user_invited.v1

Producer: tenant-service Handler: OnUserInvitedPolicy

Payload (from tenant-service):

interface UserInvitedV1 {
tenantId: TenantId;
email: Email;
invitedBy: UserId;
roleIds: RoleId[];
orgUnitIds: OrgUnitId[];
expiresAt: ISODate;
}

Handler logic:

  1. Check inbox for duplicate eventId.
  2. Look up user by email.
  3. If user exists: no-op (tenant-service will create membership separately).
  4. If no user: create shadow User with status: pending_verification, no credentials (SSO or invite-accept flow will add them).
  5. Write to inbox within same transaction.
  6. Commit.

3.2 gdpr.subject_request.received.v1

Producer: platform (cross-cutting) Handler: OnGDPRRequestPolicy

Payload:

interface GDPRSubjectRequestV1 {
tenantId: TenantId;
subjectUserId: UserId;
kind: 'export' | 'erasure' | 'rectification' | 'restriction' | 'objection';
requestId: string;
at: ISODate;
}

Handler logic for erasure:

  1. Check inbox.
  2. Execute HandleGDPRErasure command (see APPLICATION_LOGIC §2.15).
  3. Emit gdpr.subject_request.acknowledged.v1 with service: 'identity-service'.

Handler logic for export:

  1. Export all identity data for subject (users, credentials metadata excluding hash, sessions metadata, devices, MFA factors metadata, API keys metadata, external identities).
  2. Upload to secure export staging.
  3. Emit gdpr.subject_request.acknowledged.v1.

4. Event Envelope (Applied)

Every published event uses the envelope (see 04 §4):

{
eventId: ULID,
eventType: "identity.user.registered" | "identity.session.revoked" | ...,
eventVersion: 1,
schemaUri: "schemas://identity/{aggregate}/{event}/v1#sha256-...",
source: { service: "identity-service", instance: "identity-{pod}", commit: "{git-sha}" },
occurredAt: ISODate,
ingestedAt: ISODate,
correlationId: ULID, // from W3C traceparent
causationId?: ULID,
tenantId: TenantId,
actor: { type: "user" | "system" | "api_key" | "service_account", id: string },
partitionKey: string, // userId / sessionId / etc.
retentionClass: "regulated",
dataResidency: "us" | "eu" | "me" | "ap",
outbox: { dbWriteTs: ISODate, outboxId: ULID },
payload: { ... }
}

5. Outbox Implementation

5.1 Table DDL

CREATE TABLE identity.outbox (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
occurred_at timestamptz NOT NULL DEFAULT now(),
tenant_id uuid NOT NULL,
topic text NOT NULL,
envelope jsonb NOT NULL,
published_at timestamptz,
attempts int NOT NULL DEFAULT 0,
last_error text,
partition_key text NOT NULL
);
CREATE INDEX outbox_unpublished_idx ON identity.outbox (occurred_at) WHERE published_at IS NULL;
CREATE INDEX outbox_tenant_idx ON identity.outbox (tenant_id);

5.2 Producer Pattern

Every state change that must emit an event:

// Pseudocode
await db.transaction(async (tx) => {
await userRepo.save(user, tx);
await outboxRepo.insert({
topic: 'identity.user.registered.v1',
envelope: buildEnvelope(payload),
partition_key: user.id,
}, tx);
});
// Single commit ensures atomicity

5.3 OutboxRelay

Background worker:

  1. Poll unpublished rows ordered by occurred_at.
  2. Publish to NATS JetStream on identity.{aggregate}.{event}.v{N}.
  3. On success: UPDATE outbox SET published_at = now() WHERE id = ?.
  4. On failure: increment attempts, record last_error, backoff 2^attempts seconds (max 10 attempts, then DLQ).

Lock strategy: SELECT ... FOR UPDATE SKIP LOCKED LIMIT 100 ensures safe concurrent relays.

6. Inbox Implementation

6.1 Table DDL

CREATE TABLE identity.inbox (
event_id text PRIMARY KEY,
consumer text NOT NULL,
processed_at timestamptz NOT NULL DEFAULT now(),
result text NOT NULL
);
CREATE INDEX inbox_processed_idx ON identity.inbox (processed_at);

6.2 Consumer Pattern

// Pseudocode
await db.transaction(async (tx) => {
const existing = await inboxRepo.find(event.eventId, 'identity-service', tx);
if (existing) return; // duplicate - silent no-op

await handleEvent(event, tx);

await inboxRepo.insert({
event_id: event.eventId,
consumer: 'identity-service',
result: 'success',
}, tx);
});

7. Idempotency Guarantees

  • Producer: Outbox id uniqueness ensures each domain write produces exactly one outbox row. Duplicate NATS publish is idempotent because consumers deduplicate via inbox.
  • Consumer: Inbox event_id PRIMARY KEY ensures duplicate deliveries are silent no-ops.
  • Cross-boundary: Every write endpoint (REST) requires Idempotency-Key header; identity-service stores (userId, route, key) -> responseSnapshot in Redis for 24h.

8. Schema Evolution

ChangeAllowed in v1?Path
Add optional fieldyesPR + schema registry bump
Add required fieldnonew v2 + dual-publish window (≥ 1 release cycle)
Remove fieldnonew v2
Rename fieldnonew v2
Widen enumyesPR + schema registry bump
Narrow enumnonew v2

Schema registry at git@ghasi:/event-schemas/identity/. CI validates every published envelope at produce time and at consume time.

9. DLQ Handling

  • Stream: IDENTITY.dlq
  • Alert: PagerDuty on non-zero DLQ depth.
  • Triage:
    • Schema-invalid payload → fix producer or accept as poison with operator sign-off.
    • Consumer bug → fix, replay via POST /admin/events/replay.
    • Transient external failure → drain naturally.

10. Replay

  • Any durable consumer can be reset to seq=X or startTime=T.
  • Identity-service does not maintain read-models internally, so replay is primarily for downstream recovery.
  • Monthly game-day: wipe a non-prod consumer and measure recovery time.

11. Observability

MetricMeaningSLO
identity_outbox_lag_secondsOldest unpublished row agep95 < 5s
identity_outbox_depthUnpublished rows< 1000
identity_inbox_processed_rateEvents consumed per second
identity_dlq_depthDLQ backlog0 (alert on any)
identity_event_publish_rateEvents published per second

Traces: every publish span carries eventType, correlationId, tenantId, userId.