Operator Management Service — Application Logic
Status: populated Owner: Platform Engineering Last updated: 2026-04-18 Companion: DOMAIN_MODEL · API_CONTRACTS
1. Use Cases
1.1 CreateOperatorUseCase
Triggered by POST /v1/admin/operators.
1. Zod-validate payload → CreateOperatorCommand
2. DuplicateOperatorGuard.check(host, port, systemId)
→ throws DuplicateOperatorError (409) if found
3. Generate operatorId (UUIDv4)
4. Begin PG transaction:
a. INSERT ops.operators (no password columns)
b. COMMIT
5. OperatorCredentialsService.write(operatorId, { systemId, password })
→ Vault KV PUT secret/ops/operators/{operatorId}/credentials
→ On failure: rollback via compensating DELETE ops.operators + re-throw 503
6. Redis SET ops:health:{operatorId} { status: "UNKNOWN" } EX 60
7. Publish operator.config.created.v1 (NATS)
8. Return 201 operator DTO (no password field)
1.2 UpdateOperatorUseCase
Triggered by PATCH /v1/admin/operators/:id.
1. Fetch operator from PG; 404 if not found or deleted_at IS NOT NULL
2. Zod-validate patch fields
3. If (host, port, systemId) changed: DuplicateOperatorGuard.check()
4. If password field present: OperatorCredentialsService.rotate(operatorId, newPassword)
5. PG UPDATE ops.operators SET ... WHERE operator_id = $1
6. Build changedFields diff for event
7. Publish operator.config.updated.v1 (NATS)
8. Return 200 updated DTO
1.3 SoftDeleteOperatorUseCase
Triggered by DELETE /v1/admin/operators/:id.
1. Fetch operator; 404 if already deleted
2. PG UPDATE SET deleted_at = now(), status = 'INACTIVE'
3. Publish operator.config.deleted.v1
4. Return 204
1.4 GetOperatorCredentialsUseCase
Triggered by internal GET /v1/internal/operators/:id/credentials.
1. Verify mTLS (middleware)
2. Fetch operator metadata from PG (host, port, bindType, tpsLimit)
3. OperatorCredentialsService.read(operatorId) → Vault GET
4. Return combined DTO { ...metadata, systemId, password }
1.5 IngestHealthEventUseCase
Triggered by incoming NATS event from smpp-connector.
1. Consume operator.health.inbound.v1 (internal subject; smpp-connector publishes)
2. Fetch current healthState from Redis ops:health:{operatorId}
3. HealthStateReducer.reduce(currentState, event) → newState
4. If state changed:
a. INSERT ops.operator_health_log (operatorId, previousState, newState, errorRate, at)
b. Redis SET ops:health:{operatorId} {status: newState} EX 60
c. Publish operator.health.v1 (NATS — consumed by routing-engine)
5. ACK
1.6 CreateRoutingRuleUseCase
1. Validate prefix + operator exists
2. RoutingRuleConflictChecker.check(prefix, existingRules)
3. INSERT ops.routing_rules
4. INSERT ops.destination_prefixes (normalized prefix decomposition for range scan)
5. Publish operator.config.updated.v1 { changedFields: ['routingRules'] }
2. Ports
| Port | Adapter |
|---|---|
OperatorRepository | Prisma / PostgreSQL (schema ops) |
CredentialsStore | Vault HTTP adapter (node-vault) |
HealthCache | Redis adapter (ioredis) |
EventPublisher | NATS JetStream adapter |
HealthEventSubscriber | NATS JetStream consumer (internal) |
3. Concurrency
- Duplicate check + INSERT are inside a serializable PG transaction;
(host, port, system_id)unique index prevents races. - Vault write after PG commit is compensated on failure (delete inserted row).
- Redis health SET is unconditional — last writer wins; for health state this is acceptable (smpp-connector publishes at most once per 10 s per operator).
4. Observability Hooks
- OTel span per use-case with attributes
ops.operator_id,ops.action. - Pino structured logs:
operatorId,action,durationMs,vaultPath. - Prometheus counters:
ops_operator_create_total{result},ops_health_transition_total{from, to},ops_vault_read_duration_seconds,ops_nats_publish_total{event}.