New in v2.1: The CREATE CHANGEFEED statement creates a new changefeed, which provides row-level change subscriptions.
Changefeeds target an allowlist of tables, called the "watched rows." Every change to a watched row is emitted as a record in a configurable format (JSON) to a configurable sink (Kafka).
For more information, see Change Data Capture.
This feature is under active development and only works for a targeted a use case. Please file a Github issue if you have feedback on the interface.
CREATE CHANGEFEED is an enterprise-only. There will be a core version in a future version.
Required privileges
Changefeeds can only be created by superusers, i.e., members of the admin role. The admin role exists by default with root as the member.
Synopsis
Parameters
| Parameter | Description | 
|---|---|
| table_name | The name of the table (or tables in a comma separated list) to create a changefeed for. | 
| sink | The location of the configurable sink. The scheme of the URI indicates the type; currently, only kafka. There are query parameters that vary per type. Currently, thekafkascheme only hastopic_prefix, which adds a prefix to all of the topic names.Sink URI scheme: '[scheme]://[host]:[port][?topic_prefix=[foo]]'For example, CREATE CHANGEFEED FOR TABLE foo INTO 'kafka://...?topic_prefix=bar_'would emit rows under the topicbar_fooinstead offoo. | 
| option/value | For a list of available options and their values, see Options below. | 
Options
| Option | Value | Description | 
|---|---|---|
| updated | N/A | Include updated timestamps with each row. | 
| resolved | INTERVAL | Periodically emit resolved timestamps to the changefeed. Optionally, set a minimum duration between emitting resolved timestamps. If unspecified, all resolved timestamps are emitted. Example: resolved='10s' | 
| envelope | key_only/row | Use key_onlyto emit only the key and no value, which is faster if you only want to know when the key changes.Default: envelope=row | 
| cursor | Timestamp | Emits any changes after the given timestamp, but does not output the current state of the table first. If cursoris not specified, the changefeed starts by doing a consistent scan of all the watched rows and emits the current value, then moves to emitting any changes that happen after the scan.cursorcan be used to start a new changefeed where a previous changefeed ended.Example: CURSOR='1536242855577149065.0000000000' | 
| format | json/'experimental-avro' | Format of the emitted record. Currently, support for Avro is limited and experimental. Default: format=json. | 
| confluent_schema_registry | Schema Registry address | The Schema Registry address is required to use 'experimental-avro'. | 
Examples
Create a changefeed
> CREATE CHANGEFEED FOR TABLE name
  INTO 'kafka://host:port'
  WITH updated, resolved;
+--------------------+
|       job_id       |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)
For more information on how to create a changefeed connected to Kafka, see Change Data Capture.
Create a changefeed with Avro
> CREATE CHANGEFEED FOR TABLE name
  INTO 'kafka://host:port'
  WITH format = 'experimental-avro', confluent_schema_registry = '<schema_registry_address>';
+--------------------+
|       job_id       |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)
For more information on how to create a changefeed that emits an Avro record, see Change Data Capture.
Manage a changefeed
Use the following SQL statements to pause, resume, and cancel a changefeed.
Changefeed-specific SQL statements (e.g., CANCEL CHANGEFEED) will be added in the future.
Pause a changefeed
> PAUSE JOB job_id;
For more information, see PAUSE JOB.
Resume a paused changefeed
> RESUME JOB job_id;
For more information, see RESUME JOB.
Cancel a changefeed
> CANCEL JOB job_id;
For more information, see CANCEL JOB.
Start a new changefeed where another ended
Find the high-water timestamp for the ended changefeed:
> SELECT * FROM crdb_internal.jobs WHERE job_id = <job_id>;
        job_id       |  job_type  | ... |      high_water_timestamp      | error | coordinator_id
+--------------------+------------+ ... +--------------------------------+-------+----------------+
  383870400694353921 | CHANGEFEED | ... | 1537279405671006870.0000000000 |       |              1
(1 row)
Use the high_water_timestamp to start the new changefeed:
> CREATE CHANGEFEED FOR TABLE name
  INTO 'kafka//host:port'
  WITH cursor = '<high_water_timestamp>';