Fraud Intelligence Service — Testing Strategy
Version: 1.0 Status: Draft Owner: Trust and Safety + Quality Engineering Last Updated: 2026-04-21 Companion: APPLICATION_LOGIC · AI_INTEGRATION · FAILURE_MODES
1. Test Pyramid
┌────────────────────────────┐
│ E2E (5–8 flows) │ Playwright + k6 smoke
├────────────────────────────┤
│ Model Evaluation Suite │ AUC/F1/precision/recall
│ (frozen + adversarial) │ fairness audit per cohort
├────────────────────────────┤
│ Contract (Pact + Schema) │ gRPC, REST, NATS event
├────────────────────────────┤
│ Integration (80+) │ Real PG + ClickHouse + Redis + NATS + mock Triton
├────────────────────────────┤
│ Unit (300+) │ Feature transformers, score normaliser, scorers, pattern matchers
└────────────────────────────┘
Coverage targets:
- Domain layer: ≥ 90% line, ≥ 85% branch
- Application layer: ≥ 85% line, ≥ 80% branch
- Infrastructure / adapters: ≥ 70% line
- Overall: ≥ 85% line, ≥ 80% branch (aggregated)
- Model evaluation: AUC, precision, recall, fairness, adversarial recall — per AI_INTEGRATION §3 acceptance metrics
2. Unit Tests
2.1 Feature transformers
Framework: pytest (Python workers) + Jest (NestJS API)
# tests/unit/features/test_ait_transformer.py
class TestAitFeatureTransformer:
def test_synthetic_ait_campaign_signature(self):
# Synthetic 5-min window with classic AIT pattern
df = build_synthetic_window(
submit_count=42100, dlr_success_rate=0.18,
unique_dst_msisdns=39822, repeated_body_ratio=0.97,
tenant_age_days=7
)
x = AitFeatureTransformer().transform(df)
assert x.shape == (1, 12)
assert x[0, FEATURES.index('dlr_success_rate')] == pytest.approx(0.18)
def test_imputation_zero_for_unique_sender_ids_when_null(self):
df = build_synthetic_window(unique_sender_ids=None)
x = AitFeatureTransformer().transform(df)
assert x[0, FEATURES.index('unique_sender_ids')] == 1 # imputation policy
def test_feature_set_hash_deterministic(self):
h1 = AitFeatureTransformer().feature_set_hash
h2 = AitFeatureTransformer().feature_set_hash
assert h1 == h2
def test_feature_set_hash_changes_on_schema_change(self):
h1 = AitFeatureTransformer().feature_set_hash
AitFeatureTransformer.FEATURE_NAMES = [*AitFeatureTransformer.FEATURE_NAMES, 'extra']
h2 = AitFeatureTransformer().feature_set_hash
assert h1 != h2
2.2 Score normaliser
class TestFraudScoreNormaliser:
@pytest.mark.parametrize('raw,expected_tier', [
(0.05, 'SAFE'), (0.20, 'WATCH'), (0.50, 'RISKY'), (0.85, 'HIGH_RISK')
])
def test_tier_boundaries(self, raw, expected_tier):
assert tier_for_score(raw) == expected_tier
def test_no_signals_returns_probation(self):
result = compute_tenant_score(tenant_id='tnt_new', detections=[], signals=[])
assert result.tier == 'PROBATION' and result.score == 0.5
2.3 Detection emitters and pattern matchers
describe('AitDetectionEmitter', () => {
it('emits HIGH event for score >= 0.85', async () => {
const emitted = await sut.emit({ score: 0.94, ... });
expect(emitted.subject).toBe('fraud.detected.ait.v1');
});
it('opens case for 0.6 <= score < 0.85', async () => {
await sut.emit({ score: 0.72, ... });
expect(caseRepo.created).toHaveBeenCalledWith({ status: 'PENDING_REVIEW', ... });
});
it('logs only for score < 0.6', async () => {
await sut.emit({ score: 0.45, ... });
expect(natsPublish).not.toHaveBeenCalled();
expect(caseRepo.created).not.toHaveBeenCalled();
});
it('suppresses detection when subject is on allowlist', async () => {
allowlist.contains.mockReturnValue(true);
const emitted = await sut.emit({ subjectId: 'tnt_bank', ... });
expect(emitted.enforcementStatus).toBe('SUPPRESSED');
});
});
2.4 Case workflow & separation of duties
describe('CaseDecisionService', () => {
it('rejects decide call when openedBy === decidedBy', async () => {
const c = await caseRepo.create({ openedBy: 'user_alice', ... });
await expect(sut.decide(c.caseId, 'user_alice', { decision: 'CONFIRM_FRAUD', reason: '…' }))
.rejects.toThrow('FRAUD_SEPARATION_OF_DUTIES_VIOLATED');
});
it('rejects decide when reason < 20 chars', async () => {
await expect(sut.decide(c.caseId, 'user_bob', { decision: 'DISMISS', reason: 'too short' }))
.rejects.toThrow('FRAUD_DECISION_REASON_TOO_SHORT');
});
it('marks case STALE after 30 days without decision', async () => {
const c = await caseRepo.create({ openedAt: daysAgo(31), status: 'PENDING_REVIEW' });
await staleScanner.run();
expect(await caseRepo.findById(c.caseId)).toMatchObject({ status: 'STALE' });
});
});
2.5 OTP-grinding streaming aggregator
describe('OtpGrindingAggregator', () => {
it('emits detection on 11th OTP within 60s to same MSISDN', async () => {
for (let i = 0; i < 10; i++) await sut.observe(otpEvent('+93701123456'));
expect(natsPublish).not.toHaveBeenCalled();
await sut.observe(otpEvent('+93701123456'));
expect(natsPublish).toHaveBeenCalledWith(
'fraud.detected.otp_grinding.v1',
expect.objectContaining({ otpCountInWindow: 11 })
);
});
it('window slides correctly — old events drop out', async () => {
await sut.observe(otpEvent('+93701123456', tsAgo: 70_000)); // outside window
for (let i = 0; i < 10; i++) await sut.observe(otpEvent('+93701123456'));
expect(natsPublish).not.toHaveBeenCalled();
});
});
2.6 Property-based tests (hypothesis / fast-check)
@given(st.lists(st.builds(SignalArb), min_size=10, max_size=10000))
def test_score_is_idempotent(signals):
s1 = compute_tenant_score(tenant_id='t', detections=[], signals=signals)
s2 = compute_tenant_score(tenant_id='t', detections=[], signals=signals)
assert s1.score == s2.score and s1.tier == s2.tier
@given(st.builds(FeatureVectorArb))
def test_xgboost_score_in_unit_interval(fv):
score = model.predict(fv)
assert 0.0 <= score <= 1.0
3. Integration Tests
Framework: pytest + testcontainers (PostgreSQL 15, ClickHouse 23, Redis 7, NATS JetStream 2.10) + mock Triton (HTTP fixture).
3.1 gRPC Score handler integration
class TestScoreGrpcIntegration:
def test_returns_probation_for_unknown_tenant(self, grpc_client):
resp = grpc_client.Score(ScoreRequest(scope=TENANT, id='tnt_unknown'))
assert resp.tier == 'PROBATION' and resp.score == 0.5
def test_l1_cache_hit_after_first_call(self, grpc_client, redis):
grpc_client.Score(ScoreRequest(scope=TENANT, id='tnt_known'))
assert redis.get('fraud:score:TENANT:tnt_known') is not None
def test_permission_denied_for_non_allowlisted_spiffe(self, grpc_client_unauth):
with pytest.raises(grpc.RpcError) as e:
grpc_client_unauth.Score(ScoreRequest(scope=TENANT, id='tnt_x'))
assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED
def test_fail_closed_with_default_when_pg_unavailable(self, grpc_client, pg_killer):
pg_killer.stop()
# Stale cache or fall through
resp = grpc_client.Score(ScoreRequest(scope=TENANT, id='tnt_x'))
assert resp.tier in ('PROBATION', 'WATCH', 'SAFE') # never throws
3.2 Pipeline integration (AIT)
class TestAitPipelineIntegration:
def test_synthetic_ait_window_emits_high_confidence(self, pipeline, ch_seed, nats_capture):
ch_seed.insert_synthetic_ait_window(tenant='tnt_x', signals=10000, dlr_success=0.15)
pipeline.run_window(window_start=datetime(2026,4,21,10,0))
emitted = nats_capture.events(subject='fraud.detected.ait.v1')
assert len(emitted) == 1
assert emitted[0]['score'] >= 0.85
assert emitted[0]['aiProvenance']['shapTop3'] is not None
3.3 Outbox + NATS publish integration
def test_outbox_relay_publishes_to_nats(pg, nats_capture, relay):
pg.execute("INSERT INTO fraud.outbox (event_id, subject, payload) VALUES (...)")
relay.poll_once()
assert nats_capture.message_count('fraud.detected.ait.v1') == 1
assert pg.fetchval("SELECT published_at FROM fraud.outbox WHERE event_id = $1", eid) is not None
3.4 MISP feed import + signature verification
def test_import_rejects_invalid_signature(mtls_client, vault_pubkey):
body = build_misp_event(...)
bad_sig = sign_with_wrong_key(body)
resp = mtls_client.post('/v1/internal/fraud/feed/import', json={'Event': body, 'Signature': bad_sig})
assert resp.status_code == 422 and resp.json()['error']['code'] == 'FRAUD_FEED_SIGNATURE_INVALID'
assert nats_capture.message_count('fraud.alert.feed.signature.invalid.v1') == 1
def test_import_idempotent_on_source_uuid(mtls_client):
body = build_misp_event(uuid='5f3e9a18-...')
r1 = mtls_client.post('/v1/internal/fraud/feed/import', json=body)
r2 = mtls_client.post('/v1/internal/fraud/feed/import', json=body)
assert r1.json()['added'] == 1 and r2.json()['added'] == 0 and r2.json()['updated'] == 1
3.5 Model promote / rollback integration
class TestModelLifecycle:
def test_promote_blocked_under_24h_shadow(self, api):
v = api.register_version(model_id, ...)
api.shadow(v) # immediately
r = api.promote(v)
assert r.status_code == 412 and r.json()['error']['code'] == 'FRAUD_SHADOW_EVAL_INSUFFICIENT'
def test_rollback_restores_previous_active_within_60s(self, api, time_machine):
v_old = api.active_version()
v_new = api.register_and_promote(...)
t0 = time.time()
api.rollback(v_new)
assert time.time() - t0 < 60
assert api.active_version() == v_old
4. Model Evaluation Tests
Run on every model promotion attempt and nightly against the active model.
4.1 Frozen-test-corpus evaluation
def test_ait_xgboost_passes_acceptance_thresholds():
model = load_model('ml_ait_xgboost', version='active')
X, y = load_frozen_corpus('fraud-test-corpus-2026q2')
proba = model.predict_proba(X)[:,1]
assert roc_auc_score(y, proba) >= 0.92
pred = (proba >= 0.85).astype(int)
assert precision_score(y, pred) >= 0.92
assert recall_score(y, pred) >= 0.85
assert (pred[y==0]).mean() <= 0.005 # FPR ≤ 0.5%
4.2 Adversarial corpus
def test_ait_xgboost_recall_on_adversarial_corpus():
"""Paraphrased OTP-pumping, slow-burn AIT, MSISDN-block sweeps with timing jitter."""
model = load_model('ml_ait_xgboost', version='active')
X, y = load_corpus('adversarial-corpus-v3')
pred = (model.predict_proba(X)[:,1] >= 0.85).astype(int)
assert recall_score(y, pred) >= 0.80
4.3 Per-cohort fairness
def test_ait_xgboost_fairness_across_tenant_cohorts():
model = load_model('ml_ait_xgboost', version='active')
cohorts = ['bank', 'gov', 'sme', 'marketing']
aucs = {}
for c in cohorts:
X, y = load_cohort_subset('fraud-test-corpus-2026q2', cohort=c)
aucs[c] = roc_auc_score(y, model.predict_proba(X)[:,1])
delta = max(aucs.values()) - min(aucs.values())
assert delta <= 0.10, f"Fairness Δ={delta:.3f} exceeds 0.10 across {aucs}"
4.4 Calibration
def test_ait_xgboost_brier_score():
model = load_model('ml_ait_xgboost', version='active')
X, y = load_frozen_corpus('fraud-test-corpus-2026q2')
proba = model.predict_proba(X)[:,1]
assert brier_score_loss(y, proba) <= 0.10
5. Contract Tests
5.1 Pact (consumer-driven)
compliance-engine publishes its expected Score request/response shape; fraud-intel-service provider verifies.
// In compliance-engine repo (consumer)
provider('FraudIntelService').uponReceiving('a Score request for a known tenant')
.withRequest({ scope: 'TENANT', id: 'tnt_known' })
.willRespondWith({ score: 0.21, tier: 'WATCH', modelId: like('ml_ait_xgboost'), ... });
5.2 NATS event schema conformance
Every produced fraud.* event is validated against its JSON Schema in CI:
import { validate } from '@ghasi/event-schemas';
test('fraud.detected.ait.v1 conforms to schema', () => {
const ev = produce.aitDetection({ score: 0.94, ... });
expect(validate('fraud.detected.ait.v1', ev)).toBe(true);
});
5.3 OpenAPI contract
Generated OpenAPI 3.1 doc snapshot-tested in CI; breaking-change detection via oasdiff.
6. Load Tests
Framework: k6 (REST) + ghz (gRPC) + custom Python (NATS).
6.1 Score gRPC at 1000 RPS
ghz --proto src/proto/fraud.proto \
--call ghasi.sms.fraud.v1.FraudIntelService.Score \
--data-file ./test/load/score_requests.json \
--concurrency 100 --rps 1000 --duration 600s \
--host fraud-intel-grpc:50054
Pass criteria:
- P95 ≤ 50 ms, P99 ≤ 100 ms
- Error rate < 0.05%
- Cache hit ratio > 95% steady-state
- No memory leak over 10-min run
- Pod CPU < 80% averaged
6.2 NATS consumer at 10K eps ingestion
Synthetic publisher pumps 10K eps into firewall.audit.v1; assert:
- ClickHouse ingestion lag P95 ≤ 30 s
- DLQ rate < 0.1%
- Outbox-relay latency P95 < 100 ms
6.3 Pipeline burst
Insert 1M synthetic signals into fraud_features.events; trigger AIT pipeline; assert:
- Pipeline completes in < 90 s for 1000 active tenants × 5 MNOs
7. E2E Scenarios
Framework: Playwright (UI), bash + grpcurl + nats CLI (CLI E2E).
E2E-01: Detection → Enforcement (AIT)
- Inject synthetic AIT campaign via fixture publisher.
- Wait ≤ 5 min for pipeline run.
- Assert
fraud.detected.ait.v1emitted. - Assert
sms-firewall-serviceconsumes and updatesfirewall.peer_quarantine. - Assert next compliance evaluation for same tenant uses updated tier.
E2E-02: Case → HITL Decision → Action Dispatch
- Inject medium-confidence AIT signals.
- Assert case opened with
status=PENDING_REVIEW. - Login as
tns-fraud-analystin admin-dashboard. - Decide CONFIRM_FRAUD with executeAction=true.
- Assert action dispatched to firewall via NATS.
- Assert audit-log row.
E2E-03: Model Shadow → Promote → Rollback
- Register new model version via REST.
- Shadow for 24h (test fixture fast-forwards).
- Promote; assert active swap.
- Verify hot-reload in < 30s on workers.
- Rollback; assert previous active restored.
E2E-04: MISP Feed Export Roundtrip
- Confirm a fraud case.
- Wait for next scheduled export (test override 5-min cadence).
- Assert MinIO upload + signature.
- Assert SFTP mirror to test-mock regulator endpoint.
- Verify signature on downloaded file.
E2E-05: MISP Feed Import (signed)
- Push signed MISP body via internal mTLS endpoint.
- Assert indicators added.
- Trigger ingestion of an event with matching MSISDN.
- Assert
events.imported_indicator_msisdn = TRUE. - Assert next AIT pipeline accounts for this feature.
E2E-06: OTP-Grinding Real-Time Detection
- Publish 11 OTP-class messages to same MSISDN within 30 s.
- Assert
fraud.detected.otp_grinding.v1emitted within 5 s of 11th message. - Assert compliance-engine consumes and applies throttle.
E2E-07: Fail-Soft on Triton Outage
- Kill Triton pods.
- Trigger AIT pipeline.
- Assert pipeline does NOT fail; instead emits via rule-based
FraudPatternmatcher. - Assert
fraud.alert.model.unavailable.v1fired.
E2E-08: Score gRPC Fail-Closed-with-Default
- Stop fraud-intel-service pods.
- Make compliance-engine evaluate a message.
- Assert compliance-engine logs PROBATION fall-through and proceeds (does not fail-close).
8. Security Tests
- mTLS bypass attempt: Unauthenticated gRPC call → assert UNAVAILABLE / connection rejected at TLS layer.
- SPIFFE allowlist bypass: Authenticated cert with non-allowlisted SPIFFE → assert PERMISSION_DENIED.
- Role escalation:
tns-fraud-analystcallsPOST /v1/admin/fraud/models/{id}/promote→ assert 403. - Cross-tenant scope leakage: A query for
subjectIdof tenant B by tenant A's analyst → assert audit-log entry; no PII leaked in error message. - Audit-log immutability: Attempt UPDATE/DELETE on
fraud.audit_log→ assert no rows modified. - Allowlist two-person rule: Attempt to add allowlist entry where
addedBy === approvedBy→ assert 403. - Model artifact tamper: Upload artifact with mismatched SHA-256 → assert load refused, alert fired.
- MISP signature: 50 randomly-corrupted-signature import attempts → 100% rejected.
- ReDoS in OTP-pattern regex: Insert pathological regex → assert
re2/hyperscanengine catches; pattern length cap rejects.
9. CI Pipeline
# .github/workflows/fraud-intel-ci.yml (excerpt)
jobs:
unit:
steps:
- run: pnpm test:unit # NestJS
- run: pytest tests/unit # Python workers
- run: pnpm test:cov && python -m coverage report --fail-under=85
integration:
services: [postgres, clickhouse, redis, nats]
steps:
- run: pnpm test:integration
- run: pytest tests/integration
contract:
steps:
- run: pnpm pact:verify
- run: python -m schema_check fraud.detected.*
model-eval:
steps:
- run: pytest tests/model_eval --benchmark
load:
if: github.ref == 'refs/heads/main'
steps:
- run: ghz --insecure ... ./scripts/score-load.sh
e2e:
needs: [unit, integration, contract]
steps:
- run: ./scripts/e2e-up.sh && pnpm test:e2e
CI runtime budget: < 25 minutes for unit + integration + contract; < 90 minutes including model-eval + e2e.