POST
/
sinks
curl -X POST "https://api.sequinstream.com/api/sinks" \
  -H "Authorization: Bearer YOUR_API_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "kafka-ids",
    "status": "active",
    "source": {
      "include_schemas": null,
      "exclude_schemas": null,
      "include_tables": null,
      "exclude_tables": null
    },
    "actions": ["insert", "update", "delete"],
    "destination": {
      "type": "kafka",
      "hosts": "localhost:9092",
      "tls": false,
      "topic": "records"
    },
    "database": "dune",
    "filter": "",
    "transform": "id-transform",
    "routing": "",
    "message_grouping": true,
    "batch_size": 100,
    "max_retry_count": null,
    "load_shedding_policy": "pause_on_full",
    "timestamp_format": "iso8601",
    "annotations": {}
  }'
{
  "id": "4ed2a8e5-47a7-4b51-9270-d2f4fdcb94fb",
  "name": "my-kafka-sink",
  "status": "active",
  "database": "my-database",
  "source": {
    "include_schemas": null,
    "exclude_schemas": null,
    "include_tables": null,
    "exclude_tables": null
  },
  "tables": [
    {
      "name": "public.products",
      "group_column_names": ["category"]
    }
  ],
  "actions": ["insert", "update", "delete"],
  "destination": {
    "type": "kafka",
    "hosts": "localhost:9092",
    "tls": false,
    "topic": "records",
    "username": "kafka_user",
    "password": "kafka_password",
    "sasl_mechanism": "PLAIN"
  },
  "filter": "none",
  "transform": "id-transform",
  "routing": "none",
  "message_grouping": true,
  "max_retry_count": null,
  "annotations": {},
  "active_backfills": [],
  "batch_size": 50,
  "load_shedding_policy": "pause_on_full",
  "timestamp_format": "iso8601",
  "health": {
    "name": "Consumer health",
    "status": "healthy",
    "checks": [
      {
        "name": "Sink configuration",
        "status": "healthy"
      },
      ...
    ]
  }
}

Creates a new sink consumer.

Request fields

name
string
required

The name of the sink consumer

status
string
required

The initial status of the sink consumer (active, disabled, paused)

source
object
required

The source configuration for the sink consumer

tables
array

Additional configuration for individual tables.

To configure which tables are consumed, see the source field.

actions
array
required

The database actions to include in the sink (insert, update, delete)

destination
object
required

The destination configuration for the sink consumer. The shape varies by destination type.

database
string
required

The source database for the sink consumer

filter
string

The filter function for the sink consumer

transform
string

The transform function for the sink consumer

routing
string

The routing function for the sink consumer

message_grouping
boolean

Whether message grouping is enabled for ordering purposes. When true (default), messages are grouped by primary key. When false, grouping is disabled for higher throughput batching.

batch_size
integer

Number of records to batch together (1-1000)

max_retry_count
integer

The maximum number of times a message will be retried if delivery fails. Once this limit is reached, the message will be discarded. Defaults to null, meaning messages are retried indefinitely.

load_shedding_policy
string

Determines how Sequin handles overload when sink consumers can’t keep up with incoming messages. Options are:

  • pause_on_full (default) — pauses replication until the buffer has room again
  • discard_on_full — drops messages for overloaded consumers to avoid pausing replication

For more details, see load shedding policy.

timestamp_format
string

The format of the timestamp in the source data. Possible values include iso8601 and unix_microsecond.

annotations
object

Additional metadata you can attach to this sink consumer. Annotations can be any JSON object.

Response fields

id
string

The unique identifier of the sink consumer

name
string

The name of the sink consumer

status
string

The current status of the sink consumer (active, disabled, paused)

database
string

The source database for the sink consumer

source
object

The source configuration for the sink consumer. This determines which tables are consumed.

tables
array

Additional configuration for individual tables.

To configure which tables are consumed, see the source field.

actions
array

The database actions to include in the sink (insert, update, delete)

destination
object

The destination configuration for the sink consumer. The shape varies by destination type.

filter
string

The filter function for the sink consumer

transform
string

The transform function for the sink consumer

routing
string

The routing function for the sink consumer

message_grouping
boolean

Whether message grouping is enabled for delivery ordering

See message grouping and ordering for more details.

max_retry_count
integer

The maximum number of times a message will be retried if delivery fails

annotations
object

User-defined annotations for the sink consumer

active_backfills
array

Array of active backfill IDs

batch_size
integer

Number of records to batch together (1-1000)

load_shedding_policy
string

Determines how Sequin handles overload when sink consumers can’t keep up with incoming messages

timestamp_format
string

The format of the timestamp in the source data

health
object

Health status information for the sink consumer

curl -X POST "https://api.sequinstream.com/api/sinks" \
  -H "Authorization: Bearer YOUR_API_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "kafka-ids",
    "status": "active",
    "source": {
      "include_schemas": null,
      "exclude_schemas": null,
      "include_tables": null,
      "exclude_tables": null
    },
    "actions": ["insert", "update", "delete"],
    "destination": {
      "type": "kafka",
      "hosts": "localhost:9092",
      "tls": false,
      "topic": "records"
    },
    "database": "dune",
    "filter": "",
    "transform": "id-transform",
    "routing": "",
    "message_grouping": true,
    "batch_size": 100,
    "max_retry_count": null,
    "load_shedding_policy": "pause_on_full",
    "timestamp_format": "iso8601",
    "annotations": {}
  }'
{
  "id": "4ed2a8e5-47a7-4b51-9270-d2f4fdcb94fb",
  "name": "my-kafka-sink",
  "status": "active",
  "database": "my-database",
  "source": {
    "include_schemas": null,
    "exclude_schemas": null,
    "include_tables": null,
    "exclude_tables": null
  },
  "tables": [
    {
      "name": "public.products",
      "group_column_names": ["category"]
    }
  ],
  "actions": ["insert", "update", "delete"],
  "destination": {
    "type": "kafka",
    "hosts": "localhost:9092",
    "tls": false,
    "topic": "records",
    "username": "kafka_user",
    "password": "kafka_password",
    "sasl_mechanism": "PLAIN"
  },
  "filter": "none",
  "transform": "id-transform",
  "routing": "none",
  "message_grouping": true,
  "max_retry_count": null,
  "annotations": {},
  "active_backfills": [],
  "batch_size": 50,
  "load_shedding_policy": "pause_on_full",
  "timestamp_format": "iso8601",
  "health": {
    "name": "Consumer health",
    "status": "healthy",
    "checks": [
      {
        "name": "Sink configuration",
        "status": "healthy"
      },
      ...
    ]
  }
}