Data Sync Patterns
When adding TypeGraph as a graph overlay to an existing application, you need to keep your graph data in sync with your source of truth. This guide covers practical patterns for syncing external data into TypeGraph using the bulk operations API.
Overview
Section titled “Overview”Most applications adding TypeGraph will have existing data in relational tables, external APIs, or document stores. Rather than migrating this data, TypeGraph works alongside it as an overlay that provides:
- Graph traversals across your existing entities
- Semantic search via vector embeddings
- Relationship discovery and inference
The key challenge is keeping the graph in sync with your source data. We cover three approaches:
| Approach | Best For | Complexity |
|---|---|---|
| On-demand sync | Low-volume, real-time needs | Low |
| Batch sync | Bulk imports, periodic refresh | Medium |
| Event-driven sync | High-volume, near-real-time | Higher |
Bulk Operations API
Section titled “Bulk Operations API”TypeGraph provides bulk operations for efficient sync workflows:
// Create or update a single nodeawait store.nodes.Document.upsertById(id, props);
// Create many nodes at onceawait store.nodes.Document.bulkCreate(items);
// Insert many nodes without returning results (dedicated fast path)await store.nodes.Document.bulkInsert(items);
// Create or update many nodes at onceawait store.nodes.Document.bulkUpsertById(items);
// Delete many nodes at onceawait store.nodes.Document.bulkDelete(ids);upsertById
Section titled “upsertById”Creates a node if it doesn’t exist, or updates it if it does. This includes “un-deleting” soft-deleted nodes:
// First call creates the nodeconst doc1 = await store.nodes.Document.upsertById("doc_123", { title: "Original Title", content: "...",});
// Second call updates the existing nodeconst doc2 = await store.nodes.Document.upsertById("doc_123", { title: "Updated Title", content: "...",});
// doc1.id === doc2.id - same node, updated in placebulkCreate
Section titled “bulkCreate”Efficiently creates multiple nodes in a single operation. Uses a single multi-row INSERT with RETURNING when the backend supports it:
const documents = await store.nodes.Document.bulkCreate([ { props: { title: "Doc 1", content: "..." } }, { props: { title: "Doc 2", content: "..." } }, { props: { title: "Doc 3", content: "..." }, id: "custom_id" },]);If you only need write side effects and not created payloads, use bulkInsert.
bulkInsert
Section titled “bulkInsert”Inserts multiple nodes without returning results. This is the dedicated fast path for bulk ingestion — automatically wrapped in a transaction:
await store.nodes.Document.bulkInsert([ { props: { title: "Doc 1", content: "..." } }, { props: { title: "Doc 2", content: "..." } }, { props: { title: "Doc 3", content: "..." }, id: "custom_id" },]);Prefer bulkInsert over bulkCreate when you don’t need results.
bulkUpsertById
Section titled “bulkUpsertById”Creates or updates multiple nodes. Ideal for sync workflows where you don’t know which records already exist:
// Sync a batch of external recordsconst externalRecords = await fetchExternalData();
const synced = await store.nodes.Document.bulkUpsertById( externalRecords.map((record) => ({ id: record.id, // Use external ID as graph node ID props: { title: record.title, content: record.body, source: { table: "documents", id: record.id }, }, })));bulkDelete
Section titled “bulkDelete”Deletes multiple nodes by ID. Silently ignores IDs that don’t exist:
// Remove nodes that no longer exist in sourceconst deletedIds = await findDeletedRecords();await store.nodes.Document.bulkDelete(deletedIds);getOrCreate APIs
Section titled “getOrCreate APIs”Use get-or-create methods when your dedupe key is not a direct ID:
// Match by a named uniqueness constraintconst byEmail = await store.nodes.User.getOrCreateByConstraint( "user_email", { email: "alice@example.com", name: "Alice" }, { ifExists: "update" });// byEmail.action: "created" | "found" | "updated" | "resurrected"
// Match edges by endpoints (+ optional matchOn fields)const membership = await store.edges.memberOf.getOrCreateByEndpoints( user, org, { role: "admin", source: "sync" }, { matchOn: ["role"], ifExists: "update" });// membership.action: "created" | "found" | "updated" | "resurrected"Edge Bulk Operations
Section titled “Edge Bulk Operations”Edges also support bulk operations:
// Create many edges at once (returns created edges)const edges = await store.edges.relatedTo.bulkCreate([ { from: doc1, to: doc2, props: { confidence: 0.9 } }, { from: doc1, to: doc3, props: { confidence: 0.7 } }, { from: doc2, to: doc3, props: { confidence: 0.8 } },]);
// Insert many edges without returning results (fast path)await store.edges.relatedTo.bulkInsert([ { from: doc1, to: doc2, props: { confidence: 0.9 } }, { from: doc1, to: doc3, props: { confidence: 0.7 } }, { from: doc2, to: doc3, props: { confidence: 0.8 } },]);
// Delete many edges at onceawait store.edges.relatedTo.bulkDelete(edgeIds);On-Demand Sync
Section titled “On-Demand Sync”Sync individual records when they’re accessed or modified. Best for low-volume scenarios where you want real-time consistency.
import { type Store } from "@nicia-ai/typegraph";import { db, documents } from "./drizzle-schema";
interface AppDocument { id: string; title: string; content: string; updatedAt: Date;}
async function syncDocument(store: Store, doc: AppDocument) { // Generate embedding for semantic search const embedding = await generateEmbedding(doc.content);
// Upsert ensures we create or update as needed return store.nodes.Document.upsertById(doc.id, { title: doc.title, content: doc.content, embedding, source: { table: "documents", id: doc.id }, });}
// Sync on read - ensure graph is current before queryingasync function getRelatedDocuments(documentId: string) { // First, ensure the source document is synced const appDoc = await db.select().from(documents).where(eq(documents.id, documentId)).get(); if (!appDoc) throw new Error("Document not found");
await syncDocument(store, appDoc);
// Now query the graph for relationships return store .query() .from("Document", "d") .whereNode("d", (d) => d.id.eq(documentId)) .traverse("relatedTo", "r") .to("Document", "related") .select((ctx) => ({ id: ctx.related.id, title: ctx.related.title, confidence: ctx.r.confidence, })) .execute();}
// Sync on write - update graph when source changesasync function updateDocument(documentId: string, updates: Partial<AppDocument>) { // Update source of truth first const [updated] = await db .update(documents) .set({ ...updates, updatedAt: new Date() }) .where(eq(documents.id, documentId)) .returning();
// Then sync to graph await syncDocument(store, updated);
return updated;}Batch Sync
Section titled “Batch Sync”Process records in batches for bulk imports or periodic refresh. Best for large datasets or when you need to backfill data.
Basic Batch Sync
Section titled “Basic Batch Sync”interface SyncOptions { batchSize?: number; onProgress?: (processed: number, total: number) => void;}
async function syncAllDocuments(store: Store, options: SyncOptions = {}) { const { batchSize = 100, onProgress } = options;
// Get total count for progress reporting const [{ count }] = await db.select({ count: sql<number>`count(*)` }).from(documents);
let processed = 0; let offset = 0;
while (offset < count) { // Fetch a batch from source const batch = await db.select().from(documents).limit(batchSize).offset(offset);
if (batch.length === 0) break;
// Generate embeddings in parallel (respecting API rate limits) const embeddings = await batchGenerateEmbeddings(batch.map((d) => d.content));
// Bulk upsert the batch await store.nodes.Document.bulkUpsertById( batch.map((doc, i) => ({ id: doc.id, props: { title: doc.title, content: doc.content, embedding: embeddings[i], source: { table: "documents", id: doc.id }, }, })) );
processed += batch.length; offset += batchSize; onProgress?.(processed, count); }
return { processed, total: count };}
// Usageawait syncAllDocuments(store, { batchSize: 50, onProgress: (processed, total) => { console.log(`Synced ${processed}/${total} documents`); },});Incremental Sync
Section titled “Incremental Sync”Only sync records that have changed since the last sync:
interface SyncState { lastSyncAt: Date;}
async function incrementalSync(store: Store, state: SyncState): Promise<SyncState> { const since = state.lastSyncAt; const now = new Date();
// Fetch only changed records const changed = await db .select() .from(documents) .where(gt(documents.updatedAt, since)) .orderBy(documents.updatedAt);
if (changed.length > 0) { const embeddings = await batchGenerateEmbeddings(changed.map((d) => d.content));
await store.nodes.Document.bulkUpsertById( changed.map((doc, i) => ({ id: doc.id, props: { title: doc.title, content: doc.content, embedding: embeddings[i], source: { table: "documents", id: doc.id }, }, })) );
console.log(`Synced ${changed.length} changed documents`); }
// Handle deletions (if your source tracks them) const deleted = await db .select({ id: documents.id }) .from(documents) .where(and(gt(documents.deletedAt, since), isNotNull(documents.deletedAt)));
if (deleted.length > 0) { await store.nodes.Document.bulkDelete(deleted.map((d) => d.id)); console.log(`Removed ${deleted.length} deleted documents`); }
return { lastSyncAt: now };}Scheduled Sync Job
Section titled “Scheduled Sync Job”Run incremental sync on a schedule:
import { CronJob } from "cron";
// Store sync state (in production, persist this to a database)let syncState: SyncState = { lastSyncAt: new Date(0) };
// Run every 5 minutesconst syncJob = new CronJob("*/5 * * * *", async () => { try { syncState = await incrementalSync(store, syncState); } catch (error) { console.error("Sync failed:", error); // Alert, retry, etc. }});
syncJob.start();Event-Driven Sync
Section titled “Event-Driven Sync”React to changes in your source data via events, webhooks, or database triggers. Best for high-volume scenarios requiring near-real-time sync.
Message Queue Pattern
Section titled “Message Queue Pattern”import { Queue, Worker } from "bullmq";
// Define sync job typesinterface SyncJob { type: "upsert" | "delete"; entityType: "Document" | "User"; entityId: string;}
// Producer: Enqueue sync jobs when source data changesconst syncQueue = new Queue<SyncJob>("sync");
async function onDocumentCreated(doc: AppDocument) { await syncQueue.add("sync", { type: "upsert", entityType: "Document", entityId: doc.id, });}
async function onDocumentUpdated(doc: AppDocument) { await syncQueue.add("sync", { type: "upsert", entityType: "Document", entityId: doc.id, });}
async function onDocumentDeleted(docId: string) { await syncQueue.add("sync", { type: "delete", entityType: "Document", entityId: docId, });}
// Consumer: Process sync jobsconst syncWorker = new Worker<SyncJob>( "sync", async (job) => { const { type, entityType, entityId } = job.data;
if (type === "delete") { await store.nodes[entityType].delete(entityId); return; }
// Fetch current state from source const record = await fetchEntity(entityType, entityId); if (!record) { // Record was deleted between enqueue and processing await store.nodes[entityType].delete(entityId); return; }
// Generate embedding if needed const embedding = await generateEmbedding(record.content);
// Upsert to graph await store.nodes[entityType].upsertById(entityId, { ...record, embedding, source: { table: entityType.toLowerCase() + "s", id: entityId }, }); }, { concurrency: 10, connection: redis, });Webhook Handler
Section titled “Webhook Handler”Process webhooks from external systems:
import { Hono } from "hono";
const app = new Hono();
app.post("/webhooks/documents", async (c) => { const event = await c.req.json<{ type: "created" | "updated" | "deleted"; data: AppDocument; }>();
switch (event.type) { case "created": case "updated": { const embedding = await generateEmbedding(event.data.content); await store.nodes.Document.upsertById(event.data.id, { title: event.data.title, content: event.data.content, embedding, source: { table: "documents", id: event.data.id }, }); break; }
case "deleted": { await store.nodes.Document.delete(event.data.id); break; } }
return c.json({ ok: true });});Database Triggers (PostgreSQL)
Section titled “Database Triggers (PostgreSQL)”Use LISTEN/NOTIFY for real-time sync from PostgreSQL:
import { Client } from "pg";
// Set up listenerconst listener = new Client({ connectionString: process.env.DATABASE_URL });await listener.connect();await listener.query("LISTEN document_changes");
listener.on("notification", async (msg) => { if (msg.channel !== "document_changes") return;
const payload = JSON.parse(msg.payload!); const { operation, id } = payload;
if (operation === "DELETE") { await store.nodes.Document.delete(id); return; }
// Fetch and sync the changed document const doc = await db.select().from(documents).where(eq(documents.id, id)).get();
if (doc) { const embedding = await generateEmbedding(doc.content); await store.nodes.Document.upsertById(id, { title: doc.title, content: doc.content, embedding, source: { table: "documents", id }, }); }});Corresponding PostgreSQL trigger:
CREATE OR REPLACE FUNCTION notify_document_changes()RETURNS TRIGGER AS $$BEGIN PERFORM pg_notify( 'document_changes', json_build_object( 'operation', TG_OP, 'id', COALESCE(NEW.id, OLD.id) )::text ); RETURN COALESCE(NEW, OLD);END;$$ LANGUAGE plpgsql;
CREATE TRIGGER document_changes_triggerAFTER INSERT OR UPDATE OR DELETE ON documentsFOR EACH ROW EXECUTE FUNCTION notify_document_changes();Syncing Relationships
Section titled “Syncing Relationships”When syncing data that includes relationships, sync nodes first, then edges:
interface ExternalUser { id: string; name: string; email: string; managerId?: string;}
async function syncUsers(users: ExternalUser[]) { // Step 1: Sync all user nodes first await store.nodes.User.bulkUpsertById( users.map((u) => ({ id: u.id, props: { name: u.name, email: u.email, source: { table: "users", id: u.id }, }, })) );
// Step 2: Sync manager relationships // First, remove all existing manages edges (clean slate approach) const existingEdges = await store.edges.manages.find(); if (existingEdges.length > 0) { await store.edges.manages.bulkDelete(existingEdges.map((e) => e.id)); }
// Then create edges for users with managers const usersWithManagers = users.filter((u) => u.managerId);
await store.edges.manages.bulkInsert( usersWithManagers.map((u) => ({ from: { kind: "User" as const, id: u.managerId! }, to: { kind: "User" as const, id: u.id }, })), );}Handling Sync Failures
Section titled “Handling Sync Failures”Retry with Exponential Backoff
Section titled “Retry with Exponential Backoff”async function syncWithRetry<T>( fn: () => Promise<T>, options: { maxRetries?: number; baseDelay?: number } = {}): Promise<T> { const { maxRetries = 3, baseDelay = 1000 } = options;
let lastError: Error | undefined;
for (let attempt = 0; attempt <= maxRetries; attempt++) { try { return await fn(); } catch (error) { lastError = error as Error;
if (attempt < maxRetries) { const delay = baseDelay * Math.pow(2, attempt); console.warn(`Sync attempt ${attempt + 1} failed, retrying in ${delay}ms`); await new Promise((resolve) => setTimeout(resolve, delay)); } } }
throw lastError;}
// Usageawait syncWithRetry(() => store.nodes.Document.bulkUpsertById(items));Dead Letter Queue
Section titled “Dead Letter Queue”Track failed syncs for manual intervention:
interface FailedSync { entityType: string; entityId: string; error: string; failedAt: Date; attempts: number;}
const failedSyncs: FailedSync[] = [];
async function syncWithDLQ(entityType: string, entityId: string, syncFn: () => Promise<void>) { try { await syncWithRetry(syncFn); } catch (error) { failedSyncs.push({ entityType, entityId, error: (error as Error).message, failedAt: new Date(), attempts: 3, });
console.error(`Sync failed after retries: ${entityType}:${entityId}`); }}
// Periodically retry or alert on failed syncsasync function processFailedSyncs() { for (const failed of failedSyncs) { console.log(`Failed sync: ${failed.entityType}:${failed.entityId} - ${failed.error}`); // Retry, alert, or log for manual intervention }}Best Practices
Section titled “Best Practices”Use Consistent IDs
Section titled “Use Consistent IDs”Map external IDs to graph node IDs consistently:
// Good: Use external ID directly when it's unique and stableawait store.nodes.Document.upsertById(externalDoc.id, { ... });
// Good: Namespace if IDs might collide across sourcesawait store.nodes.Document.upsertById(`notion:${notionPage.id}`, { ... });await store.nodes.Document.upsertById(`gdrive:${driveFile.id}`, { ... });Track Sync Metadata
Section titled “Track Sync Metadata”Store sync information for debugging and auditing:
const Document = defineNode("Document", { schema: z.object({ title: z.string(), content: z.string(), embedding: embedding(1536).optional(), source: externalRef("documents"), // Sync metadata lastSyncedAt: z.string().datetime().optional(), syncVersion: z.number().optional(), }),});
await store.nodes.Document.upsertById(doc.id, { ...props, lastSyncedAt: new Date().toISOString(), syncVersion: (existingNode?.syncVersion ?? 0) + 1,});Validate Before Sync
Section titled “Validate Before Sync”Validate external data before syncing to avoid corrupting your graph:
const ExternalDocumentSchema = z.object({ id: z.string().min(1), title: z.string().min(1), content: z.string(),});
async function syncDocument(rawDoc: unknown) { const result = ExternalDocumentSchema.safeParse(rawDoc);
if (!result.success) { console.error("Invalid document data:", result.error); return; }
await store.nodes.Document.upsertById(result.data.id, { title: result.data.title, content: result.data.content, });}Monitor Sync Health
Section titled “Monitor Sync Health”Track sync metrics for observability:
const syncMetrics = { successful: 0, failed: 0, lastSyncDuration: 0, lastSyncAt: null as Date | null,};
async function monitoredSync(fn: () => Promise<void>) { const start = Date.now();
try { await fn(); syncMetrics.successful++; } catch (error) { syncMetrics.failed++; throw error; } finally { syncMetrics.lastSyncDuration = Date.now() - start; syncMetrics.lastSyncAt = new Date(); }}
// Expose metrics endpointapp.get("/metrics/sync", (c) => c.json(syncMetrics));Next Steps
Section titled “Next Steps”- Integration Patterns - Database setup and deployment patterns
- Semantic Search - Add vector embeddings during sync
- Query Builder - Query your synced graph data