Enrichment allows you to enrich your messages with additional data from your database before they are sent to the sink destination. This is useful for:

  • Adding related data from other tables to your messages
  • Reducing the number of queries your downstream systems need to make
  • Building composite objects from multiple tables, ie. for search indexing

How it works

To configure enrichment on a sink consumer, you must:

  1. Create an enrichment function
  2. Configure the sink consumer to use the enrichment function

When you configure enrichment on a sink consumer, Sequin will:

  1. Batch messages that need to be enriched
  2. Execute your enrichment query against your source database with the primary keys of those messages as parameters (ie. $1)
  3. Determine which enrichment result corresponds to each message (by matching the primary key values) and set it as the message’s metadata.enrichment field
  4. Send the enriched messages through the rest of your sink pipeline, including into transform and routing functions

SQL requirements

Enrichment functions use SQL to query your source database. The SQL query must:

  1. Use parameterization with the $1 syntax
  2. Select all primary key columns from the source table
  3. Select additional fields for enrichment
  4. Return 0 or 1 row per message

Here’s an example that enriches user messages with their account information:

SELECT
  -- select all primary keys of the source table (see below)
  u.id,
  -- enrichment fields
  a.name as account_name,
  a.subscription_status as account_subscription_status
FROM
  -- source table is `users`
  users u
JOIN
  -- enrichment joins to `accounts` table
  accounts a on u.account_id = a.id
WHERE
  -- parameterize the query with the primary key values (see below)
  u.id = ANY($1)

Primary keys

Your enrichment query must select all primary key columns from the source table. These are used to match the enrichment results back to the original messages.

For example, if your table has a primary key of id, your query must select id:

SELECT
  -- you must select all primary keys of the source table
  -- for Sequin to associate the enrichment result with the message
  id,
  -- enrichment fields
  account_name,
  subscription_status
FROM users u
JOIN accounts a ON u.account_id = a.id
WHERE id = ANY($1)

Parameterization

Enrichment functions must use the $1 syntax for parameterization. This allows Sequin to safely batch multiple messages into a single query.

SELECT
  u.id, -- you must select all primary keys of the source table for Sequin to associate the enrichment with the message
  a.name as account_name -- example of a column added for enrichment
FROM
  users u -- this enrichment is triggered on `users` table change messages
JOIN
  accounts a on u.account_id = a.id
WHERE
  -- the `ANY($1)` syntax is required for Sequin to perform batched queries
  u.id = ANY($1)

Composite primary keys

If your table has componsite primary keys, you must use a relatively advanced SQL where clause to select rows with those keys. Consider a table with a primary keys of (id, account_id):

SELECT
  id,
  account_name,
  subscription_status
FROM users u
JOIN accounts a ON u.account_id = a.id

-- WHERE clause for composite primary keys
WHERE (u.account_id, u.id) = ANY(
  SELECT (r.account_id, r.id)
  FROM unnest($1, $2) AS r(account_id, id)
);

Sequin will assign the primary key values to two parameters, $1 and $2. Sequin will assign them in alphabetical order of the column names. In this case, Sequin will assign $1 to a list of all account IDs and $2 to a list of all user IDs for the batched messages.

Results

Your query should return either 0 or 1 row for each message. If your query returns multiple rows for a message, Sequin will raise an error and the entire batch will fail.

The enrichment results are set into the message’s metadata.enrichment field. This is sent in Sequin’s default message format.

Additionally, the metadata.enrichment field is available in other functions in your sink pipeline including filters, transforms, and routing functions.

Limitations

When an enrichment function runs, it queries the current state of your database, not the state at the time the message was created. This means the enriched data may have changed between when the original change occurred and when the enrichment executes.

This can lead to race conditions and therefore enrichment should only be used in situations where this race condition is acceptable. For instance in materialization use cases such as search indexing, this is typically acceptable because you want the search index to have the latest data.

In use cases where the point-in-time data is critical we recommend that you enrich your messages in Postgres with either transaction annotations or event tables.

Testing enrichment

When creating or editing an enrichment function, Sequin will automatically capture up to 10 recent events from your database. You can see how your enrichment affects these events in real-time.

When changes occur in a connected database and you have the enrichment editor open, Sequin will capture events and display them in the editor.

Example use cases

Enrich messages with data from related tables:

SELECT
  orders.id,
  customers.name as customer_name,
  customers.email as customer_email
FROM orders
JOIN customers ON orders.customer_id = customers.id
WHERE orders.id = ANY($1)

Enriching multiple rows

Enrichment functions can only return a single row per message. If you want enrichment data from multiple rows, you can use Postgres aggregation functions.

-- Sequin enrichment function to load top 3 comments for posts
-- This enriches post messages with their most recent comments

SELECT
    p.id,  -- Required: primary key from source table (posts)

    -- Aggregate top 3 comments as JSON array
    COALESCE(
        JSON_AGG(
            jsonb_build_object(
                'id', c.id,
                'author_name', c.author_name,
                'content', c.content,
                'created_at', c.created_at,
                'updated_at', c.updated_at
            )
            ORDER BY c.created_at DESC  -- Most recent comments first
        ) FILTER (WHERE c.id IS NOT NULL),
        '[]'::json
    ) AS top_comments,

    -- Additional useful aggregates
    COUNT(c.id) AS total_comment_count,
    MAX(c.created_at) AS latest_comment_at,

    -- Array of comment author names for easy access
    ARRAY_AGG(DISTINCT c.author_name) FILTER (WHERE c.author_name IS NOT NULL) AS comment_authors

FROM posts.posts p
LEFT JOIN (
    -- Subquery to get only top 3 comments per post
    SELECT
        c1.id,
        c1.post_id,
        c1.author_name,
        c1.content,
        c1.created_at,
        c1.updated_at,
        ROW_NUMBER() OVER (PARTITION BY c1.post_id ORDER BY c1.created_at DESC) as rn
    FROM posts.comments c1
) c ON p.id = c.post_id AND c.rn <= 3  -- Only top 3 comments
WHERE
    p.id = ANY($1)  -- Required: parameterization for batched queries
GROUP BY
    p.id;

Building a search index

Enrich messages with data from your database to build a search index. This example shows how to enrich messages with materials and colors for faceting:

SELECT
    p.id,
    p.name,
    p.description,
    p.price,
    p.category,
    p.created_at,
    p.updated_at,
    -- Aggregate materials as JSON array
    COALESCE(
        JSON_AGG(
            DISTINCT jsonb_build_object(
                'id', m.id,
                'name', m.name,
                'type', m.type
            )
        ) FILTER (WHERE m.id IS NOT NULL),
        '[]'::json
    ) AS materials,
    -- Aggregate colors as JSON array
    COALESCE(
        JSON_AGG(
            DISTINCT jsonb_build_object(
                'id', c.id,
                'name', c.name,
                'hex_code', c.hex_code
            )
        ) FILTER (WHERE c.id IS NOT NULL),
        '[]'::json
    ) AS colors,
    -- Create searchable text fields
    CONCAT_WS(' ',
        p.name,
        p.description,
        STRING_AGG(DISTINCT m.name, ' '),
        STRING_AGG(DISTINCT c.name, ' ')
    ) AS search_text,
    -- Material names as array for faceting
    ARRAY_AGG(DISTINCT m.name) FILTER (WHERE m.name IS NOT NULL) AS material_names,
    -- Color names as array for faceting
    ARRAY_AGG(DISTINCT c.name) FILTER (WHERE c.name IS NOT NULL) AS color_names
FROM products p
LEFT JOIN product_materials pm ON p.id = pm.product_id
LEFT JOIN materials m ON pm.material_id = m.id
LEFT JOIN product_colors pc ON p.id = pc.product_id
LEFT JOIN colors c ON pc.color_id = c.id
GROUP BY p.id, p.name, p.description, p.price, p.category, p.created_at, p.updated_at
ORDER BY p.updated_at DESC;

Formatting data

For formatting data, we recommend using transforms instead of enrichment.

Enrichment functions are designed to query additional data from your database. They make a round trip to your database.

Transform functions are significantly more expressive and performant than enrichment for pure formatting.