Examples: Complete application examples demonstrating TypeGraph patterns
# Audit Trail
> Complete change tracking with user attribution and diff generation
This example shows how to build a comprehensive audit system that:
- **Tracks all changes** using TypeGraph's temporal model
- **Attributes changes** to users and sessions
- **Generates diffs** between versions
- **Supports compliance queries** (who changed what, when)
- **Exports audit logs** for external systems
## How TypeGraph Enables Auditing
TypeGraph's temporal model provides built-in auditing capabilities:
1. **Every update creates a new version** - Old data is preserved with `valid_to` timestamp
2. **Temporal queries** - Query any point in time with `asOf` or get full history with `includeEnded`
3. **Metadata fields** - `createdAt`, `updatedAt`, `version` are tracked automatically
This example extends the built-in capabilities with:
- User attribution (who made the change)
- Change descriptions (why the change was made)
- Structured diffs (what exactly changed)
## Schema Definition
```typescript
import { z } from "zod";
import { defineNode, defineEdge, defineGraph } from "@nicia-ai/typegraph";
// Audited entity (example: Settings)
const Setting = defineNode("Setting", {
schema: z.object({
key: z.string(),
value: z.string(),
category: z.string(),
description: z.string().optional(),
}),
});
// Users making changes
const User = defineNode("User", {
schema: z.object({
email: z.string().email(),
name: z.string(),
role: z.enum(["admin", "editor", "viewer"]),
}),
});
// Explicit audit log entries (for cross-cutting concerns)
const AuditEntry = defineNode("AuditEntry", {
schema: z.object({
entityType: z.string(),
entityId: z.string(),
action: z.enum(["create", "update", "delete", "restore"]),
timestamp: z.string().datetime(),
changes: z.record(z.object({
before: z.unknown().optional(),
after: z.unknown().optional(),
})).optional(),
reason: z.string().optional(),
ipAddress: z.string().optional(),
userAgent: z.string().optional(),
}),
});
// Sessions for grouping changes
const Session = defineNode("Session", {
schema: z.object({
startedAt: z.string().datetime(),
endedAt: z.string().datetime().optional(),
ipAddress: z.string().optional(),
userAgent: z.string().optional(),
}),
});
// Edges
const performedBy = defineEdge("performedBy");
const inSession = defineEdge("inSession");
const hasSession = defineEdge("hasSession");
const graph = defineGraph({
id: "audit_trail",
nodes: {
Setting: { type: Setting },
User: { type: User },
AuditEntry: { type: AuditEntry },
Session: { type: Session },
},
edges: {
performedBy: { type: performedBy, from: [AuditEntry], to: [User] },
inSession: { type: inSession, from: [AuditEntry], to: [Session] },
hasSession: { type: hasSession, from: [User], to: [Session] },
},
});
```
## Audit Context
Create a context object to track the current user and session:
```typescript
interface AuditContext {
userId: string;
sessionId?: string;
ipAddress?: string;
userAgent?: string;
reason?: string;
}
// Thread-local storage for audit context (Node.js)
import { AsyncLocalStorage } from "async_hooks";
const auditContext = new AsyncLocalStorage();
function withAuditContext(context: AuditContext, fn: () => Promise): Promise {
return auditContext.run(context, fn);
}
function getAuditContext(): AuditContext | undefined {
return auditContext.getStore();
}
```
## Audited Operations
### Create with Audit
```typescript
async function createSetting(
key: string,
value: string,
category: string
): Promise> {
const ctx = getAuditContext();
if (!ctx) throw new Error("Audit context required");
return store.transaction(async (tx) => {
// Create the setting
const setting = await tx.nodes.Setting.create({
key,
value,
category,
});
// Create audit entry
await createAuditEntry(tx, {
entityType: "Setting",
entityId: setting.id,
action: "create",
changes: {
key: { after: key },
value: { after: value },
category: { after: category },
},
});
return setting;
});
}
```
### Update with Audit
```typescript
async function updateSetting(
id: string,
updates: Partial<{ value: string; description: string }>
): Promise> {
const ctx = getAuditContext();
if (!ctx) throw new Error("Audit context required");
return store.transaction(async (tx) => {
// Get current state
const current = await tx.nodes.Setting.getById(id);
if (!current) throw new Error(`Setting not found: ${id}`);
// Calculate changes
const changes: Record = {};
for (const [key, newValue] of Object.entries(updates)) {
const oldValue = current[key as keyof typeof current];
if (oldValue !== newValue) {
changes[key] = { before: oldValue, after: newValue };
}
}
// Skip if no actual changes
if (Object.keys(changes).length === 0) {
return current;
}
// Update the setting
const updated = await tx.nodes.Setting.update(id, updates);
// Create audit entry
await createAuditEntry(tx, {
entityType: "Setting",
entityId: id,
action: "update",
changes,
});
return updated;
});
}
```
### Delete with Audit
```typescript
async function deleteSetting(id: string): Promise {
const ctx = getAuditContext();
if (!ctx) throw new Error("Audit context required");
await store.transaction(async (tx) => {
// Get current state for audit
const current = await tx.nodes.Setting.getById(id);
if (!current) throw new Error(`Setting not found: ${id}`);
// Delete (soft delete)
await tx.nodes.Setting.delete(id);
// Create audit entry
await createAuditEntry(tx, {
entityType: "Setting",
entityId: id,
action: "delete",
changes: {
key: { before: current.key },
value: { before: current.value },
category: { before: current.category },
},
});
});
}
```
### Create Audit Entry
```typescript
interface AuditEntryInput {
entityType: string;
entityId: string;
action: "create" | "update" | "delete" | "restore";
changes?: Record;
}
async function createAuditEntry(
tx: Transaction,
input: AuditEntryInput
): Promise> {
const ctx = getAuditContext()!;
const entry = await tx.nodes.AuditEntry.create({
entityType: input.entityType,
entityId: input.entityId,
action: input.action,
timestamp: new Date().toISOString(),
changes: input.changes,
reason: ctx.reason,
ipAddress: ctx.ipAddress,
userAgent: ctx.userAgent,
});
// Link to user
const user = await tx.nodes.User.getById(ctx.userId);
if (!user) throw new Error(`User not found: ${ctx.userId}`);
await tx.edges.performedBy.create(entry, user, {});
// Link to session if present
if (ctx.sessionId) {
const session = await tx.nodes.Session.getById(ctx.sessionId);
if (!session) throw new Error(`Session not found: ${ctx.sessionId}`);
await tx.edges.inSession.create(entry, session, {});
}
return entry;
}
```
## Querying Audit History
### Get Entity History
```typescript
interface HistoryEntry {
version: number;
timestamp: string;
action: string;
changes?: Record;
user: { name: string; email: string };
reason?: string;
}
async function getEntityHistory(
entityType: string,
entityId: string
): Promise {
return store
.query()
.from("AuditEntry", "a")
.whereNode("a", (a) =>
a.entityType.eq(entityType).and(a.entityId.eq(entityId))
)
.traverse("performedBy", "e")
.to("User", "u")
.orderBy((ctx) => ctx.a.timestamp, "desc")
.select((ctx) => ({
version: ctx.a.version,
timestamp: ctx.a.timestamp,
action: ctx.a.action,
changes: ctx.a.changes,
user: {
name: ctx.u.name,
email: ctx.u.email,
},
reason: ctx.a.reason,
}))
.execute();
}
```
### Get User Activity
```typescript
interface UserActivity {
timestamp: string;
entityType: string;
entityId: string;
action: string;
}
async function getUserActivity(
userId: string,
options: { since?: Date; limit?: number } = {}
): Promise {
const { since, limit = 100 } = options;
let query = store
.query()
.from("User", "u")
.whereNode("u", (u) => u.id.eq(userId))
.traverse("performedBy", "e", { direction: "in" })
.to("AuditEntry", "a");
if (since) {
query = query.whereNode("a", (a) => a.timestamp.gte(since.toISOString()));
}
return query
.orderBy((ctx) => ctx.a.timestamp, "desc")
.limit(limit)
.select((ctx) => ({
timestamp: ctx.a.timestamp,
entityType: ctx.a.entityType,
entityId: ctx.a.entityId,
action: ctx.a.action,
}))
.execute();
}
```
### Changes in Time Range
```typescript
interface ChangeReport {
entityType: string;
entityId: string;
changeCount: number;
users: string[];
lastChange: string;
}
async function getChangesInRange(
startDate: Date,
endDate: Date
): Promise {
const entries = await store
.query()
.from("AuditEntry", "a")
.whereNode("a", (a) =>
a.timestamp.gte(startDate.toISOString()).and(
a.timestamp.lte(endDate.toISOString())
)
)
.traverse("performedBy", "e")
.to("User", "u")
.select((ctx) => ({
entityType: ctx.a.entityType,
entityId: ctx.a.entityId,
timestamp: ctx.a.timestamp,
userName: ctx.u.name,
}))
.execute();
// Group by entity
const grouped = new Map();
for (const entry of entries) {
const key = `${entry.entityType}:${entry.entityId}`;
const existing = grouped.get(key);
if (existing) {
existing.changeCount++;
if (!existing.users.includes(entry.userName)) {
existing.users.push(entry.userName);
}
if (entry.timestamp > existing.lastChange) {
existing.lastChange = entry.timestamp;
}
} else {
grouped.set(key, {
entityType: entry.entityType,
entityId: entry.entityId,
changeCount: 1,
users: [entry.userName],
lastChange: entry.timestamp,
});
}
}
return Array.from(grouped.values());
}
```
## Using TypeGraph's Built-in Temporal Features
### View Entity at Point in Time
```typescript
async function getSettingAsOf(
id: string,
timestamp: Date
): Promise {
return store
.query()
.from("Setting", "s")
.temporal("asOf", timestamp.toISOString())
.whereNode("s", (s) => s.id.eq(id))
.select((ctx) => ctx.s)
.first();
}
```
### Get All Versions
```typescript
interface SettingVersion {
props: SettingProps;
validFrom: string;
validTo: string | undefined;
version: number;
}
async function getSettingVersions(id: string): Promise {
return store
.query()
.from("Setting", "s")
.temporal("includeEnded")
.whereNode("s", (s) => s.id.eq(id))
.orderBy((ctx) => ctx.s.validFrom, "desc")
.select((ctx) => ({
props: ctx.s,
validFrom: ctx.s.validFrom,
validTo: ctx.s.validTo,
version: ctx.s.version,
}))
.execute();
}
```
### Compare Versions
```typescript
interface VersionDiff {
field: string;
before: unknown;
after: unknown;
}
async function compareVersions(
id: string,
version1: number,
version2: number
): Promise {
const versions = await store
.query()
.from("Setting", "s")
.temporal("includeEnded")
.whereNode("s", (s) => s.id.eq(id).and(s.version.in([version1, version2])))
.orderBy((ctx) => ctx.s.version, "asc")
.select((ctx) => ctx.s)
.execute();
if (versions.length !== 2) {
throw new Error("Versions not found");
}
const [before, after] = versions;
const diffs: VersionDiff[] = [];
const allKeys = new Set([...Object.keys(before), ...Object.keys(after)]);
for (const key of allKeys) {
const beforeVal = before[key as keyof typeof before];
const afterVal = after[key as keyof typeof after];
if (JSON.stringify(beforeVal) !== JSON.stringify(afterVal)) {
diffs.push({ field: key, before: beforeVal, after: afterVal });
}
}
return diffs;
}
```
## Session Management
### Start Session
```typescript
async function startSession(
userId: string,
metadata: { ipAddress?: string; userAgent?: string }
): Promise> {
return store.transaction(async (tx) => {
const session = await tx.nodes.Session.create({
startedAt: new Date().toISOString(),
ipAddress: metadata.ipAddress,
userAgent: metadata.userAgent,
});
const user = await tx.nodes.User.getById(userId);
if (!user) throw new Error(`User not found: ${userId}`);
await tx.edges.hasSession.create(user, session, {});
return session;
});
}
```
### End Session
```typescript
async function endSession(sessionId: string): Promise {
await store.nodes.Session.update(sessionId, {
endedAt: new Date().toISOString(),
});
}
```
### Get Session Activity
```typescript
async function getSessionActivity(
sessionId: string
): Promise> {
return store
.query()
.from("Session", "s")
.whereNode("s", (s) => s.id.eq(sessionId))
.traverse("inSession", "e", { direction: "in" })
.to("AuditEntry", "a")
.orderBy((ctx) => ctx.a.timestamp, "asc")
.select((ctx) => ({
timestamp: ctx.a.timestamp,
action: ctx.a.action,
entityType: ctx.a.entityType,
}))
.execute();
}
```
## Compliance Queries
### Who Changed This?
```typescript
async function whoChanged(
entityType: string,
entityId: string,
field: string
): Promise> {
const entries = await store
.query()
.from("AuditEntry", "a")
.whereNode("a", (a) =>
a.entityType.eq(entityType).and(a.entityId.eq(entityId))
)
.traverse("performedBy", "e")
.to("User", "u")
.orderBy((ctx) => ctx.a.timestamp, "desc")
.select((ctx) => ({
changes: ctx.a.changes,
user: ctx.u.name,
timestamp: ctx.a.timestamp,
}))
.execute();
return entries
.filter((e) => e.changes && field in e.changes)
.map((e) => ({
user: e.user,
timestamp: e.timestamp,
before: e.changes![field].before,
after: e.changes![field].after,
}));
}
```
### When Was This Value Set?
```typescript
async function whenWasValueSet(
entityType: string,
entityId: string,
field: string,
value: unknown
): Promise<{ timestamp: string; user: string } | undefined> {
const entries = await store
.query()
.from("AuditEntry", "a")
.whereNode("a", (a) =>
a.entityType.eq(entityType).and(a.entityId.eq(entityId))
)
.traverse("performedBy", "e")
.to("User", "u")
.orderBy((ctx) => ctx.a.timestamp, "asc")
.select((ctx) => ({
changes: ctx.a.changes,
user: ctx.u.name,
timestamp: ctx.a.timestamp,
}))
.execute();
const entry = entries.find(
(e) => e.changes && field in e.changes && e.changes[field].after === value
);
return entry ? { timestamp: entry.timestamp, user: entry.user } : undefined;
}
```
## Export Audit Logs
### Stream to External System
```typescript
async function* exportAuditLogs(
since: Date,
batchSize = 1000
): AsyncGenerator {
const stream = store
.query()
.from("AuditEntry", "a")
.whereNode("a", (a) => a.timestamp.gte(since.toISOString()))
.traverse("performedBy", "e")
.to("User", "u")
.orderBy((ctx) => ctx.a.timestamp, "asc")
.select((ctx) => ({
...ctx.a,
performedBy: ctx.u.email,
}))
.stream({ batchSize });
let batch: AuditEntryProps[] = [];
for await (const entry of stream) {
batch.push(entry);
if (batch.length >= batchSize) {
yield batch;
batch = [];
}
}
if (batch.length > 0) {
yield batch;
}
}
// Usage
async function syncToExternalAuditSystem(since: Date): Promise {
for await (const batch of exportAuditLogs(since)) {
await externalAuditApi.ingestBatch(batch);
}
}
```
## Next Steps
- [Document Management](/examples/document-management) - CMS with semantic search
- [Product Catalog](/examples/product-catalog) - Categories, variants, inventory
- [Workflow Engine](/examples/workflow-engine) - State machines with approvals
# Document Management System
> A complete CMS example with semantic search, versioning, and access control
This example builds a document management system with:
- **Document hierarchy** (folders, documents, sections)
- **Semantic search** with vector embeddings
- **Version history** using temporal queries
- **Access control** with permission inheritance
- **Related documents** discovery
## Schema Definition
```typescript
import { z } from "zod";
import {
defineNode,
defineEdge,
defineGraph,
embedding,
subClassOf,
partOf,
hasPart,
} from "@nicia-ai/typegraph";
// Base content type (abstract)
const Content = defineNode("Content", {
schema: z.object({
title: z.string(),
createdBy: z.string(),
status: z.enum(["draft", "published", "archived"]).default("draft"),
}),
});
// Folder extends Content
const Folder = defineNode("Folder", {
schema: z.object({
title: z.string(),
createdBy: z.string(),
status: z.enum(["draft", "published", "archived"]).default("draft"),
path: z.string(), // /engineering/specs
}),
});
// Document extends Content
const Document = defineNode("Document", {
schema: z.object({
title: z.string(),
content: z.string(),
createdBy: z.string(),
status: z.enum(["draft", "published", "archived"]).default("draft"),
contentType: z.enum(["markdown", "html", "plaintext"]).default("markdown"),
embedding: embedding(1536).optional(),
}),
});
// Users and permissions
const User = defineNode("User", {
schema: z.object({
email: z.string().email(),
name: z.string(),
role: z.enum(["admin", "editor", "viewer"]).default("viewer"),
}),
});
const Permission = defineNode("Permission", {
schema: z.object({
level: z.enum(["read", "write", "admin"]),
}),
});
// Edges
const contains = defineEdge("contains");
const relatedTo = defineEdge("relatedTo", {
schema: z.object({
type: z.enum(["references", "supersedes", "related"]),
confidence: z.number().min(0).max(1).optional(),
}),
});
const hasPermission = defineEdge("hasPermission");
const createdBy = defineEdge("createdBy");
// Graph definition
const graph = defineGraph({
id: "document_management",
nodes: {
Content: { type: Content },
Folder: { type: Folder },
Document: { type: Document },
User: { type: User },
Permission: { type: Permission },
},
edges: {
contains: { type: contains, from: [Folder], to: [Folder, Document] },
relatedTo: { type: relatedTo, from: [Document], to: [Document] },
hasPermission: { type: hasPermission, from: [User], to: [Content] },
createdBy: { type: createdBy, from: [Content], to: [User] },
},
ontology: [
// Type hierarchy
subClassOf(Folder, Content),
subClassOf(Document, Content),
// Compositional relationships
partOf(Document, Folder),
hasPart(Folder, Document),
],
});
```
## Database Setup
```typescript
import Database from "better-sqlite3";
import { drizzle } from "drizzle-orm/better-sqlite3";
import * as sqliteVec from "sqlite-vec";
import { createSqliteBackend, generateSqliteMigrationSQL } from "@nicia-ai/typegraph/sqlite";
import { createStore } from "@nicia-ai/typegraph";
// Initialize database with vector extension
const sqlite = new Database("documents.db");
sqliteVec.load(sqlite);
sqlite.exec(generateSqliteMigrationSQL());
const db = drizzle(sqlite);
const backend = createSqliteBackend(db);
const store = createStore(graph, backend);
```
## Core Operations
### Creating Folder Structure
```typescript
async function createFolderPath(path: string, userId: string): Promise> {
const parts = path.split("/").filter(Boolean);
let currentPath = "";
let parentFolder: Node | undefined;
for (const part of parts) {
currentPath += `/${part}`;
// Check if folder exists
let folder = await store
.query()
.from("Folder", "f")
.whereNode("f", (f) => f.path.eq(currentPath))
.select((ctx) => ctx.f)
.first();
if (!folder) {
folder = await store.nodes.Folder.create({
title: part,
path: currentPath,
createdBy: userId,
status: "published",
});
if (parentFolder) {
await store.edges.contains.create(parentFolder, folder, {});
}
}
parentFolder = folder;
}
return parentFolder!;
}
```
### Creating Documents with Embeddings
```typescript
import OpenAI from "openai";
const openai = new OpenAI();
async function generateEmbedding(text: string): Promise {
const response = await openai.embeddings.create({
model: "text-embedding-ada-002",
input: text,
});
return response.data[0].embedding;
}
async function createDocument(
folderId: string,
title: string,
content: string,
userId: string
): Promise> {
const embedding = await generateEmbedding(`${title}\n\n${content}`);
const document = await store.nodes.Document.create({
title,
content,
createdBy: userId,
status: "draft",
contentType: "markdown",
embedding,
});
// Link to folder
const folder = await store.nodes.Folder.getById(folderId);
if (!folder) throw new Error(`Folder not found: ${folderId}`);
await store.edges.contains.create(folder, document, {});
// Link to creator
const user = await store.nodes.User.getById(userId);
if (!user) throw new Error(`User not found: ${userId}`);
await store.edges.createdBy.create(document, user, {});
return document;
}
```
### Updating Documents (Versioned)
```typescript
async function updateDocument(
documentId: string,
updates: { title?: string; content?: string; status?: "draft" | "published" | "archived" }
): Promise> {
const current = await store.nodes.Document.getById(documentId);
if (!current) throw new Error(`Document not found: ${documentId}`);
// If content changed, regenerate embedding
let embedding = current.embedding;
if (updates.content && updates.content !== current.content) {
const text = `${updates.title ?? current.title}\n\n${updates.content}`;
embedding = await generateEmbedding(text);
}
// Update creates a new version automatically
return store.nodes.Document.update(documentId, {
...updates,
embedding,
});
}
```
## Searching Documents
### Semantic Search
```typescript
async function searchDocuments(
query: string,
options: {
folderId?: string;
status?: "draft" | "published" | "archived";
limit?: number;
minScore?: number;
} = {}
): Promise> {
const { folderId, status = "published", limit = 10, minScore = 0.7 } = options;
const queryEmbedding = await generateEmbedding(query);
let queryBuilder = store
.query()
.from("Document", "d")
.whereNode("d", (d) => {
let pred = d.embedding.similarTo(queryEmbedding, limit, {
metric: "cosine",
minScore,
});
if (status) {
pred = pred.and(d.status.eq(status));
}
return pred;
});
// If folderId specified, filter to folder descendants
if (folderId) {
queryBuilder = store
.query()
.from("Folder", "f")
.whereNode("f", (f) => f.id.eq(folderId))
.traverse("contains", "e")
.recursive()
.to("Document", "d")
.whereNode("d", (d) =>
d.embedding
.similarTo(queryEmbedding, limit, { metric: "cosine", minScore })
.and(d.status.eq(status))
);
}
return queryBuilder
.select((ctx) => ({
document: ctx.d,
score: ctx.d.embedding.similarity(queryEmbedding),
}))
.execute();
}
```
### Find Related Documents
```typescript
async function findRelatedDocuments(
documentId: string,
limit = 5
): Promise> {
// First, get explicit relationships
const explicit = await store
.query()
.from("Document", "d")
.whereNode("d", (d) => d.id.eq(documentId))
.traverse("relatedTo", "e")
.to("Document", "related")
.select((ctx) => ({
document: ctx.related,
relationship: ctx.e.type,
}))
.execute();
// Then, find semantically similar documents
const source = await store.nodes.Document.getById(documentId);
if (!source) throw new Error(`Document not found: ${documentId}`);
if (!source.embedding) {
return explicit;
}
const similar = await store
.query()
.from("Document", "d")
.whereNode("d", (d) =>
d.embedding
.similarTo(source.embedding!, limit * 2, { metric: "cosine", minScore: 0.8 })
.and(d.id.neq(documentId))
)
.select((ctx) => ({
document: ctx.d,
relationship: "similar" as const,
}))
.limit(limit)
.execute();
return [...explicit, ...similar].slice(0, limit);
}
```
## Version History
### Get Document History
```typescript
interface DocumentVersion {
title: string;
content: string;
status: string;
validFrom: string;
validTo: string | undefined;
version: number;
}
async function getDocumentHistory(documentId: string): Promise {
return store
.query()
.from("Document", "d")
.temporal("includeEnded")
.whereNode("d", (d) => d.id.eq(documentId))
.orderBy((ctx) => ctx.d.validFrom, "desc")
.select((ctx) => ({
title: ctx.d.title,
content: ctx.d.content,
status: ctx.d.status,
validFrom: ctx.d.validFrom,
validTo: ctx.d.validTo,
version: ctx.d.version,
}))
.execute();
}
```
### View Document at Point in Time
```typescript
async function getDocumentAsOf(
documentId: string,
timestamp: Date
): Promise {
return store
.query()
.from("Document", "d")
.temporal("asOf", timestamp.toISOString())
.whereNode("d", (d) => d.id.eq(documentId))
.select((ctx) => ctx.d)
.first();
}
```
### Compare Versions
```typescript
async function compareVersions(
documentId: string,
version1: number,
version2: number
): Promise<{ before: DocumentProps; after: DocumentProps } | undefined> {
const versions = await store
.query()
.from("Document", "d")
.temporal("includeEnded")
.whereNode("d", (d) =>
d.id.eq(documentId).and(d.version.in([version1, version2]))
)
.orderBy((ctx) => ctx.d.version, "asc")
.select((ctx) => ctx.d)
.execute();
if (versions.length !== 2) return undefined;
return { before: versions[0], after: versions[1] };
}
```
## Access Control
### Check Read Permission
```typescript
async function canRead(userId: string, contentId: string): Promise {
// Check direct permission
const directPermission = await store
.query()
.from("User", "u")
.whereNode("u", (u) => u.id.eq(userId))
.traverse("hasPermission", "p")
.to("Content", "c", { includeSubClasses: true })
.whereNode("c", (c) => c.id.eq(contentId))
.first();
if (directPermission) return true;
// Check inherited permission (from parent folders)
const content = await store.nodes.Content.getById(contentId);
if (!content) return false;
// Walk up the folder tree checking permissions
const parentFolders = await store
.query()
.from("Folder", "f")
.traverse("contains", "e")
.recursive()
.to("Content", "c", { includeSubClasses: true })
.whereNode("c", (c) => c.id.eq(contentId))
.select((ctx) => ctx.f.id)
.execute();
for (const folderId of parentFolders) {
const folderPermission = await store
.query()
.from("User", "u")
.whereNode("u", (u) => u.id.eq(userId))
.traverse("hasPermission", "p")
.to("Folder", "f")
.whereNode("f", (f) => f.id.eq(folderId))
.first();
if (folderPermission) return true;
}
return false;
}
```
### Grant Permission
```typescript
async function grantPermission(
userId: string,
contentId: string,
level: "read" | "write" | "admin"
): Promise {
const user = await store.nodes.User.getById(userId);
if (!user) throw new Error(`User not found: ${userId}`);
const content = await store.nodes.Content.getById(contentId);
if (!content) throw new Error(`Content not found: ${contentId}`);
// Create permission node
const permission = await store.nodes.Permission.create({ level });
// Link user to content via permission
await store.edges.hasPermission.create(user, content, {});
}
```
## Folder Navigation
### Get Folder Contents
```typescript
interface FolderContents {
folders: Array<{ id: string; title: string; path: string }>;
documents: Array<{ id: string; title: string; status: string }>;
}
async function getFolderContents(folderId: string): Promise {
const folders = await store
.query()
.from("Folder", "parent")
.whereNode("parent", (f) => f.id.eq(folderId))
.traverse("contains", "e")
.to("Folder", "child")
.select((ctx) => ({
id: ctx.child.id,
title: ctx.child.title,
path: ctx.child.path,
}))
.execute();
const documents = await store
.query()
.from("Folder", "parent")
.whereNode("parent", (f) => f.id.eq(folderId))
.traverse("contains", "e")
.to("Document", "doc")
.select((ctx) => ({
id: ctx.doc.id,
title: ctx.doc.title,
status: ctx.doc.status,
}))
.execute();
return { folders, documents };
}
```
### Get Breadcrumb Path
```typescript
async function getBreadcrumb(
contentId: string
): Promise> {
return store
.query()
.from("Content", "c", { includeSubClasses: true })
.whereNode("c", (c) => c.id.eq(contentId))
.traverse("contains", "e", { direction: "in" })
.recursive()
.to("Folder", "ancestor")
.select((ctx) => ({
id: ctx.ancestor.id,
title: ctx.ancestor.title,
path: ctx.ancestor.path,
}))
.execute();
}
```
## Bulk Operations
### Move Document to Folder
```typescript
async function moveDocument(documentId: string, targetFolderId: string): Promise {
await store.transaction(async (tx) => {
// Remove from current folder
const currentEdge = await tx
.query()
.from("Folder", "f")
.traverse("contains", "e")
.to("Document", "d")
.whereNode("d", (d) => d.id.eq(documentId))
.select((ctx) => ctx.e.id)
.first();
if (currentEdge) {
await tx.edges.contains.delete(currentEdge);
}
// Add to new folder
const document = await tx.nodes.Document.getById(documentId);
if (!document) throw new Error(`Document not found: ${documentId}`);
const targetFolder = await tx.nodes.Folder.getById(targetFolderId);
if (!targetFolder) throw new Error(`Folder not found: ${targetFolderId}`);
await tx.edges.contains.create(targetFolder, document, {});
});
}
```
### Bulk Archive
```typescript
async function archiveFolder(folderId: string): Promise {
// Get all documents in folder and subfolders
const documents = await store
.query()
.from("Folder", "f")
.whereNode("f", (f) => f.id.eq(folderId))
.traverse("contains", "e")
.recursive()
.to("Document", "d")
.select((ctx) => ctx.d.id)
.execute();
// Archive each document
await store.transaction(async (tx) => {
for (const docId of documents) {
await tx.nodes.Document.update(docId, { status: "archived" });
}
});
return documents.length;
}
```
## Next Steps
- [Product Catalog](/examples/product-catalog) - Categories, variants, inventory
- [Workflow Engine](/examples/workflow-engine) - State machines with approvals
- [Audit Trail](/examples/audit-trail) - Complete change tracking
# Knowledge Graph for RAG
> Enhance retrieval with entity linking, relationship traversal, and multi-hop context
This example demonstrates how **graph structure enhances RAG** beyond vector similarity.
While [Semantic Search](/semantic-search) covers embedding basics, this guide focuses
on what graphs uniquely provide: entity disambiguation, relationship traversal,
and structured context that flat retrieval cannot offer.
## What Graphs Add to RAG
| Flat RAG | Graph RAG |
|----------|-----------|
| Returns similar chunks | Traverses to related entities and facts |
| Treats "Apple" the same everywhere | Disambiguates Apple Inc. vs. apple fruit |
| Context is unstructured text | Context includes structured relationships |
| Single-hop retrieval | Multi-hop reasoning across connections |
**Example**: For "What companies has Elon Musk founded?", flat RAG returns chunks
mentioning him. Graph RAG traverses from the "Elon Musk" entity through "founded"
edges to return structured company data—regardless of whether those facts appear
in the same chunk.
## Schema
```typescript
import { z } from "zod";
import { defineNode, defineEdge, defineGraph, embedding, inverseOf } from "@nicia-ai/typegraph";
// Source documents
const Document = defineNode("Document", {
schema: z.object({
title: z.string(),
source: z.string(),
}),
});
// Text chunks with embeddings
const Chunk = defineNode("Chunk", {
schema: z.object({
text: z.string(),
embedding: embedding(1536),
position: z.number().int(),
}),
});
// Extracted entities
const Entity = defineNode("Entity", {
schema: z.object({
name: z.string(),
type: z.enum(["person", "organization", "location", "concept", "product", "event"]),
description: z.string().optional(),
embedding: embedding(1536).optional(),
}),
});
// Edges
const containsChunk = defineEdge("containsChunk");
const nextChunk = defineEdge("nextChunk");
const prevChunk = defineEdge("prevChunk");
const mentions = defineEdge("mentions", {
schema: z.object({
confidence: z.number().min(0).max(1).optional(),
}),
});
const relatesTo = defineEdge("relatesTo", {
schema: z.object({
relationship: z.string(), // "founded", "works_at", "located_in"
}),
});
export const graph = defineGraph({
id: "rag_graph",
nodes: {
Document: { type: Document },
Chunk: { type: Chunk },
Entity: {
type: Entity,
unique: [
{
name: "entity_name_type",
fields: ["name", "type"],
scope: "kind",
collation: "caseInsensitive",
},
],
},
},
edges: {
containsChunk: { type: containsChunk, from: [Document], to: [Chunk] },
nextChunk: { type: nextChunk, from: [Chunk], to: [Chunk] },
prevChunk: { type: prevChunk, from: [Chunk], to: [Chunk] },
mentions: { type: mentions, from: [Chunk], to: [Entity] },
relatesTo: { type: relatesTo, from: [Entity], to: [Entity] },
},
ontology: [inverseOf(nextChunk, prevChunk)],
});
```
## Embedding Setup
Using [Vercel AI SDK](https://ai-sdk.dev/docs/ai-sdk-core/embeddings):
```typescript
import { embed, embedMany } from "ai";
import { openai } from "@ai-sdk/openai";
const embeddingModel = openai.embeddingModel("text-embedding-3-small");
async function generateEmbedding(text: string): Promise {
const { embedding } = await embed({ model: embeddingModel, value: text });
return embedding;
}
async function generateEmbeddings(texts: string[]): Promise {
const { embeddings } = await embedMany({ model: embeddingModel, values: texts });
return embeddings;
}
```
## Ingestion with Entity Linking
The key graph RAG capability: linking chunks to disambiguated entities.
```typescript
import type { Node } from "@nicia-ai/typegraph";
interface ChunkData {
text: string;
entities: Array<{
name: string;
type: "person" | "organization" | "location" | "concept" | "product" | "event";
}>;
}
async function ingestDocument(
title: string,
source: string,
chunks: ChunkData[]
): Promise {
await store.transaction(async (tx) => {
const doc = await tx.nodes.Document.create({ title, source });
// Batch embed all chunks
const chunkEmbeddings = await generateEmbeddings(chunks.map((c) => c.text));
let prevChunk: Node | undefined;
for (const [i, chunkData] of chunks.entries()) {
const chunk = await tx.nodes.Chunk.create({
text: chunkData.text,
embedding: chunkEmbeddings[i],
position: i,
});
await tx.edges.containsChunk.create(doc, chunk, {});
if (prevChunk) {
await tx.edges.nextChunk.create(prevChunk, chunk, {});
}
// Link to entities (dedupe by unique constraint)
for (const entityData of chunkData.entities) {
const entityResult = await tx.nodes.Entity.getOrCreateByConstraint(
"entity_name_type",
{
name: entityData.name,
type: entityData.type,
}
);
// Compute expensive derived fields only for newly created entities
if (entityResult.action === "created") {
await tx.nodes.Entity.update(entityResult.node.id, {
embedding: await generateEmbedding(entityData.name),
});
}
await tx.edges.mentions.getOrCreateByEndpoints(
chunk,
entityResult.node,
{},
{ ifExists: "return" }
);
}
prevChunk = chunk;
}
});
}
```
## Graph-Specific Query Patterns
These patterns demonstrate capabilities that require graph structure—they cannot
be replicated with flat vector search.
### Entity-Based Retrieval
Find all chunks mentioning a specific entity, regardless of how it's phrased:
```typescript
async function findChunksByEntity(entityName: string) {
return store
.query()
.from("Entity", "e")
.whereNode("e", (e) => e.name.eq(entityName))
.traverse("mentions", "m", { direction: "in" })
.to("Chunk", "c")
.select((ctx) => ctx.c.text)
.execute();
}
```
### Multi-Hop Entity Traversal
Find entities connected through relationships:
```typescript
async function findRelatedEntities(entityName: string, maxHops = 2) {
const rows = await store
.query()
.from("Entity", "e")
.whereNode("e", (e) => e.name.eq(entityName))
.traverse("relatesTo", "r")
.recursive({ maxHops, depth: "depth" })
.to("Entity", "related")
.select((ctx) => ({
from: ctx.e.name,
to: ctx.related.name,
toId: ctx.related.id,
depth: ctx.depth,
}))
.execute();
// distinct paths can reach the same target; dedupe by target
const seen = new Set();
return rows
.filter((row) => {
if (seen.has(row.toId)) return false;
seen.add(row.toId);
return true;
})
.map((row) => ({
from: row.from,
to: row.to,
depth: row.depth,
}));
}
```
### Context Window Expansion
Get surrounding chunks for a match:
```typescript
async function getChunkWithContext(chunkId: string, windowSize = 1) {
const [before, after] = await Promise.all([
store
.query()
.from("Chunk", "c")
.whereNode("c", (c) => c.id.eq(chunkId))
.traverse("prevChunk", "e")
.recursive({ maxHops: windowSize })
.to("Chunk", "prev")
.orderBy("prev", "position", "desc")
.select((ctx) => ctx.prev.text)
.execute(),
store
.query()
.from("Chunk", "c")
.whereNode("c", (c) => c.id.eq(chunkId))
.traverse("nextChunk", "e")
.recursive({ maxHops: windowSize })
.to("Chunk", "next")
.orderBy("next", "position", "asc")
.select((ctx) => ctx.next.text)
.execute(),
]);
const chunk = await store.nodes.Chunk.getById(chunkId);
return {
before: before.toReversed(),
chunk: chunk?.text ?? "",
after,
};
}
```
## Hybrid Retrieval: Vector + Graph
Combine vector similarity with graph traversal in a single query using the `from` option:
```typescript
async function hybridRetrieval(query: string, limit = 10) {
const queryEmbedding = await generateEmbedding(query);
// Single query: vector search + fan-out to entities AND document
const results = await store
.query()
.from("Chunk", "c")
.whereNode("c", (c) =>
c.embedding.similarTo(queryEmbedding, limit, { metric: "cosine", minScore: 0.7 })
)
.traverse("mentions", "m")
.to("Entity", "e")
.traverse("containsChunk", "d_edge", { direction: "in", from: "c" }) // Fan-out from chunk
.to("Document", "d")
.select((ctx) => ({
chunkId: ctx.c.id,
text: ctx.c.text,
score: ctx.c.embedding.similarity(queryEmbedding),
source: ctx.d.title,
entityName: ctx.e.name,
entityType: ctx.e.type,
}))
.execute();
// Group by chunk (one row per chunk-entity pair)
const byChunk = new Map }>();
for (const row of results) {
const existing = byChunk.get(row.chunkId);
if (existing) {
existing.entities.push({ name: row.entityName, type: row.entityType });
} else {
byChunk.set(row.chunkId, {
...row,
entities: [{ name: row.entityName, type: row.entityType }],
});
}
}
return [...byChunk.values()];
}
```
The `from` option enables **fan-out patterns** where you traverse multiple relationships
from the same node. Without `from`, traversals chain sequentially (A→B→C). With `from`,
you can branch: traverse from chunk to entities, AND from chunk to document.
## Building Structured Context
Format graph-enriched context for an LLM:
```typescript
async function buildGraphContext(query: string, extractedEntities: string[]) {
const queryEmbedding = await generateEmbedding(query);
// Get relevant chunks with sources
const chunks = await store
.query()
.from("Chunk", "c")
.whereNode("c", (c) =>
c.embedding.similarTo(queryEmbedding, 5, { metric: "cosine", minScore: 0.7 })
)
.traverse("containsChunk", "e", { direction: "in" })
.to("Document", "d")
.select((ctx) => ({ text: ctx.c.text, source: ctx.d.title }))
.execute();
// Get entity relationships from graph
const entityFacts = await Promise.all(
extractedEntities.map(async (name) => {
const relations = await store
.query()
.from("Entity", "e")
.whereNode("e", (e) => e.name.eq(name))
.traverse("relatesTo", "r")
.to("Entity", "target")
.select((ctx) => ctx.target.name)
.execute();
return relations.length > 0 ? { name, relatedTo: relations } : undefined;
})
);
return { chunks, entityFacts: entityFacts.filter(Boolean) };
}
function formatForPrompt(context: Awaited>): string {
let prompt = "## Relevant Passages\n\n";
for (const chunk of context.chunks) {
prompt += `**${chunk.source}**: ${chunk.text}\n\n`;
}
if (context.entityFacts.length > 0) {
prompt += "## Entity Relationships\n\n";
for (const entity of context.entityFacts) {
if (entity) {
prompt += `**${entity.name}** → ${entity.relatedTo.join(", ")}\n`;
}
}
}
return prompt;
}
```
## When to Use Graph RAG
**Use graph RAG when:**
- Queries require connecting facts across documents ("Who founded X and what else did they start?")
- Entity disambiguation matters (distinguishing "Apple" the company from "apple" the fruit)
- Relationship traversal provides value ("Find all companies in the same industry as X")
- You need structured facts alongside unstructured text
**Flat vector RAG may suffice when:**
- Simple "find similar content" queries
- No entity relationships to exploit
- Single-document question answering
## Next Steps
- [Semantic Search](/semantic-search) — Vector embedding fundamentals
- [Traversals](/queries/traverse) — Graph traversal patterns
- [Document Management](/examples/document-management) — Versioning and access control
# Multi-Tenant SaaS
> Complete multi-tenancy patterns with isolation, data partitioning, and tenant management
This example shows how to build a multi-tenant SaaS application with:
- **Three isolation strategies** (shared tables, schema per tenant, database per tenant)
- **Tenant-aware queries** that automatically filter data
- **Tenant provisioning** and lifecycle management
- **Cross-tenant analytics** for platform operators
- **Tenant migration** between isolation levels
## Choosing an Isolation Strategy
| Strategy | Isolation | Complexity | Cost | Best For |
|----------|-----------|------------|------|----------|
| Shared tables | Low | Low | Lowest | Many small tenants, B2C SaaS |
| Schema per tenant | Medium | Medium | Low | SMB customers, PostgreSQL only |
| Database per tenant | High | High | Highest | Enterprise, compliance requirements |
## Strategy 1: Shared Tables with Row-Level Isolation
All tenants share the same database tables, filtered by `tenantId`.
### Schema Definition
```typescript
import { z } from "zod";
import { defineNode, defineEdge, defineGraph } from "@nicia-ai/typegraph";
// Tenant metadata
const Tenant = defineNode("Tenant", {
schema: z.object({
slug: z.string(),
name: z.string(),
plan: z.enum(["free", "starter", "pro", "enterprise"]),
status: z.enum(["active", "suspended", "cancelled"]).default("active"),
createdAt: z.string().datetime(),
settings: z.record(z.unknown()).optional(),
}),
});
// All entities include tenantId
const Project = defineNode("Project", {
schema: z.object({
tenantId: z.string(),
name: z.string(),
description: z.string().optional(),
status: z.enum(["active", "archived"]).default("active"),
}),
});
const Task = defineNode("Task", {
schema: z.object({
tenantId: z.string(),
title: z.string(),
status: z.enum(["todo", "in_progress", "done"]).default("todo"),
priority: z.enum(["low", "medium", "high"]).default("medium"),
}),
});
const User = defineNode("User", {
schema: z.object({
tenantId: z.string(),
email: z.string().email(),
name: z.string(),
role: z.enum(["owner", "admin", "member", "guest"]).default("member"),
}),
});
// Edges
const hasProject = defineEdge("hasProject");
const hasTask = defineEdge("hasTask");
const assignedTo = defineEdge("assignedTo");
const memberOf = defineEdge("memberOf");
const graph = defineGraph({
id: "multi_tenant",
nodes: {
Tenant: { type: Tenant },
Project: { type: Project },
Task: { type: Task },
User: { type: User },
},
edges: {
hasProject: { type: hasProject, from: [Tenant], to: [Project] },
hasTask: { type: hasTask, from: [Project], to: [Task] },
assignedTo: { type: assignedTo, from: [Task], to: [User] },
memberOf: { type: memberOf, from: [User], to: [Tenant] },
},
});
```
### Tenant-Scoped Store
Create a wrapper that automatically filters by tenant:
```typescript
interface TenantContext {
tenantId: string;
userId: string;
role: "owner" | "admin" | "member" | "guest";
}
function createTenantStore(store: Store, ctx: TenantContext) {
const projects = {
async list(options: { status?: string } = {}) {
let query = store
.query()
.from("Project", "p")
.whereNode("p", (p) => p.tenantId.eq(ctx.tenantId));
if (options.status) {
query = query.whereNode("p", (p) => p.status.eq(options.status));
}
return query.select((q) => q.p).execute();
},
async create(data: { name: string; description?: string }) {
const project = await store.nodes.Project.create({
...data,
tenantId: ctx.tenantId,
});
const tenant = await store.nodes.Tenant.getById(ctx.tenantId);
if (!tenant) throw new Error(`Tenant not found: ${ctx.tenantId}`);
await store.edges.hasProject.create(tenant, project, {});
return project;
},
async get(projectId: string) {
const project = await store.nodes.Project.getById(projectId);
if (!project || project.tenantId !== ctx.tenantId) {
throw new Error("Not found");
}
return project;
},
async update(projectId: string, updates: Partial) {
await projects.get(projectId); // Verify access
return store.nodes.Project.update(projectId, updates);
},
async delete(projectId: string) {
await projects.get(projectId); // Verify access
await store.nodes.Project.delete(projectId);
},
};
const tasks = {
async list(projectId: string) {
await projects.get(projectId); // Verify access
return store
.query()
.from("Project", "p")
.whereNode("p", (p) => p.id.eq(projectId))
.traverse("hasTask", "e")
.to("Task", "t")
.select((q) => q.t)
.execute();
},
async create(projectId: string, data: { title: string; priority?: string }) {
const project = await projects.get(projectId); // Verify access
const task = await store.nodes.Task.create({
...data,
tenantId: ctx.tenantId,
});
await store.edges.hasTask.create(project, task, {});
return task;
},
};
const users = {
async list() {
return store
.query()
.from("User", "u")
.whereNode("u", (u) => u.tenantId.eq(ctx.tenantId))
.select((q) => q.u)
.execute();
},
async invite(email: string, name: string, role: string) {
if (ctx.role !== "owner" && ctx.role !== "admin") {
throw new Error("Insufficient permissions");
}
const user = await store.nodes.User.create({
tenantId: ctx.tenantId,
email,
name,
role,
});
const tenant = await store.nodes.Tenant.getById(ctx.tenantId);
if (!tenant) throw new Error(`Tenant not found: ${ctx.tenantId}`);
await store.edges.memberOf.create(user, tenant, {});
return user;
},
};
return { projects, tasks, users };
}
// Usage in API handler
async function handleRequest(req: Request) {
const session = await getSession(req);
const tenantStore = createTenantStore(store, {
tenantId: session.tenantId,
userId: session.userId,
role: session.role,
});
// All queries are automatically tenant-scoped
const projects = await tenantStore.projects.list();
}
```
### Tenant Provisioning
```typescript
async function provisionTenant(
slug: string,
name: string,
ownerEmail: string,
ownerName: string,
plan: "free" | "starter" | "pro" | "enterprise" = "free"
): Promise<{ tenant: Node; owner: Node }> {
return store.transaction(async (tx) => {
// Check slug uniqueness
const existing = await tx
.query()
.from("Tenant", "t")
.whereNode("t", (t) => t.slug.eq(slug))
.first();
if (existing) {
throw new Error("Tenant slug already exists");
}
// Create tenant
const tenant = await tx.nodes.Tenant.create({
slug,
name,
plan,
status: "active",
createdAt: new Date().toISOString(),
});
// Create owner user
const owner = await tx.nodes.User.create({
tenantId: tenant.id,
email: ownerEmail,
name: ownerName,
role: "owner",
});
await tx.edges.memberOf.create(owner, tenant, {});
return { tenant, owner };
});
}
```
## Strategy 2: Schema Per Tenant (PostgreSQL)
Each tenant gets their own PostgreSQL schema within the same database.
### Setup
```typescript
import { Pool } from "pg";
import { drizzle } from "drizzle-orm/node-postgres";
import { sql } from "drizzle-orm";
import { createPostgresBackend, generatePostgresMigrationSQL } from "@nicia-ai/typegraph/postgres";
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
async function createTenantSchema(tenantId: string): Promise {
const schemaName = `tenant_${tenantId}`;
// Create schema
await pool.query(`CREATE SCHEMA IF NOT EXISTS ${schemaName}`);
// Run TypeGraph migrations in the tenant schema
await pool.query(`SET search_path TO ${schemaName}`);
await pool.query(generatePostgresMigrationSQL());
await pool.query(`SET search_path TO public`);
}
async function getTenantStore(tenantId: string): Promise {
const schemaName = `tenant_${tenantId}`;
// Create connection with schema
const client = await pool.connect();
await client.query(`SET search_path TO ${schemaName}`);
const db = drizzle(client);
const backend = createPostgresBackend(db);
return createStore(graph, backend);
}
```
### Tenant Store Cache
```typescript
class TenantStoreManager {
private stores = new Map();
private maxCached = 100;
async getStore(tenantId: string): Promise {
const cached = this.stores.get(tenantId);
if (cached) {
cached.lastUsed = new Date();
return cached.store;
}
// Evict oldest if at capacity
if (this.stores.size >= this.maxCached) {
this.evictOldest();
}
const store = await getTenantStore(tenantId);
this.stores.set(tenantId, { store, lastUsed: new Date() });
return store;
}
private evictOldest(): void {
let oldest: { id: string; date: Date } | undefined;
for (const [id, { lastUsed }] of this.stores) {
if (!oldest || lastUsed < oldest.date) {
oldest = { id, date: lastUsed };
}
}
if (oldest) {
this.stores.delete(oldest.id);
}
}
}
const tenantManager = new TenantStoreManager();
```
### Provisioning with Schema
```typescript
async function provisionTenantWithSchema(
slug: string,
name: string,
ownerEmail: string
): Promise<{ tenantId: string }> {
const tenantId = generateUUID();
// Create schema and tables
await createTenantSchema(tenantId);
// Get tenant-specific store
const tenantStore = await tenantManager.getStore(tenantId);
// Create initial data
await tenantStore.nodes.User.create({
email: ownerEmail,
name: name,
role: "owner",
});
// Store tenant metadata in public schema
const publicDb = drizzle(pool);
await publicDb.insert(tenants).values({
id: tenantId,
slug,
name,
createdAt: new Date(),
});
return { tenantId };
}
```
## Strategy 3: Database Per Tenant
Each tenant gets their own database for maximum isolation.
### Tenant Database Manager
```typescript
interface TenantConfig {
id: string;
slug: string;
databaseUrl: string;
status: "active" | "suspended";
}
class TenantDatabaseManager {
private connections = new Map();
private maxConnections = 50;
async getStore(tenantId: string): Promise {
const cached = this.connections.get(tenantId);
if (cached) return cached.store;
// Get tenant config from central registry
const config = await this.getTenantConfig(tenantId);
if (config.status !== "active") {
throw new Error("Tenant is not active");
}
// Evict if at capacity
if (this.connections.size >= this.maxConnections) {
await this.evictLeastUsed();
}
// Create new connection
const pool = new Pool({ connectionString: config.databaseUrl, max: 5 });
const db = drizzle(pool);
const backend = createPostgresBackend(db);
const store = createStore(graph, backend);
this.connections.set(tenantId, { pool, store });
return store;
}
async closeConnection(tenantId: string): Promise {
const conn = this.connections.get(tenantId);
if (conn) {
await conn.pool.end();
this.connections.delete(tenantId);
}
}
private async getTenantConfig(tenantId: string): Promise {
// Fetch from central tenant registry
const result = await centralDb
.select()
.from(tenantConfigs)
.where(eq(tenantConfigs.id, tenantId))
.get();
if (!result) throw new Error("Tenant not found");
return result;
}
private async evictLeastUsed(): Promise {
// Simple LRU eviction
const first = this.connections.keys().next().value;
if (first) {
await this.closeConnection(first);
}
}
}
const dbManager = new TenantDatabaseManager();
```
### Provisioning New Database
```typescript
async function provisionTenantDatabase(
slug: string,
name: string,
ownerEmail: string
): Promise<{ tenantId: string; databaseUrl: string }> {
const tenantId = generateUUID();
const dbName = `tenant_${tenantId.replace(/-/g, "_")}`;
// Create database (using admin connection)
const adminPool = new Pool({ connectionString: process.env.ADMIN_DATABASE_URL });
await adminPool.query(`CREATE DATABASE ${dbName}`);
await adminPool.end();
// Build connection URL
const baseUrl = new URL(process.env.DATABASE_BASE_URL!);
baseUrl.pathname = `/${dbName}`;
const databaseUrl = baseUrl.toString();
// Initialize TypeGraph tables
const tenantPool = new Pool({ connectionString: databaseUrl });
await tenantPool.query(generatePostgresMigrationSQL());
// Create initial data
const db = drizzle(tenantPool);
const backend = createPostgresBackend(db);
const store = createStore(graph, backend);
await store.nodes.User.create({
email: ownerEmail,
name: name,
role: "owner",
});
await tenantPool.end();
// Register in central tenant registry
await centralDb.insert(tenantConfigs).values({
id: tenantId,
slug,
name,
databaseUrl,
status: "active",
createdAt: new Date(),
});
return { tenantId, databaseUrl };
}
```
## Cross-Tenant Operations
For platform administrators who need to query across tenants.
### Aggregated Metrics (Shared Tables)
```typescript
import { count, field } from "@nicia-ai/typegraph";
async function getTenantMetrics(): Promise<
Array<{ tenantId: string; projectCount: number; taskCount: number; userCount: number }>
> {
// Projects by tenant
const projectCounts = await store
.query()
.from("Project", "p")
.groupBy("p", "tenantId")
.aggregate({
tenantId: field("p", "tenantId"),
projectCount: count("p"),
})
.execute();
// Tasks by tenant
const taskCounts = await store
.query()
.from("Task", "t")
.groupBy("t", "tenantId")
.aggregate({
tenantId: field("t", "tenantId"),
taskCount: count("t"),
})
.execute();
// Users by tenant
const userCounts = await store
.query()
.from("User", "u")
.groupBy("u", "tenantId")
.aggregate({
tenantId: field("u", "tenantId"),
userCount: count("u"),
})
.execute();
// Merge results
const metrics = new Map();
for (const p of projectCounts) {
metrics.set(p.tenantId, { projectCount: p.projectCount, taskCount: 0, userCount: 0 });
}
for (const t of taskCounts) {
const existing = metrics.get(t.tenantId) || { projectCount: 0, taskCount: 0, userCount: 0 };
existing.taskCount = t.taskCount;
metrics.set(t.tenantId, existing);
}
for (const u of userCounts) {
const existing = metrics.get(u.tenantId) || { projectCount: 0, taskCount: 0, userCount: 0 };
existing.userCount = u.userCount;
metrics.set(u.tenantId, existing);
}
return Array.from(metrics.entries()).map(([tenantId, counts]) => ({
tenantId,
...counts,
}));
}
```
### Cross-Tenant Search (Database Per Tenant)
```typescript
async function searchAcrossTenants(
query: string,
tenantIds: string[]
): Promise> {
const results = await Promise.all(
tenantIds.map(async (tenantId) => {
try {
const store = await dbManager.getStore(tenantId);
const projects = await store
.query()
.from("Project", "p")
.whereNode("p", (p) => p.name.contains(query))
.select((ctx) => ctx.p)
.limit(10)
.execute();
return { tenantId, results: projects };
} catch (error) {
console.error(`Failed to search tenant ${tenantId}:`, error);
return { tenantId, results: [] };
}
})
);
return results;
}
```
## Tenant Lifecycle
### Suspend Tenant
```typescript
async function suspendTenant(tenantId: string, reason: string): Promise {
const current = await store.nodes.Tenant.getById(tenantId);
if (!current) throw new Error(`Tenant not found: ${tenantId}`);
await store.nodes.Tenant.update(tenantId, {
status: "suspended",
settings: {
...(current.settings || {}),
suspendedAt: new Date().toISOString(),
suspendReason: reason,
},
});
}
```
### Delete Tenant (Shared Tables)
```typescript
async function deleteTenant(tenantId: string): Promise {
await store.transaction(async (tx) => {
// Delete all tasks
const tasks = await tx
.query()
.from("Task", "t")
.whereNode("t", (t) => t.tenantId.eq(tenantId))
.select((ctx) => ctx.t.id)
.execute();
for (const taskId of tasks) {
await tx.nodes.Task.delete(taskId);
}
// Delete all projects
const projects = await tx
.query()
.from("Project", "p")
.whereNode("p", (p) => p.tenantId.eq(tenantId))
.select((ctx) => ctx.p.id)
.execute();
for (const projectId of projects) {
await tx.nodes.Project.delete(projectId);
}
// Delete all users
const users = await tx
.query()
.from("User", "u")
.whereNode("u", (u) => u.tenantId.eq(tenantId))
.select((ctx) => ctx.u.id)
.execute();
for (const userId of users) {
await tx.nodes.User.delete(userId);
}
// Delete tenant
await tx.nodes.Tenant.delete(tenantId);
});
}
```
### Delete Tenant (Database Per Tenant)
```typescript
async function deleteTenantDatabase(tenantId: string): Promise {
// Close active connection
await dbManager.closeConnection(tenantId);
// Get database name
const config = await getTenantConfig(tenantId);
const dbUrl = new URL(config.databaseUrl);
const dbName = dbUrl.pathname.slice(1);
// Drop database
const adminPool = new Pool({ connectionString: process.env.ADMIN_DATABASE_URL });
await adminPool.query(`DROP DATABASE IF EXISTS ${dbName}`);
await adminPool.end();
// Remove from registry
await centralDb.delete(tenantConfigs).where(eq(tenantConfigs.id, tenantId));
}
```
## Tenant Migration
Move tenant between isolation strategies:
```typescript
async function migrateTenantToSeparateDatabase(tenantId: string): Promise {
// 1. Create new database
const { databaseUrl } = await provisionTenantDatabase(
`migrated_${tenantId}`,
"Migrated Tenant",
"placeholder@example.com"
);
// 2. Get tenant data from shared tables
const sharedStore = store;
const projects = await sharedStore
.query()
.from("Project", "p")
.whereNode("p", (p) => p.tenantId.eq(tenantId))
.select((ctx) => ctx.p)
.execute();
const tasks = await sharedStore
.query()
.from("Task", "t")
.whereNode("t", (t) => t.tenantId.eq(tenantId))
.select((ctx) => ctx.t)
.execute();
const users = await sharedStore
.query()
.from("User", "u")
.whereNode("u", (u) => u.tenantId.eq(tenantId))
.select((ctx) => ctx.u)
.execute();
// 3. Insert into new database
const newStore = await dbManager.getStore(tenantId);
await newStore.transaction(async (tx) => {
for (const project of projects) {
await tx.nodes.Project.create(project);
}
for (const task of tasks) {
await tx.nodes.Task.create(task);
}
for (const user of users) {
await tx.nodes.User.create(user);
}
});
// 4. Delete from shared tables
await deleteTenant(tenantId);
return databaseUrl;
}
```
## Next Steps
- [Document Management](/examples/document-management) - CMS with semantic search
- [Product Catalog](/examples/product-catalog) - Categories, variants, inventory
- [Integration Patterns](/integration) - More deployment strategies
# Product Catalog
> E-commerce catalog with categories, variants, and inventory tracking
This example builds a product catalog system with:
- **Category hierarchy** with inheritance
- **Product variants** (size, color, etc.)
- **Inventory tracking** across warehouses
- **Product relationships** (bundles, accessories, alternatives)
- **Price history** using temporal queries
## Schema Definition
```typescript
import { z } from "zod";
import {
defineNode,
defineEdge,
defineGraph,
embedding,
} from "@nicia-ai/typegraph";
// Category hierarchy
const Category = defineNode("Category", {
schema: z.object({
name: z.string(),
slug: z.string(),
description: z.string().optional(),
imageUrl: z.string().url().optional(),
displayOrder: z.number().default(0),
isActive: z.boolean().default(true),
}),
});
// Products
const Product = defineNode("Product", {
schema: z.object({
sku: z.string(),
name: z.string(),
description: z.string(),
basePrice: z.number().positive(),
currency: z.string().default("USD"),
status: z.enum(["draft", "active", "discontinued"]).default("draft"),
embedding: embedding(1536).optional(),
}),
});
// Product variants (specific size/color combinations)
const Variant = defineNode("Variant", {
schema: z.object({
sku: z.string(),
name: z.string(), // "Large / Blue"
priceModifier: z.number().default(0), // Added to base price
attributes: z.record(z.string()), // { size: "L", color: "blue" }
isDefault: z.boolean().default(false),
}),
});
// Inventory
const Warehouse = defineNode("Warehouse", {
schema: z.object({
code: z.string(),
name: z.string(),
location: z.string(),
isActive: z.boolean().default(true),
}),
});
const Inventory = defineNode("Inventory", {
schema: z.object({
quantity: z.number().int().min(0),
reservedQuantity: z.number().int().min(0).default(0),
reorderPoint: z.number().int().min(0).default(10),
lastCountedAt: z.string().datetime().optional(),
}),
});
// Edges
const parentCategory = defineEdge("parentCategory");
const inCategory = defineEdge("inCategory", {
schema: z.object({ isPrimary: z.boolean().default(false) }),
});
const hasVariant = defineEdge("hasVariant");
const inventoryFor = defineEdge("inventoryFor");
const atWarehouse = defineEdge("atWarehouse");
const relatedProduct = defineEdge("relatedProduct", {
schema: z.object({
type: z.enum(["accessory", "alternative", "bundled", "upsell"]),
sortOrder: z.number().default(0),
}),
});
// Graph
const graph = defineGraph({
id: "product_catalog",
nodes: {
Category: { type: Category },
Product: { type: Product },
Variant: { type: Variant },
Warehouse: { type: Warehouse },
Inventory: { type: Inventory },
},
edges: {
parentCategory: { type: parentCategory, from: [Category], to: [Category] },
inCategory: { type: inCategory, from: [Product], to: [Category] },
hasVariant: { type: hasVariant, from: [Product], to: [Variant] },
inventoryFor: { type: inventoryFor, from: [Inventory], to: [Variant] },
atWarehouse: { type: atWarehouse, from: [Inventory], to: [Warehouse] },
relatedProduct: { type: relatedProduct, from: [Product], to: [Product] },
},
ontology: [
// Category hierarchy is modeled via the parentCategory edge, not ontology.
// Use ontology for type-level constraints, e.g.:
// disjointWith(Product, Category),
],
});
```
## Category Management
### Create Category Tree
```typescript
async function createCategory(
name: string,
slug: string,
parentSlug?: string
): Promise> {
const category = await store.nodes.Category.create({
name,
slug,
isActive: true,
});
if (parentSlug) {
const parent = await store
.query()
.from("Category", "c")
.whereNode("c", (c) => c.slug.eq(parentSlug))
.select((ctx) => ctx.c)
.first();
if (parent) {
await store.edges.parentCategory.create(category, parent, {});
}
}
return category;
}
// Build initial category structure
await createCategory("Electronics", "electronics");
await createCategory("Phones", "phones", "electronics");
await createCategory("Accessories", "accessories", "electronics");
await createCategory("Cases", "cases", "accessories");
await createCategory("Chargers", "chargers", "accessories");
```
### Get Category with Ancestors
```typescript
interface CategoryWithPath {
id: string;
name: string;
slug: string;
path: Array<{ name: string; slug: string }>;
}
async function getCategoryWithPath(slug: string): Promise {
const category = await store
.query()
.from("Category", "c")
.whereNode("c", (c) => c.slug.eq(slug))
.select((ctx) => ({
id: ctx.c.id,
name: ctx.c.name,
slug: ctx.c.slug,
}))
.first();
if (!category) return undefined;
const ancestors = await store
.query()
.from("Category", "c")
.whereNode("c", (c) => c.slug.eq(slug))
.traverse("parentCategory", "e")
.recursive()
.to("Category", "ancestor")
.select((ctx) => ({
name: ctx.ancestor.name,
slug: ctx.ancestor.slug,
}))
.execute();
return {
...category,
path: ancestors.reverse(), // Root first
};
}
```
### Get Subcategories
```typescript
async function getSubcategories(
parentSlug: string,
includeNested = false
): Promise> {
let query = store
.query()
.from("Category", "parent")
.whereNode("parent", (c) => c.slug.eq(parentSlug))
.traverse("parentCategory", "e", { direction: "in" });
if (includeNested) {
query = query.recursive({ depth: "depth" });
}
return query
.to("Category", "child")
.whereNode("child", (c) => c.isActive.eq(true))
.select((ctx) => ({
id: ctx.child.id,
name: ctx.child.name,
slug: ctx.child.slug,
depth: ctx.depth ?? 1,
}))
.orderBy((ctx) => ctx.child.displayOrder, "asc")
.execute();
}
```
## Product Management
### Create Product with Variants
```typescript
interface ProductInput {
sku: string;
name: string;
description: string;
basePrice: number;
categorySlug: string;
variants: Array<{
sku: string;
name: string;
priceModifier?: number;
attributes: Record;
isDefault?: boolean;
}>;
}
async function createProduct(input: ProductInput): Promise> {
return store.transaction(async (tx) => {
// Generate embedding for semantic search
const embedding = await generateEmbedding(`${input.name} ${input.description}`);
// Create product
const product = await tx.nodes.Product.create({
sku: input.sku,
name: input.name,
description: input.description,
basePrice: input.basePrice,
status: "draft",
embedding,
});
// Link to category
const category = await tx
.query()
.from("Category", "c")
.whereNode("c", (c) => c.slug.eq(input.categorySlug))
.select((ctx) => ctx.c)
.first();
if (category) {
await tx.edges.inCategory.create(product, category, { isPrimary: true });
}
// Create variants
for (const v of input.variants) {
const variant = await tx.nodes.Variant.create({
sku: v.sku,
name: v.name,
priceModifier: v.priceModifier ?? 0,
attributes: v.attributes,
isDefault: v.isDefault ?? false,
});
await tx.edges.hasVariant.create(product, variant, {});
}
return product;
});
}
```
### Get Product Details
```typescript
interface ProductDetails {
id: string;
sku: string;
name: string;
description: string;
basePrice: number;
status: string;
categories: Array<{ name: string; slug: string; isPrimary: boolean }>;
variants: Array<{
id: string;
sku: string;
name: string;
price: number;
attributes: Record;
inventory: number;
}>;
related: Array<{ id: string; name: string; type: string }>;
}
async function getProductDetails(sku: string): Promise {
const product = await store
.query()
.from("Product", "p")
.whereNode("p", (p) => p.sku.eq(sku))
.select((ctx) => ctx.p)
.first();
if (!product) return undefined;
// Get categories
const categories = await store
.query()
.from("Product", "p")
.whereNode("p", (p) => p.id.eq(product.id))
.traverse("inCategory", "e")
.to("Category", "c")
.select((ctx) => ({
name: ctx.c.name,
slug: ctx.c.slug,
isPrimary: ctx.e.isPrimary,
}))
.execute();
// Get variants with inventory
const variants = await store
.query()
.from("Product", "p")
.whereNode("p", (p) => p.id.eq(product.id))
.traverse("hasVariant", "e")
.to("Variant", "v")
.optionalTraverse("inventoryFor", "inv", { direction: "in" })
.to("Inventory", "i")
.select((ctx) => ({
id: ctx.v.id,
sku: ctx.v.sku,
name: ctx.v.name,
priceModifier: ctx.v.priceModifier,
attributes: ctx.v.attributes,
quantity: ctx.i?.quantity ?? 0,
reservedQuantity: ctx.i?.reservedQuantity ?? 0,
}))
.execute();
// Get related products
const related = await store
.query()
.from("Product", "p")
.whereNode("p", (p) => p.id.eq(product.id))
.traverse("relatedProduct", "e")
.to("Product", "r")
.select((ctx) => ({
id: ctx.r.id,
name: ctx.r.name,
type: ctx.e.type,
}))
.orderBy((ctx) => ctx.e.sortOrder, "asc")
.execute();
return {
id: product.id,
sku: product.sku,
name: product.name,
description: product.description,
basePrice: product.basePrice,
status: product.status,
categories,
variants: variants.map((v) => ({
...v,
price: product.basePrice + v.priceModifier,
inventory: v.quantity - v.reservedQuantity,
})),
related,
};
}
```
## Inventory Management
### Update Inventory
```typescript
async function updateInventory(
variantSku: string,
warehouseCode: string,
quantity: number
): Promise {
const variant = await store
.query()
.from("Variant", "v")
.whereNode("v", (v) => v.sku.eq(variantSku))
.select((ctx) => ctx.v)
.first();
const warehouse = await store
.query()
.from("Warehouse", "w")
.whereNode("w", (w) => w.code.eq(warehouseCode))
.select((ctx) => ctx.w)
.first();
if (!variant || !warehouse) {
throw new Error("Variant or warehouse not found");
}
// Find existing inventory record
const existingInventory = await store
.query()
.from("Inventory", "i")
.traverse("inventoryFor", "e1")
.to("Variant", "v")
.whereNode("v", (v) => v.id.eq(variant.id))
.traverse("atWarehouse", "e2", { direction: "in" })
.to("Warehouse", "w")
.whereNode("w", (w) => w.id.eq(warehouse.id))
.select((ctx) => ctx.i)
.first();
if (existingInventory) {
await store.nodes.Inventory.update(existingInventory.id, {
quantity,
lastCountedAt: new Date().toISOString(),
});
} else {
const inventory = await store.nodes.Inventory.create({
quantity,
reservedQuantity: 0,
lastCountedAt: new Date().toISOString(),
});
await store.edges.inventoryFor.create(inventory, variant, {});
await store.edges.atWarehouse.create(inventory, warehouse, {});
}
}
```
### Reserve Inventory
```typescript
async function reserveInventory(
variantSku: string,
quantity: number
): Promise<{ success: boolean; warehouseCode?: string }> {
const inventories = await store
.query()
.from("Variant", "v")
.whereNode("v", (v) => v.sku.eq(variantSku))
.traverse("inventoryFor", "e", { direction: "in" })
.to("Inventory", "i")
.traverse("atWarehouse", "e2")
.to("Warehouse", "w")
.whereNode("w", (w) => w.isActive.eq(true))
.select((ctx) => ({
inventoryId: ctx.i.id,
warehouseCode: ctx.w.code,
available: ctx.i.quantity - ctx.i.reservedQuantity,
reservedQuantity: ctx.i.reservedQuantity,
}))
.execute();
// Find warehouse with enough inventory
const available = inventories.find((i) => i.available >= quantity);
if (!available) {
return { success: false };
}
await store.nodes.Inventory.update(available.inventoryId, {
reservedQuantity: available.reservedQuantity + quantity,
});
return { success: true, warehouseCode: available.warehouseCode };
}
```
### Low Stock Report
```typescript
import { field, sum, havingLt } from "@nicia-ai/typegraph";
interface LowStockItem {
productName: string;
variantSku: string;
variantName: string;
totalQuantity: number;
reorderPoint: number;
}
async function getLowStockItems(): Promise {
return store
.query()
.from("Product", "p")
.traverse("hasVariant", "e1")
.to("Variant", "v")
.traverse("inventoryFor", "e2", { direction: "in" })
.to("Inventory", "i")
.groupByNode("v")
.having(havingLt(sum("i", "quantity"), field("i", "reorderPoint")))
.aggregate({
productName: field("p", "name"),
variantSku: field("v", "sku"),
variantName: field("v", "name"),
totalQuantity: sum("i", "quantity"),
reorderPoint: field("i", "reorderPoint"),
})
.execute();
}
```
## Search and Discovery
### Semantic Product Search
```typescript
async function searchProducts(
query: string,
options: {
categorySlug?: string;
minPrice?: number;
maxPrice?: number;
limit?: number;
} = {}
): Promise> {
const { categorySlug, minPrice, maxPrice, limit = 20 } = options;
const queryEmbedding = await generateEmbedding(query);
let queryBuilder = store
.query()
.from("Product", "p")
.whereNode("p", (p) => {
let pred = p.embedding
.similarTo(queryEmbedding, limit, { metric: "cosine", minScore: 0.6 })
.and(p.status.eq("active"));
if (minPrice !== undefined) {
pred = pred.and(p.basePrice.gte(minPrice));
}
if (maxPrice !== undefined) {
pred = pred.and(p.basePrice.lte(maxPrice));
}
return pred;
});
// Filter by category if specified
if (categorySlug) {
// Get category and all subcategories
const categoryIds = await store
.query()
.from("Category", "c")
.whereNode("c", (c) => c.slug.eq(categorySlug))
.traverse("parentCategory", "e", { direction: "in" })
.recursive()
.to("Category", "sub")
.select((ctx) => ctx.sub.id)
.execute();
queryBuilder = queryBuilder
.traverse("inCategory", "e")
.to("Category", "c")
.whereNode("c", (c) => c.id.in([...categoryIds, categorySlug]));
}
return queryBuilder
.select((ctx) => ({
product: ctx.p,
score: ctx.p.embedding.similarity(queryEmbedding),
}))
.execute();
}
```
### Get Products in Category
```typescript
async function getProductsInCategory(
categorySlug: string,
options: {
includeSubcategories?: boolean;
page?: number;
pageSize?: number;
sortBy?: "name" | "price" | "newest";
} = {}
): Promise<{ products: ProductProps[]; total: number }> {
const { includeSubcategories = true, page = 1, pageSize = 20, sortBy = "name" } = options;
// Build category ID list
let categoryIds: string[] = [];
const rootCategory = await store
.query()
.from("Category", "c")
.whereNode("c", (c) => c.slug.eq(categorySlug))
.select((ctx) => ctx.c.id)
.first();
if (!rootCategory) return { products: [], total: 0 };
categoryIds.push(rootCategory);
if (includeSubcategories) {
const subIds = await store
.query()
.from("Category", "c")
.whereNode("c", (c) => c.slug.eq(categorySlug))
.traverse("parentCategory", "e", { direction: "in" })
.recursive()
.to("Category", "sub")
.select((ctx) => ctx.sub.id)
.execute();
categoryIds = [...categoryIds, ...subIds];
}
const query = store
.query()
.from("Product", "p")
.whereNode("p", (p) => p.status.eq("active"))
.traverse("inCategory", "e")
.to("Category", "c")
.whereNode("c", (c) => c.id.in(categoryIds))
.select((ctx) => ctx.p);
// Apply sorting
const sortedQuery =
sortBy === "price"
? query.orderBy((ctx) => ctx.p.basePrice, "asc")
: sortBy === "newest"
? query.orderBy((ctx) => ctx.p.createdAt, "desc")
: query.orderBy((ctx) => ctx.p.name, "asc");
const products = await sortedQuery
.limit(pageSize)
.offset((page - 1) * pageSize)
.execute();
const total = await store
.query()
.from("Product", "p")
.whereNode("p", (p) => p.status.eq("active"))
.traverse("inCategory", "e")
.to("Category", "c")
.whereNode("c", (c) => c.id.in(categoryIds))
.count();
return { products, total };
}
```
## Price History
### Track Price Changes
TypeGraph's temporal model automatically tracks all changes:
```typescript
async function getPriceHistory(
sku: string
): Promise> {
return store
.query()
.from("Product", "p")
.temporal("includeEnded")
.whereNode("p", (p) => p.sku.eq(sku))
.orderBy((ctx) => ctx.p.validFrom, "desc")
.select((ctx) => ({
price: ctx.p.basePrice,
validFrom: ctx.p.validFrom,
validTo: ctx.p.validTo,
}))
.execute();
}
```
### Price at Point in Time
```typescript
async function getPriceAsOf(sku: string, date: Date): Promise {
const product = await store
.query()
.from("Product", "p")
.temporal("asOf", date.toISOString())
.whereNode("p", (p) => p.sku.eq(sku))
.select((ctx) => ctx.p.basePrice)
.first();
return product;
}
```
## Next Steps
- [Document Management](/examples/document-management) - CMS with semantic search
- [Workflow Engine](/examples/workflow-engine) - State machines with approvals
- [Audit Trail](/examples/audit-trail) - Complete change tracking
# Workflow Engine
> State machines with approvals, assignments, and escalations
This example builds a workflow engine with:
- **State machine definitions** as graph schemas
- **Approval chains** with multiple approvers
- **Task assignment** and delegation
- **Escalation rules** based on time
- **Audit trail** of all state changes
## Schema Definition
```typescript
import { z } from "zod";
import { defineNode, defineEdge, defineGraph, implies } from "@nicia-ai/typegraph";
// Workflow definition (template)
const WorkflowDefinition = defineNode("WorkflowDefinition", {
schema: z.object({
name: z.string(),
description: z.string().optional(),
version: z.number().int().positive(),
isActive: z.boolean().default(true),
}),
});
// States within a workflow
const State = defineNode("State", {
schema: z.object({
name: z.string(),
type: z.enum(["initial", "intermediate", "terminal", "approval"]),
config: z.record(z.unknown()).optional(), // State-specific config
}),
});
// Transitions between states
const Transition = defineNode("Transition", {
schema: z.object({
name: z.string(),
condition: z.string().optional(), // Expression to evaluate
requiredRole: z.string().optional(),
}),
});
// Workflow instances
const WorkflowInstance = defineNode("WorkflowInstance", {
schema: z.object({
referenceId: z.string(), // ID of the entity being processed
referenceType: z.string(), // Type of entity (e.g., "PurchaseOrder")
status: z.enum(["active", "completed", "cancelled", "failed"]).default("active"),
data: z.record(z.unknown()).optional(), // Instance-specific data
createdAt: z.string().datetime(),
completedAt: z.string().datetime().optional(),
}),
});
// Tasks assigned to users
const Task = defineNode("Task", {
schema: z.object({
title: z.string(),
description: z.string().optional(),
type: z.enum(["action", "approval", "review", "notification"]),
status: z.enum(["pending", "in_progress", "completed", "rejected", "escalated"]).default("pending"),
dueDate: z.string().datetime().optional(),
priority: z.enum(["low", "medium", "high", "urgent"]).default("medium"),
result: z.record(z.unknown()).optional(),
completedAt: z.string().datetime().optional(),
}),
});
// Users
const User = defineNode("User", {
schema: z.object({
email: z.string().email(),
name: z.string(),
role: z.string(),
department: z.string().optional(),
}),
});
// Comments on tasks
const Comment = defineNode("Comment", {
schema: z.object({
content: z.string(),
createdAt: z.string().datetime(),
}),
});
// Edges
const hasState = defineEdge("hasState");
const hasTransition = defineEdge("hasTransition");
const fromState = defineEdge("fromState");
const toState = defineEdge("toState");
const usesDefinition = defineEdge("usesDefinition");
const currentState = defineEdge("currentState");
const hasTask = defineEdge("hasTask");
const assignedTo = defineEdge("assignedTo");
const createdBy = defineEdge("createdBy");
const hasComment = defineEdge("hasComment");
const reportsTo = defineEdge("reportsTo"); // For escalation chain
// Graph
const graph = defineGraph({
id: "workflow_engine",
nodes: {
WorkflowDefinition: { type: WorkflowDefinition },
State: { type: State },
Transition: { type: Transition },
WorkflowInstance: { type: WorkflowInstance },
Task: { type: Task },
User: { type: User },
Comment: { type: Comment },
},
edges: {
hasState: { type: hasState, from: [WorkflowDefinition], to: [State] },
hasTransition: { type: hasTransition, from: [WorkflowDefinition], to: [Transition] },
fromState: { type: fromState, from: [Transition], to: [State] },
toState: { type: toState, from: [Transition], to: [State] },
usesDefinition: { type: usesDefinition, from: [WorkflowInstance], to: [WorkflowDefinition] },
currentState: { type: currentState, from: [WorkflowInstance], to: [State] },
hasTask: { type: hasTask, from: [WorkflowInstance], to: [Task] },
assignedTo: { type: assignedTo, from: [Task], to: [User] },
createdBy: { type: createdBy, from: [Task, Comment, WorkflowInstance], to: [User] },
hasComment: { type: hasComment, from: [Task], to: [Comment] },
reportsTo: { type: reportsTo, from: [User], to: [User] },
},
ontology: [
// Escalation implies assignment
implies(reportsTo, assignedTo),
],
});
```
## Workflow Definition
### Create Approval Workflow
```typescript
async function createApprovalWorkflow(): Promise> {
return store.transaction(async (tx) => {
// Create workflow definition
const workflow = await tx.nodes.WorkflowDefinition.create({
name: "Purchase Order Approval",
description: "Multi-level approval for purchase orders",
version: 1,
isActive: true,
});
// Create states
const states = {
draft: await tx.nodes.State.create({
name: "Draft",
type: "initial",
}),
pendingManagerApproval: await tx.nodes.State.create({
name: "Pending Manager Approval",
type: "approval",
config: { approverRole: "manager", timeout: "48h" },
}),
pendingFinanceApproval: await tx.nodes.State.create({
name: "Pending Finance Approval",
type: "approval",
config: { approverRole: "finance", timeout: "24h" },
}),
approved: await tx.nodes.State.create({
name: "Approved",
type: "terminal",
}),
rejected: await tx.nodes.State.create({
name: "Rejected",
type: "terminal",
}),
};
// Link states to workflow
for (const state of Object.values(states)) {
await tx.edges.hasState.create(workflow, state, {});
}
// Create transitions
const transitions = [
{
from: states.draft,
to: states.pendingManagerApproval,
name: "Submit",
requiredRole: "requester",
},
{
from: states.pendingManagerApproval,
to: states.pendingFinanceApproval,
name: "Approve",
requiredRole: "manager",
condition: "amount > 1000",
},
{
from: states.pendingManagerApproval,
to: states.approved,
name: "Approve",
requiredRole: "manager",
condition: "amount <= 1000",
},
{
from: states.pendingManagerApproval,
to: states.rejected,
name: "Reject",
requiredRole: "manager",
},
{
from: states.pendingFinanceApproval,
to: states.approved,
name: "Approve",
requiredRole: "finance",
},
{
from: states.pendingFinanceApproval,
to: states.rejected,
name: "Reject",
requiredRole: "finance",
},
];
for (const t of transitions) {
const transition = await tx.nodes.Transition.create({
name: t.name,
requiredRole: t.requiredRole,
condition: t.condition,
});
await tx.edges.hasTransition.create(workflow, transition, {});
await tx.edges.fromState.create(transition, t.from, {});
await tx.edges.toState.create(transition, t.to, {});
}
return workflow;
});
}
```
## Workflow Instances
### Start Workflow
```typescript
interface StartWorkflowInput {
workflowName: string;
referenceId: string;
referenceType: string;
data?: Record;
createdByUserId: string;
}
async function startWorkflow(input: StartWorkflowInput): Promise> {
return store.transaction(async (tx) => {
// Find workflow definition
const workflow = await tx
.query()
.from("WorkflowDefinition", "w")
.whereNode("w", (w) => w.name.eq(input.workflowName).and(w.isActive.eq(true)))
.select((ctx) => ctx.w)
.first();
if (!workflow) {
throw new Error(`Workflow '${input.workflowName}' not found`);
}
// Find initial state
const initialState = await tx
.query()
.from("WorkflowDefinition", "w")
.whereNode("w", (w) => w.id.eq(workflow.id))
.traverse("hasState", "e")
.to("State", "s")
.whereNode("s", (s) => s.type.eq("initial"))
.select((ctx) => ctx.s)
.first();
if (!initialState) {
throw new Error("Workflow has no initial state");
}
// Create instance
const instance = await tx.nodes.WorkflowInstance.create({
referenceId: input.referenceId,
referenceType: input.referenceType,
status: "active",
data: input.data,
createdAt: new Date().toISOString(),
});
// Link to definition and state
await tx.edges.usesDefinition.create(instance, workflow, {});
await tx.edges.currentState.create(instance, initialState, {});
// Link to creator
const creator = await tx.nodes.User.getById(input.createdByUserId);
if (!creator) throw new Error(`User not found: ${input.createdByUserId}`);
await tx.edges.createdBy.create(instance, creator, {});
return instance;
});
}
```
### Get Available Transitions
```typescript
interface AvailableTransition {
id: string;
name: string;
targetState: string;
requiredRole?: string;
condition?: string;
}
async function getAvailableTransitions(
instanceId: string,
userId: string
): Promise {
// Get user's role
const user = await store.nodes.User.getById(userId);
if (!user) throw new Error(`User not found: ${userId}`);
const userRole = user.role;
// Get current state
const currentState = await store
.query()
.from("WorkflowInstance", "i")
.whereNode("i", (i) => i.id.eq(instanceId))
.traverse("currentState", "e")
.to("State", "s")
.select((ctx) => ctx.s)
.first();
if (!currentState) {
throw new Error("Instance has no current state");
}
// Get transitions from current state
const transitions = await store
.query()
.from("State", "s")
.whereNode("s", (s) => s.id.eq(currentState.id))
.traverse("fromState", "e1", { direction: "in" })
.to("Transition", "t")
.traverse("toState", "e2")
.to("State", "target")
.select((ctx) => ({
id: ctx.t.id,
name: ctx.t.name,
targetState: ctx.target.name,
requiredRole: ctx.t.requiredRole,
condition: ctx.t.condition,
}))
.execute();
// Filter by role
return transitions.filter(
(t) => !t.requiredRole || t.requiredRole === userRole || userRole === "admin"
);
}
```
### Execute Transition
```typescript
async function executeTransition(
instanceId: string,
transitionId: string,
userId: string,
result?: Record
): Promise {
await store.transaction(async (tx) => {
const instance = await tx.nodes.WorkflowInstance.getById(instanceId);
if (!instance) throw new Error(`WorkflowInstance not found: ${instanceId}`);
if (instance.status !== "active") {
throw new Error("Workflow is not active");
}
// Verify transition is valid
const available = await getAvailableTransitions(instanceId, userId);
const transition = available.find((t) => t.id === transitionId);
if (!transition) {
throw new Error("Transition not available");
}
// Get target state
const targetState = await tx
.query()
.from("Transition", "t")
.whereNode("t", (t) => t.id.eq(transitionId))
.traverse("toState", "e")
.to("State", "s")
.select((ctx) => ctx.s)
.first();
// Remove current state edge
const currentStateEdge = await tx
.query()
.from("WorkflowInstance", "i")
.whereNode("i", (i) => i.id.eq(instanceId))
.traverse("currentState", "e")
.to("State", "s")
.select((ctx) => ctx.e.id)
.first();
if (currentStateEdge) {
await tx.edges.currentState.delete(currentStateEdge);
}
// Add new state edge
await tx.edges.currentState.create(instance, targetState!, {});
// Update instance data
const updatedData = { ...instance.data, lastTransition: transition.name, ...result };
const updates: Partial = { data: updatedData };
// Check if terminal state
if (targetState!.type === "terminal") {
updates.status = "completed";
updates.completedAt = new Date().toISOString();
}
await tx.nodes.WorkflowInstance.update(instanceId, updates);
// Complete any pending tasks
const pendingTasks = await tx
.query()
.from("WorkflowInstance", "i")
.whereNode("i", (i) => i.id.eq(instanceId))
.traverse("hasTask", "e")
.to("Task", "t")
.whereNode("t", (t) => t.status.in(["pending", "in_progress"]))
.select((ctx) => ctx.t.id)
.execute();
for (const taskId of pendingTasks) {
await tx.nodes.Task.update(taskId, {
status: "completed",
completedAt: new Date().toISOString(),
});
}
// Create tasks for new state if needed
if (targetState!.type === "approval") {
await createApprovalTask(tx, instanceId, targetState!, userId);
}
});
}
```
## Task Management
### Create Approval Task
```typescript
async function createApprovalTask(
tx: Transaction,
instanceId: string,
state: Node,
requesterId: string
): Promise {
const config = state.config as { approverRole: string; timeout: string } | undefined;
if (!config) return;
// Find approver (first user with matching role, or requester's manager)
let approver = await tx
.query()
.from("User", "u")
.whereNode("u", (u) => u.role.eq(config.approverRole))
.select((ctx) => ctx.u)
.first();
// If no direct match, find in reporting chain
if (!approver) {
approver = await tx
.query()
.from("User", "requester")
.whereNode("requester", (u) => u.id.eq(requesterId))
.traverse("reportsTo", "e")
.recursive()
.to("User", "manager")
.whereNode("manager", (u) => u.role.eq(config.approverRole))
.select((ctx) => ctx.manager)
.first();
}
if (!approver) {
throw new Error(`No approver found with role '${config.approverRole}'`);
}
// Calculate due date
const dueDate = calculateDueDate(config.timeout);
// Create task
const task = await tx.nodes.Task.create({
title: `Approval Required: ${state.name}`,
description: `Please review and approve or reject.`,
type: "approval",
status: "pending",
priority: "medium",
dueDate: dueDate.toISOString(),
});
// Link task to instance and approver
const instance = await tx.nodes.WorkflowInstance.getById(instanceId);
if (!instance) throw new Error(`WorkflowInstance not found: ${instanceId}`);
await tx.edges.hasTask.create(instance, task, {});
await tx.edges.assignedTo.create(task, approver, {});
}
function calculateDueDate(timeout: string): Date {
const now = new Date();
const match = timeout.match(/^(\d+)(h|d)$/);
if (!match) return new Date(now.getTime() + 24 * 60 * 60 * 1000); // Default 24h
const value = parseInt(match[1], 10);
const unit = match[2];
if (unit === "h") {
return new Date(now.getTime() + value * 60 * 60 * 1000);
} else {
return new Date(now.getTime() + value * 24 * 60 * 60 * 1000);
}
}
```
### Get User's Tasks
```typescript
interface TaskWithContext {
id: string;
title: string;
type: string;
status: string;
priority: string;
dueDate?: string;
workflowName: string;
referenceId: string;
referenceType: string;
}
async function getUserTasks(
userId: string,
status?: "pending" | "in_progress"
): Promise {
let query = store
.query()
.from("User", "u")
.whereNode("u", (u) => u.id.eq(userId))
.traverse("assignedTo", "e", { direction: "in" })
.to("Task", "t");
if (status) {
query = query.whereNode("t", (t) => t.status.eq(status));
} else {
query = query.whereNode("t", (t) => t.status.in(["pending", "in_progress"]));
}
return query
.traverse("hasTask", "e2", { direction: "in" })
.to("WorkflowInstance", "i")
.traverse("usesDefinition", "e3")
.to("WorkflowDefinition", "w")
.select((ctx) => ({
id: ctx.t.id,
title: ctx.t.title,
type: ctx.t.type,
status: ctx.t.status,
priority: ctx.t.priority,
dueDate: ctx.t.dueDate,
workflowName: ctx.w.name,
referenceId: ctx.i.referenceId,
referenceType: ctx.i.referenceType,
}))
.orderBy((ctx) => ctx.t.dueDate, "asc")
.execute();
}
```
### Complete Task
```typescript
async function completeTask(
taskId: string,
userId: string,
decision: "approve" | "reject",
comment?: string
): Promise {
await store.transaction(async (tx) => {
const task = await tx.nodes.Task.getById(taskId);
if (!task) throw new Error(`Task not found: ${taskId}`);
// Verify user is assigned
const assignee = await tx
.query()
.from("Task", "t")
.whereNode("t", (t) => t.id.eq(taskId))
.traverse("assignedTo", "e")
.to("User", "u")
.select((ctx) => ctx.u.id)
.first();
if (assignee !== userId) {
throw new Error("User is not assigned to this task");
}
// Update task
await tx.nodes.Task.update(taskId, {
status: decision === "approve" ? "completed" : "rejected",
completedAt: new Date().toISOString(),
result: { decision },
});
// Add comment if provided
if (comment) {
const commentNode = await tx.nodes.Comment.create({
content: comment,
createdAt: new Date().toISOString(),
});
await tx.edges.hasComment.create(task, commentNode, {});
const user = await tx.nodes.User.getById(userId);
if (!user) throw new Error(`User not found: ${userId}`);
await tx.edges.createdBy.create(commentNode, user, {});
}
// Get workflow instance
const instance = await tx
.query()
.from("Task", "t")
.whereNode("t", (t) => t.id.eq(taskId))
.traverse("hasTask", "e", { direction: "in" })
.to("WorkflowInstance", "i")
.select((ctx) => ctx.i)
.first();
// Find and execute the appropriate transition
const transitions = await getAvailableTransitions(instance!.id, userId);
const transition = transitions.find((t) =>
decision === "approve" ? t.name === "Approve" : t.name === "Reject"
);
if (transition) {
await executeTransition(instance!.id, transition.id, userId, { decision });
}
});
}
```
## Escalation
### Check Overdue Tasks
```typescript
async function getOverdueTasks(): Promise> {
const now = new Date().toISOString();
return store
.query()
.from("Task", "t")
.whereNode("t", (t) =>
t.status
.in(["pending", "in_progress"])
.and(t.dueDate.isNotNull())
.and(t.dueDate.lt(now))
)
.traverse("assignedTo", "e")
.to("User", "u")
.select((ctx) => ({
task: ctx.t,
assignee: ctx.u,
}))
.execute();
}
```
### Escalate Task
```typescript
async function escalateTask(taskId: string): Promise {
await store.transaction(async (tx) => {
// Get current assignee
const currentAssignment = await tx
.query()
.from("Task", "t")
.whereNode("t", (t) => t.id.eq(taskId))
.traverse("assignedTo", "e")
.to("User", "u")
.select((ctx) => ({ edgeId: ctx.e.id, user: ctx.u }))
.first();
if (!currentAssignment) {
throw new Error("Task has no assignee");
}
// Find manager in reporting chain
const manager = await tx
.query()
.from("User", "u")
.whereNode("u", (u) => u.id.eq(currentAssignment.user.id))
.traverse("reportsTo", "e")
.to("User", "manager")
.select((ctx) => ctx.manager)
.first();
if (!manager) {
throw new Error("No manager found for escalation");
}
// Update task
await tx.nodes.Task.update(taskId, {
status: "escalated",
priority: "urgent",
});
// Reassign to manager
await tx.edges.assignedTo.delete(currentAssignment.edgeId);
const task = await tx.nodes.Task.getById(taskId);
if (!task) throw new Error(`Task not found: ${taskId}`);
await tx.edges.assignedTo.create(task, manager, {});
// Add escalation comment
const comment = await tx.nodes.Comment.create({
content: `Task escalated from ${currentAssignment.user.name} due to timeout`,
createdAt: new Date().toISOString(),
});
await tx.edges.hasComment.create(task, comment, {});
});
}
```
### Run Escalation Job
```typescript
async function runEscalationJob(): Promise<{ escalated: number }> {
const overdueTasks = await getOverdueTasks();
let escalated = 0;
for (const { task } of overdueTasks) {
try {
await escalateTask(task.id);
escalated++;
} catch (error) {
console.error(`Failed to escalate task ${task.id}:`, error);
}
}
return { escalated };
}
```
## Workflow History
### Get Instance Timeline
```typescript
interface TimelineEvent {
timestamp: string;
type: "state_change" | "task_created" | "task_completed" | "comment";
description: string;
actor?: string;
}
async function getInstanceTimeline(instanceId: string): Promise {
const events: TimelineEvent[] = [];
// Get state change history using temporal queries
const stateHistory = await store
.query()
.from("WorkflowInstance", "i")
.temporal("includeEnded")
.whereNode("i", (i) => i.id.eq(instanceId))
.traverse("currentState", "e")
.to("State", "s")
.orderBy((ctx) => ctx.e.validFrom, "asc")
.select((ctx) => ({
stateName: ctx.s.name,
timestamp: ctx.e.validFrom,
}))
.execute();
for (const state of stateHistory) {
events.push({
timestamp: state.timestamp,
type: "state_change",
description: `Entered state: ${state.stateName}`,
});
}
// Get task events
const tasks = await store
.query()
.from("WorkflowInstance", "i")
.whereNode("i", (i) => i.id.eq(instanceId))
.traverse("hasTask", "e")
.to("Task", "t")
.optionalTraverse("assignedTo", "a")
.to("User", "u")
.select((ctx) => ({
title: ctx.t.title,
status: ctx.t.status,
createdAt: ctx.t.createdAt,
completedAt: ctx.t.completedAt,
assignee: ctx.u?.name,
}))
.execute();
for (const task of tasks) {
events.push({
timestamp: task.createdAt.toISOString(),
type: "task_created",
description: `Task created: ${task.title}`,
actor: task.assignee,
});
if (task.completedAt) {
events.push({
timestamp: task.completedAt,
type: "task_completed",
description: `Task ${task.status}: ${task.title}`,
actor: task.assignee,
});
}
}
// Sort by timestamp
return events.sort((a, b) => a.timestamp.localeCompare(b.timestamp));
}
```
## Next Steps
- [Document Management](/examples/document-management) - CMS with semantic search
- [Product Catalog](/examples/product-catalog) - Categories, variants, inventory
- [Audit Trail](/examples/audit-trail) - Complete change tracking