Courier Module
Typed, protocol-aware data movement library for ETL/ELT, CDC, and streaming in Spry.
Introduction
Courier is Spry's typed, protocol-aware data movement library that treats data movement (ETL/ELT, CDC, streaming) as a first-class concept. It provides a unified API for Singer, Airbyte, and Spry-native data flows under a single "Data Movement Protocol" (DataMP).
Purpose
Courier provides essential infrastructure for:
- Unified Protocol Abstraction - Work with Singer, Airbyte, and custom protocols through a single API
- Type-Safe Data Flows - Build taps and targets using Zod schemas with full TypeScript support
- Profile-Agnostic Pipelines - Write code once that works across different wire formats
- Standardized Diagnostics - Emit and consume logs, metrics, and checkpoints consistently
- Stream Management - Handle multiple data streams with typed schemas and metadata
Directory Structure
lib/courier/
├── protocol.ts # Core DataMP types and engine
├── singer.ts # Singer protocol adapters
└── airbyte.ts # Airbyte protocol adaptersCore Concepts
Data Movement Protocol (DataMP)
The unified model for moving data consists of:
- Streams: Named channels of data (e.g., "users", "orders")
- Messages: Typed units of information (RECORD, STATE, SCHEMA, etc.)
- Profiles: Concrete wire formats (Singer, Airbyte, or DataMove native)
Key Insight:
DataMP acts as a "lingua franca" for data movement - your code works with DataMP types, and Courier handles translation to/from specific protocols like Singer or Airbyte.
Profiles
Different wire format implementations:
- Singer: Classic JSON-based protocol from the Singer ecosystem
- Airbyte: Protocol used by Airbyte connectors
- DataMove Protocol: Spry's internal superset with enhanced control and diagnostics
Taps and Targets
- Tap (Source): Reads data from a source system and emits messages
- Target (Destination): Receives messages and writes to a destination system
- Transform: Optional intermediate processing of messages
Core Modules
protocol.ts - Data Movement Engine
The heart of Courier, implementing the DataMP specification and execution engine.
Building Simple Pipelines
import { z } from "@zod/zod";
import {
dataMoveSingleStreamMap,
dataMoveSingleStreamDef,
dataMovementPipeline,
type DataMoveTap,
type DataMoveTarget,
} from "lib/courier/protocol.ts";
// 1. Define your data schema with Zod
const UserSchema = z.object({
id: z.string(),
name: z.string(),
email: z.string().email(),
createdAt: z.string().datetime(),
});
// 2. Create stream definition
const streamName = "users";
const schemas = dataMoveSingleStreamMap(streamName, UserSchema);
// 3. Implement a Tap (data source)
const myTap: DataMoveTap<typeof schemas> = {
id: "my-database-tap",
streams: {
users: dataMoveSingleStreamDef(streamName, UserSchema),
},
async *read(ctx) {
// Emit SCHEMA message first
yield {
protocol: "data-move-protocol",
type: "SCHEMA",
stream: "users",
schema: UserSchema,
keyProperties: ["id"],
};
// Emit RECORD messages
const users = await fetchUsersFromDatabase();
for (const user of users) {
yield {
protocol: "data-move-protocol",
type: "RECORD",
stream: "users",
record: user,
};
}
// Emit final STATE
yield {
protocol: "data-move-protocol",
type: "STATE",
value: { lastSync: new Date().toISOString() },
};
},
};
// 4. Implement a Target (data destination)
const myTarget: DataMoveTarget<typeof schemas> = {
id: "my-warehouse-target",
handleMessage(msg) {
if (msg.type === "RECORD") {
// Write to data warehouse
console.log(`Writing user: ${msg.record.name}`);
writeToWarehouse(msg.stream, msg.record);
} else if (msg.type === "STATE") {
// Checkpoint progress
saveCheckpoint(msg.value);
}
},
};
// 5. Execute the pipeline
await dataMovementPipeline({
tap: myTap,
target: myTarget,
});Pipeline Flow:
- Tap emits SCHEMA to define structure
- Tap emits RECORD messages with typed data
- Target receives and processes each message
- STATE messages enable resumable sync
Message Types
DataMP supports multiple message types for different purposes:
| Message Type | Purpose | When to Use |
|---|---|---|
SCHEMA | Define stream structure | Start of stream, before records |
RECORD | Individual data records | For each row/document |
STATE | Checkpoint/bookmark | Periodically to enable resume |
TRACE | Diagnostic logging | Debug information |
ERROR | Error reporting | When operations fail |
BARRIER | Synchronization point | Batch boundaries |
METRICS | Performance data | Stats and monitoring |
Advanced Pipeline Features
import {
dataMovementPipeline,
type DataMoveMessageTransform,
} from "lib/courier/protocol.ts";
// Transform that enriches records
const enrichTransform: DataMoveMessageTransform<typeof schemas> = {
id: "enrichment-transform",
async *transform(msg) {
if (msg.type === "RECORD" && msg.stream === "users") {
// Add computed field
const enriched = {
...msg.record,
fullName: `${msg.record.name} (${msg.record.email})`,
};
yield { ...msg, record: enriched };
} else {
yield msg;
}
},
};
// Pipeline with state management
await dataMovementPipeline({
tap: myTap,
target: myTarget,
transforms: [enrichTransform],
initialState: { lastSync: "2024-01-01T00:00:00Z" },
onState: (state) => {
console.log("Checkpoint:", state);
// Save state to enable resume after failure
saveStateToFile(state);
},
logger: console,
});State Management: Checkpointing enables incremental sync - if a pipeline fails, it can resume from the last successful STATE rather than starting over.
Multi-Stream Pipelines
import { type StreamSchemaMap } from "lib/courier/protocol.ts";
// Define multiple streams
const OrderSchema = z.object({
id: z.string(),
userId: z.string(),
amount: z.number(),
status: z.enum(["pending", "completed", "cancelled"]),
});
const schemas: StreamSchemaMap = {
users: UserSchema,
orders: OrderSchema,
};
const multiStreamTap: DataMoveTap<typeof schemas> = {
id: "multi-stream-tap",
streams: {
users: dataMoveSingleStreamDef("users", UserSchema),
orders: dataMoveSingleStreamDef("orders", OrderSchema),
},
async *read(ctx) {
// Emit users
for (const user of await fetchUsers()) {
yield {
protocol: "data-move-protocol",
type: "RECORD",
stream: "users",
record: user,
};
}
// Emit orders
for (const order of await fetchOrders()) {
yield {
protocol: "data-move-protocol",
type: "RECORD",
stream: "orders",
record: order,
};
}
},
};singer.ts - Singer Protocol Support
Adapters for working with the Singer ecosystem (taps and targets).
Understanding Singer Wire Format
Singer uses JSONL (JSON Lines) with specific message types:
{"type": "SCHEMA", "stream": "users", "schema": {...}, "key_properties": ["id"]}
{"type": "RECORD", "stream": "users", "record": {"id": "1", "name": "Alice"}}
{"type": "STATE", "value": {"bookmark": "2024-01-01"}}Converting to Singer Format
import {
singerWireMessageSchema,
dataMoveTypedRecordToSingerRecordWireSchema,
dataMoveSingerSchemaWireFromMetaSchema,
} from "lib/courier/singer.ts";
// Convert DataMP RECORD to Singer wire format
const singerRecord = dataMoveTypedRecordToSingerRecordWireSchema(
{
protocol: "data-move-protocol",
type: "RECORD",
stream: "users",
record: { id: "1", name: "Alice", email: "alice@example.com" },
}
);
// Result: { type: "RECORD", stream: "users", record: {...} }
// Generate Singer SCHEMA message from metadata
const singerSchema = dataMoveSingerSchemaWireFromMetaSchema({
stream: "users",
schema: UserSchema,
keyProperties: ["id"],
});
// Result: { type: "SCHEMA", stream: "users", schema: {...}, key_properties: ["id"] }Validating Singer Messages
import { singerWireMessageSchema } from "lib/courier/singer.ts";
const rawMessage = JSON.parse(line);
const validated = singerWireMessageSchema.parse(rawMessage);
// TypeScript now knows the message type
if (validated.type === "RECORD") {
console.log(`Record for stream: ${validated.stream}`);
} else if (validated.type === "STATE") {
console.log(`State update: ${JSON.stringify(validated.value)}`);
}Singer Ecosystem:
Singer has 300+ pre-built taps (sources) and targets (destinations). Courier's Singer support lets you integrate with this ecosystem while maintaining type safety.
airbyte.ts - Airbyte Protocol Support
Adapters for working with Airbyte connectors.
Understanding Airbyte Wire Format
Airbyte uses JSONL with its own message structure:
{"type": "RECORD", "record": {"stream": "users", "data": {...}, "emitted_at": 1234567890}}
{"type": "STATE", "state": {"data": {...}}}
{"type": "LOG", "log": {"level": "INFO", "message": "Processing..."}}Working with Airbyte Messages
import {
airbyteWireMessageSchema,
airbyteRecordMessageSchema,
} from "lib/courier/airbyte.ts";
// Parse Airbyte wire messages
const rawMessage = JSON.parse(line);
const validated = airbyteWireMessageSchema.parse(rawMessage);
if (validated.type === "RECORD") {
const { stream, data, emitted_at } = validated.record;
console.log(`Record from ${stream} at ${emitted_at}`);
processRecord(data);
} else if (validated.type === "STATE") {
saveCheckpoint(validated.state.data);
} else if (validated.type === "LOG") {
console.log(`[${validated.log.level}] ${validated.log.message}`);
}
// Validate just record messages
const recordOnly = airbyteRecordMessageSchema.parse(validated);Airbyte Catalog:
Airbyte has 300+ certified connectors. Courier's Airbyte support enables type-safe integration with these connectors through the DataMP abstraction.
Protocol Interfaces
DataMoveTap Interface
interface DataMoveTap<TSchemas extends StreamSchemaMap, TState = unknown> {
id: string;
streams: Record<keyof TSchemas, DataMoveStreamDef<any>>;
read(ctx: {
state?: TState;
logger?: Logger;
}): AsyncGenerator<DataMoveMessage<TSchemas, TState>>;
}Key Points:
id: Unique identifier for the tapstreams: Schema definitions for all streamsread(): Async generator that yields messagesstate: Optional initial state for incremental sync
DataMoveTarget Interface
interface DataMoveTarget<TSchemas extends StreamSchemaMap, TState = unknown> {
id: string;
handleMessage(msg: DataMoveMessage<TSchemas, TState>): void | Promise<void>;
}Key Points:
id: Unique identifier for the targethandleMessage(): Process each message (can be sync or async)
DataMoveMessageTransform Interface
interface DataMoveMessageTransform<TSchemas extends StreamSchemaMap, TState = unknown> {
id: string;
transform(
msg: DataMoveMessage<TSchemas, TState>
): AsyncGenerator<DataMoveMessage<TSchemas, TState>>;
}Key Points:
- Sits between tap and target
- Can filter, enrich, or split messages
- Async generator for one-to-many transformations
Common Patterns
Pattern: Database to Warehouse Sync
import { dataMovementPipeline } from "lib/courier/protocol.ts";
// Source: PostgreSQL database
const postgresTap: DataMoveTap<typeof schemas> = {
id: "postgres-tap",
streams: { users: userStreamDef },
async *read(ctx) {
const lastSync = ctx.state?.lastId || 0;
// Query with WHERE clause for incremental sync
const rows = await db.query(
"SELECT * FROM users WHERE id > $1 ORDER BY id",
[lastSync]
);
for (const row of rows) {
yield {
protocol: "data-move-protocol",
type: "RECORD",
stream: "users",
record: row,
};
}
// Save bookmark
if (rows.length > 0) {
yield {
protocol: "data-move-protocol",
type: "STATE",
value: { lastId: rows[rows.length - 1].id },
};
}
},
};
// Destination: Snowflake warehouse
const snowflakeTarget: DataMoveTarget<typeof schemas> = {
id: "snowflake-target",
handleMessage(msg) {
if (msg.type === "RECORD") {
snowflake.insert("users_table", msg.record);
}
},
};
await dataMovementPipeline({
tap: postgresTap,
target: snowflakeTarget,
initialState: await loadLastState(),
onState: async (state) => {
await saveState(state);
},
});Pattern: API to Database ETL
// Tap: REST API with pagination
const apiTap: DataMoveTap<typeof schemas> = {
id: "api-tap",
streams: { orders: orderStreamDef },
async *read(ctx) {
let page = ctx.state?.nextPage || 1;
let hasMore = true;
while (hasMore) {
const response = await fetch(
`https://api.example.com/orders?page=${page}`
);
const data = await response.json();
for (const order of data.results) {
yield {
protocol: "data-move-protocol",
type: "RECORD",
stream: "orders",
record: order,
};
}
// Checkpoint after each page
page++;
hasMore = data.hasMore;
yield {
protocol: "data-move-protocol",
type: "STATE",
value: { nextPage: page },
};
}
},
};
// Target: SQLite database
const sqliteTarget: DataMoveTarget<typeof schemas> = {
id: "sqlite-target",
handleMessage(msg) {
if (msg.type === "RECORD") {
db.run(
"INSERT OR REPLACE INTO orders VALUES (?, ?, ?)",
[msg.record.id, msg.record.amount, msg.record.status]
);
}
},
};Pattern: Stream Filtering Transform
const filterTransform: DataMoveMessageTransform<typeof schemas> = {
id: "active-users-filter",
async *transform(msg) {
if (msg.type === "RECORD" && msg.stream === "users") {
// Only pass through active users
if (msg.record.status === "active") {
yield msg;
}
// Drop inactive users (don't yield)
} else {
// Pass through all non-record messages
yield msg;
}
},
};
await dataMovementPipeline({
tap: usersTap,
target: activeUsersTarget,
transforms: [filterTransform],
});Pattern: Data Enrichment Transform
const enrichTransform: DataMoveMessageTransform<typeof schemas> = {
id: "user-enrichment",
async *transform(msg) {
if (msg.type === "RECORD" && msg.stream === "users") {
// Look up additional data
const profile = await fetchUserProfile(msg.record.id);
const location = await geocode(msg.record.address);
yield {
...msg,
record: {
...msg.record,
profileComplete: profile.completeness,
latitude: location.lat,
longitude: location.lng,
},
};
} else {
yield msg;
}
},
};Pattern: Record Splitting Transform
const splitTransform: DataMoveMessageTransform<typeof schemas> = {
id: "order-line-splitter",
async *transform(msg) {
if (msg.type === "RECORD" && msg.stream === "orders") {
// Split order into line items
for (const item of msg.record.lineItems) {
yield {
protocol: "data-move-protocol",
type: "RECORD",
stream: "order_items",
record: {
orderId: msg.record.id,
productId: item.productId,
quantity: item.quantity,
price: item.price,
},
};
}
} else {
yield msg;
}
},
};Helper Functions
dataMoveSingleStreamMap
Create a schema map for a single stream:
import { dataMoveSingleStreamMap } from "lib/courier/protocol.ts";
const schemas = dataMoveSingleStreamMap("users", UserSchema);
// Equivalent to: { users: UserSchema }dataMoveSingleStreamDef
Create a complete stream definition:
import { dataMoveSingleStreamDef } from "lib/courier/protocol.ts";
const streamDef = dataMoveSingleStreamDef("users", UserSchema, {
keyProperties: ["id"],
replicationMethod: "INCREMENTAL",
replicationKey: "updated_at",
});Options:
keyProperties: Primary key fieldsreplicationMethod: "FULL_TABLE" or "INCREMENTAL"replicationKey: Field to use for incremental syncbookmarkProperties: Fields for cursor-based pagination
Type Safety Benefits
Why Type Safety Matters:
- Compile-Time Validation: TypeScript catches schema mismatches before runtime
- IntelliSense Support: Auto-completion for record fields
- Refactoring Safety: Rename fields across tap and target with confidence
- Documentation: Types serve as always-up-to-date documentation
Example: Type Errors Caught Early
const UserSchema = z.object({
id: z.string(),
name: z.string(),
email: z.string().email(),
});
const tap: DataMoveTap<typeof schemas> = {
id: "my-tap",
streams: { users: userStreamDef },
async *read(ctx) {
yield {
protocol: "data-move-protocol",
type: "RECORD",
stream: "users",
record: {
id: "1",
name: "Alice",
// TS Error: Property 'email' is missing!
},
};
},
};Diagnostic Messages
TRACE Messages
For debug logging and operational visibility:
yield {
protocol: "data-move-protocol",
type: "TRACE",
nature: "TRACE",
message: "Fetching page 5 of API results",
metadata: { page: 5, totalRecords: 1234 },
};ERROR Messages
For error reporting without stopping the pipeline:
try {
await processRecord(record);
} catch (err) {
yield {
protocol: "data-move-protocol",
type: "ERROR",
nature: "ERROR",
message: err.message,
metadata: { recordId: record.id, stack: err.stack },
};
}METRICS Messages
For performance monitoring:
yield {
protocol: "data-move-protocol",
type: "METRICS",
nature: "METRICS",
metrics: {
recordsProcessed: 1000,
durationMs: 5432,
bytesTransferred: 102400,
},
};BARRIER Messages
For batch synchronization:
// After processing 1000 records
yield {
protocol: "data-move-protocol",
type: "BARRIER",
nature: "BARRIER",
message: "Batch complete",
metadata: { batchSize: 1000 },
};Best Practices
1. Always Emit STATE Messages
Enable incremental sync and failure recovery:
// No state tracking
async *read(ctx) {
for (const record of allRecords) {
yield { type: "RECORD", stream: "users", record };
}
}// With state tracking
async *read(ctx) {
const lastId = ctx.state?.lastId || 0;
for (const record of recordsSince(lastId)) {
yield { type: "RECORD", stream: "users", record };
// Checkpoint periodically
if (record.id % 1000 === 0) {
yield { type: "STATE", value: { lastId: record.id } };
}
}
}2. Use Zod for Schema Validation
Catch data quality issues early:
import { z } from "@zod/zod";
const UserSchema = z.object({
id: z.string().uuid(), // Must be valid UUID
email: z.string().email(), // Must be valid email
age: z.number().int().min(0).max(150), // Reasonable age
createdAt: z.string().datetime(), // ISO datetime
});
// Invalid data throws validation error
const result = UserSchema.safeParse(record);
if (!result.success) {
console.error("Invalid record:", result.error);
}3. Handle Errors Gracefully
Don't let one bad record stop the entire pipeline:
async *read(ctx) {
for (const raw of rawRecords) {
try {
const validated = RecordSchema.parse(raw);
yield { type: "RECORD", stream: "users", record: validated };
} catch (err) {
// Log error but continue processing
yield {
type: "ERROR",
nature: "ERROR",
message: `Validation failed: ${err.message}`,
metadata: { rawRecord: raw },
};
}
}
}4. Batch State Updates
Don't checkpoint after every single record:
// Checkpoint every 1000 records or 5 minutes
let recordCount = 0;
let lastCheckpoint = Date.now();
for (const record of records) {
yield { type: "RECORD", stream: "users", record };
recordCount++;
const elapsed = Date.now() - lastCheckpoint;
if (recordCount % 1000 === 0 || elapsed > 300000) {
yield { type: "STATE", value: { lastId: record.id } };
lastCheckpoint = Date.now();
}
}5. Use Transforms for Reusable Logic
Don't duplicate enrichment/filtering logic across taps:
// Reusable transform
const piiRedactionTransform: DataMoveMessageTransform<any> = {
id: "pii-redaction",
async *transform(msg) {
if (msg.type === "RECORD") {
yield {
...msg,
record: {
...msg.record,
email: redact(msg.record.email),
phone: redact(msg.record.phone),
},
};
} else {
yield msg;
}
},
};
// Use in multiple pipelines
await dataMovementPipeline({
tap: tap1,
target: target1,
transforms: [piiRedactionTransform],
});
await dataMovementPipeline({
tap: tap2,
target: target2,
transforms: [piiRedactionTransform],
});Performance Considerations
Performance Tips:
- Batch Processing: Group records into batches for bulk inserts
- Async Generators: Use
async *for memory-efficient streaming - State Checkpointing: Balance checkpoint frequency vs. recovery time
- Connection Pooling: Reuse database connections across records
- Parallel Streams: Process independent streams concurrently
Batching Example
const batchTarget: DataMoveTarget<typeof schemas> = {
id: "batch-target",
batch: [],
async handleMessage(msg) {
if (msg.type === "RECORD") {
this.batch.push(msg.record);
// Bulk insert every 1000 records
if (this.batch.length >= 1000) {
await db.bulkInsert("users", this.batch);
this.batch = [];
}
} else if (msg.type === "STATE") {
// Flush remaining records
if (this.batch.length > 0) {
await db.bulkInsert("users", this.batch);
this.batch = [];
}
}
},
};Testing
Unit Testing Taps
import { assertEquals } from "@std/assert";
Deno.test("tap emits correct records", async () => {
const tap = myTap;
const messages = [];
for await (const msg of tap.read({})) {
messages.push(msg);
}
assertEquals(messages.length, 3);
assertEquals(messages[0].type, "SCHEMA");
assertEquals(messages[1].type, "RECORD");
assertEquals(messages[1].record.id, "1");
});Integration Testing Pipelines
Deno.test("pipeline transfers data correctly", async () => {
const records: any[] = [];
const testTarget: DataMoveTarget<typeof schemas> = {
id: "test-target",
handleMessage(msg) {
if (msg.type === "RECORD") {
records.push(msg.record);
}
},
};
await dataMovementPipeline({
tap: myTap,
target: testTarget,
});
assertEquals(records.length, 100);
assertEquals(records[0].id, "1");
});Common Patterns Summary
Quick Reference:
- Database Sync: Use STATE with incremental queries
- API ETL: Paginate with state checkpoints
- Filtering: Transform that conditionally yields messages
- Enrichment: Transform that augments records
- Splitting: Transform that yields multiple messages per input
- Batching: Target that accumulates records before writing
Integration with Other Spry Modules
With Universal Module
import { shell } from "lib/universal/shell.ts";
import { dataMovementPipeline } from "lib/courier/protocol.ts";
// Run shell command before/after pipeline
const sh = shell({});
await sh.spawnText("pg_dump source_db > backup.sql");
await dataMovementPipeline({ tap, target });
await sh.spawnText("echo 'Sync complete' | mail admin@example.com");With Task System
import { executionPlan, executeDAG } from "lib/universal/task.ts";
const tasks = [
{
id: "sync-users",
deps: [],
async run() {
await dataMovementPipeline({ tap: usersTap, target });
},
},
{
id: "sync-orders",
deps: ["sync-users"],
async run() {
await dataMovementPipeline({ tap: ordersTap, target });
},
},
];
const plan = executionPlan(tasks);
await executeDAG(plan, (task) => task.run());Summary
Courier provides a unified, type-safe foundation for data movement in Spry. By abstracting protocol differences and providing strong typing, it enables building robust, maintainable data pipelines that work across the Singer and Airbyte ecosystems.
Key Benefits:
- Type Safety: Zod schemas catch errors at compile time and runtime
- Protocol Agnostic: Write once, work with Singer, Airbyte, or native
- Incremental Sync: State management enables efficient updates
- Composable: Transforms create reusable processing steps
- Observable: Rich diagnostics for monitoring and debugging
How is this guide?
Last updated on