Spry LogoOpsfolio
Contributing and Support

Courier Module

Typed, protocol-aware data movement library for ETL/ELT, CDC, and streaming in Spry.

Introduction

Courier is Spry's typed, protocol-aware data movement library that treats data movement (ETL/ELT, CDC, streaming) as a first-class concept. It provides a unified API for Singer, Airbyte, and Spry-native data flows under a single "Data Movement Protocol" (DataMP).


Purpose

Courier provides essential infrastructure for:

  • Unified Protocol Abstraction - Work with Singer, Airbyte, and custom protocols through a single API
  • Type-Safe Data Flows - Build taps and targets using Zod schemas with full TypeScript support
  • Profile-Agnostic Pipelines - Write code once that works across different wire formats
  • Standardized Diagnostics - Emit and consume logs, metrics, and checkpoints consistently
  • Stream Management - Handle multiple data streams with typed schemas and metadata

Directory Structure

lib/courier/
├── protocol.ts              # Core DataMP types and engine
├── singer.ts                # Singer protocol adapters
└── airbyte.ts              # Airbyte protocol adapters

Core Concepts

Data Movement Protocol (DataMP)

The unified model for moving data consists of:

  • Streams: Named channels of data (e.g., "users", "orders")
  • Messages: Typed units of information (RECORD, STATE, SCHEMA, etc.)
  • Profiles: Concrete wire formats (Singer, Airbyte, or DataMove native)

Key Insight:

DataMP acts as a "lingua franca" for data movement - your code works with DataMP types, and Courier handles translation to/from specific protocols like Singer or Airbyte.

Profiles

Different wire format implementations:

  • Singer: Classic JSON-based protocol from the Singer ecosystem
  • Airbyte: Protocol used by Airbyte connectors
  • DataMove Protocol: Spry's internal superset with enhanced control and diagnostics

Taps and Targets

  • Tap (Source): Reads data from a source system and emits messages
  • Target (Destination): Receives messages and writes to a destination system
  • Transform: Optional intermediate processing of messages

Core Modules

protocol.ts - Data Movement Engine

The heart of Courier, implementing the DataMP specification and execution engine.

Building Simple Pipelines

import { z } from "@zod/zod";
import {
  dataMoveSingleStreamMap,
  dataMoveSingleStreamDef,
  dataMovementPipeline,
  type DataMoveTap,
  type DataMoveTarget,
} from "lib/courier/protocol.ts";

// 1. Define your data schema with Zod
const UserSchema = z.object({
  id: z.string(),
  name: z.string(),
  email: z.string().email(),
  createdAt: z.string().datetime(),
});

// 2. Create stream definition
const streamName = "users";
const schemas = dataMoveSingleStreamMap(streamName, UserSchema);

// 3. Implement a Tap (data source)
const myTap: DataMoveTap<typeof schemas> = {
  id: "my-database-tap",
  streams: {
    users: dataMoveSingleStreamDef(streamName, UserSchema),
  },
  async *read(ctx) {
    // Emit SCHEMA message first
    yield {
      protocol: "data-move-protocol",
      type: "SCHEMA",
      stream: "users",
      schema: UserSchema,
      keyProperties: ["id"],
    };

    // Emit RECORD messages
    const users = await fetchUsersFromDatabase();
    for (const user of users) {
      yield {
        protocol: "data-move-protocol",
        type: "RECORD",
        stream: "users",
        record: user,
      };
    }

    // Emit final STATE
    yield {
      protocol: "data-move-protocol",
      type: "STATE",
      value: { lastSync: new Date().toISOString() },
    };
  },
};

// 4. Implement a Target (data destination)
const myTarget: DataMoveTarget<typeof schemas> = {
  id: "my-warehouse-target",
  handleMessage(msg) {
    if (msg.type === "RECORD") {
      // Write to data warehouse
      console.log(`Writing user: ${msg.record.name}`);
      writeToWarehouse(msg.stream, msg.record);
    } else if (msg.type === "STATE") {
      // Checkpoint progress
      saveCheckpoint(msg.value);
    }
  },
};

// 5. Execute the pipeline
await dataMovementPipeline({
  tap: myTap,
  target: myTarget,
});

Pipeline Flow:

  1. Tap emits SCHEMA to define structure
  2. Tap emits RECORD messages with typed data
  3. Target receives and processes each message
  4. STATE messages enable resumable sync

Message Types

DataMP supports multiple message types for different purposes:

Message TypePurposeWhen to Use
SCHEMADefine stream structureStart of stream, before records
RECORDIndividual data recordsFor each row/document
STATECheckpoint/bookmarkPeriodically to enable resume
TRACEDiagnostic loggingDebug information
ERRORError reportingWhen operations fail
BARRIERSynchronization pointBatch boundaries
METRICSPerformance dataStats and monitoring

Advanced Pipeline Features

import {
  dataMovementPipeline,
  type DataMoveMessageTransform,
} from "lib/courier/protocol.ts";

// Transform that enriches records
const enrichTransform: DataMoveMessageTransform<typeof schemas> = {
  id: "enrichment-transform",
  async *transform(msg) {
    if (msg.type === "RECORD" && msg.stream === "users") {
      // Add computed field
      const enriched = {
        ...msg.record,
        fullName: `${msg.record.name} (${msg.record.email})`,
      };
      yield { ...msg, record: enriched };
    } else {
      yield msg;
    }
  },
};

// Pipeline with state management
await dataMovementPipeline({
  tap: myTap,
  target: myTarget,
  transforms: [enrichTransform],
  initialState: { lastSync: "2024-01-01T00:00:00Z" },
  onState: (state) => {
    console.log("Checkpoint:", state);
    // Save state to enable resume after failure
    saveStateToFile(state);
  },
  logger: console,
});

State Management: Checkpointing enables incremental sync - if a pipeline fails, it can resume from the last successful STATE rather than starting over.

Multi-Stream Pipelines

import { type StreamSchemaMap } from "lib/courier/protocol.ts";

// Define multiple streams
const OrderSchema = z.object({
  id: z.string(),
  userId: z.string(),
  amount: z.number(),
  status: z.enum(["pending", "completed", "cancelled"]),
});

const schemas: StreamSchemaMap = {
  users: UserSchema,
  orders: OrderSchema,
};

const multiStreamTap: DataMoveTap<typeof schemas> = {
  id: "multi-stream-tap",
  streams: {
    users: dataMoveSingleStreamDef("users", UserSchema),
    orders: dataMoveSingleStreamDef("orders", OrderSchema),
  },
  async *read(ctx) {
    // Emit users
    for (const user of await fetchUsers()) {
      yield {
        protocol: "data-move-protocol",
        type: "RECORD",
        stream: "users",
        record: user,
      };
    }

    // Emit orders
    for (const order of await fetchOrders()) {
      yield {
        protocol: "data-move-protocol",
        type: "RECORD",
        stream: "orders",
        record: order,
      };
    }
  },
};

singer.ts - Singer Protocol Support

Adapters for working with the Singer ecosystem (taps and targets).

Understanding Singer Wire Format

Singer uses JSONL (JSON Lines) with specific message types:

{"type": "SCHEMA", "stream": "users", "schema": {...}, "key_properties": ["id"]}
{"type": "RECORD", "stream": "users", "record": {"id": "1", "name": "Alice"}}
{"type": "STATE", "value": {"bookmark": "2024-01-01"}}

Converting to Singer Format

import {
  singerWireMessageSchema,
  dataMoveTypedRecordToSingerRecordWireSchema,
  dataMoveSingerSchemaWireFromMetaSchema,
} from "lib/courier/singer.ts";

// Convert DataMP RECORD to Singer wire format
const singerRecord = dataMoveTypedRecordToSingerRecordWireSchema(
  {
    protocol: "data-move-protocol",
    type: "RECORD",
    stream: "users",
    record: { id: "1", name: "Alice", email: "alice@example.com" },
  }
);
// Result: { type: "RECORD", stream: "users", record: {...} }

// Generate Singer SCHEMA message from metadata
const singerSchema = dataMoveSingerSchemaWireFromMetaSchema({
  stream: "users",
  schema: UserSchema,
  keyProperties: ["id"],
});
// Result: { type: "SCHEMA", stream: "users", schema: {...}, key_properties: ["id"] }

Validating Singer Messages

import { singerWireMessageSchema } from "lib/courier/singer.ts";

const rawMessage = JSON.parse(line);
const validated = singerWireMessageSchema.parse(rawMessage);

// TypeScript now knows the message type
if (validated.type === "RECORD") {
  console.log(`Record for stream: ${validated.stream}`);
} else if (validated.type === "STATE") {
  console.log(`State update: ${JSON.stringify(validated.value)}`);
}

Singer Ecosystem:

Singer has 300+ pre-built taps (sources) and targets (destinations). Courier's Singer support lets you integrate with this ecosystem while maintaining type safety.


airbyte.ts - Airbyte Protocol Support

Adapters for working with Airbyte connectors.

Understanding Airbyte Wire Format

Airbyte uses JSONL with its own message structure:

{"type": "RECORD", "record": {"stream": "users", "data": {...}, "emitted_at": 1234567890}}
{"type": "STATE", "state": {"data": {...}}}
{"type": "LOG", "log": {"level": "INFO", "message": "Processing..."}}

Working with Airbyte Messages

import {
  airbyteWireMessageSchema,
  airbyteRecordMessageSchema,
} from "lib/courier/airbyte.ts";

// Parse Airbyte wire messages
const rawMessage = JSON.parse(line);
const validated = airbyteWireMessageSchema.parse(rawMessage);

if (validated.type === "RECORD") {
  const { stream, data, emitted_at } = validated.record;
  console.log(`Record from ${stream} at ${emitted_at}`);
  processRecord(data);
} else if (validated.type === "STATE") {
  saveCheckpoint(validated.state.data);
} else if (validated.type === "LOG") {
  console.log(`[${validated.log.level}] ${validated.log.message}`);
}

// Validate just record messages
const recordOnly = airbyteRecordMessageSchema.parse(validated);

Airbyte Catalog:

Airbyte has 300+ certified connectors. Courier's Airbyte support enables type-safe integration with these connectors through the DataMP abstraction.


Protocol Interfaces

DataMoveTap Interface

interface DataMoveTap<TSchemas extends StreamSchemaMap, TState = unknown> {
  id: string;
  streams: Record<keyof TSchemas, DataMoveStreamDef<any>>;
  read(ctx: {
    state?: TState;
    logger?: Logger;
  }): AsyncGenerator<DataMoveMessage<TSchemas, TState>>;
}

Key Points:

  • id: Unique identifier for the tap
  • streams: Schema definitions for all streams
  • read(): Async generator that yields messages
  • state: Optional initial state for incremental sync

DataMoveTarget Interface

interface DataMoveTarget<TSchemas extends StreamSchemaMap, TState = unknown> {
  id: string;
  handleMessage(msg: DataMoveMessage<TSchemas, TState>): void | Promise<void>;
}

Key Points:

  • id: Unique identifier for the target
  • handleMessage(): Process each message (can be sync or async)

DataMoveMessageTransform Interface

interface DataMoveMessageTransform<TSchemas extends StreamSchemaMap, TState = unknown> {
  id: string;
  transform(
    msg: DataMoveMessage<TSchemas, TState>
  ): AsyncGenerator<DataMoveMessage<TSchemas, TState>>;
}

Key Points:

  • Sits between tap and target
  • Can filter, enrich, or split messages
  • Async generator for one-to-many transformations

Common Patterns

Pattern: Database to Warehouse Sync

import { dataMovementPipeline } from "lib/courier/protocol.ts";

// Source: PostgreSQL database
const postgresTap: DataMoveTap<typeof schemas> = {
  id: "postgres-tap",
  streams: { users: userStreamDef },
  async *read(ctx) {
    const lastSync = ctx.state?.lastId || 0;
    
    // Query with WHERE clause for incremental sync
    const rows = await db.query(
      "SELECT * FROM users WHERE id > $1 ORDER BY id",
      [lastSync]
    );

    for (const row of rows) {
      yield {
        protocol: "data-move-protocol",
        type: "RECORD",
        stream: "users",
        record: row,
      };
    }

    // Save bookmark
    if (rows.length > 0) {
      yield {
        protocol: "data-move-protocol",
        type: "STATE",
        value: { lastId: rows[rows.length - 1].id },
      };
    }
  },
};

// Destination: Snowflake warehouse
const snowflakeTarget: DataMoveTarget<typeof schemas> = {
  id: "snowflake-target",
  handleMessage(msg) {
    if (msg.type === "RECORD") {
      snowflake.insert("users_table", msg.record);
    }
  },
};

await dataMovementPipeline({
  tap: postgresTap,
  target: snowflakeTarget,
  initialState: await loadLastState(),
  onState: async (state) => {
    await saveState(state);
  },
});

Pattern: API to Database ETL

// Tap: REST API with pagination
const apiTap: DataMoveTap<typeof schemas> = {
  id: "api-tap",
  streams: { orders: orderStreamDef },
  async *read(ctx) {
    let page = ctx.state?.nextPage || 1;
    let hasMore = true;

    while (hasMore) {
      const response = await fetch(
        `https://api.example.com/orders?page=${page}`
      );
      const data = await response.json();

      for (const order of data.results) {
        yield {
          protocol: "data-move-protocol",
          type: "RECORD",
          stream: "orders",
          record: order,
        };
      }

      // Checkpoint after each page
      page++;
      hasMore = data.hasMore;
      
      yield {
        protocol: "data-move-protocol",
        type: "STATE",
        value: { nextPage: page },
      };
    }
  },
};

// Target: SQLite database
const sqliteTarget: DataMoveTarget<typeof schemas> = {
  id: "sqlite-target",
  handleMessage(msg) {
    if (msg.type === "RECORD") {
      db.run(
        "INSERT OR REPLACE INTO orders VALUES (?, ?, ?)",
        [msg.record.id, msg.record.amount, msg.record.status]
      );
    }
  },
};

Pattern: Stream Filtering Transform

const filterTransform: DataMoveMessageTransform<typeof schemas> = {
  id: "active-users-filter",
  async *transform(msg) {
    if (msg.type === "RECORD" && msg.stream === "users") {
      // Only pass through active users
      if (msg.record.status === "active") {
        yield msg;
      }
      // Drop inactive users (don't yield)
    } else {
      // Pass through all non-record messages
      yield msg;
    }
  },
};

await dataMovementPipeline({
  tap: usersTap,
  target: activeUsersTarget,
  transforms: [filterTransform],
});

Pattern: Data Enrichment Transform

const enrichTransform: DataMoveMessageTransform<typeof schemas> = {
  id: "user-enrichment",
  async *transform(msg) {
    if (msg.type === "RECORD" && msg.stream === "users") {
      // Look up additional data
      const profile = await fetchUserProfile(msg.record.id);
      const location = await geocode(msg.record.address);

      yield {
        ...msg,
        record: {
          ...msg.record,
          profileComplete: profile.completeness,
          latitude: location.lat,
          longitude: location.lng,
        },
      };
    } else {
      yield msg;
    }
  },
};

Pattern: Record Splitting Transform

const splitTransform: DataMoveMessageTransform<typeof schemas> = {
  id: "order-line-splitter",
  async *transform(msg) {
    if (msg.type === "RECORD" && msg.stream === "orders") {
      // Split order into line items
      for (const item of msg.record.lineItems) {
        yield {
          protocol: "data-move-protocol",
          type: "RECORD",
          stream: "order_items",
          record: {
            orderId: msg.record.id,
            productId: item.productId,
            quantity: item.quantity,
            price: item.price,
          },
        };
      }
    } else {
      yield msg;
    }
  },
};

Helper Functions

dataMoveSingleStreamMap

Create a schema map for a single stream:

import { dataMoveSingleStreamMap } from "lib/courier/protocol.ts";

const schemas = dataMoveSingleStreamMap("users", UserSchema);
// Equivalent to: { users: UserSchema }

dataMoveSingleStreamDef

Create a complete stream definition:

import { dataMoveSingleStreamDef } from "lib/courier/protocol.ts";

const streamDef = dataMoveSingleStreamDef("users", UserSchema, {
  keyProperties: ["id"],
  replicationMethod: "INCREMENTAL",
  replicationKey: "updated_at",
});

Options:

  • keyProperties: Primary key fields
  • replicationMethod: "FULL_TABLE" or "INCREMENTAL"
  • replicationKey: Field to use for incremental sync
  • bookmarkProperties: Fields for cursor-based pagination

Type Safety Benefits

Why Type Safety Matters:

  • Compile-Time Validation: TypeScript catches schema mismatches before runtime
  • IntelliSense Support: Auto-completion for record fields
  • Refactoring Safety: Rename fields across tap and target with confidence
  • Documentation: Types serve as always-up-to-date documentation

Example: Type Errors Caught Early

const UserSchema = z.object({
  id: z.string(),
  name: z.string(),
  email: z.string().email(),
});

const tap: DataMoveTap<typeof schemas> = {
  id: "my-tap",
  streams: { users: userStreamDef },
  async *read(ctx) {
    yield {
      protocol: "data-move-protocol",
      type: "RECORD",
      stream: "users",
      record: {
        id: "1",
        name: "Alice",
        // TS Error: Property 'email' is missing!
      },
    };
  },
};

Diagnostic Messages

TRACE Messages

For debug logging and operational visibility:

yield {
  protocol: "data-move-protocol",
  type: "TRACE",
  nature: "TRACE",
  message: "Fetching page 5 of API results",
  metadata: { page: 5, totalRecords: 1234 },
};

ERROR Messages

For error reporting without stopping the pipeline:

try {
  await processRecord(record);
} catch (err) {
  yield {
    protocol: "data-move-protocol",
    type: "ERROR",
    nature: "ERROR",
    message: err.message,
    metadata: { recordId: record.id, stack: err.stack },
  };
}

METRICS Messages

For performance monitoring:

yield {
  protocol: "data-move-protocol",
  type: "METRICS",
  nature: "METRICS",
  metrics: {
    recordsProcessed: 1000,
    durationMs: 5432,
    bytesTransferred: 102400,
  },
};

BARRIER Messages

For batch synchronization:

// After processing 1000 records
yield {
  protocol: "data-move-protocol",
  type: "BARRIER",
  nature: "BARRIER",
  message: "Batch complete",
  metadata: { batchSize: 1000 },
};

Best Practices

1. Always Emit STATE Messages

Enable incremental sync and failure recovery:

// No state tracking
async *read(ctx) {
  for (const record of allRecords) {
    yield { type: "RECORD", stream: "users", record };
  }
}
// With state tracking
async *read(ctx) {
  const lastId = ctx.state?.lastId || 0;
  
  for (const record of recordsSince(lastId)) {
    yield { type: "RECORD", stream: "users", record };
    
    // Checkpoint periodically
    if (record.id % 1000 === 0) {
      yield { type: "STATE", value: { lastId: record.id } };
    }
  }
}

2. Use Zod for Schema Validation

Catch data quality issues early:

import { z } from "@zod/zod";

const UserSchema = z.object({
  id: z.string().uuid(),  // Must be valid UUID
  email: z.string().email(),  // Must be valid email
  age: z.number().int().min(0).max(150),  // Reasonable age
  createdAt: z.string().datetime(),  // ISO datetime
});

// Invalid data throws validation error
const result = UserSchema.safeParse(record);
if (!result.success) {
  console.error("Invalid record:", result.error);
}

3. Handle Errors Gracefully

Don't let one bad record stop the entire pipeline:

async *read(ctx) {
  for (const raw of rawRecords) {
    try {
      const validated = RecordSchema.parse(raw);
      yield { type: "RECORD", stream: "users", record: validated };
    } catch (err) {
      // Log error but continue processing
      yield {
        type: "ERROR",
        nature: "ERROR",
        message: `Validation failed: ${err.message}`,
        metadata: { rawRecord: raw },
      };
    }
  }
}

4. Batch State Updates

Don't checkpoint after every single record:

// Checkpoint every 1000 records or 5 minutes
let recordCount = 0;
let lastCheckpoint = Date.now();

for (const record of records) {
  yield { type: "RECORD", stream: "users", record };
  recordCount++;

  const elapsed = Date.now() - lastCheckpoint;
  if (recordCount % 1000 === 0 || elapsed > 300000) {
    yield { type: "STATE", value: { lastId: record.id } };
    lastCheckpoint = Date.now();
  }
}

5. Use Transforms for Reusable Logic

Don't duplicate enrichment/filtering logic across taps:

// Reusable transform
const piiRedactionTransform: DataMoveMessageTransform<any> = {
  id: "pii-redaction",
  async *transform(msg) {
    if (msg.type === "RECORD") {
      yield {
        ...msg,
        record: {
          ...msg.record,
          email: redact(msg.record.email),
          phone: redact(msg.record.phone),
        },
      };
    } else {
      yield msg;
    }
  },
};

// Use in multiple pipelines
await dataMovementPipeline({
  tap: tap1,
  target: target1,
  transforms: [piiRedactionTransform],
});

await dataMovementPipeline({
  tap: tap2,
  target: target2,
  transforms: [piiRedactionTransform],
});

Performance Considerations

Performance Tips:

  • Batch Processing: Group records into batches for bulk inserts
  • Async Generators: Use async * for memory-efficient streaming
  • State Checkpointing: Balance checkpoint frequency vs. recovery time
  • Connection Pooling: Reuse database connections across records
  • Parallel Streams: Process independent streams concurrently

Batching Example

const batchTarget: DataMoveTarget<typeof schemas> = {
  id: "batch-target",
  batch: [],
  async handleMessage(msg) {
    if (msg.type === "RECORD") {
      this.batch.push(msg.record);
      
      // Bulk insert every 1000 records
      if (this.batch.length >= 1000) {
        await db.bulkInsert("users", this.batch);
        this.batch = [];
      }
    } else if (msg.type === "STATE") {
      // Flush remaining records
      if (this.batch.length > 0) {
        await db.bulkInsert("users", this.batch);
        this.batch = [];
      }
    }
  },
};

Testing

Unit Testing Taps

import { assertEquals } from "@std/assert";

Deno.test("tap emits correct records", async () => {
  const tap = myTap;
  const messages = [];
  
  for await (const msg of tap.read({})) {
    messages.push(msg);
  }
  
  assertEquals(messages.length, 3);
  assertEquals(messages[0].type, "SCHEMA");
  assertEquals(messages[1].type, "RECORD");
  assertEquals(messages[1].record.id, "1");
});

Integration Testing Pipelines

Deno.test("pipeline transfers data correctly", async () => {
  const records: any[] = [];
  
  const testTarget: DataMoveTarget<typeof schemas> = {
    id: "test-target",
    handleMessage(msg) {
      if (msg.type === "RECORD") {
        records.push(msg.record);
      }
    },
  };
  
  await dataMovementPipeline({
    tap: myTap,
    target: testTarget,
  });
  
  assertEquals(records.length, 100);
  assertEquals(records[0].id, "1");
});

Common Patterns Summary

Quick Reference:

  • Database Sync: Use STATE with incremental queries
  • API ETL: Paginate with state checkpoints
  • Filtering: Transform that conditionally yields messages
  • Enrichment: Transform that augments records
  • Splitting: Transform that yields multiple messages per input
  • Batching: Target that accumulates records before writing

Integration with Other Spry Modules

With Universal Module

import { shell } from "lib/universal/shell.ts";
import { dataMovementPipeline } from "lib/courier/protocol.ts";

// Run shell command before/after pipeline
const sh = shell({});
await sh.spawnText("pg_dump source_db > backup.sql");

await dataMovementPipeline({ tap, target });

await sh.spawnText("echo 'Sync complete' | mail admin@example.com");

With Task System

import { executionPlan, executeDAG } from "lib/universal/task.ts";

const tasks = [
  {
    id: "sync-users",
    deps: [],
    async run() {
      await dataMovementPipeline({ tap: usersTap, target });
    },
  },
  {
    id: "sync-orders",
    deps: ["sync-users"],
    async run() {
      await dataMovementPipeline({ tap: ordersTap, target });
    },
  },
];

const plan = executionPlan(tasks);
await executeDAG(plan, (task) => task.run());

Summary

Courier provides a unified, type-safe foundation for data movement in Spry. By abstracting protocol differences and providing strong typing, it enables building robust, maintainable data pipelines that work across the Singer and Airbyte ecosystems.

Key Benefits:

  • Type Safety: Zod schemas catch errors at compile time and runtime
  • Protocol Agnostic: Write once, work with Singer, Airbyte, or native
  • Incremental Sync: State management enables efficient updates
  • Composable: Transforms create reusable processing steps
  • Observable: Rich diagnostics for monitoring and debugging

How is this guide?

Last updated on

On this page