Skip to main content

Data Model

:::info Source Sourced from services/sync-service/DATA_MODEL.md in the documentation repo. :::

1. Database

Postgres 16 schema sync.

2. Tables

2.1 registrations

CREATE TABLE sync.registrations (
service TEXT NOT NULL,
entity_type TEXT NOT NULL,
conflict_policy TEXT NOT NULL CHECK (conflict_policy IN ('append_only','crdt_yjs','lww','server_authoritative')),
delta_projector TEXT NOT NULL,
push_handler TEXT NOT NULL,
version_field TEXT NOT NULL,
schema_ref TEXT NOT NULL,
PRIMARY KEY (service, entity_type)
);

2.2 cursors

CREATE TABLE sync.cursors (
tenant_id UUID NOT NULL,
user_id ULID NOT NULL,
device_id ULID NOT NULL,
scope TEXT NOT NULL,
lamport BIGINT NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (tenant_id, user_id, device_id, scope)
);
CREATE INDEX cursors_device ON sync.cursors (device_id);

2.3 mutations

CREATE TABLE sync.mutations (
client_mutation_id TEXT PRIMARY KEY,
tenant_id UUID NOT NULL,
user_id ULID NOT NULL,
device_id ULID NOT NULL,
service TEXT NOT NULL,
entity_type TEXT NOT NULL,
entity_id TEXT NOT NULL,
base_version INT,
vector_clock JSONB NOT NULL,
op TEXT NOT NULL CHECK (op IN ('create','update','delete','crdt_update')),
payload JSONB NOT NULL,
occurred_at TIMESTAMPTZ NOT NULL,
received_at TIMESTAMPTZ NOT NULL DEFAULT now(),
attempts INT NOT NULL DEFAULT 0,
state TEXT NOT NULL CHECK (state IN ('queued','inflight','applied','conflicted','rejected')),
last_error JSONB
);
CREATE INDEX mutations_device ON sync.mutations (tenant_id, user_id, device_id, state);
CREATE INDEX mutations_pending ON sync.mutations (state, received_at) WHERE state = 'queued';

2.4 deltas (materialized server deltas for pull)

CREATE TABLE sync.deltas (
id BIGSERIAL,
scope TEXT NOT NULL,
tenant_id UUID NOT NULL,
lamport BIGINT NOT NULL,
entity_type TEXT NOT NULL,
entity_id TEXT NOT NULL,
op TEXT NOT NULL CHECK (op IN ('upsert','delete')),
payload JSONB,
projected_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (scope, lamport)
);
CREATE INDEX deltas_scope_lamport ON sync.deltas (scope, lamport);

2.5 conflicts

CREATE TABLE sync.conflicts (
id ULID PRIMARY KEY,
tenant_id UUID NOT NULL,
user_id ULID NOT NULL,
entity_type TEXT NOT NULL,
entity_id TEXT NOT NULL,
base_version INT NOT NULL,
server_version INT NOT NULL,
client_payload JSONB NOT NULL,
server_payload JSONB NOT NULL,
resolution TEXT NOT NULL DEFAULT 'pending' CHECK (resolution IN ('pending','kept_server','kept_client','merged')),
resolved_by ULID,
resolved_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX conflicts_pending ON sync.conflicts (tenant_id, user_id, resolution) WHERE resolution = 'pending';

2.6 device_health

CREATE TABLE sync.device_health (
device_id ULID PRIMARY KEY,
tenant_id UUID NOT NULL,
user_id ULID NOT NULL,
last_push_at TIMESTAMPTZ,
last_pull_at TIMESTAMPTZ,
mutation_backlog INT NOT NULL DEFAULT 0,
conflict_count INT NOT NULL DEFAULT 0,
sync_frequency_seconds INT, -- average interval between syncs
status TEXT NOT NULL DEFAULT 'healthy' CHECK (status IN ('healthy','stale','disconnected'))
);
CREATE INDEX device_health_stale ON sync.device_health (status) WHERE status != 'healthy';

2.7 outbox, inbox

Standard.

3. RLS

All tables with tenant_id enforced.

4. Indexing

  • Cursors: PK covers fast lookup.
  • Deltas: (scope, lamport) for pull range queries.
  • Mutations: (tenant_id, user_id, device_id, state) for per-device push status.
  • Conflicts: (tenant_id, user_id, resolution='pending') for conflict resolution UI.

5. Partitioning

  • deltas partitioned monthly by projected_at.
  • Old partitions (> 30 days) archived + dropped (devices with older cursors full-resync).

6. Retention

  • deltas: 30 days hot (clients must sync within 30 days or full-resync).
  • mutations: 30 days (applied/rejected cleaned up nightly).
  • conflicts: indefinite (audit trail).
  • cursors: indefinite.
  • device_health: indefinite.

7. Performance

  • Pull: range query on (scope, lamport) with sorted index — fast.
  • Push: per-mutation routing to owning service; parallelized within batch.
  • Delta projection: consumed from NATS; wrote to deltas table per event.
  • Conflict detection: compare baseVersion with current server version at push time.

8. Migration

  • Additive columns on cursors/deltas.
  • Conflict policy additions to enum.
  • Delta table partitioning changes require careful coordination (affect pull window).