Skip to main content

Fraud Intelligence Service — Testing Strategy

Version: 1.0 Status: Draft Owner: Trust and Safety + Quality Engineering Last Updated: 2026-04-21 Companion: APPLICATION_LOGIC · AI_INTEGRATION · FAILURE_MODES


1. Test Pyramid

┌────────────────────────────┐
│ E2E (5–8 flows) │ Playwright + k6 smoke
├────────────────────────────┤
│ Model Evaluation Suite │ AUC/F1/precision/recall
│ (frozen + adversarial) │ fairness audit per cohort
├────────────────────────────┤
│ Contract (Pact + Schema) │ gRPC, REST, NATS event
├────────────────────────────┤
│ Integration (80+) │ Real PG + ClickHouse + Redis + NATS + mock Triton
├────────────────────────────┤
│ Unit (300+) │ Feature transformers, score normaliser, scorers, pattern matchers
└────────────────────────────┘

Coverage targets:

  • Domain layer: ≥ 90% line, ≥ 85% branch
  • Application layer: ≥ 85% line, ≥ 80% branch
  • Infrastructure / adapters: ≥ 70% line
  • Overall: ≥ 85% line, ≥ 80% branch (aggregated)
  • Model evaluation: AUC, precision, recall, fairness, adversarial recall — per AI_INTEGRATION §3 acceptance metrics

2. Unit Tests

2.1 Feature transformers

Framework: pytest (Python workers) + Jest (NestJS API)

# tests/unit/features/test_ait_transformer.py
class TestAitFeatureTransformer:
def test_synthetic_ait_campaign_signature(self):
# Synthetic 5-min window with classic AIT pattern
df = build_synthetic_window(
submit_count=42100, dlr_success_rate=0.18,
unique_dst_msisdns=39822, repeated_body_ratio=0.97,
tenant_age_days=7
)
x = AitFeatureTransformer().transform(df)
assert x.shape == (1, 12)
assert x[0, FEATURES.index('dlr_success_rate')] == pytest.approx(0.18)

def test_imputation_zero_for_unique_sender_ids_when_null(self):
df = build_synthetic_window(unique_sender_ids=None)
x = AitFeatureTransformer().transform(df)
assert x[0, FEATURES.index('unique_sender_ids')] == 1 # imputation policy

def test_feature_set_hash_deterministic(self):
h1 = AitFeatureTransformer().feature_set_hash
h2 = AitFeatureTransformer().feature_set_hash
assert h1 == h2

def test_feature_set_hash_changes_on_schema_change(self):
h1 = AitFeatureTransformer().feature_set_hash
AitFeatureTransformer.FEATURE_NAMES = [*AitFeatureTransformer.FEATURE_NAMES, 'extra']
h2 = AitFeatureTransformer().feature_set_hash
assert h1 != h2

2.2 Score normaliser

class TestFraudScoreNormaliser:
@pytest.mark.parametrize('raw,expected_tier', [
(0.05, 'SAFE'), (0.20, 'WATCH'), (0.50, 'RISKY'), (0.85, 'HIGH_RISK')
])
def test_tier_boundaries(self, raw, expected_tier):
assert tier_for_score(raw) == expected_tier

def test_no_signals_returns_probation(self):
result = compute_tenant_score(tenant_id='tnt_new', detections=[], signals=[])
assert result.tier == 'PROBATION' and result.score == 0.5

2.3 Detection emitters and pattern matchers

describe('AitDetectionEmitter', () => {
it('emits HIGH event for score >= 0.85', async () => {
const emitted = await sut.emit({ score: 0.94, ... });
expect(emitted.subject).toBe('fraud.detected.ait.v1');
});

it('opens case for 0.6 <= score < 0.85', async () => {
await sut.emit({ score: 0.72, ... });
expect(caseRepo.created).toHaveBeenCalledWith({ status: 'PENDING_REVIEW', ... });
});

it('logs only for score < 0.6', async () => {
await sut.emit({ score: 0.45, ... });
expect(natsPublish).not.toHaveBeenCalled();
expect(caseRepo.created).not.toHaveBeenCalled();
});

it('suppresses detection when subject is on allowlist', async () => {
allowlist.contains.mockReturnValue(true);
const emitted = await sut.emit({ subjectId: 'tnt_bank', ... });
expect(emitted.enforcementStatus).toBe('SUPPRESSED');
});
});

2.4 Case workflow & separation of duties

describe('CaseDecisionService', () => {
it('rejects decide call when openedBy === decidedBy', async () => {
const c = await caseRepo.create({ openedBy: 'user_alice', ... });
await expect(sut.decide(c.caseId, 'user_alice', { decision: 'CONFIRM_FRAUD', reason: '…' }))
.rejects.toThrow('FRAUD_SEPARATION_OF_DUTIES_VIOLATED');
});

it('rejects decide when reason < 20 chars', async () => {
await expect(sut.decide(c.caseId, 'user_bob', { decision: 'DISMISS', reason: 'too short' }))
.rejects.toThrow('FRAUD_DECISION_REASON_TOO_SHORT');
});

it('marks case STALE after 30 days without decision', async () => {
const c = await caseRepo.create({ openedAt: daysAgo(31), status: 'PENDING_REVIEW' });
await staleScanner.run();
expect(await caseRepo.findById(c.caseId)).toMatchObject({ status: 'STALE' });
});
});

2.5 OTP-grinding streaming aggregator

describe('OtpGrindingAggregator', () => {
it('emits detection on 11th OTP within 60s to same MSISDN', async () => {
for (let i = 0; i < 10; i++) await sut.observe(otpEvent('+93701123456'));
expect(natsPublish).not.toHaveBeenCalled();
await sut.observe(otpEvent('+93701123456'));
expect(natsPublish).toHaveBeenCalledWith(
'fraud.detected.otp_grinding.v1',
expect.objectContaining({ otpCountInWindow: 11 })
);
});

it('window slides correctly — old events drop out', async () => {
await sut.observe(otpEvent('+93701123456', tsAgo: 70_000)); // outside window
for (let i = 0; i < 10; i++) await sut.observe(otpEvent('+93701123456'));
expect(natsPublish).not.toHaveBeenCalled();
});
});

2.6 Property-based tests (hypothesis / fast-check)

@given(st.lists(st.builds(SignalArb), min_size=10, max_size=10000))
def test_score_is_idempotent(signals):
s1 = compute_tenant_score(tenant_id='t', detections=[], signals=signals)
s2 = compute_tenant_score(tenant_id='t', detections=[], signals=signals)
assert s1.score == s2.score and s1.tier == s2.tier

@given(st.builds(FeatureVectorArb))
def test_xgboost_score_in_unit_interval(fv):
score = model.predict(fv)
assert 0.0 <= score <= 1.0

3. Integration Tests

Framework: pytest + testcontainers (PostgreSQL 15, ClickHouse 23, Redis 7, NATS JetStream 2.10) + mock Triton (HTTP fixture).

3.1 gRPC Score handler integration

class TestScoreGrpcIntegration:
def test_returns_probation_for_unknown_tenant(self, grpc_client):
resp = grpc_client.Score(ScoreRequest(scope=TENANT, id='tnt_unknown'))
assert resp.tier == 'PROBATION' and resp.score == 0.5

def test_l1_cache_hit_after_first_call(self, grpc_client, redis):
grpc_client.Score(ScoreRequest(scope=TENANT, id='tnt_known'))
assert redis.get('fraud:score:TENANT:tnt_known') is not None

def test_permission_denied_for_non_allowlisted_spiffe(self, grpc_client_unauth):
with pytest.raises(grpc.RpcError) as e:
grpc_client_unauth.Score(ScoreRequest(scope=TENANT, id='tnt_x'))
assert e.value.code() == grpc.StatusCode.PERMISSION_DENIED

def test_fail_closed_with_default_when_pg_unavailable(self, grpc_client, pg_killer):
pg_killer.stop()
# Stale cache or fall through
resp = grpc_client.Score(ScoreRequest(scope=TENANT, id='tnt_x'))
assert resp.tier in ('PROBATION', 'WATCH', 'SAFE') # never throws

3.2 Pipeline integration (AIT)

class TestAitPipelineIntegration:
def test_synthetic_ait_window_emits_high_confidence(self, pipeline, ch_seed, nats_capture):
ch_seed.insert_synthetic_ait_window(tenant='tnt_x', signals=10000, dlr_success=0.15)
pipeline.run_window(window_start=datetime(2026,4,21,10,0))
emitted = nats_capture.events(subject='fraud.detected.ait.v1')
assert len(emitted) == 1
assert emitted[0]['score'] >= 0.85
assert emitted[0]['aiProvenance']['shapTop3'] is not None

3.3 Outbox + NATS publish integration

def test_outbox_relay_publishes_to_nats(pg, nats_capture, relay):
pg.execute("INSERT INTO fraud.outbox (event_id, subject, payload) VALUES (...)")
relay.poll_once()
assert nats_capture.message_count('fraud.detected.ait.v1') == 1
assert pg.fetchval("SELECT published_at FROM fraud.outbox WHERE event_id = $1", eid) is not None

3.4 MISP feed import + signature verification

def test_import_rejects_invalid_signature(mtls_client, vault_pubkey):
body = build_misp_event(...)
bad_sig = sign_with_wrong_key(body)
resp = mtls_client.post('/v1/internal/fraud/feed/import', json={'Event': body, 'Signature': bad_sig})
assert resp.status_code == 422 and resp.json()['error']['code'] == 'FRAUD_FEED_SIGNATURE_INVALID'
assert nats_capture.message_count('fraud.alert.feed.signature.invalid.v1') == 1

def test_import_idempotent_on_source_uuid(mtls_client):
body = build_misp_event(uuid='5f3e9a18-...')
r1 = mtls_client.post('/v1/internal/fraud/feed/import', json=body)
r2 = mtls_client.post('/v1/internal/fraud/feed/import', json=body)
assert r1.json()['added'] == 1 and r2.json()['added'] == 0 and r2.json()['updated'] == 1

3.5 Model promote / rollback integration

class TestModelLifecycle:
def test_promote_blocked_under_24h_shadow(self, api):
v = api.register_version(model_id, ...)
api.shadow(v) # immediately
r = api.promote(v)
assert r.status_code == 412 and r.json()['error']['code'] == 'FRAUD_SHADOW_EVAL_INSUFFICIENT'

def test_rollback_restores_previous_active_within_60s(self, api, time_machine):
v_old = api.active_version()
v_new = api.register_and_promote(...)
t0 = time.time()
api.rollback(v_new)
assert time.time() - t0 < 60
assert api.active_version() == v_old

4. Model Evaluation Tests

Run on every model promotion attempt and nightly against the active model.

4.1 Frozen-test-corpus evaluation

def test_ait_xgboost_passes_acceptance_thresholds():
model = load_model('ml_ait_xgboost', version='active')
X, y = load_frozen_corpus('fraud-test-corpus-2026q2')
proba = model.predict_proba(X)[:,1]
assert roc_auc_score(y, proba) >= 0.92
pred = (proba >= 0.85).astype(int)
assert precision_score(y, pred) >= 0.92
assert recall_score(y, pred) >= 0.85
assert (pred[y==0]).mean() <= 0.005 # FPR ≤ 0.5%

4.2 Adversarial corpus

def test_ait_xgboost_recall_on_adversarial_corpus():
"""Paraphrased OTP-pumping, slow-burn AIT, MSISDN-block sweeps with timing jitter."""
model = load_model('ml_ait_xgboost', version='active')
X, y = load_corpus('adversarial-corpus-v3')
pred = (model.predict_proba(X)[:,1] >= 0.85).astype(int)
assert recall_score(y, pred) >= 0.80

4.3 Per-cohort fairness

def test_ait_xgboost_fairness_across_tenant_cohorts():
model = load_model('ml_ait_xgboost', version='active')
cohorts = ['bank', 'gov', 'sme', 'marketing']
aucs = {}
for c in cohorts:
X, y = load_cohort_subset('fraud-test-corpus-2026q2', cohort=c)
aucs[c] = roc_auc_score(y, model.predict_proba(X)[:,1])
delta = max(aucs.values()) - min(aucs.values())
assert delta <= 0.10, f"Fairness Δ={delta:.3f} exceeds 0.10 across {aucs}"

4.4 Calibration

def test_ait_xgboost_brier_score():
model = load_model('ml_ait_xgboost', version='active')
X, y = load_frozen_corpus('fraud-test-corpus-2026q2')
proba = model.predict_proba(X)[:,1]
assert brier_score_loss(y, proba) <= 0.10

5. Contract Tests

5.1 Pact (consumer-driven)

compliance-engine publishes its expected Score request/response shape; fraud-intel-service provider verifies.

// In compliance-engine repo (consumer)
provider('FraudIntelService').uponReceiving('a Score request for a known tenant')
.withRequest({ scope: 'TENANT', id: 'tnt_known' })
.willRespondWith({ score: 0.21, tier: 'WATCH', modelId: like('ml_ait_xgboost'), ... });

5.2 NATS event schema conformance

Every produced fraud.* event is validated against its JSON Schema in CI:

import { validate } from '@ghasi/event-schemas';
test('fraud.detected.ait.v1 conforms to schema', () => {
const ev = produce.aitDetection({ score: 0.94, ... });
expect(validate('fraud.detected.ait.v1', ev)).toBe(true);
});

5.3 OpenAPI contract

Generated OpenAPI 3.1 doc snapshot-tested in CI; breaking-change detection via oasdiff.


6. Load Tests

Framework: k6 (REST) + ghz (gRPC) + custom Python (NATS).

6.1 Score gRPC at 1000 RPS

ghz --proto src/proto/fraud.proto \
--call ghasi.sms.fraud.v1.FraudIntelService.Score \
--data-file ./test/load/score_requests.json \
--concurrency 100 --rps 1000 --duration 600s \
--host fraud-intel-grpc:50054

Pass criteria:

  • P95 ≤ 50 ms, P99 ≤ 100 ms
  • Error rate < 0.05%
  • Cache hit ratio > 95% steady-state
  • No memory leak over 10-min run
  • Pod CPU < 80% averaged

6.2 NATS consumer at 10K eps ingestion

Synthetic publisher pumps 10K eps into firewall.audit.v1; assert:

  • ClickHouse ingestion lag P95 ≤ 30 s
  • DLQ rate < 0.1%
  • Outbox-relay latency P95 < 100 ms

6.3 Pipeline burst

Insert 1M synthetic signals into fraud_features.events; trigger AIT pipeline; assert:

  • Pipeline completes in < 90 s for 1000 active tenants × 5 MNOs

7. E2E Scenarios

Framework: Playwright (UI), bash + grpcurl + nats CLI (CLI E2E).

E2E-01: Detection → Enforcement (AIT)

  1. Inject synthetic AIT campaign via fixture publisher.
  2. Wait ≤ 5 min for pipeline run.
  3. Assert fraud.detected.ait.v1 emitted.
  4. Assert sms-firewall-service consumes and updates firewall.peer_quarantine.
  5. Assert next compliance evaluation for same tenant uses updated tier.

E2E-02: Case → HITL Decision → Action Dispatch

  1. Inject medium-confidence AIT signals.
  2. Assert case opened with status=PENDING_REVIEW.
  3. Login as tns-fraud-analyst in admin-dashboard.
  4. Decide CONFIRM_FRAUD with executeAction=true.
  5. Assert action dispatched to firewall via NATS.
  6. Assert audit-log row.

E2E-03: Model Shadow → Promote → Rollback

  1. Register new model version via REST.
  2. Shadow for 24h (test fixture fast-forwards).
  3. Promote; assert active swap.
  4. Verify hot-reload in < 30s on workers.
  5. Rollback; assert previous active restored.

E2E-04: MISP Feed Export Roundtrip

  1. Confirm a fraud case.
  2. Wait for next scheduled export (test override 5-min cadence).
  3. Assert MinIO upload + signature.
  4. Assert SFTP mirror to test-mock regulator endpoint.
  5. Verify signature on downloaded file.

E2E-05: MISP Feed Import (signed)

  1. Push signed MISP body via internal mTLS endpoint.
  2. Assert indicators added.
  3. Trigger ingestion of an event with matching MSISDN.
  4. Assert events.imported_indicator_msisdn = TRUE.
  5. Assert next AIT pipeline accounts for this feature.

E2E-06: OTP-Grinding Real-Time Detection

  1. Publish 11 OTP-class messages to same MSISDN within 30 s.
  2. Assert fraud.detected.otp_grinding.v1 emitted within 5 s of 11th message.
  3. Assert compliance-engine consumes and applies throttle.

E2E-07: Fail-Soft on Triton Outage

  1. Kill Triton pods.
  2. Trigger AIT pipeline.
  3. Assert pipeline does NOT fail; instead emits via rule-based FraudPattern matcher.
  4. Assert fraud.alert.model.unavailable.v1 fired.

E2E-08: Score gRPC Fail-Closed-with-Default

  1. Stop fraud-intel-service pods.
  2. Make compliance-engine evaluate a message.
  3. Assert compliance-engine logs PROBATION fall-through and proceeds (does not fail-close).

8. Security Tests

  • mTLS bypass attempt: Unauthenticated gRPC call → assert UNAVAILABLE / connection rejected at TLS layer.
  • SPIFFE allowlist bypass: Authenticated cert with non-allowlisted SPIFFE → assert PERMISSION_DENIED.
  • Role escalation: tns-fraud-analyst calls POST /v1/admin/fraud/models/{id}/promote → assert 403.
  • Cross-tenant scope leakage: A query for subjectId of tenant B by tenant A's analyst → assert audit-log entry; no PII leaked in error message.
  • Audit-log immutability: Attempt UPDATE/DELETE on fraud.audit_log → assert no rows modified.
  • Allowlist two-person rule: Attempt to add allowlist entry where addedBy === approvedBy → assert 403.
  • Model artifact tamper: Upload artifact with mismatched SHA-256 → assert load refused, alert fired.
  • MISP signature: 50 randomly-corrupted-signature import attempts → 100% rejected.
  • ReDoS in OTP-pattern regex: Insert pathological regex → assert re2/hyperscan engine catches; pattern length cap rejects.

9. CI Pipeline

# .github/workflows/fraud-intel-ci.yml (excerpt)
jobs:
unit:
steps:
- run: pnpm test:unit # NestJS
- run: pytest tests/unit # Python workers
- run: pnpm test:cov && python -m coverage report --fail-under=85
integration:
services: [postgres, clickhouse, redis, nats]
steps:
- run: pnpm test:integration
- run: pytest tests/integration
contract:
steps:
- run: pnpm pact:verify
- run: python -m schema_check fraud.detected.*
model-eval:
steps:
- run: pytest tests/model_eval --benchmark
load:
if: github.ref == 'refs/heads/main'
steps:
- run: ghz --insecure ... ./scripts/score-load.sh
e2e:
needs: [unit, integration, contract]
steps:
- run: ./scripts/e2e-up.sh && pnpm test:e2e

CI runtime budget: < 25 minutes for unit + integration + contract; < 90 minutes including model-eval + e2e.