Audit logs are an essential tool for tracking and recording changes in your database. With Sequin, you can create comprehensive audit logs of changes in your Postgres database to:

  • Track compliance and security: Meet regulatory requirements by monitoring sensitive data modifications
  • Debug and recover: Trace change history to investigate issues and recover from unwanted changes
  • Build user features: Create activity feeds, change history views, and undo/redo functionality

Prerequisites

If you’re self-hosting Sequin, you’ll need:

  1. Sequin installed
  2. A database connected
  3. A destination database ready to receive audit logs
  4. A sink destination (like SQS, Kafka, Redis, or HTTP)

If you’re using Sequin Cloud, you’ll need:

  1. A Sequin Cloud account
  2. A database connected
  3. A destination database ready to receive audit logs
  4. A sink destination (like SQS, Kafka, Redis, or HTTP)

If using SQS, be sure to use a FIFO queue.

Architecture overview

Your audit logging pipeline will have these components:

  1. Source table(s): The table(s) in Postgres that you want to audit
  2. Destination sink: The message queue or webhook endpoint that delivers changes to your processing system (e.g. SQS, Kafka, or HTTP endpoint)
  3. Processor: An application or service you write that receives changes and writes to your audit tables

Create a sink

First, create a sink to the queue, stream, or webhook endpoint that you want to use to process changes:

1

Select the source

Select the table(s) you want to audit.

Optionally add SQL filters to audit a subset of your source table.

2

Select the message type

Leave the default “Changes” message type selected.

3

Leave message grouping default

If your sink supports message grouping, leave the default option selected for “Message grouping”.

This will ensure that messages are grouped by primary key, helping eliminate race conditions as you write audit logs.

4

Specify backfill

If you want to snapshot your current rows from your source table into your audit logs, specify a backfill.

Backfill messages are change messages where the action is read.

5

Configure sink-specific settings

Configure sink-specific settings and click “Create Sink”.

Process changes

Once your sink is configured, changes from your source table will flow to your message queue or HTTP endpoint. Before implementing your audit processor, consider these key requirements for reliable audit logging:

Important considerations

  1. Idempotency: Implement idempotent processing to handle edge cases safely
    • Your target table should have a unique constraint on the event_id column
    • Use upsert operations (ON CONFLICT clauses) to handle potential duplicate messages

Duplicates are rare and only occur if your processor successfully writes to the database but fails to acknowledge messages from the queue (SQS/Kafka) or return a 200 status code (HTTP endpoints). In these cases, the message will be redelivered to ensure at-least-once delivery.

  1. Type handling: Cast JSON to Postgres types

    Sequin sends events to your consumer in JSON. Since JSON’s types are not as rich as Postgres’ types, you’ll need to cast values appropriately when writing to your database.

    Common conversions include:

    • Timestamps/dates: Cast from strings to timestamp or date
    • UUIDs: Cast from strings to uuid
    • Numeric types: Cast to decimal, bigint, etc. based on precision needs
  2. Batch processing: For better performance, batch your database operations:

    Consider your message queue’s batching capabilities (e.g., SQS batch size).

Example: Basic audit table

First, create an audit table to store your change history:

create_table.sql
create table audit_logs (
  id serial primary key,
  event_id uuid unique not null,
  table_name text not null,
  record_id uuid not null,
  action text not null,
  old_values jsonb,
  new_values jsonb,
  created_at timestamp not null default now(),
  updated_at timestamp
);

create unique index on audit_logs(event_id);

-- Optional: Add indexes for common queries
create index on audit_logs(table_name, record_id);
create index on audit_logs(created_at);

Process changes

Map changes to your audit log table and perform an upsert operation. We recommend using a do update clause rather than do nothing to handle duplicates.

Using do update instead of do nothing gives you more flexibility when maintaining your audit logs. If you discover bugs in your processing logic, you can fix them and replay the events - the do update clause will overwrite any incorrect values with the corrected data.

process_change.py
def process_change(change):
    record = {
        'event_id': uuid.UUID(change.id),
        'table_name': change.metadata.table_name,
        'record_id': uuid.UUID(change.record['id']),
        'action': change.action,
        'old_values': json.dumps(change.changes) if change.changes else None,
        'new_values': json.dumps(change.record),
        'created_at': datetime.now(),
        'updated_at': datetime.now()
    }

    db.execute("""
        insert into audit_logs (
            event_id, table_name, record_id, 
            action, old_values, new_values, created_at, updated_at
        )
        values (
            %(event_id)s, %(table_name)s, %(record_id)s,
            %(action)s, %(old_values)s, %(new_values)s, 
            %(created_at)s, %(updated_at)s
        )
        on conflict (event_id) do update set
            table_name = excluded.table_name,
            record_id = excluded.record_id,
            action = excluded.action,
            old_values = excluded.old_values,
            new_values = excluded.new_values,
            updated_at = excluded.updated_at
    """, record)

For better performance, consider batching multiple changes into a single database operation. Batching increases throughput while still maintaining transactional guarantees.

Your audit log table will now be populated with old and new values for each change that occurs in your source table(s).

Example: Activity feed

There may be times when you want to transform the audit data before writing it to your audit table. For example, you might want to transform the audit data into a more readable format to power a user activity feed.

Create an activity feed table to store your transformed audit data:

create table activity_feed (
    id serial primary key,
    event_id uuid unique not null,
    user_id uuid not null,
    action text not null,
    description text not null,
    metadata jsonb,
    created_at timestamp not null default now()
);

create index on activity_feed(user_id, created_at);

Map changes to the updated activity_feed table schema and perform a batch insert operation:

process_change.py
def process_change(change):
    description = generate_activity_description(
        change.metadata.table_name,
        change.action,
        change.record
    )
    
    activity = {
        'event_id': uuid.UUID(change.id),
        'user_id': uuid.UUID(change.record['user_id']),
        'action': change.action,
        'description': description,
        'metadata': json.dumps(change.record),
        'created_at': datetime.now()
    }

    db.execute("""
        insert into activity_feed (
            event_id, user_id, action,
            description, metadata, created_at
        )
        values (
            %(event_id)s, %(user_id)s, %(action)s,
            %(description)s, %(metadata)s, %(created_at)s
        )
        on conflict (event_id) do update set
            user_id = excluded.user_id,
            action = excluded.action,
            description = excluded.description,
            metadata = excluded.metadata
    """, activity)

def generate_activity_description(table, action, record):
    if table == 'orders':
        if action == 'insert':
            return f"Created order #{record['order_number']}"
        elif action == 'update':
            return f"Updated order #{record['order_number']}"
    # ... etc

Verify your pipeline is working

If you specified a backfill, there should be messages in your sink ready for your system to process:

  1. On the sink overview page, click the “Messages” tab. You should see messages flowing to your sink.
  2. Check your audit tables to ensure changes are being recorded as expected.

Maintenance

Re-syncing your audit logs

You may need to re-sync your audit logs in these scenarios:

  1. Schema changes: Updates to source or audit table schema
  2. Logic updates: Changes to audit transformation logic
  3. Data recovery: Recovering from processing errors

Streaming changes, by default, does not include retention. This means that a change message is evicted from Sequin after it is propagated to the destination sink.

When streaming changes without retention, you can backfill from the source table. The change messages will be of action read and will only include the value currently in the database. Old values and deleted rows are not included.

If you need the ability to re-sync your audit logs with full history of changes, you can use a Change retention.

Next steps

See “Deploy to production” for guidance on copying your local sink configuration to your production environment.