This guide shows you how to replicate tables between databases using Sequin for change data capture.

By the end, you’ll have a pipeline that streams database changes to your destination database while handling common challenges like race conditions, schema transformations, and recovery from failures.

When to use this guide

This approach works well when you need to:

  • Sync data between systems while applying transformations
  • Consolidate data from multiple Postgres tables or databases to a single destination
  • Programmatically replicate data between development or staging environments

Prerequisites

If you’re self-hosting Sequin, you’ll need:

  1. Sequin installed
  2. A database connected
  3. A destination database ready to receive updates
  4. A sink destination (like SQS, Kafka, Redis, or HTTP)

If you’re using Sequin Cloud, you’ll need:

  1. A Sequin Cloud account
  2. A database connected
  3. A destination database ready to receive updates
  4. A sink destination (like SQS, Kafka, Redis, or HTTP)

If using SQS, be sure to use a FIFO queue.

Architecture overview

Your table replication pipeline will have these components:

  1. Source table(s): The table(s) in Postgres that you want to replicate
  2. Destination sink: The message queue or webhook endpoint that delivers changes to your processing system (e.g. SQS, Kafka, or HTTP endpoint)
  3. Replication processor: An application or service you write that receives changes and upserts to your destination table

Create a sink

First, create a sink to the queue, stream, or webhook endpoint that you want to use to process changes:

1

Select the source

Select the table you want to replicate.

Optionally add SQL filters to sync a subset of your source table to the destination.

2

Select the message type

Leave the default “Changes” message type selected.

3

Leave message grouping default

If your sink supports message grouping, leave the default option selected for “Message grouping”.

This will ensure that messages are grouped by primary key, helping eliminate race conditions as you write rows to your destination.

4

Specify backfill

To replicate data currently in your source table, specify backfill.

Backfill messages are change messages where the action is read.

5

Configure sink-specific settings

Configure sink-specific settings and click “Create Sink”.

Process changes

Once your sink is configured, changes from your source table will flow to your message queue or HTTP endpoint. Before implementing your change processor, consider these key requirements for reliable table replication:

Important considerations

  1. Type casting: JSON messages transport data types differently than Postgres, often with less specificity:

    • UUIDs arrive as strings
    • Timestamps/dates arrive as ISO-8601 strings
    • Decimals/numerics arrive as numbers

    You will need to cast types appropriately in your processor.

  2. Batch processing: For better performance, batch your database operations:

    • Consider your message queue’s batching capabilities (e.g., SQS batch size)
    • Group similar operations (inserts/updates vs deletes) and perform them in a single database operation
  3. Idempotency: Always use ON CONFLICT clauses to handle duplicate messages safely

Example: Basic 1-to-1 replication

First, create a destination table that matches your source schema:

create_table.sql
create table replicated_products (
  product_id uuid primary key,  -- matches source table PK
  name text,
  price decimal,
  created_at timestamp
);

As you’ll see in the next section, you want to ensure your destination table has a primary key or unique constraint that matches your source table to enable idempotent upserts.

Process changes

Here’s an example of how to process changes to your destination table:

process_change.py
import json
import uuid

def process_change(change):
    product_id = uuid.UUID(change.record['id'])
    
    if change.action in ['insert', 'update', 'read']:
        # Map source table to appropriate column
        record = {
            'product_id': product_id,
            'data': json.dumps(change.record)
        }
        
        db.execute(f"""
            insert into replicated_products (product_id, data)
            values (%(product_id)s, %(data)s)
            on conflict (product_id) do update set
                data = excluded.data
        """, record)
        
    elif change.action == 'delete':
        db.execute(f"""
            delete from replicated_products 
            where product_id = %(product_id)s
        """, {'product_id': product_id})

For better performance, consider batching multiple changes into a single database operation. Batching increases throughput while still maintaining transactional guarantees.

With this approach, you can efficiently replicate changes from your source table to your destination table in ~25 lines of code.

Example: Fan-in replication

When combining multiple source tables into a single destination table, you’ll need to carefully consider your primary key strategy. Here are two approaches:

Option 1: Composite key with source identifier

This approach works well when all source tables share the same primary key type (e.g., UUID):

create table combined_records (
  source_table text,
  record_id uuid,
  data jsonb,
  primary key (source_table, record_id)
);

Here’s how to process changes with this schema:

def process_change(change):
    source_table = change.metadata.table_name
    record_id = uuid.UUID(change.record['id'])
    
    if change.action in ['insert', 'update', 'read']:
        # Map source table to appropriate column
        record = {
            'source_table': source_table,
            'record_id': record_id,
            'data': json.dumps(change.record)
        }
        
        db.execute("""
            insert into combined_records (source_table, record_id, data)
            values (%(source_table)s, %(record_id)s, %(data)s)
            on conflict (source_table, record_id) do update set
                data = excluded.data
        """, record)
        
    elif change.action == 'delete':
        db.execute("""
            delete from combined_records 
            where source_table = %(source_table)s 
            and record_id = %(record_id)s
        """, {'source_table': source_table, 'record_id': record_id})

Option 2: Separate primary key columns

For more complex scenarios where source tables have different primary key types or you need to maintain separate unique constraints:

create table combined_records (
  id serial primary key,
  users_id uuid,
  orders_id uuid,
  products_id uuid,
  data jsonb,
  constraint exactly_one_pk check (
    (users_id is not null)::integer +
    (orders_id is not null)::integer +
    (products_id is not null)::integer = 1
  ),
  constraint users_unique unique (users_id),
  constraint orders_unique unique (orders_id),
  constraint products_unique unique (products_id)
);

Here’s how to process changes with this schema:

def process_change(change):
    source_table = change.metadata.table_name
    record_id = uuid.UUID(change.record['id'])
    
    if change.action in ['insert', 'update', 'read']:
        # Map source table to appropriate column
        record = {
            f"{source_table}_id": record_id,
            'data': json.dumps(change.record)
        }
        
        db.execute(f"""
            insert into combined_records ({source_table}_id, data)
            values (%(id)s, %(data)s)
            on conflict ({source_table}_id) do update set
                data = excluded.data
        """, record)
        
    elif change.action == 'delete':
        db.execute(f"""
            delete from combined_records 
            where {source_table}_id = %(id)s
        """, {'id': record_id})

Verify your pipeline is working

If you specified a backfill, there should be messages in your stream ready for your system to process:

  1. On the sink overview page, click the “Messages” tab. You should see messages flowing to your sink.
  2. Check your destination table to ensure rows are being replicated as expected.

Maintenance

Re-syncing your destination

You may need to re-sync your destination in these scenarios:

  1. Schema changes: Updates to source or destination schema
  2. Logic updates: Changes to transformation logic
  3. Data recovery: Recovering from processing errors

You can use a backfill to replay data from your source table through your pipeline.

Next steps

See “Deploy to production” for guidance on copying your local sink configuration to your production environment.