Sequin Stream is a durable, scalable, and fault-tolerant message stream that you can use with Sequin in place of additional infrastructure like Kafka or SQS. It uses Postgres tables as the underlying storage layer.

You’ll send your database changes to a Sequin Stream sink, then process messages in the Sequin Stream using the Sequin Stream API.

Sequin Stream provides exactly-once processing guarantees and supports multiple consumers working together as a consumer group to process messages in parallel.

Configuration

When you create a Sequin Stream sink, you can configure the following settings:

  • Visibility timeout

    How long a message will be invisible to other consumers after being received. When the timeout is reached, Sequin will make the message available again if it hasn’t been acknowledged. Choose a conservative value based on how long you expect your consumer to take to process a batch of messages.

  • Max ack pending

    The maximum number of messages that can be outstanding (i.e. in-flight but not yet acknowledged by your consumer group) at one time. This setting helps prevent issues where messages fail to be acknowledged, either due to errors in processing or bugs that cause too many workers to spin up. When the max ack pending limit is reached, Sequin stops delivering new messages and focuses on re-delivering unacknowledged messages.

  • Max waiting

    The maximum number of consumers that can be waiting to receive messages at one time. Once reached, subsequent /receive calls will be rejected with a 429 error. This setting helps manage your consumer group’s resource utilization and prevent excessive polling of the Sequin Stream API. The right value depends on how many parallel consumers you expect to have in your consumer group.

You’ll create a Sequin Stream sink for each database table you want to capture changes from. This means you’ll also have one consumer group for each Sequin Stream sink and, by extension, for each database table.

We’re working to support multiple tables per Sequin Stream sink, but this is not yet available.

Consumer group pattern

The Sequin Stream API allows you to pull rows from your Sequin Streams using consumer groups. Consumer groups for a Sequin Stream are like consumer groups in other streaming systems. They provide a way to safely process data in parallel with exactly-once processing guarantees.

In the consumer group pattern, each message (i.e. change or row from your database) goes to only one consumer in the group. If a consumer fails, other consumers can pick up its messages, ensuring nothing is missed.

When you set up a Sequin Stream sink, Sequin provisions an HTTP endpoint for your consumer group to pull messages from using the Sequin Stream API:

https://api.sequinstream.com/api/http_pull_consumers/{{YOUR_SEQUIN_STREAM_SINK_NAME}}/

Key attributes of the consumer group pattern include:

  • Multiple consumers: You can have many processes or workers (i.e. consumers) pulling from the same endpoint simultaneously.
  • Visibility timeout: After receiving a batch of messages, a consumer has a configurable period to process them before they become available to other consumers.
  • Acknowledgment: Consumers explicitly acknowledge processed messages to ensure exactly-once processing.
  • Automatic retries: If a consumer fails to process or acknowledge a batch, those messages become available for redelivery.

Processing messages

There are three steps in the lifecycle of processing messages with a Sequin Stream:

Receive

Your consumer requests one or a batch of available messages from the consumer group by calling the /receive endpoint.

Process

Your consumer processes the received messages.

Ack or Nack

After processing, your consumer sends an acknowledgement (ack) for successfully processed messages or a negative acknowledgement (nack) for messages that couldn’t be processed.

If a message is neither ack’d nor nack’d within the visibility timeout, Sequin automatically makes it available for other consumers in the group to process.

Receive messages

The first step is to make a call to the /receive endpoint to get one or a batch of messages:

curl -X GET https://api.sequinstream.com/api/http_pull_consumers/{{YOUR_CONSUMER_NAME}}/receive?batch_size=10 \
  -H "Authorization: Bearer {your-token}"

This will return a batch of messages. Each message will contain an ack_id and either a row or changes object:

Change Messages

{
  data: Array<{
    ack_id: string;
    record: Record<string, any>;  // The row data
    changes: Record<string, any> | null;  // Previous values for changed fields
    action: "insert" | "update" | "delete" | "read";
    metadata: {
      table_schema: string;
      table_name: string;
      commit_timestamp: string;  // ISO timestamp
      consumer: {
        id: string;
        name: string;
      };
    };
  }>;
}

Example:

{
  "data": [
    {
      "ack_id": "MTYyeJ7abUjl1pO",
      "record": {
        "id": 1234,
        "customer_id": 789,
        "status": "shipped"
      },
      "changes": {
        "status": "pending"
      },
      "action": "update",
      "metadata": {
        "table_schema": "public",
        "table_name": "orders",
        "commit_timestamp": "2024-03-20T15:30:00Z",
        "consumer": {
          "id": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
          "name": "orders_webhook"
        }
      }
    }
  ]
}

Row Messages

{
  data: Array<{
    ack_id: string;
    record: Record<string, any>;  // The row data
    metadata: {
      table_schema: string;
      table_name: string;
      consumer: {
        id: string;
        name: string;
      };
    };
  }>;
}

Example:

{
  "data": [
    {
      "ack_id": "MTYyeJ7abUjl1pO",
      "record": {
        "id": 1234,
        "customer_id": 789,
        "status": "shipped"
      },
      "metadata": {
        "table_schema": "public",
        "table_name": "orders",
        "consumer": {
          "id": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
          "name": "orders_webhook"
        }
      }
    }
  ]
}

While your consumer is processing this batch of messages, the messages will not be visible to other consumers. The amount of time these messages are not visible (i.e. the visibility timeout) defaults to 30 seconds and is configurable in your sink’s settings.

Acknowledge messages

Once your consumer has finished processing the messages, acknowledge or ack them. This tells Sequin you’re done processing them, and ensures that the consumer group won’t see them again:

curl -X POST https://api.sequinstream.com/api/http_pull_consumers/{{YOUR_CONSUMER_NAME}}/ack \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer {your-token}" \
  -d '{
    "ack_ids": ["MTYyeJ7abUjl1pO", "MTYyeJ0p73hQak"]
  }'

Nack messages

Alternatively, if your consumer is unable to process the messages, it can nack them. This tells Sequin to make the messages available for processing again right away:

curl -X POST https://api.sequinstream.com/api/http_pull_consumers/{{YOUR_CONSUMER_NAME}}/nack \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer {your-token}" \
  -d '{
    "ack_ids": ["MTYyeJ7abUjl1pO", "MTYyeJ0p73hQak"]
  }'

Nack’ing is a good option if for whatever reason you can’t process the messages right away, but you anticipate they will be processable shortly. For example, if you’re having difficulty connecting to a downstream database, you can nack in the hopes that another consumer will pick up the messages that has a working connection.

Instead of nacking, your consumer can also do nothing. After the visibility timeout expires, the messages will be made available for processing again.

Grouping and ordering

The default message grouping for a Sequin Stream is the source row’s primary key value(s). This means if a message is outstanding for a given row, subsequent changes for that row will not be delivered until the first message is acknowledged.

You can specify other columns to use for message grouping in a sink’s settings.

Changes in a group are delivered in the order that they occurred. So, you don’t need to worry about receiving a new change for a row before an older change.

Was this page helpful?