Skip to main content

Operator Management Service — Application Logic

Status: populated Owner: Platform Engineering Last updated: 2026-04-18 Companion: DOMAIN_MODEL · API_CONTRACTS

1. Use Cases

1.1 CreateOperatorUseCase

Triggered by POST /v1/admin/operators.

1. Zod-validate payload → CreateOperatorCommand
2. DuplicateOperatorGuard.check(host, port, systemId)
→ throws DuplicateOperatorError (409) if found
3. Generate operatorId (UUIDv4)
4. Begin PG transaction:
a. INSERT ops.operators (no password columns)
b. COMMIT
5. OperatorCredentialsService.write(operatorId, { systemId, password })
→ Vault KV PUT secret/ops/operators/{operatorId}/credentials
→ On failure: rollback via compensating DELETE ops.operators + re-throw 503
6. Redis SET ops:health:{operatorId} { status: "UNKNOWN" } EX 60
7. Publish operator.config.created.v1 (NATS)
8. Return 201 operator DTO (no password field)

1.2 UpdateOperatorUseCase

Triggered by PATCH /v1/admin/operators/:id.

1. Fetch operator from PG; 404 if not found or deleted_at IS NOT NULL
2. Zod-validate patch fields
3. If (host, port, systemId) changed: DuplicateOperatorGuard.check()
4. If password field present: OperatorCredentialsService.rotate(operatorId, newPassword)
5. PG UPDATE ops.operators SET ... WHERE operator_id = $1
6. Build changedFields diff for event
7. Publish operator.config.updated.v1 (NATS)
8. Return 200 updated DTO

1.3 SoftDeleteOperatorUseCase

Triggered by DELETE /v1/admin/operators/:id.

1. Fetch operator; 404 if already deleted
2. PG UPDATE SET deleted_at = now(), status = 'INACTIVE'
3. Publish operator.config.deleted.v1
4. Return 204

1.4 GetOperatorCredentialsUseCase

Triggered by internal GET /v1/internal/operators/:id/credentials.

1. Verify mTLS (middleware)
2. Fetch operator metadata from PG (host, port, bindType, tpsLimit)
3. OperatorCredentialsService.read(operatorId) → Vault GET
4. Return combined DTO { ...metadata, systemId, password }

1.5 IngestHealthEventUseCase

Triggered by incoming NATS event from smpp-connector.

1. Consume operator.health.inbound.v1 (internal subject; smpp-connector publishes)
2. Fetch current healthState from Redis ops:health:{operatorId}
3. HealthStateReducer.reduce(currentState, event) → newState
4. If state changed:
a. INSERT ops.operator_health_log (operatorId, previousState, newState, errorRate, at)
b. Redis SET ops:health:{operatorId} {status: newState} EX 60
c. Publish operator.health.v1 (NATS — consumed by routing-engine)
5. ACK

1.6 CreateRoutingRuleUseCase

1. Validate prefix + operator exists
2. RoutingRuleConflictChecker.check(prefix, existingRules)
3. INSERT ops.routing_rules
4. INSERT ops.destination_prefixes (normalized prefix decomposition for range scan)
5. Publish operator.config.updated.v1 { changedFields: ['routingRules'] }

2. Ports

PortAdapter
OperatorRepositoryPrisma / PostgreSQL (schema ops)
CredentialsStoreVault HTTP adapter (node-vault)
HealthCacheRedis adapter (ioredis)
EventPublisherNATS JetStream adapter
HealthEventSubscriberNATS JetStream consumer (internal)

3. Concurrency

  • Duplicate check + INSERT are inside a serializable PG transaction; (host, port, system_id) unique index prevents races.
  • Vault write after PG commit is compensated on failure (delete inserted row).
  • Redis health SET is unconditional — last writer wins; for health state this is acceptable (smpp-connector publishes at most once per 10 s per operator).

4. Observability Hooks

  • OTel span per use-case with attributes ops.operator_id, ops.action.
  • Pino structured logs: operatorId, action, durationMs, vaultPath.
  • Prometheus counters: ops_operator_create_total{result}, ops_health_transition_total{from, to}, ops_vault_read_duration_seconds, ops_nats_publish_total{event}.