Examples: Complete application examples demonstrating TypeGraph patterns # Audit Trail > Complete change tracking with user attribution and diff generation This example shows how to build a comprehensive audit system that: - **Tracks all changes** using TypeGraph's temporal model - **Attributes changes** to users and sessions - **Generates diffs** between versions - **Supports compliance queries** (who changed what, when) - **Exports audit logs** for external systems ## How TypeGraph Enables Auditing TypeGraph's temporal model provides built-in auditing capabilities: 1. **Every update creates a new version** - Old data is preserved with `valid_to` timestamp 2. **Temporal queries** - Query any point in time with `asOf` or get full history with `includeEnded` 3. **Metadata fields** - `createdAt`, `updatedAt`, `version` are tracked automatically This example extends the built-in capabilities with: - User attribution (who made the change) - Change descriptions (why the change was made) - Structured diffs (what exactly changed) ## Schema Definition ```typescript import { z } from "zod"; import { defineNode, defineEdge, defineGraph } from "@nicia-ai/typegraph"; // Audited entity (example: Settings) const Setting = defineNode("Setting", { schema: z.object({ key: z.string(), value: z.string(), category: z.string(), description: z.string().optional(), }), }); // Users making changes const User = defineNode("User", { schema: z.object({ email: z.string().email(), name: z.string(), role: z.enum(["admin", "editor", "viewer"]), }), }); // Explicit audit log entries (for cross-cutting concerns) const AuditEntry = defineNode("AuditEntry", { schema: z.object({ entityType: z.string(), entityId: z.string(), action: z.enum(["create", "update", "delete", "restore"]), timestamp: z.string().datetime(), changes: z.record(z.object({ before: z.unknown().optional(), after: z.unknown().optional(), })).optional(), reason: z.string().optional(), ipAddress: z.string().optional(), userAgent: z.string().optional(), }), }); // Sessions for grouping changes const Session = defineNode("Session", { schema: z.object({ startedAt: z.string().datetime(), endedAt: z.string().datetime().optional(), ipAddress: z.string().optional(), userAgent: z.string().optional(), }), }); // Edges const performedBy = defineEdge("performedBy"); const inSession = defineEdge("inSession"); const hasSession = defineEdge("hasSession"); const graph = defineGraph({ id: "audit_trail", nodes: { Setting: { type: Setting }, User: { type: User }, AuditEntry: { type: AuditEntry }, Session: { type: Session }, }, edges: { performedBy: { type: performedBy, from: [AuditEntry], to: [User] }, inSession: { type: inSession, from: [AuditEntry], to: [Session] }, hasSession: { type: hasSession, from: [User], to: [Session] }, }, }); ``` ## Audit Context Create a context object to track the current user and session: ```typescript interface AuditContext { userId: string; sessionId?: string; ipAddress?: string; userAgent?: string; reason?: string; } // Thread-local storage for audit context (Node.js) import { AsyncLocalStorage } from "async_hooks"; const auditContext = new AsyncLocalStorage(); function withAuditContext(context: AuditContext, fn: () => Promise): Promise { return auditContext.run(context, fn); } function getAuditContext(): AuditContext | undefined { return auditContext.getStore(); } ``` ## Audited Operations ### Create with Audit ```typescript async function createSetting( key: string, value: string, category: string ): Promise> { const ctx = getAuditContext(); if (!ctx) throw new Error("Audit context required"); return store.transaction(async (tx) => { // Create the setting const setting = await tx.nodes.Setting.create({ key, value, category, }); // Create audit entry await createAuditEntry(tx, { entityType: "Setting", entityId: setting.id, action: "create", changes: { key: { after: key }, value: { after: value }, category: { after: category }, }, }); return setting; }); } ``` ### Update with Audit ```typescript async function updateSetting( id: string, updates: Partial<{ value: string; description: string }> ): Promise> { const ctx = getAuditContext(); if (!ctx) throw new Error("Audit context required"); return store.transaction(async (tx) => { // Get current state const current = await tx.nodes.Setting.getById(id); if (!current) throw new Error(`Setting not found: ${id}`); // Calculate changes const changes: Record = {}; for (const [key, newValue] of Object.entries(updates)) { const oldValue = current[key as keyof typeof current]; if (oldValue !== newValue) { changes[key] = { before: oldValue, after: newValue }; } } // Skip if no actual changes if (Object.keys(changes).length === 0) { return current; } // Update the setting const updated = await tx.nodes.Setting.update(id, updates); // Create audit entry await createAuditEntry(tx, { entityType: "Setting", entityId: id, action: "update", changes, }); return updated; }); } ``` ### Delete with Audit ```typescript async function deleteSetting(id: string): Promise { const ctx = getAuditContext(); if (!ctx) throw new Error("Audit context required"); await store.transaction(async (tx) => { // Get current state for audit const current = await tx.nodes.Setting.getById(id); if (!current) throw new Error(`Setting not found: ${id}`); // Delete (soft delete) await tx.nodes.Setting.delete(id); // Create audit entry await createAuditEntry(tx, { entityType: "Setting", entityId: id, action: "delete", changes: { key: { before: current.key }, value: { before: current.value }, category: { before: current.category }, }, }); }); } ``` ### Create Audit Entry ```typescript interface AuditEntryInput { entityType: string; entityId: string; action: "create" | "update" | "delete" | "restore"; changes?: Record; } async function createAuditEntry( tx: Transaction, input: AuditEntryInput ): Promise> { const ctx = getAuditContext()!; const entry = await tx.nodes.AuditEntry.create({ entityType: input.entityType, entityId: input.entityId, action: input.action, timestamp: new Date().toISOString(), changes: input.changes, reason: ctx.reason, ipAddress: ctx.ipAddress, userAgent: ctx.userAgent, }); // Link to user const user = await tx.nodes.User.getById(ctx.userId); if (!user) throw new Error(`User not found: ${ctx.userId}`); await tx.edges.performedBy.create(entry, user, {}); // Link to session if present if (ctx.sessionId) { const session = await tx.nodes.Session.getById(ctx.sessionId); if (!session) throw new Error(`Session not found: ${ctx.sessionId}`); await tx.edges.inSession.create(entry, session, {}); } return entry; } ``` ## Querying Audit History ### Get Entity History ```typescript interface HistoryEntry { version: number; timestamp: string; action: string; changes?: Record; user: { name: string; email: string }; reason?: string; } async function getEntityHistory( entityType: string, entityId: string ): Promise { return store .query() .from("AuditEntry", "a") .whereNode("a", (a) => a.entityType.eq(entityType).and(a.entityId.eq(entityId)) ) .traverse("performedBy", "e") .to("User", "u") .orderBy((ctx) => ctx.a.timestamp, "desc") .select((ctx) => ({ version: ctx.a.version, timestamp: ctx.a.timestamp, action: ctx.a.action, changes: ctx.a.changes, user: { name: ctx.u.name, email: ctx.u.email, }, reason: ctx.a.reason, })) .execute(); } ``` ### Get User Activity ```typescript interface UserActivity { timestamp: string; entityType: string; entityId: string; action: string; } async function getUserActivity( userId: string, options: { since?: Date; limit?: number } = {} ): Promise { const { since, limit = 100 } = options; let query = store .query() .from("User", "u") .whereNode("u", (u) => u.id.eq(userId)) .traverse("performedBy", "e", { direction: "in" }) .to("AuditEntry", "a"); if (since) { query = query.whereNode("a", (a) => a.timestamp.gte(since.toISOString())); } return query .orderBy((ctx) => ctx.a.timestamp, "desc") .limit(limit) .select((ctx) => ({ timestamp: ctx.a.timestamp, entityType: ctx.a.entityType, entityId: ctx.a.entityId, action: ctx.a.action, })) .execute(); } ``` ### Changes in Time Range ```typescript interface ChangeReport { entityType: string; entityId: string; changeCount: number; users: string[]; lastChange: string; } async function getChangesInRange( startDate: Date, endDate: Date ): Promise { const entries = await store .query() .from("AuditEntry", "a") .whereNode("a", (a) => a.timestamp.gte(startDate.toISOString()).and( a.timestamp.lte(endDate.toISOString()) ) ) .traverse("performedBy", "e") .to("User", "u") .select((ctx) => ({ entityType: ctx.a.entityType, entityId: ctx.a.entityId, timestamp: ctx.a.timestamp, userName: ctx.u.name, })) .execute(); // Group by entity const grouped = new Map(); for (const entry of entries) { const key = `${entry.entityType}:${entry.entityId}`; const existing = grouped.get(key); if (existing) { existing.changeCount++; if (!existing.users.includes(entry.userName)) { existing.users.push(entry.userName); } if (entry.timestamp > existing.lastChange) { existing.lastChange = entry.timestamp; } } else { grouped.set(key, { entityType: entry.entityType, entityId: entry.entityId, changeCount: 1, users: [entry.userName], lastChange: entry.timestamp, }); } } return Array.from(grouped.values()); } ``` ## Using TypeGraph's Built-in Temporal Features ### View Entity at Point in Time ```typescript async function getSettingAsOf( id: string, timestamp: Date ): Promise { return store .query() .from("Setting", "s") .temporal("asOf", timestamp.toISOString()) .whereNode("s", (s) => s.id.eq(id)) .select((ctx) => ctx.s) .first(); } ``` ### Get All Versions ```typescript interface SettingVersion { props: SettingProps; validFrom: string; validTo: string | undefined; version: number; } async function getSettingVersions(id: string): Promise { return store .query() .from("Setting", "s") .temporal("includeEnded") .whereNode("s", (s) => s.id.eq(id)) .orderBy((ctx) => ctx.s.validFrom, "desc") .select((ctx) => ({ props: ctx.s, validFrom: ctx.s.validFrom, validTo: ctx.s.validTo, version: ctx.s.version, })) .execute(); } ``` ### Compare Versions ```typescript interface VersionDiff { field: string; before: unknown; after: unknown; } async function compareVersions( id: string, version1: number, version2: number ): Promise { const versions = await store .query() .from("Setting", "s") .temporal("includeEnded") .whereNode("s", (s) => s.id.eq(id).and(s.version.in([version1, version2]))) .orderBy((ctx) => ctx.s.version, "asc") .select((ctx) => ctx.s) .execute(); if (versions.length !== 2) { throw new Error("Versions not found"); } const [before, after] = versions; const diffs: VersionDiff[] = []; const allKeys = new Set([...Object.keys(before), ...Object.keys(after)]); for (const key of allKeys) { const beforeVal = before[key as keyof typeof before]; const afterVal = after[key as keyof typeof after]; if (JSON.stringify(beforeVal) !== JSON.stringify(afterVal)) { diffs.push({ field: key, before: beforeVal, after: afterVal }); } } return diffs; } ``` ## Session Management ### Start Session ```typescript async function startSession( userId: string, metadata: { ipAddress?: string; userAgent?: string } ): Promise> { return store.transaction(async (tx) => { const session = await tx.nodes.Session.create({ startedAt: new Date().toISOString(), ipAddress: metadata.ipAddress, userAgent: metadata.userAgent, }); const user = await tx.nodes.User.getById(userId); if (!user) throw new Error(`User not found: ${userId}`); await tx.edges.hasSession.create(user, session, {}); return session; }); } ``` ### End Session ```typescript async function endSession(sessionId: string): Promise { await store.nodes.Session.update(sessionId, { endedAt: new Date().toISOString(), }); } ``` ### Get Session Activity ```typescript async function getSessionActivity( sessionId: string ): Promise> { return store .query() .from("Session", "s") .whereNode("s", (s) => s.id.eq(sessionId)) .traverse("inSession", "e", { direction: "in" }) .to("AuditEntry", "a") .orderBy((ctx) => ctx.a.timestamp, "asc") .select((ctx) => ({ timestamp: ctx.a.timestamp, action: ctx.a.action, entityType: ctx.a.entityType, })) .execute(); } ``` ## Compliance Queries ### Who Changed This? ```typescript async function whoChanged( entityType: string, entityId: string, field: string ): Promise> { const entries = await store .query() .from("AuditEntry", "a") .whereNode("a", (a) => a.entityType.eq(entityType).and(a.entityId.eq(entityId)) ) .traverse("performedBy", "e") .to("User", "u") .orderBy((ctx) => ctx.a.timestamp, "desc") .select((ctx) => ({ changes: ctx.a.changes, user: ctx.u.name, timestamp: ctx.a.timestamp, })) .execute(); return entries .filter((e) => e.changes && field in e.changes) .map((e) => ({ user: e.user, timestamp: e.timestamp, before: e.changes![field].before, after: e.changes![field].after, })); } ``` ### When Was This Value Set? ```typescript async function whenWasValueSet( entityType: string, entityId: string, field: string, value: unknown ): Promise<{ timestamp: string; user: string } | undefined> { const entries = await store .query() .from("AuditEntry", "a") .whereNode("a", (a) => a.entityType.eq(entityType).and(a.entityId.eq(entityId)) ) .traverse("performedBy", "e") .to("User", "u") .orderBy((ctx) => ctx.a.timestamp, "asc") .select((ctx) => ({ changes: ctx.a.changes, user: ctx.u.name, timestamp: ctx.a.timestamp, })) .execute(); const entry = entries.find( (e) => e.changes && field in e.changes && e.changes[field].after === value ); return entry ? { timestamp: entry.timestamp, user: entry.user } : undefined; } ``` ## Export Audit Logs ### Stream to External System ```typescript async function* exportAuditLogs( since: Date, batchSize = 1000 ): AsyncGenerator { const stream = store .query() .from("AuditEntry", "a") .whereNode("a", (a) => a.timestamp.gte(since.toISOString())) .traverse("performedBy", "e") .to("User", "u") .orderBy((ctx) => ctx.a.timestamp, "asc") .select((ctx) => ({ ...ctx.a, performedBy: ctx.u.email, })) .stream({ batchSize }); let batch: AuditEntryProps[] = []; for await (const entry of stream) { batch.push(entry); if (batch.length >= batchSize) { yield batch; batch = []; } } if (batch.length > 0) { yield batch; } } // Usage async function syncToExternalAuditSystem(since: Date): Promise { for await (const batch of exportAuditLogs(since)) { await externalAuditApi.ingestBatch(batch); } } ``` ## Next Steps - [Document Management](/examples/document-management) - CMS with semantic search - [Product Catalog](/examples/product-catalog) - Categories, variants, inventory - [Workflow Engine](/examples/workflow-engine) - State machines with approvals # Document Management System > A complete CMS example with semantic search, versioning, and access control This example builds a document management system with: - **Document hierarchy** (folders, documents, sections) - **Semantic search** with vector embeddings - **Version history** using temporal queries - **Access control** with permission inheritance - **Related documents** discovery ## Schema Definition ```typescript import { z } from "zod"; import { defineNode, defineEdge, defineGraph, embedding, subClassOf, partOf, hasPart, } from "@nicia-ai/typegraph"; // Base content type (abstract) const Content = defineNode("Content", { schema: z.object({ title: z.string(), createdBy: z.string(), status: z.enum(["draft", "published", "archived"]).default("draft"), }), }); // Folder extends Content const Folder = defineNode("Folder", { schema: z.object({ title: z.string(), createdBy: z.string(), status: z.enum(["draft", "published", "archived"]).default("draft"), path: z.string(), // /engineering/specs }), }); // Document extends Content const Document = defineNode("Document", { schema: z.object({ title: z.string(), content: z.string(), createdBy: z.string(), status: z.enum(["draft", "published", "archived"]).default("draft"), contentType: z.enum(["markdown", "html", "plaintext"]).default("markdown"), embedding: embedding(1536).optional(), }), }); // Users and permissions const User = defineNode("User", { schema: z.object({ email: z.string().email(), name: z.string(), role: z.enum(["admin", "editor", "viewer"]).default("viewer"), }), }); const Permission = defineNode("Permission", { schema: z.object({ level: z.enum(["read", "write", "admin"]), }), }); // Edges const contains = defineEdge("contains"); const relatedTo = defineEdge("relatedTo", { schema: z.object({ type: z.enum(["references", "supersedes", "related"]), confidence: z.number().min(0).max(1).optional(), }), }); const hasPermission = defineEdge("hasPermission"); const createdBy = defineEdge("createdBy"); // Graph definition const graph = defineGraph({ id: "document_management", nodes: { Content: { type: Content }, Folder: { type: Folder }, Document: { type: Document }, User: { type: User }, Permission: { type: Permission }, }, edges: { contains: { type: contains, from: [Folder], to: [Folder, Document] }, relatedTo: { type: relatedTo, from: [Document], to: [Document] }, hasPermission: { type: hasPermission, from: [User], to: [Content] }, createdBy: { type: createdBy, from: [Content], to: [User] }, }, ontology: [ // Type hierarchy subClassOf(Folder, Content), subClassOf(Document, Content), // Compositional relationships partOf(Document, Folder), hasPart(Folder, Document), ], }); ``` ## Database Setup ```typescript import Database from "better-sqlite3"; import { drizzle } from "drizzle-orm/better-sqlite3"; import * as sqliteVec from "sqlite-vec"; import { createSqliteBackend, generateSqliteMigrationSQL } from "@nicia-ai/typegraph/sqlite"; import { createStore } from "@nicia-ai/typegraph"; // Initialize database with vector extension const sqlite = new Database("documents.db"); sqliteVec.load(sqlite); sqlite.exec(generateSqliteMigrationSQL()); const db = drizzle(sqlite); const backend = createSqliteBackend(db); const store = createStore(graph, backend); ``` ## Core Operations ### Creating Folder Structure ```typescript async function createFolderPath(path: string, userId: string): Promise> { const parts = path.split("/").filter(Boolean); let currentPath = ""; let parentFolder: Node | undefined; for (const part of parts) { currentPath += `/${part}`; // Check if folder exists let folder = await store .query() .from("Folder", "f") .whereNode("f", (f) => f.path.eq(currentPath)) .select((ctx) => ctx.f) .first(); if (!folder) { folder = await store.nodes.Folder.create({ title: part, path: currentPath, createdBy: userId, status: "published", }); if (parentFolder) { await store.edges.contains.create(parentFolder, folder, {}); } } parentFolder = folder; } return parentFolder!; } ``` ### Creating Documents with Embeddings ```typescript import OpenAI from "openai"; const openai = new OpenAI(); async function generateEmbedding(text: string): Promise { const response = await openai.embeddings.create({ model: "text-embedding-ada-002", input: text, }); return response.data[0].embedding; } async function createDocument( folderId: string, title: string, content: string, userId: string ): Promise> { const embedding = await generateEmbedding(`${title}\n\n${content}`); const document = await store.nodes.Document.create({ title, content, createdBy: userId, status: "draft", contentType: "markdown", embedding, }); // Link to folder const folder = await store.nodes.Folder.getById(folderId); if (!folder) throw new Error(`Folder not found: ${folderId}`); await store.edges.contains.create(folder, document, {}); // Link to creator const user = await store.nodes.User.getById(userId); if (!user) throw new Error(`User not found: ${userId}`); await store.edges.createdBy.create(document, user, {}); return document; } ``` ### Updating Documents (Versioned) ```typescript async function updateDocument( documentId: string, updates: { title?: string; content?: string; status?: "draft" | "published" | "archived" } ): Promise> { const current = await store.nodes.Document.getById(documentId); if (!current) throw new Error(`Document not found: ${documentId}`); // If content changed, regenerate embedding let embedding = current.embedding; if (updates.content && updates.content !== current.content) { const text = `${updates.title ?? current.title}\n\n${updates.content}`; embedding = await generateEmbedding(text); } // Update creates a new version automatically return store.nodes.Document.update(documentId, { ...updates, embedding, }); } ``` ## Searching Documents ### Semantic Search ```typescript async function searchDocuments( query: string, options: { folderId?: string; status?: "draft" | "published" | "archived"; limit?: number; minScore?: number; } = {} ): Promise> { const { folderId, status = "published", limit = 10, minScore = 0.7 } = options; const queryEmbedding = await generateEmbedding(query); let queryBuilder = store .query() .from("Document", "d") .whereNode("d", (d) => { let pred = d.embedding.similarTo(queryEmbedding, limit, { metric: "cosine", minScore, }); if (status) { pred = pred.and(d.status.eq(status)); } return pred; }); // If folderId specified, filter to folder descendants if (folderId) { queryBuilder = store .query() .from("Folder", "f") .whereNode("f", (f) => f.id.eq(folderId)) .traverse("contains", "e") .recursive() .to("Document", "d") .whereNode("d", (d) => d.embedding .similarTo(queryEmbedding, limit, { metric: "cosine", minScore }) .and(d.status.eq(status)) ); } return queryBuilder .select((ctx) => ({ document: ctx.d, score: ctx.d.embedding.similarity(queryEmbedding), })) .execute(); } ``` ### Find Related Documents ```typescript async function findRelatedDocuments( documentId: string, limit = 5 ): Promise> { // First, get explicit relationships const explicit = await store .query() .from("Document", "d") .whereNode("d", (d) => d.id.eq(documentId)) .traverse("relatedTo", "e") .to("Document", "related") .select((ctx) => ({ document: ctx.related, relationship: ctx.e.type, })) .execute(); // Then, find semantically similar documents const source = await store.nodes.Document.getById(documentId); if (!source) throw new Error(`Document not found: ${documentId}`); if (!source.embedding) { return explicit; } const similar = await store .query() .from("Document", "d") .whereNode("d", (d) => d.embedding .similarTo(source.embedding!, limit * 2, { metric: "cosine", minScore: 0.8 }) .and(d.id.neq(documentId)) ) .select((ctx) => ({ document: ctx.d, relationship: "similar" as const, })) .limit(limit) .execute(); return [...explicit, ...similar].slice(0, limit); } ``` ## Version History ### Get Document History ```typescript interface DocumentVersion { title: string; content: string; status: string; validFrom: string; validTo: string | undefined; version: number; } async function getDocumentHistory(documentId: string): Promise { return store .query() .from("Document", "d") .temporal("includeEnded") .whereNode("d", (d) => d.id.eq(documentId)) .orderBy((ctx) => ctx.d.validFrom, "desc") .select((ctx) => ({ title: ctx.d.title, content: ctx.d.content, status: ctx.d.status, validFrom: ctx.d.validFrom, validTo: ctx.d.validTo, version: ctx.d.version, })) .execute(); } ``` ### View Document at Point in Time ```typescript async function getDocumentAsOf( documentId: string, timestamp: Date ): Promise { return store .query() .from("Document", "d") .temporal("asOf", timestamp.toISOString()) .whereNode("d", (d) => d.id.eq(documentId)) .select((ctx) => ctx.d) .first(); } ``` ### Compare Versions ```typescript async function compareVersions( documentId: string, version1: number, version2: number ): Promise<{ before: DocumentProps; after: DocumentProps } | undefined> { const versions = await store .query() .from("Document", "d") .temporal("includeEnded") .whereNode("d", (d) => d.id.eq(documentId).and(d.version.in([version1, version2])) ) .orderBy((ctx) => ctx.d.version, "asc") .select((ctx) => ctx.d) .execute(); if (versions.length !== 2) return undefined; return { before: versions[0], after: versions[1] }; } ``` ## Access Control ### Check Read Permission ```typescript async function canRead(userId: string, contentId: string): Promise { // Check direct permission const directPermission = await store .query() .from("User", "u") .whereNode("u", (u) => u.id.eq(userId)) .traverse("hasPermission", "p") .to("Content", "c", { includeSubClasses: true }) .whereNode("c", (c) => c.id.eq(contentId)) .first(); if (directPermission) return true; // Check inherited permission (from parent folders) const content = await store.nodes.Content.getById(contentId); if (!content) return false; // Walk up the folder tree checking permissions const parentFolders = await store .query() .from("Folder", "f") .traverse("contains", "e") .recursive() .to("Content", "c", { includeSubClasses: true }) .whereNode("c", (c) => c.id.eq(contentId)) .select((ctx) => ctx.f.id) .execute(); for (const folderId of parentFolders) { const folderPermission = await store .query() .from("User", "u") .whereNode("u", (u) => u.id.eq(userId)) .traverse("hasPermission", "p") .to("Folder", "f") .whereNode("f", (f) => f.id.eq(folderId)) .first(); if (folderPermission) return true; } return false; } ``` ### Grant Permission ```typescript async function grantPermission( userId: string, contentId: string, level: "read" | "write" | "admin" ): Promise { const user = await store.nodes.User.getById(userId); if (!user) throw new Error(`User not found: ${userId}`); const content = await store.nodes.Content.getById(contentId); if (!content) throw new Error(`Content not found: ${contentId}`); // Create permission node const permission = await store.nodes.Permission.create({ level }); // Link user to content via permission await store.edges.hasPermission.create(user, content, {}); } ``` ## Folder Navigation ### Get Folder Contents ```typescript interface FolderContents { folders: Array<{ id: string; title: string; path: string }>; documents: Array<{ id: string; title: string; status: string }>; } async function getFolderContents(folderId: string): Promise { const folders = await store .query() .from("Folder", "parent") .whereNode("parent", (f) => f.id.eq(folderId)) .traverse("contains", "e") .to("Folder", "child") .select((ctx) => ({ id: ctx.child.id, title: ctx.child.title, path: ctx.child.path, })) .execute(); const documents = await store .query() .from("Folder", "parent") .whereNode("parent", (f) => f.id.eq(folderId)) .traverse("contains", "e") .to("Document", "doc") .select((ctx) => ({ id: ctx.doc.id, title: ctx.doc.title, status: ctx.doc.status, })) .execute(); return { folders, documents }; } ``` ### Get Breadcrumb Path ```typescript async function getBreadcrumb( contentId: string ): Promise> { return store .query() .from("Content", "c", { includeSubClasses: true }) .whereNode("c", (c) => c.id.eq(contentId)) .traverse("contains", "e", { direction: "in" }) .recursive() .to("Folder", "ancestor") .select((ctx) => ({ id: ctx.ancestor.id, title: ctx.ancestor.title, path: ctx.ancestor.path, })) .execute(); } ``` ## Bulk Operations ### Move Document to Folder ```typescript async function moveDocument(documentId: string, targetFolderId: string): Promise { await store.transaction(async (tx) => { // Remove from current folder const currentEdge = await tx .query() .from("Folder", "f") .traverse("contains", "e") .to("Document", "d") .whereNode("d", (d) => d.id.eq(documentId)) .select((ctx) => ctx.e.id) .first(); if (currentEdge) { await tx.edges.contains.delete(currentEdge); } // Add to new folder const document = await tx.nodes.Document.getById(documentId); if (!document) throw new Error(`Document not found: ${documentId}`); const targetFolder = await tx.nodes.Folder.getById(targetFolderId); if (!targetFolder) throw new Error(`Folder not found: ${targetFolderId}`); await tx.edges.contains.create(targetFolder, document, {}); }); } ``` ### Bulk Archive ```typescript async function archiveFolder(folderId: string): Promise { // Get all documents in folder and subfolders const documents = await store .query() .from("Folder", "f") .whereNode("f", (f) => f.id.eq(folderId)) .traverse("contains", "e") .recursive() .to("Document", "d") .select((ctx) => ctx.d.id) .execute(); // Archive each document await store.transaction(async (tx) => { for (const docId of documents) { await tx.nodes.Document.update(docId, { status: "archived" }); } }); return documents.length; } ``` ## Next Steps - [Product Catalog](/examples/product-catalog) - Categories, variants, inventory - [Workflow Engine](/examples/workflow-engine) - State machines with approvals - [Audit Trail](/examples/audit-trail) - Complete change tracking # Knowledge Graph for RAG > Enhance retrieval with entity linking, relationship traversal, and multi-hop context This example demonstrates how **graph structure enhances RAG** beyond vector similarity. While [Semantic Search](/semantic-search) covers embedding basics, this guide focuses on what graphs uniquely provide: entity disambiguation, relationship traversal, and structured context that flat retrieval cannot offer. ## What Graphs Add to RAG | Flat RAG | Graph RAG | |----------|-----------| | Returns similar chunks | Traverses to related entities and facts | | Treats "Apple" the same everywhere | Disambiguates Apple Inc. vs. apple fruit | | Context is unstructured text | Context includes structured relationships | | Single-hop retrieval | Multi-hop reasoning across connections | **Example**: For "What companies has Elon Musk founded?", flat RAG returns chunks mentioning him. Graph RAG traverses from the "Elon Musk" entity through "founded" edges to return structured company data—regardless of whether those facts appear in the same chunk. ## Schema ```typescript import { z } from "zod"; import { defineNode, defineEdge, defineGraph, embedding, inverseOf } from "@nicia-ai/typegraph"; // Source documents const Document = defineNode("Document", { schema: z.object({ title: z.string(), source: z.string(), }), }); // Text chunks with embeddings const Chunk = defineNode("Chunk", { schema: z.object({ text: z.string(), embedding: embedding(1536), position: z.number().int(), }), }); // Extracted entities const Entity = defineNode("Entity", { schema: z.object({ name: z.string(), type: z.enum(["person", "organization", "location", "concept", "product", "event"]), description: z.string().optional(), embedding: embedding(1536).optional(), }), }); // Edges const containsChunk = defineEdge("containsChunk"); const nextChunk = defineEdge("nextChunk"); const prevChunk = defineEdge("prevChunk"); const mentions = defineEdge("mentions", { schema: z.object({ confidence: z.number().min(0).max(1).optional(), }), }); const relatesTo = defineEdge("relatesTo", { schema: z.object({ relationship: z.string(), // "founded", "works_at", "located_in" }), }); export const graph = defineGraph({ id: "rag_graph", nodes: { Document: { type: Document }, Chunk: { type: Chunk }, Entity: { type: Entity, unique: [ { name: "entity_name_type", fields: ["name", "type"], scope: "kind", collation: "caseInsensitive", }, ], }, }, edges: { containsChunk: { type: containsChunk, from: [Document], to: [Chunk] }, nextChunk: { type: nextChunk, from: [Chunk], to: [Chunk] }, prevChunk: { type: prevChunk, from: [Chunk], to: [Chunk] }, mentions: { type: mentions, from: [Chunk], to: [Entity] }, relatesTo: { type: relatesTo, from: [Entity], to: [Entity] }, }, ontology: [inverseOf(nextChunk, prevChunk)], }); ``` ## Embedding Setup Using [Vercel AI SDK](https://ai-sdk.dev/docs/ai-sdk-core/embeddings): ```typescript import { embed, embedMany } from "ai"; import { openai } from "@ai-sdk/openai"; const embeddingModel = openai.embeddingModel("text-embedding-3-small"); async function generateEmbedding(text: string): Promise { const { embedding } = await embed({ model: embeddingModel, value: text }); return embedding; } async function generateEmbeddings(texts: string[]): Promise { const { embeddings } = await embedMany({ model: embeddingModel, values: texts }); return embeddings; } ``` ## Ingestion with Entity Linking The key graph RAG capability: linking chunks to disambiguated entities. ```typescript import type { Node } from "@nicia-ai/typegraph"; interface ChunkData { text: string; entities: Array<{ name: string; type: "person" | "organization" | "location" | "concept" | "product" | "event"; }>; } async function ingestDocument( title: string, source: string, chunks: ChunkData[] ): Promise { await store.transaction(async (tx) => { const doc = await tx.nodes.Document.create({ title, source }); // Batch embed all chunks const chunkEmbeddings = await generateEmbeddings(chunks.map((c) => c.text)); let prevChunk: Node | undefined; for (const [i, chunkData] of chunks.entries()) { const chunk = await tx.nodes.Chunk.create({ text: chunkData.text, embedding: chunkEmbeddings[i], position: i, }); await tx.edges.containsChunk.create(doc, chunk, {}); if (prevChunk) { await tx.edges.nextChunk.create(prevChunk, chunk, {}); } // Link to entities (dedupe by unique constraint) for (const entityData of chunkData.entities) { const entityResult = await tx.nodes.Entity.getOrCreateByConstraint( "entity_name_type", { name: entityData.name, type: entityData.type, } ); // Compute expensive derived fields only for newly created entities if (entityResult.action === "created") { await tx.nodes.Entity.update(entityResult.node.id, { embedding: await generateEmbedding(entityData.name), }); } await tx.edges.mentions.getOrCreateByEndpoints( chunk, entityResult.node, {}, { ifExists: "return" } ); } prevChunk = chunk; } }); } ``` ## Graph-Specific Query Patterns These patterns demonstrate capabilities that require graph structure—they cannot be replicated with flat vector search. ### Entity-Based Retrieval Find all chunks mentioning a specific entity, regardless of how it's phrased: ```typescript async function findChunksByEntity(entityName: string) { return store .query() .from("Entity", "e") .whereNode("e", (e) => e.name.eq(entityName)) .traverse("mentions", "m", { direction: "in" }) .to("Chunk", "c") .select((ctx) => ctx.c.text) .execute(); } ``` ### Multi-Hop Entity Traversal Find entities connected through relationships: ```typescript async function findRelatedEntities(entityName: string, maxHops = 2) { const rows = await store .query() .from("Entity", "e") .whereNode("e", (e) => e.name.eq(entityName)) .traverse("relatesTo", "r") .recursive({ maxHops, depth: "depth" }) .to("Entity", "related") .select((ctx) => ({ from: ctx.e.name, to: ctx.related.name, toId: ctx.related.id, depth: ctx.depth, })) .execute(); // distinct paths can reach the same target; dedupe by target const seen = new Set(); return rows .filter((row) => { if (seen.has(row.toId)) return false; seen.add(row.toId); return true; }) .map((row) => ({ from: row.from, to: row.to, depth: row.depth, })); } ``` ### Context Window Expansion Get surrounding chunks for a match: ```typescript async function getChunkWithContext(chunkId: string, windowSize = 1) { const [before, after] = await Promise.all([ store .query() .from("Chunk", "c") .whereNode("c", (c) => c.id.eq(chunkId)) .traverse("prevChunk", "e") .recursive({ maxHops: windowSize }) .to("Chunk", "prev") .orderBy("prev", "position", "desc") .select((ctx) => ctx.prev.text) .execute(), store .query() .from("Chunk", "c") .whereNode("c", (c) => c.id.eq(chunkId)) .traverse("nextChunk", "e") .recursive({ maxHops: windowSize }) .to("Chunk", "next") .orderBy("next", "position", "asc") .select((ctx) => ctx.next.text) .execute(), ]); const chunk = await store.nodes.Chunk.getById(chunkId); return { before: before.toReversed(), chunk: chunk?.text ?? "", after, }; } ``` ## Hybrid Retrieval: Vector + Graph Combine vector similarity with graph traversal in a single query using the `from` option: ```typescript async function hybridRetrieval(query: string, limit = 10) { const queryEmbedding = await generateEmbedding(query); // Single query: vector search + fan-out to entities AND document const results = await store .query() .from("Chunk", "c") .whereNode("c", (c) => c.embedding.similarTo(queryEmbedding, limit, { metric: "cosine", minScore: 0.7 }) ) .traverse("mentions", "m") .to("Entity", "e") .traverse("containsChunk", "d_edge", { direction: "in", from: "c" }) // Fan-out from chunk .to("Document", "d") .select((ctx) => ({ chunkId: ctx.c.id, text: ctx.c.text, score: ctx.c.embedding.similarity(queryEmbedding), source: ctx.d.title, entityName: ctx.e.name, entityType: ctx.e.type, })) .execute(); // Group by chunk (one row per chunk-entity pair) const byChunk = new Map }>(); for (const row of results) { const existing = byChunk.get(row.chunkId); if (existing) { existing.entities.push({ name: row.entityName, type: row.entityType }); } else { byChunk.set(row.chunkId, { ...row, entities: [{ name: row.entityName, type: row.entityType }], }); } } return [...byChunk.values()]; } ``` The `from` option enables **fan-out patterns** where you traverse multiple relationships from the same node. Without `from`, traversals chain sequentially (A→B→C). With `from`, you can branch: traverse from chunk to entities, AND from chunk to document. ## Building Structured Context Format graph-enriched context for an LLM: ```typescript async function buildGraphContext(query: string, extractedEntities: string[]) { const queryEmbedding = await generateEmbedding(query); // Get relevant chunks with sources const chunks = await store .query() .from("Chunk", "c") .whereNode("c", (c) => c.embedding.similarTo(queryEmbedding, 5, { metric: "cosine", minScore: 0.7 }) ) .traverse("containsChunk", "e", { direction: "in" }) .to("Document", "d") .select((ctx) => ({ text: ctx.c.text, source: ctx.d.title })) .execute(); // Get entity relationships from graph const entityFacts = await Promise.all( extractedEntities.map(async (name) => { const relations = await store .query() .from("Entity", "e") .whereNode("e", (e) => e.name.eq(name)) .traverse("relatesTo", "r") .to("Entity", "target") .select((ctx) => ctx.target.name) .execute(); return relations.length > 0 ? { name, relatedTo: relations } : undefined; }) ); return { chunks, entityFacts: entityFacts.filter(Boolean) }; } function formatForPrompt(context: Awaited>): string { let prompt = "## Relevant Passages\n\n"; for (const chunk of context.chunks) { prompt += `**${chunk.source}**: ${chunk.text}\n\n`; } if (context.entityFacts.length > 0) { prompt += "## Entity Relationships\n\n"; for (const entity of context.entityFacts) { if (entity) { prompt += `**${entity.name}** → ${entity.relatedTo.join(", ")}\n`; } } } return prompt; } ``` ## When to Use Graph RAG **Use graph RAG when:** - Queries require connecting facts across documents ("Who founded X and what else did they start?") - Entity disambiguation matters (distinguishing "Apple" the company from "apple" the fruit) - Relationship traversal provides value ("Find all companies in the same industry as X") - You need structured facts alongside unstructured text **Flat vector RAG may suffice when:** - Simple "find similar content" queries - No entity relationships to exploit - Single-document question answering ## Next Steps - [Semantic Search](/semantic-search) — Vector embedding fundamentals - [Traversals](/queries/traverse) — Graph traversal patterns - [Document Management](/examples/document-management) — Versioning and access control # Multi-Tenant SaaS > Complete multi-tenancy patterns with isolation, data partitioning, and tenant management This example shows how to build a multi-tenant SaaS application with: - **Three isolation strategies** (shared tables, schema per tenant, database per tenant) - **Tenant-aware queries** that automatically filter data - **Tenant provisioning** and lifecycle management - **Cross-tenant analytics** for platform operators - **Tenant migration** between isolation levels ## Choosing an Isolation Strategy | Strategy | Isolation | Complexity | Cost | Best For | |----------|-----------|------------|------|----------| | Shared tables | Low | Low | Lowest | Many small tenants, B2C SaaS | | Schema per tenant | Medium | Medium | Low | SMB customers, PostgreSQL only | | Database per tenant | High | High | Highest | Enterprise, compliance requirements | ## Strategy 1: Shared Tables with Row-Level Isolation All tenants share the same database tables, filtered by `tenantId`. ### Schema Definition ```typescript import { z } from "zod"; import { defineNode, defineEdge, defineGraph } from "@nicia-ai/typegraph"; // Tenant metadata const Tenant = defineNode("Tenant", { schema: z.object({ slug: z.string(), name: z.string(), plan: z.enum(["free", "starter", "pro", "enterprise"]), status: z.enum(["active", "suspended", "cancelled"]).default("active"), createdAt: z.string().datetime(), settings: z.record(z.unknown()).optional(), }), }); // All entities include tenantId const Project = defineNode("Project", { schema: z.object({ tenantId: z.string(), name: z.string(), description: z.string().optional(), status: z.enum(["active", "archived"]).default("active"), }), }); const Task = defineNode("Task", { schema: z.object({ tenantId: z.string(), title: z.string(), status: z.enum(["todo", "in_progress", "done"]).default("todo"), priority: z.enum(["low", "medium", "high"]).default("medium"), }), }); const User = defineNode("User", { schema: z.object({ tenantId: z.string(), email: z.string().email(), name: z.string(), role: z.enum(["owner", "admin", "member", "guest"]).default("member"), }), }); // Edges const hasProject = defineEdge("hasProject"); const hasTask = defineEdge("hasTask"); const assignedTo = defineEdge("assignedTo"); const memberOf = defineEdge("memberOf"); const graph = defineGraph({ id: "multi_tenant", nodes: { Tenant: { type: Tenant }, Project: { type: Project }, Task: { type: Task }, User: { type: User }, }, edges: { hasProject: { type: hasProject, from: [Tenant], to: [Project] }, hasTask: { type: hasTask, from: [Project], to: [Task] }, assignedTo: { type: assignedTo, from: [Task], to: [User] }, memberOf: { type: memberOf, from: [User], to: [Tenant] }, }, }); ``` ### Tenant-Scoped Store Create a wrapper that automatically filters by tenant: ```typescript interface TenantContext { tenantId: string; userId: string; role: "owner" | "admin" | "member" | "guest"; } function createTenantStore(store: Store, ctx: TenantContext) { const projects = { async list(options: { status?: string } = {}) { let query = store .query() .from("Project", "p") .whereNode("p", (p) => p.tenantId.eq(ctx.tenantId)); if (options.status) { query = query.whereNode("p", (p) => p.status.eq(options.status)); } return query.select((q) => q.p).execute(); }, async create(data: { name: string; description?: string }) { const project = await store.nodes.Project.create({ ...data, tenantId: ctx.tenantId, }); const tenant = await store.nodes.Tenant.getById(ctx.tenantId); if (!tenant) throw new Error(`Tenant not found: ${ctx.tenantId}`); await store.edges.hasProject.create(tenant, project, {}); return project; }, async get(projectId: string) { const project = await store.nodes.Project.getById(projectId); if (!project || project.tenantId !== ctx.tenantId) { throw new Error("Not found"); } return project; }, async update(projectId: string, updates: Partial) { await projects.get(projectId); // Verify access return store.nodes.Project.update(projectId, updates); }, async delete(projectId: string) { await projects.get(projectId); // Verify access await store.nodes.Project.delete(projectId); }, }; const tasks = { async list(projectId: string) { await projects.get(projectId); // Verify access return store .query() .from("Project", "p") .whereNode("p", (p) => p.id.eq(projectId)) .traverse("hasTask", "e") .to("Task", "t") .select((q) => q.t) .execute(); }, async create(projectId: string, data: { title: string; priority?: string }) { const project = await projects.get(projectId); // Verify access const task = await store.nodes.Task.create({ ...data, tenantId: ctx.tenantId, }); await store.edges.hasTask.create(project, task, {}); return task; }, }; const users = { async list() { return store .query() .from("User", "u") .whereNode("u", (u) => u.tenantId.eq(ctx.tenantId)) .select((q) => q.u) .execute(); }, async invite(email: string, name: string, role: string) { if (ctx.role !== "owner" && ctx.role !== "admin") { throw new Error("Insufficient permissions"); } const user = await store.nodes.User.create({ tenantId: ctx.tenantId, email, name, role, }); const tenant = await store.nodes.Tenant.getById(ctx.tenantId); if (!tenant) throw new Error(`Tenant not found: ${ctx.tenantId}`); await store.edges.memberOf.create(user, tenant, {}); return user; }, }; return { projects, tasks, users }; } // Usage in API handler async function handleRequest(req: Request) { const session = await getSession(req); const tenantStore = createTenantStore(store, { tenantId: session.tenantId, userId: session.userId, role: session.role, }); // All queries are automatically tenant-scoped const projects = await tenantStore.projects.list(); } ``` ### Tenant Provisioning ```typescript async function provisionTenant( slug: string, name: string, ownerEmail: string, ownerName: string, plan: "free" | "starter" | "pro" | "enterprise" = "free" ): Promise<{ tenant: Node; owner: Node }> { return store.transaction(async (tx) => { // Check slug uniqueness const existing = await tx .query() .from("Tenant", "t") .whereNode("t", (t) => t.slug.eq(slug)) .first(); if (existing) { throw new Error("Tenant slug already exists"); } // Create tenant const tenant = await tx.nodes.Tenant.create({ slug, name, plan, status: "active", createdAt: new Date().toISOString(), }); // Create owner user const owner = await tx.nodes.User.create({ tenantId: tenant.id, email: ownerEmail, name: ownerName, role: "owner", }); await tx.edges.memberOf.create(owner, tenant, {}); return { tenant, owner }; }); } ``` ## Strategy 2: Schema Per Tenant (PostgreSQL) Each tenant gets their own PostgreSQL schema within the same database. ### Setup ```typescript import { Pool } from "pg"; import { drizzle } from "drizzle-orm/node-postgres"; import { sql } from "drizzle-orm"; import { createPostgresBackend, generatePostgresMigrationSQL } from "@nicia-ai/typegraph/postgres"; const pool = new Pool({ connectionString: process.env.DATABASE_URL }); async function createTenantSchema(tenantId: string): Promise { const schemaName = `tenant_${tenantId}`; // Create schema await pool.query(`CREATE SCHEMA IF NOT EXISTS ${schemaName}`); // Run TypeGraph migrations in the tenant schema await pool.query(`SET search_path TO ${schemaName}`); await pool.query(generatePostgresMigrationSQL()); await pool.query(`SET search_path TO public`); } async function getTenantStore(tenantId: string): Promise { const schemaName = `tenant_${tenantId}`; // Create connection with schema const client = await pool.connect(); await client.query(`SET search_path TO ${schemaName}`); const db = drizzle(client); const backend = createPostgresBackend(db); return createStore(graph, backend); } ``` ### Tenant Store Cache ```typescript class TenantStoreManager { private stores = new Map(); private maxCached = 100; async getStore(tenantId: string): Promise { const cached = this.stores.get(tenantId); if (cached) { cached.lastUsed = new Date(); return cached.store; } // Evict oldest if at capacity if (this.stores.size >= this.maxCached) { this.evictOldest(); } const store = await getTenantStore(tenantId); this.stores.set(tenantId, { store, lastUsed: new Date() }); return store; } private evictOldest(): void { let oldest: { id: string; date: Date } | undefined; for (const [id, { lastUsed }] of this.stores) { if (!oldest || lastUsed < oldest.date) { oldest = { id, date: lastUsed }; } } if (oldest) { this.stores.delete(oldest.id); } } } const tenantManager = new TenantStoreManager(); ``` ### Provisioning with Schema ```typescript async function provisionTenantWithSchema( slug: string, name: string, ownerEmail: string ): Promise<{ tenantId: string }> { const tenantId = generateUUID(); // Create schema and tables await createTenantSchema(tenantId); // Get tenant-specific store const tenantStore = await tenantManager.getStore(tenantId); // Create initial data await tenantStore.nodes.User.create({ email: ownerEmail, name: name, role: "owner", }); // Store tenant metadata in public schema const publicDb = drizzle(pool); await publicDb.insert(tenants).values({ id: tenantId, slug, name, createdAt: new Date(), }); return { tenantId }; } ``` ## Strategy 3: Database Per Tenant Each tenant gets their own database for maximum isolation. ### Tenant Database Manager ```typescript interface TenantConfig { id: string; slug: string; databaseUrl: string; status: "active" | "suspended"; } class TenantDatabaseManager { private connections = new Map(); private maxConnections = 50; async getStore(tenantId: string): Promise { const cached = this.connections.get(tenantId); if (cached) return cached.store; // Get tenant config from central registry const config = await this.getTenantConfig(tenantId); if (config.status !== "active") { throw new Error("Tenant is not active"); } // Evict if at capacity if (this.connections.size >= this.maxConnections) { await this.evictLeastUsed(); } // Create new connection const pool = new Pool({ connectionString: config.databaseUrl, max: 5 }); const db = drizzle(pool); const backend = createPostgresBackend(db); const store = createStore(graph, backend); this.connections.set(tenantId, { pool, store }); return store; } async closeConnection(tenantId: string): Promise { const conn = this.connections.get(tenantId); if (conn) { await conn.pool.end(); this.connections.delete(tenantId); } } private async getTenantConfig(tenantId: string): Promise { // Fetch from central tenant registry const result = await centralDb .select() .from(tenantConfigs) .where(eq(tenantConfigs.id, tenantId)) .get(); if (!result) throw new Error("Tenant not found"); return result; } private async evictLeastUsed(): Promise { // Simple LRU eviction const first = this.connections.keys().next().value; if (first) { await this.closeConnection(first); } } } const dbManager = new TenantDatabaseManager(); ``` ### Provisioning New Database ```typescript async function provisionTenantDatabase( slug: string, name: string, ownerEmail: string ): Promise<{ tenantId: string; databaseUrl: string }> { const tenantId = generateUUID(); const dbName = `tenant_${tenantId.replace(/-/g, "_")}`; // Create database (using admin connection) const adminPool = new Pool({ connectionString: process.env.ADMIN_DATABASE_URL }); await adminPool.query(`CREATE DATABASE ${dbName}`); await adminPool.end(); // Build connection URL const baseUrl = new URL(process.env.DATABASE_BASE_URL!); baseUrl.pathname = `/${dbName}`; const databaseUrl = baseUrl.toString(); // Initialize TypeGraph tables const tenantPool = new Pool({ connectionString: databaseUrl }); await tenantPool.query(generatePostgresMigrationSQL()); // Create initial data const db = drizzle(tenantPool); const backend = createPostgresBackend(db); const store = createStore(graph, backend); await store.nodes.User.create({ email: ownerEmail, name: name, role: "owner", }); await tenantPool.end(); // Register in central tenant registry await centralDb.insert(tenantConfigs).values({ id: tenantId, slug, name, databaseUrl, status: "active", createdAt: new Date(), }); return { tenantId, databaseUrl }; } ``` ## Cross-Tenant Operations For platform administrators who need to query across tenants. ### Aggregated Metrics (Shared Tables) ```typescript import { count, field } from "@nicia-ai/typegraph"; async function getTenantMetrics(): Promise< Array<{ tenantId: string; projectCount: number; taskCount: number; userCount: number }> > { // Projects by tenant const projectCounts = await store .query() .from("Project", "p") .groupBy("p", "tenantId") .aggregate({ tenantId: field("p", "tenantId"), projectCount: count("p"), }) .execute(); // Tasks by tenant const taskCounts = await store .query() .from("Task", "t") .groupBy("t", "tenantId") .aggregate({ tenantId: field("t", "tenantId"), taskCount: count("t"), }) .execute(); // Users by tenant const userCounts = await store .query() .from("User", "u") .groupBy("u", "tenantId") .aggregate({ tenantId: field("u", "tenantId"), userCount: count("u"), }) .execute(); // Merge results const metrics = new Map(); for (const p of projectCounts) { metrics.set(p.tenantId, { projectCount: p.projectCount, taskCount: 0, userCount: 0 }); } for (const t of taskCounts) { const existing = metrics.get(t.tenantId) || { projectCount: 0, taskCount: 0, userCount: 0 }; existing.taskCount = t.taskCount; metrics.set(t.tenantId, existing); } for (const u of userCounts) { const existing = metrics.get(u.tenantId) || { projectCount: 0, taskCount: 0, userCount: 0 }; existing.userCount = u.userCount; metrics.set(u.tenantId, existing); } return Array.from(metrics.entries()).map(([tenantId, counts]) => ({ tenantId, ...counts, })); } ``` ### Cross-Tenant Search (Database Per Tenant) ```typescript async function searchAcrossTenants( query: string, tenantIds: string[] ): Promise> { const results = await Promise.all( tenantIds.map(async (tenantId) => { try { const store = await dbManager.getStore(tenantId); const projects = await store .query() .from("Project", "p") .whereNode("p", (p) => p.name.contains(query)) .select((ctx) => ctx.p) .limit(10) .execute(); return { tenantId, results: projects }; } catch (error) { console.error(`Failed to search tenant ${tenantId}:`, error); return { tenantId, results: [] }; } }) ); return results; } ``` ## Tenant Lifecycle ### Suspend Tenant ```typescript async function suspendTenant(tenantId: string, reason: string): Promise { const current = await store.nodes.Tenant.getById(tenantId); if (!current) throw new Error(`Tenant not found: ${tenantId}`); await store.nodes.Tenant.update(tenantId, { status: "suspended", settings: { ...(current.settings || {}), suspendedAt: new Date().toISOString(), suspendReason: reason, }, }); } ``` ### Delete Tenant (Shared Tables) ```typescript async function deleteTenant(tenantId: string): Promise { await store.transaction(async (tx) => { // Delete all tasks const tasks = await tx .query() .from("Task", "t") .whereNode("t", (t) => t.tenantId.eq(tenantId)) .select((ctx) => ctx.t.id) .execute(); for (const taskId of tasks) { await tx.nodes.Task.delete(taskId); } // Delete all projects const projects = await tx .query() .from("Project", "p") .whereNode("p", (p) => p.tenantId.eq(tenantId)) .select((ctx) => ctx.p.id) .execute(); for (const projectId of projects) { await tx.nodes.Project.delete(projectId); } // Delete all users const users = await tx .query() .from("User", "u") .whereNode("u", (u) => u.tenantId.eq(tenantId)) .select((ctx) => ctx.u.id) .execute(); for (const userId of users) { await tx.nodes.User.delete(userId); } // Delete tenant await tx.nodes.Tenant.delete(tenantId); }); } ``` ### Delete Tenant (Database Per Tenant) ```typescript async function deleteTenantDatabase(tenantId: string): Promise { // Close active connection await dbManager.closeConnection(tenantId); // Get database name const config = await getTenantConfig(tenantId); const dbUrl = new URL(config.databaseUrl); const dbName = dbUrl.pathname.slice(1); // Drop database const adminPool = new Pool({ connectionString: process.env.ADMIN_DATABASE_URL }); await adminPool.query(`DROP DATABASE IF EXISTS ${dbName}`); await adminPool.end(); // Remove from registry await centralDb.delete(tenantConfigs).where(eq(tenantConfigs.id, tenantId)); } ``` ## Tenant Migration Move tenant between isolation strategies: ```typescript async function migrateTenantToSeparateDatabase(tenantId: string): Promise { // 1. Create new database const { databaseUrl } = await provisionTenantDatabase( `migrated_${tenantId}`, "Migrated Tenant", "placeholder@example.com" ); // 2. Get tenant data from shared tables const sharedStore = store; const projects = await sharedStore .query() .from("Project", "p") .whereNode("p", (p) => p.tenantId.eq(tenantId)) .select((ctx) => ctx.p) .execute(); const tasks = await sharedStore .query() .from("Task", "t") .whereNode("t", (t) => t.tenantId.eq(tenantId)) .select((ctx) => ctx.t) .execute(); const users = await sharedStore .query() .from("User", "u") .whereNode("u", (u) => u.tenantId.eq(tenantId)) .select((ctx) => ctx.u) .execute(); // 3. Insert into new database const newStore = await dbManager.getStore(tenantId); await newStore.transaction(async (tx) => { for (const project of projects) { await tx.nodes.Project.create(project); } for (const task of tasks) { await tx.nodes.Task.create(task); } for (const user of users) { await tx.nodes.User.create(user); } }); // 4. Delete from shared tables await deleteTenant(tenantId); return databaseUrl; } ``` ## Next Steps - [Document Management](/examples/document-management) - CMS with semantic search - [Product Catalog](/examples/product-catalog) - Categories, variants, inventory - [Integration Patterns](/integration) - More deployment strategies # Product Catalog > E-commerce catalog with categories, variants, and inventory tracking This example builds a product catalog system with: - **Category hierarchy** with inheritance - **Product variants** (size, color, etc.) - **Inventory tracking** across warehouses - **Product relationships** (bundles, accessories, alternatives) - **Price history** using temporal queries ## Schema Definition ```typescript import { z } from "zod"; import { defineNode, defineEdge, defineGraph, embedding, } from "@nicia-ai/typegraph"; // Category hierarchy const Category = defineNode("Category", { schema: z.object({ name: z.string(), slug: z.string(), description: z.string().optional(), imageUrl: z.string().url().optional(), displayOrder: z.number().default(0), isActive: z.boolean().default(true), }), }); // Products const Product = defineNode("Product", { schema: z.object({ sku: z.string(), name: z.string(), description: z.string(), basePrice: z.number().positive(), currency: z.string().default("USD"), status: z.enum(["draft", "active", "discontinued"]).default("draft"), embedding: embedding(1536).optional(), }), }); // Product variants (specific size/color combinations) const Variant = defineNode("Variant", { schema: z.object({ sku: z.string(), name: z.string(), // "Large / Blue" priceModifier: z.number().default(0), // Added to base price attributes: z.record(z.string()), // { size: "L", color: "blue" } isDefault: z.boolean().default(false), }), }); // Inventory const Warehouse = defineNode("Warehouse", { schema: z.object({ code: z.string(), name: z.string(), location: z.string(), isActive: z.boolean().default(true), }), }); const Inventory = defineNode("Inventory", { schema: z.object({ quantity: z.number().int().min(0), reservedQuantity: z.number().int().min(0).default(0), reorderPoint: z.number().int().min(0).default(10), lastCountedAt: z.string().datetime().optional(), }), }); // Edges const parentCategory = defineEdge("parentCategory"); const inCategory = defineEdge("inCategory", { schema: z.object({ isPrimary: z.boolean().default(false) }), }); const hasVariant = defineEdge("hasVariant"); const inventoryFor = defineEdge("inventoryFor"); const atWarehouse = defineEdge("atWarehouse"); const relatedProduct = defineEdge("relatedProduct", { schema: z.object({ type: z.enum(["accessory", "alternative", "bundled", "upsell"]), sortOrder: z.number().default(0), }), }); // Graph const graph = defineGraph({ id: "product_catalog", nodes: { Category: { type: Category }, Product: { type: Product }, Variant: { type: Variant }, Warehouse: { type: Warehouse }, Inventory: { type: Inventory }, }, edges: { parentCategory: { type: parentCategory, from: [Category], to: [Category] }, inCategory: { type: inCategory, from: [Product], to: [Category] }, hasVariant: { type: hasVariant, from: [Product], to: [Variant] }, inventoryFor: { type: inventoryFor, from: [Inventory], to: [Variant] }, atWarehouse: { type: atWarehouse, from: [Inventory], to: [Warehouse] }, relatedProduct: { type: relatedProduct, from: [Product], to: [Product] }, }, ontology: [ // Category hierarchy is modeled via the parentCategory edge, not ontology. // Use ontology for type-level constraints, e.g.: // disjointWith(Product, Category), ], }); ``` ## Category Management ### Create Category Tree ```typescript async function createCategory( name: string, slug: string, parentSlug?: string ): Promise> { const category = await store.nodes.Category.create({ name, slug, isActive: true, }); if (parentSlug) { const parent = await store .query() .from("Category", "c") .whereNode("c", (c) => c.slug.eq(parentSlug)) .select((ctx) => ctx.c) .first(); if (parent) { await store.edges.parentCategory.create(category, parent, {}); } } return category; } // Build initial category structure await createCategory("Electronics", "electronics"); await createCategory("Phones", "phones", "electronics"); await createCategory("Accessories", "accessories", "electronics"); await createCategory("Cases", "cases", "accessories"); await createCategory("Chargers", "chargers", "accessories"); ``` ### Get Category with Ancestors ```typescript interface CategoryWithPath { id: string; name: string; slug: string; path: Array<{ name: string; slug: string }>; } async function getCategoryWithPath(slug: string): Promise { const category = await store .query() .from("Category", "c") .whereNode("c", (c) => c.slug.eq(slug)) .select((ctx) => ({ id: ctx.c.id, name: ctx.c.name, slug: ctx.c.slug, })) .first(); if (!category) return undefined; const ancestors = await store .query() .from("Category", "c") .whereNode("c", (c) => c.slug.eq(slug)) .traverse("parentCategory", "e") .recursive() .to("Category", "ancestor") .select((ctx) => ({ name: ctx.ancestor.name, slug: ctx.ancestor.slug, })) .execute(); return { ...category, path: ancestors.reverse(), // Root first }; } ``` ### Get Subcategories ```typescript async function getSubcategories( parentSlug: string, includeNested = false ): Promise> { let query = store .query() .from("Category", "parent") .whereNode("parent", (c) => c.slug.eq(parentSlug)) .traverse("parentCategory", "e", { direction: "in" }); if (includeNested) { query = query.recursive({ depth: "depth" }); } return query .to("Category", "child") .whereNode("child", (c) => c.isActive.eq(true)) .select((ctx) => ({ id: ctx.child.id, name: ctx.child.name, slug: ctx.child.slug, depth: ctx.depth ?? 1, })) .orderBy((ctx) => ctx.child.displayOrder, "asc") .execute(); } ``` ## Product Management ### Create Product with Variants ```typescript interface ProductInput { sku: string; name: string; description: string; basePrice: number; categorySlug: string; variants: Array<{ sku: string; name: string; priceModifier?: number; attributes: Record; isDefault?: boolean; }>; } async function createProduct(input: ProductInput): Promise> { return store.transaction(async (tx) => { // Generate embedding for semantic search const embedding = await generateEmbedding(`${input.name} ${input.description}`); // Create product const product = await tx.nodes.Product.create({ sku: input.sku, name: input.name, description: input.description, basePrice: input.basePrice, status: "draft", embedding, }); // Link to category const category = await tx .query() .from("Category", "c") .whereNode("c", (c) => c.slug.eq(input.categorySlug)) .select((ctx) => ctx.c) .first(); if (category) { await tx.edges.inCategory.create(product, category, { isPrimary: true }); } // Create variants for (const v of input.variants) { const variant = await tx.nodes.Variant.create({ sku: v.sku, name: v.name, priceModifier: v.priceModifier ?? 0, attributes: v.attributes, isDefault: v.isDefault ?? false, }); await tx.edges.hasVariant.create(product, variant, {}); } return product; }); } ``` ### Get Product Details ```typescript interface ProductDetails { id: string; sku: string; name: string; description: string; basePrice: number; status: string; categories: Array<{ name: string; slug: string; isPrimary: boolean }>; variants: Array<{ id: string; sku: string; name: string; price: number; attributes: Record; inventory: number; }>; related: Array<{ id: string; name: string; type: string }>; } async function getProductDetails(sku: string): Promise { const product = await store .query() .from("Product", "p") .whereNode("p", (p) => p.sku.eq(sku)) .select((ctx) => ctx.p) .first(); if (!product) return undefined; // Get categories const categories = await store .query() .from("Product", "p") .whereNode("p", (p) => p.id.eq(product.id)) .traverse("inCategory", "e") .to("Category", "c") .select((ctx) => ({ name: ctx.c.name, slug: ctx.c.slug, isPrimary: ctx.e.isPrimary, })) .execute(); // Get variants with inventory const variants = await store .query() .from("Product", "p") .whereNode("p", (p) => p.id.eq(product.id)) .traverse("hasVariant", "e") .to("Variant", "v") .optionalTraverse("inventoryFor", "inv", { direction: "in" }) .to("Inventory", "i") .select((ctx) => ({ id: ctx.v.id, sku: ctx.v.sku, name: ctx.v.name, priceModifier: ctx.v.priceModifier, attributes: ctx.v.attributes, quantity: ctx.i?.quantity ?? 0, reservedQuantity: ctx.i?.reservedQuantity ?? 0, })) .execute(); // Get related products const related = await store .query() .from("Product", "p") .whereNode("p", (p) => p.id.eq(product.id)) .traverse("relatedProduct", "e") .to("Product", "r") .select((ctx) => ({ id: ctx.r.id, name: ctx.r.name, type: ctx.e.type, })) .orderBy((ctx) => ctx.e.sortOrder, "asc") .execute(); return { id: product.id, sku: product.sku, name: product.name, description: product.description, basePrice: product.basePrice, status: product.status, categories, variants: variants.map((v) => ({ ...v, price: product.basePrice + v.priceModifier, inventory: v.quantity - v.reservedQuantity, })), related, }; } ``` ## Inventory Management ### Update Inventory ```typescript async function updateInventory( variantSku: string, warehouseCode: string, quantity: number ): Promise { const variant = await store .query() .from("Variant", "v") .whereNode("v", (v) => v.sku.eq(variantSku)) .select((ctx) => ctx.v) .first(); const warehouse = await store .query() .from("Warehouse", "w") .whereNode("w", (w) => w.code.eq(warehouseCode)) .select((ctx) => ctx.w) .first(); if (!variant || !warehouse) { throw new Error("Variant or warehouse not found"); } // Find existing inventory record const existingInventory = await store .query() .from("Inventory", "i") .traverse("inventoryFor", "e1") .to("Variant", "v") .whereNode("v", (v) => v.id.eq(variant.id)) .traverse("atWarehouse", "e2", { direction: "in" }) .to("Warehouse", "w") .whereNode("w", (w) => w.id.eq(warehouse.id)) .select((ctx) => ctx.i) .first(); if (existingInventory) { await store.nodes.Inventory.update(existingInventory.id, { quantity, lastCountedAt: new Date().toISOString(), }); } else { const inventory = await store.nodes.Inventory.create({ quantity, reservedQuantity: 0, lastCountedAt: new Date().toISOString(), }); await store.edges.inventoryFor.create(inventory, variant, {}); await store.edges.atWarehouse.create(inventory, warehouse, {}); } } ``` ### Reserve Inventory ```typescript async function reserveInventory( variantSku: string, quantity: number ): Promise<{ success: boolean; warehouseCode?: string }> { const inventories = await store .query() .from("Variant", "v") .whereNode("v", (v) => v.sku.eq(variantSku)) .traverse("inventoryFor", "e", { direction: "in" }) .to("Inventory", "i") .traverse("atWarehouse", "e2") .to("Warehouse", "w") .whereNode("w", (w) => w.isActive.eq(true)) .select((ctx) => ({ inventoryId: ctx.i.id, warehouseCode: ctx.w.code, available: ctx.i.quantity - ctx.i.reservedQuantity, reservedQuantity: ctx.i.reservedQuantity, })) .execute(); // Find warehouse with enough inventory const available = inventories.find((i) => i.available >= quantity); if (!available) { return { success: false }; } await store.nodes.Inventory.update(available.inventoryId, { reservedQuantity: available.reservedQuantity + quantity, }); return { success: true, warehouseCode: available.warehouseCode }; } ``` ### Low Stock Report ```typescript import { field, sum, havingLt } from "@nicia-ai/typegraph"; interface LowStockItem { productName: string; variantSku: string; variantName: string; totalQuantity: number; reorderPoint: number; } async function getLowStockItems(): Promise { return store .query() .from("Product", "p") .traverse("hasVariant", "e1") .to("Variant", "v") .traverse("inventoryFor", "e2", { direction: "in" }) .to("Inventory", "i") .groupByNode("v") .having(havingLt(sum("i", "quantity"), field("i", "reorderPoint"))) .aggregate({ productName: field("p", "name"), variantSku: field("v", "sku"), variantName: field("v", "name"), totalQuantity: sum("i", "quantity"), reorderPoint: field("i", "reorderPoint"), }) .execute(); } ``` ## Search and Discovery ### Semantic Product Search ```typescript async function searchProducts( query: string, options: { categorySlug?: string; minPrice?: number; maxPrice?: number; limit?: number; } = {} ): Promise> { const { categorySlug, minPrice, maxPrice, limit = 20 } = options; const queryEmbedding = await generateEmbedding(query); let queryBuilder = store .query() .from("Product", "p") .whereNode("p", (p) => { let pred = p.embedding .similarTo(queryEmbedding, limit, { metric: "cosine", minScore: 0.6 }) .and(p.status.eq("active")); if (minPrice !== undefined) { pred = pred.and(p.basePrice.gte(minPrice)); } if (maxPrice !== undefined) { pred = pred.and(p.basePrice.lte(maxPrice)); } return pred; }); // Filter by category if specified if (categorySlug) { // Get category and all subcategories const categoryIds = await store .query() .from("Category", "c") .whereNode("c", (c) => c.slug.eq(categorySlug)) .traverse("parentCategory", "e", { direction: "in" }) .recursive() .to("Category", "sub") .select((ctx) => ctx.sub.id) .execute(); queryBuilder = queryBuilder .traverse("inCategory", "e") .to("Category", "c") .whereNode("c", (c) => c.id.in([...categoryIds, categorySlug])); } return queryBuilder .select((ctx) => ({ product: ctx.p, score: ctx.p.embedding.similarity(queryEmbedding), })) .execute(); } ``` ### Get Products in Category ```typescript async function getProductsInCategory( categorySlug: string, options: { includeSubcategories?: boolean; page?: number; pageSize?: number; sortBy?: "name" | "price" | "newest"; } = {} ): Promise<{ products: ProductProps[]; total: number }> { const { includeSubcategories = true, page = 1, pageSize = 20, sortBy = "name" } = options; // Build category ID list let categoryIds: string[] = []; const rootCategory = await store .query() .from("Category", "c") .whereNode("c", (c) => c.slug.eq(categorySlug)) .select((ctx) => ctx.c.id) .first(); if (!rootCategory) return { products: [], total: 0 }; categoryIds.push(rootCategory); if (includeSubcategories) { const subIds = await store .query() .from("Category", "c") .whereNode("c", (c) => c.slug.eq(categorySlug)) .traverse("parentCategory", "e", { direction: "in" }) .recursive() .to("Category", "sub") .select((ctx) => ctx.sub.id) .execute(); categoryIds = [...categoryIds, ...subIds]; } const query = store .query() .from("Product", "p") .whereNode("p", (p) => p.status.eq("active")) .traverse("inCategory", "e") .to("Category", "c") .whereNode("c", (c) => c.id.in(categoryIds)) .select((ctx) => ctx.p); // Apply sorting const sortedQuery = sortBy === "price" ? query.orderBy((ctx) => ctx.p.basePrice, "asc") : sortBy === "newest" ? query.orderBy((ctx) => ctx.p.createdAt, "desc") : query.orderBy((ctx) => ctx.p.name, "asc"); const products = await sortedQuery .limit(pageSize) .offset((page - 1) * pageSize) .execute(); const total = await store .query() .from("Product", "p") .whereNode("p", (p) => p.status.eq("active")) .traverse("inCategory", "e") .to("Category", "c") .whereNode("c", (c) => c.id.in(categoryIds)) .count(); return { products, total }; } ``` ## Price History ### Track Price Changes TypeGraph's temporal model automatically tracks all changes: ```typescript async function getPriceHistory( sku: string ): Promise> { return store .query() .from("Product", "p") .temporal("includeEnded") .whereNode("p", (p) => p.sku.eq(sku)) .orderBy((ctx) => ctx.p.validFrom, "desc") .select((ctx) => ({ price: ctx.p.basePrice, validFrom: ctx.p.validFrom, validTo: ctx.p.validTo, })) .execute(); } ``` ### Price at Point in Time ```typescript async function getPriceAsOf(sku: string, date: Date): Promise { const product = await store .query() .from("Product", "p") .temporal("asOf", date.toISOString()) .whereNode("p", (p) => p.sku.eq(sku)) .select((ctx) => ctx.p.basePrice) .first(); return product; } ``` ## Next Steps - [Document Management](/examples/document-management) - CMS with semantic search - [Workflow Engine](/examples/workflow-engine) - State machines with approvals - [Audit Trail](/examples/audit-trail) - Complete change tracking # Workflow Engine > State machines with approvals, assignments, and escalations This example builds a workflow engine with: - **State machine definitions** as graph schemas - **Approval chains** with multiple approvers - **Task assignment** and delegation - **Escalation rules** based on time - **Audit trail** of all state changes ## Schema Definition ```typescript import { z } from "zod"; import { defineNode, defineEdge, defineGraph, implies } from "@nicia-ai/typegraph"; // Workflow definition (template) const WorkflowDefinition = defineNode("WorkflowDefinition", { schema: z.object({ name: z.string(), description: z.string().optional(), version: z.number().int().positive(), isActive: z.boolean().default(true), }), }); // States within a workflow const State = defineNode("State", { schema: z.object({ name: z.string(), type: z.enum(["initial", "intermediate", "terminal", "approval"]), config: z.record(z.unknown()).optional(), // State-specific config }), }); // Transitions between states const Transition = defineNode("Transition", { schema: z.object({ name: z.string(), condition: z.string().optional(), // Expression to evaluate requiredRole: z.string().optional(), }), }); // Workflow instances const WorkflowInstance = defineNode("WorkflowInstance", { schema: z.object({ referenceId: z.string(), // ID of the entity being processed referenceType: z.string(), // Type of entity (e.g., "PurchaseOrder") status: z.enum(["active", "completed", "cancelled", "failed"]).default("active"), data: z.record(z.unknown()).optional(), // Instance-specific data createdAt: z.string().datetime(), completedAt: z.string().datetime().optional(), }), }); // Tasks assigned to users const Task = defineNode("Task", { schema: z.object({ title: z.string(), description: z.string().optional(), type: z.enum(["action", "approval", "review", "notification"]), status: z.enum(["pending", "in_progress", "completed", "rejected", "escalated"]).default("pending"), dueDate: z.string().datetime().optional(), priority: z.enum(["low", "medium", "high", "urgent"]).default("medium"), result: z.record(z.unknown()).optional(), completedAt: z.string().datetime().optional(), }), }); // Users const User = defineNode("User", { schema: z.object({ email: z.string().email(), name: z.string(), role: z.string(), department: z.string().optional(), }), }); // Comments on tasks const Comment = defineNode("Comment", { schema: z.object({ content: z.string(), createdAt: z.string().datetime(), }), }); // Edges const hasState = defineEdge("hasState"); const hasTransition = defineEdge("hasTransition"); const fromState = defineEdge("fromState"); const toState = defineEdge("toState"); const usesDefinition = defineEdge("usesDefinition"); const currentState = defineEdge("currentState"); const hasTask = defineEdge("hasTask"); const assignedTo = defineEdge("assignedTo"); const createdBy = defineEdge("createdBy"); const hasComment = defineEdge("hasComment"); const reportsTo = defineEdge("reportsTo"); // For escalation chain // Graph const graph = defineGraph({ id: "workflow_engine", nodes: { WorkflowDefinition: { type: WorkflowDefinition }, State: { type: State }, Transition: { type: Transition }, WorkflowInstance: { type: WorkflowInstance }, Task: { type: Task }, User: { type: User }, Comment: { type: Comment }, }, edges: { hasState: { type: hasState, from: [WorkflowDefinition], to: [State] }, hasTransition: { type: hasTransition, from: [WorkflowDefinition], to: [Transition] }, fromState: { type: fromState, from: [Transition], to: [State] }, toState: { type: toState, from: [Transition], to: [State] }, usesDefinition: { type: usesDefinition, from: [WorkflowInstance], to: [WorkflowDefinition] }, currentState: { type: currentState, from: [WorkflowInstance], to: [State] }, hasTask: { type: hasTask, from: [WorkflowInstance], to: [Task] }, assignedTo: { type: assignedTo, from: [Task], to: [User] }, createdBy: { type: createdBy, from: [Task, Comment, WorkflowInstance], to: [User] }, hasComment: { type: hasComment, from: [Task], to: [Comment] }, reportsTo: { type: reportsTo, from: [User], to: [User] }, }, ontology: [ // Escalation implies assignment implies(reportsTo, assignedTo), ], }); ``` ## Workflow Definition ### Create Approval Workflow ```typescript async function createApprovalWorkflow(): Promise> { return store.transaction(async (tx) => { // Create workflow definition const workflow = await tx.nodes.WorkflowDefinition.create({ name: "Purchase Order Approval", description: "Multi-level approval for purchase orders", version: 1, isActive: true, }); // Create states const states = { draft: await tx.nodes.State.create({ name: "Draft", type: "initial", }), pendingManagerApproval: await tx.nodes.State.create({ name: "Pending Manager Approval", type: "approval", config: { approverRole: "manager", timeout: "48h" }, }), pendingFinanceApproval: await tx.nodes.State.create({ name: "Pending Finance Approval", type: "approval", config: { approverRole: "finance", timeout: "24h" }, }), approved: await tx.nodes.State.create({ name: "Approved", type: "terminal", }), rejected: await tx.nodes.State.create({ name: "Rejected", type: "terminal", }), }; // Link states to workflow for (const state of Object.values(states)) { await tx.edges.hasState.create(workflow, state, {}); } // Create transitions const transitions = [ { from: states.draft, to: states.pendingManagerApproval, name: "Submit", requiredRole: "requester", }, { from: states.pendingManagerApproval, to: states.pendingFinanceApproval, name: "Approve", requiredRole: "manager", condition: "amount > 1000", }, { from: states.pendingManagerApproval, to: states.approved, name: "Approve", requiredRole: "manager", condition: "amount <= 1000", }, { from: states.pendingManagerApproval, to: states.rejected, name: "Reject", requiredRole: "manager", }, { from: states.pendingFinanceApproval, to: states.approved, name: "Approve", requiredRole: "finance", }, { from: states.pendingFinanceApproval, to: states.rejected, name: "Reject", requiredRole: "finance", }, ]; for (const t of transitions) { const transition = await tx.nodes.Transition.create({ name: t.name, requiredRole: t.requiredRole, condition: t.condition, }); await tx.edges.hasTransition.create(workflow, transition, {}); await tx.edges.fromState.create(transition, t.from, {}); await tx.edges.toState.create(transition, t.to, {}); } return workflow; }); } ``` ## Workflow Instances ### Start Workflow ```typescript interface StartWorkflowInput { workflowName: string; referenceId: string; referenceType: string; data?: Record; createdByUserId: string; } async function startWorkflow(input: StartWorkflowInput): Promise> { return store.transaction(async (tx) => { // Find workflow definition const workflow = await tx .query() .from("WorkflowDefinition", "w") .whereNode("w", (w) => w.name.eq(input.workflowName).and(w.isActive.eq(true))) .select((ctx) => ctx.w) .first(); if (!workflow) { throw new Error(`Workflow '${input.workflowName}' not found`); } // Find initial state const initialState = await tx .query() .from("WorkflowDefinition", "w") .whereNode("w", (w) => w.id.eq(workflow.id)) .traverse("hasState", "e") .to("State", "s") .whereNode("s", (s) => s.type.eq("initial")) .select((ctx) => ctx.s) .first(); if (!initialState) { throw new Error("Workflow has no initial state"); } // Create instance const instance = await tx.nodes.WorkflowInstance.create({ referenceId: input.referenceId, referenceType: input.referenceType, status: "active", data: input.data, createdAt: new Date().toISOString(), }); // Link to definition and state await tx.edges.usesDefinition.create(instance, workflow, {}); await tx.edges.currentState.create(instance, initialState, {}); // Link to creator const creator = await tx.nodes.User.getById(input.createdByUserId); if (!creator) throw new Error(`User not found: ${input.createdByUserId}`); await tx.edges.createdBy.create(instance, creator, {}); return instance; }); } ``` ### Get Available Transitions ```typescript interface AvailableTransition { id: string; name: string; targetState: string; requiredRole?: string; condition?: string; } async function getAvailableTransitions( instanceId: string, userId: string ): Promise { // Get user's role const user = await store.nodes.User.getById(userId); if (!user) throw new Error(`User not found: ${userId}`); const userRole = user.role; // Get current state const currentState = await store .query() .from("WorkflowInstance", "i") .whereNode("i", (i) => i.id.eq(instanceId)) .traverse("currentState", "e") .to("State", "s") .select((ctx) => ctx.s) .first(); if (!currentState) { throw new Error("Instance has no current state"); } // Get transitions from current state const transitions = await store .query() .from("State", "s") .whereNode("s", (s) => s.id.eq(currentState.id)) .traverse("fromState", "e1", { direction: "in" }) .to("Transition", "t") .traverse("toState", "e2") .to("State", "target") .select((ctx) => ({ id: ctx.t.id, name: ctx.t.name, targetState: ctx.target.name, requiredRole: ctx.t.requiredRole, condition: ctx.t.condition, })) .execute(); // Filter by role return transitions.filter( (t) => !t.requiredRole || t.requiredRole === userRole || userRole === "admin" ); } ``` ### Execute Transition ```typescript async function executeTransition( instanceId: string, transitionId: string, userId: string, result?: Record ): Promise { await store.transaction(async (tx) => { const instance = await tx.nodes.WorkflowInstance.getById(instanceId); if (!instance) throw new Error(`WorkflowInstance not found: ${instanceId}`); if (instance.status !== "active") { throw new Error("Workflow is not active"); } // Verify transition is valid const available = await getAvailableTransitions(instanceId, userId); const transition = available.find((t) => t.id === transitionId); if (!transition) { throw new Error("Transition not available"); } // Get target state const targetState = await tx .query() .from("Transition", "t") .whereNode("t", (t) => t.id.eq(transitionId)) .traverse("toState", "e") .to("State", "s") .select((ctx) => ctx.s) .first(); // Remove current state edge const currentStateEdge = await tx .query() .from("WorkflowInstance", "i") .whereNode("i", (i) => i.id.eq(instanceId)) .traverse("currentState", "e") .to("State", "s") .select((ctx) => ctx.e.id) .first(); if (currentStateEdge) { await tx.edges.currentState.delete(currentStateEdge); } // Add new state edge await tx.edges.currentState.create(instance, targetState!, {}); // Update instance data const updatedData = { ...instance.data, lastTransition: transition.name, ...result }; const updates: Partial = { data: updatedData }; // Check if terminal state if (targetState!.type === "terminal") { updates.status = "completed"; updates.completedAt = new Date().toISOString(); } await tx.nodes.WorkflowInstance.update(instanceId, updates); // Complete any pending tasks const pendingTasks = await tx .query() .from("WorkflowInstance", "i") .whereNode("i", (i) => i.id.eq(instanceId)) .traverse("hasTask", "e") .to("Task", "t") .whereNode("t", (t) => t.status.in(["pending", "in_progress"])) .select((ctx) => ctx.t.id) .execute(); for (const taskId of pendingTasks) { await tx.nodes.Task.update(taskId, { status: "completed", completedAt: new Date().toISOString(), }); } // Create tasks for new state if needed if (targetState!.type === "approval") { await createApprovalTask(tx, instanceId, targetState!, userId); } }); } ``` ## Task Management ### Create Approval Task ```typescript async function createApprovalTask( tx: Transaction, instanceId: string, state: Node, requesterId: string ): Promise { const config = state.config as { approverRole: string; timeout: string } | undefined; if (!config) return; // Find approver (first user with matching role, or requester's manager) let approver = await tx .query() .from("User", "u") .whereNode("u", (u) => u.role.eq(config.approverRole)) .select((ctx) => ctx.u) .first(); // If no direct match, find in reporting chain if (!approver) { approver = await tx .query() .from("User", "requester") .whereNode("requester", (u) => u.id.eq(requesterId)) .traverse("reportsTo", "e") .recursive() .to("User", "manager") .whereNode("manager", (u) => u.role.eq(config.approverRole)) .select((ctx) => ctx.manager) .first(); } if (!approver) { throw new Error(`No approver found with role '${config.approverRole}'`); } // Calculate due date const dueDate = calculateDueDate(config.timeout); // Create task const task = await tx.nodes.Task.create({ title: `Approval Required: ${state.name}`, description: `Please review and approve or reject.`, type: "approval", status: "pending", priority: "medium", dueDate: dueDate.toISOString(), }); // Link task to instance and approver const instance = await tx.nodes.WorkflowInstance.getById(instanceId); if (!instance) throw new Error(`WorkflowInstance not found: ${instanceId}`); await tx.edges.hasTask.create(instance, task, {}); await tx.edges.assignedTo.create(task, approver, {}); } function calculateDueDate(timeout: string): Date { const now = new Date(); const match = timeout.match(/^(\d+)(h|d)$/); if (!match) return new Date(now.getTime() + 24 * 60 * 60 * 1000); // Default 24h const value = parseInt(match[1], 10); const unit = match[2]; if (unit === "h") { return new Date(now.getTime() + value * 60 * 60 * 1000); } else { return new Date(now.getTime() + value * 24 * 60 * 60 * 1000); } } ``` ### Get User's Tasks ```typescript interface TaskWithContext { id: string; title: string; type: string; status: string; priority: string; dueDate?: string; workflowName: string; referenceId: string; referenceType: string; } async function getUserTasks( userId: string, status?: "pending" | "in_progress" ): Promise { let query = store .query() .from("User", "u") .whereNode("u", (u) => u.id.eq(userId)) .traverse("assignedTo", "e", { direction: "in" }) .to("Task", "t"); if (status) { query = query.whereNode("t", (t) => t.status.eq(status)); } else { query = query.whereNode("t", (t) => t.status.in(["pending", "in_progress"])); } return query .traverse("hasTask", "e2", { direction: "in" }) .to("WorkflowInstance", "i") .traverse("usesDefinition", "e3") .to("WorkflowDefinition", "w") .select((ctx) => ({ id: ctx.t.id, title: ctx.t.title, type: ctx.t.type, status: ctx.t.status, priority: ctx.t.priority, dueDate: ctx.t.dueDate, workflowName: ctx.w.name, referenceId: ctx.i.referenceId, referenceType: ctx.i.referenceType, })) .orderBy((ctx) => ctx.t.dueDate, "asc") .execute(); } ``` ### Complete Task ```typescript async function completeTask( taskId: string, userId: string, decision: "approve" | "reject", comment?: string ): Promise { await store.transaction(async (tx) => { const task = await tx.nodes.Task.getById(taskId); if (!task) throw new Error(`Task not found: ${taskId}`); // Verify user is assigned const assignee = await tx .query() .from("Task", "t") .whereNode("t", (t) => t.id.eq(taskId)) .traverse("assignedTo", "e") .to("User", "u") .select((ctx) => ctx.u.id) .first(); if (assignee !== userId) { throw new Error("User is not assigned to this task"); } // Update task await tx.nodes.Task.update(taskId, { status: decision === "approve" ? "completed" : "rejected", completedAt: new Date().toISOString(), result: { decision }, }); // Add comment if provided if (comment) { const commentNode = await tx.nodes.Comment.create({ content: comment, createdAt: new Date().toISOString(), }); await tx.edges.hasComment.create(task, commentNode, {}); const user = await tx.nodes.User.getById(userId); if (!user) throw new Error(`User not found: ${userId}`); await tx.edges.createdBy.create(commentNode, user, {}); } // Get workflow instance const instance = await tx .query() .from("Task", "t") .whereNode("t", (t) => t.id.eq(taskId)) .traverse("hasTask", "e", { direction: "in" }) .to("WorkflowInstance", "i") .select((ctx) => ctx.i) .first(); // Find and execute the appropriate transition const transitions = await getAvailableTransitions(instance!.id, userId); const transition = transitions.find((t) => decision === "approve" ? t.name === "Approve" : t.name === "Reject" ); if (transition) { await executeTransition(instance!.id, transition.id, userId, { decision }); } }); } ``` ## Escalation ### Check Overdue Tasks ```typescript async function getOverdueTasks(): Promise> { const now = new Date().toISOString(); return store .query() .from("Task", "t") .whereNode("t", (t) => t.status .in(["pending", "in_progress"]) .and(t.dueDate.isNotNull()) .and(t.dueDate.lt(now)) ) .traverse("assignedTo", "e") .to("User", "u") .select((ctx) => ({ task: ctx.t, assignee: ctx.u, })) .execute(); } ``` ### Escalate Task ```typescript async function escalateTask(taskId: string): Promise { await store.transaction(async (tx) => { // Get current assignee const currentAssignment = await tx .query() .from("Task", "t") .whereNode("t", (t) => t.id.eq(taskId)) .traverse("assignedTo", "e") .to("User", "u") .select((ctx) => ({ edgeId: ctx.e.id, user: ctx.u })) .first(); if (!currentAssignment) { throw new Error("Task has no assignee"); } // Find manager in reporting chain const manager = await tx .query() .from("User", "u") .whereNode("u", (u) => u.id.eq(currentAssignment.user.id)) .traverse("reportsTo", "e") .to("User", "manager") .select((ctx) => ctx.manager) .first(); if (!manager) { throw new Error("No manager found for escalation"); } // Update task await tx.nodes.Task.update(taskId, { status: "escalated", priority: "urgent", }); // Reassign to manager await tx.edges.assignedTo.delete(currentAssignment.edgeId); const task = await tx.nodes.Task.getById(taskId); if (!task) throw new Error(`Task not found: ${taskId}`); await tx.edges.assignedTo.create(task, manager, {}); // Add escalation comment const comment = await tx.nodes.Comment.create({ content: `Task escalated from ${currentAssignment.user.name} due to timeout`, createdAt: new Date().toISOString(), }); await tx.edges.hasComment.create(task, comment, {}); }); } ``` ### Run Escalation Job ```typescript async function runEscalationJob(): Promise<{ escalated: number }> { const overdueTasks = await getOverdueTasks(); let escalated = 0; for (const { task } of overdueTasks) { try { await escalateTask(task.id); escalated++; } catch (error) { console.error(`Failed to escalate task ${task.id}:`, error); } } return { escalated }; } ``` ## Workflow History ### Get Instance Timeline ```typescript interface TimelineEvent { timestamp: string; type: "state_change" | "task_created" | "task_completed" | "comment"; description: string; actor?: string; } async function getInstanceTimeline(instanceId: string): Promise { const events: TimelineEvent[] = []; // Get state change history using temporal queries const stateHistory = await store .query() .from("WorkflowInstance", "i") .temporal("includeEnded") .whereNode("i", (i) => i.id.eq(instanceId)) .traverse("currentState", "e") .to("State", "s") .orderBy((ctx) => ctx.e.validFrom, "asc") .select((ctx) => ({ stateName: ctx.s.name, timestamp: ctx.e.validFrom, })) .execute(); for (const state of stateHistory) { events.push({ timestamp: state.timestamp, type: "state_change", description: `Entered state: ${state.stateName}`, }); } // Get task events const tasks = await store .query() .from("WorkflowInstance", "i") .whereNode("i", (i) => i.id.eq(instanceId)) .traverse("hasTask", "e") .to("Task", "t") .optionalTraverse("assignedTo", "a") .to("User", "u") .select((ctx) => ({ title: ctx.t.title, status: ctx.t.status, createdAt: ctx.t.createdAt, completedAt: ctx.t.completedAt, assignee: ctx.u?.name, })) .execute(); for (const task of tasks) { events.push({ timestamp: task.createdAt.toISOString(), type: "task_created", description: `Task created: ${task.title}`, actor: task.assignee, }); if (task.completedAt) { events.push({ timestamp: task.completedAt, type: "task_completed", description: `Task ${task.status}: ${task.title}`, actor: task.assignee, }); } } // Sort by timestamp return events.sort((a, b) => a.timestamp.localeCompare(b.timestamp)); } ``` ## Next Steps - [Document Management](/examples/document-management) - CMS with semantic search - [Product Catalog](/examples/product-catalog) - Categories, variants, inventory - [Audit Trail](/examples/audit-trail) - Complete change tracking