Audit logs are an essential tool for tracking and recording changes in your database. With Sequin, you can create comprehensive audit logs of changes in your Postgres database to:

  • Track compliance and security: Meet regulatory requirements by monitoring sensitive data modifications
  • Debug and recover: Trace change history to investigate issues and recover from unwanted changes
  • Build user features: Create activity feeds, change history views, and undo/redo functionality
  • Capture context: Use transaction annotations to record who made changes and why

Prerequisites

If you’re self-hosting Sequin, you’ll need:

  1. Sequin installed
  2. A database connected
  3. A destination database ready to receive audit logs
  4. A sink destination (like SQS, Kafka, Redis, or HTTP)

If you’re using Sequin Cloud, you’ll need:

  1. A Sequin Cloud account
  2. A database connected
  3. A destination database ready to receive audit logs
  4. A sink destination (like SQS, Kafka, Redis, or HTTP)

If using SQS, be sure to use a FIFO queue.

Architecture overview

Your audit logging pipeline will have these components:

  1. Source table(s): The table(s) in Postgres that you want to audit
  2. Destination sink: The message queue or webhook endpoint that delivers changes to your processing system (e.g. SQS, Kafka, or HTTP endpoint)
  3. Processor: An application or service you write that receives changes and writes to your audit tables
  4. Transaction annotations: Metadata added to your transactions to provide context (who, why, where)

Create a sink

First, create a sink to the queue, stream, or webhook endpoint that you want to use to process changes:

1

Select the source

Select the table(s) you want to audit.

Optionally add SQL filters to audit a subset of your source table.

2

Select the message type

Leave the default “Changes” message type selected.

3

Leave message grouping default

If your sink supports message grouping, leave the default option selected for “Message grouping”.

This will ensure that messages are grouped by primary key, helping eliminate race conditions as you write audit logs.

4

Specify backfill

If you want to snapshot your current rows from your source table into your audit logs, specify a backfill.

Backfill messages are change messages where the action is read.

5

Configure sink-specific settings

Configure sink-specific settings and click “Create Sink”.

Annotate your transactions

To make your audit logs more valuable, use transaction annotations to add context to your changes. For example, you can record who made the change or from where.

1

Update your application code

Modify your database transaction code to include annotations:

BEGIN;
  -- Set annotations for this transaction
  SELECT pg_logical_emit_message(true, 'sequin:transaction_annotations.set', '{
    "username": "paul.atreides",
    "source": "web_ui",
    "request_id": "req_123",
    "metadata": {
      "reason": "Customer requested update",
      "client_ip": "192.168.1.100"
    }
  }');

  -- Your database operations
  UPDATE users SET email = 'new@example.com' WHERE id = 'user_123';

COMMIT;

These annotations will be included in the change messages for all operations following the annotation statement in this transaction.

Process changes

Once your sink is configured, changes from your source table will flow to your message queue or HTTP endpoint. Before implementing your audit processor, consider these key requirements for reliable audit logging:

Important considerations

  1. Idempotency: Implement idempotent processing to handle edge cases safely
    • Your target table should have a unique constraint on the event_id column
    • Use upsert operations (ON CONFLICT clauses) to handle potential duplicate messages

Duplicates are rare and only occur if your processor successfully writes to the database but fails to acknowledge messages from the queue (SQS/Kafka) or return a 200 status code (HTTP endpoints). In these cases, the message will be redelivered to ensure at-least-once delivery.

  1. Type handling: Cast JSON to Postgres types

    Sequin sends events to your consumer in JSON. Since JSON’s types are not as rich as Postgres’ types, you’ll need to cast values appropriately when writing to your database.

    Common conversions include:

    • Timestamps/dates: Cast from strings to timestamp or date
    • UUIDs: Cast from strings to uuid
    • Numeric types: Cast to decimal, bigint, etc. based on precision needs
  2. Batch processing: For better performance, batch your database operations:

    Consider your message queue’s batching capabilities (e.g., SQS batch size).

Example: Enhanced audit table with annotations

First, create an audit table to store your change history, including fields for transaction annotations:

create_table.sql
create table audit_logs (
  id serial primary key,
  event_id uuid unique not null,
  table_name text not null,
  record_id uuid not null,
  action text not null,
  old_values jsonb,
  new_values jsonb,
  created_at timestamp not null default now(),
  updated_at timestamp,
  -- Fields for transaction annotations
  username text,
  source text,
  request_id text,
  metadata jsonb
);

create unique index on audit_logs(event_id);

-- Optional: Add indexes for common queries
create index on audit_logs(table_name, record_id);
create index on audit_logs(created_at);
create index on audit_logs(username);
create index on audit_logs(request_id);

Process changes with annotations

Map changes and their annotations to your audit log table and perform an upsert operation:

process_change.py
def process_change(change):
    # Extract transaction annotations
    annotations = change.metadata.get('transaction_annotations', {})

    record = {
        'event_id': uuid.UUID(change.id),
        'table_name': change.metadata.table_name,
        'record_id': uuid.UUID(change.record['id']),
        'action': change.action,
        'old_values': json.dumps(change.changes) if change.changes else None,
        'new_values': json.dumps(change.record),
        'created_at': datetime.now(),
        'updated_at': datetime.now(),
        # Transaction annotation fields
        'username': annotations.get('username'),
        'source': annotations.get('source'),
        'request_id': annotations.get('request_id'),
        'metadata': json.dumps(annotations.get('metadata', {}))
    }

    db.execute("""
        insert into audit_logs (
            event_id, table_name, record_id,
            action, old_values, new_values,
            created_at, updated_at,
            username, source, request_id, metadata
        )
        values (
            %(event_id)s, %(table_name)s, %(record_id)s,
            %(action)s, %(old_values)s, %(new_values)s,
            %(created_at)s, %(updated_at)s,
            %(username)s, %(source)s, %(request_id)s, %(metadata)s
        )
        on conflict (event_id) do update set
            table_name = excluded.table_name,
            record_id = excluded.record_id,
            action = excluded.action,
            old_values = excluded.old_values,
            new_values = excluded.new_values,
            updated_at = excluded.updated_at,
            username = excluded.username,
            source = excluded.source,
            request_id = excluded.request_id,
            metadata = excluded.metadata
    """, record)

For better performance, consider batching multiple changes into a single database operation. Batching increases throughput while still maintaining transactional guarantees.

Your audit log table will now be populated with old and new values for each change, along with rich contextual information about who made the change and why.

Example: Enhanced Activity feed with annotations

With transaction annotations, you can create more informative activity feeds that show not just what changed, but who changed it and why:

create table activity_feed (
    id serial primary key,
    event_id uuid unique not null,
    user_id uuid not null,
    action text not null,
    description text not null,
    actor text not null,
    source text not null,
    request_id text,
    reason text,
    metadata jsonb,
    created_at timestamp not null default now()
);

create index on activity_feed(user_id, created_at);
create index on activity_feed(actor, created_at);

Process the annotated changes:

process_change.py
def process_change(change):
    # Extract transaction annotations
    annotations = change.metadata.get('transaction_annotations', {})

    description = generate_activity_description(
        change.metadata.table_name,
        change.action,
        change.record,
        annotations
    )

    activity = {
        'event_id': uuid.UUID(change.id),
        'user_id': uuid.UUID(change.record['user_id']),
        'action': change.action,
        'description': description,
        'actor': annotations.get('username', 'system'),
        'source': annotations.get('source', 'unknown'),
        'request_id': annotations.get('request_id'),
        'reason': annotations.get('metadata', {}).get('reason'),
        'metadata': json.dumps(change.record),
        'created_at': datetime.now()
    }

    db.execute("""
        insert into activity_feed (
            event_id, user_id, action,
            description, actor, source,
            request_id, reason, metadata, created_at
        )
        values (
            %(event_id)s, %(user_id)s, %(action)s,
            %(description)s, %(actor)s, %(source)s,
            %(request_id)s, %(reason)s, %(metadata)s, %(created_at)s
        )
        on conflict (event_id) do update set
            user_id = excluded.user_id,
            action = excluded.action,
            description = excluded.description,
            actor = excluded.actor,
            source = excluded.source,
            request_id = excluded.request_id,
            reason = excluded.reason,
            metadata = excluded.metadata
    """, activity)

def generate_activity_description(table, action, record, annotations):
    actor = annotations.get('username', 'System')

    if table == 'orders':
        if action == 'insert':
            return f"{actor} created order #{record['order_number']}"
        elif action == 'update':
            return f"{actor} updated order #{record['order_number']}"
    # ... etc

Compliance reporting

For regulated industries, you can use annotations to simplify compliance reporting:

def process_compliance_event(change):
    annotations = change.metadata.get('transaction_annotations', {})

    # Track data access for GDPR/HIPAA
    compliance_event = {
        'event_id': uuid.UUID(change.id),
        'timestamp': datetime.now(),
        'data_category': determine_data_category(change.metadata.table_name, change.record),
        'operation_type': change.action,
        'user_id': annotations.get('username'),
        'purpose': annotations.get('metadata', {}).get('reason'),
        'lawful_basis': annotations.get('metadata', {}).get('lawful_basis'),
        'record_id': change.record.get('id'),
        'system_source': annotations.get('source')
    }

    # Insert into compliance log
    db.execute("""
        insert into compliance_log (
            event_id, timestamp, data_category, operation_type,
            user_id, purpose, lawful_basis, record_id, system_source
        ) values (
            %(event_id)s, %(timestamp)s, %(data_category)s, %(operation_type)s,
            %(user_id)s, %(purpose)s, %(lawful_basis)s, %(record_id)s, %(system_source)s
        )
    """, compliance_event)

Verify your pipeline is working

If you specified a backfill, there should be messages in your sink ready for your system to process:

  1. On the sink overview page, click the “Messages” tab. You should see messages flowing to your sink.
  2. Check your audit tables to ensure changes are being recorded as expected.
  3. Verify that transaction annotations are being correctly captured in your audit logs.

Maintenance

Re-syncing your audit logs

You may need to re-sync your audit logs in these scenarios:

  1. Schema changes: Updates to source or audit table schema
  2. Logic updates: Changes to audit transformation logic
  3. Data recovery: Recovering from processing errors

Streaming changes, by default, does not include retention. This means that a change message is evicted from Sequin after it is propagated to the destination sink.

When streaming changes without retention, you can backfill from the source table. The change messages will be of action read and will only include the value currently in the database. Old values and deleted rows are not included.

Note that backfilled records will not include transaction annotations.

If you need the ability to re-sync your audit logs with full history of changes, you can use a Change retention.

Next steps

See “Deploy to production” for guidance on copying your local sink configuration to your production environment.