Kinesis Data Streams
Amazon Kinesis Data Streams enables applications to collect and process large streams of data records in real time. These applications can send the processed records to dashboards, use them to generate alerts, dynamically change pricing and advertising strategies, or send data to a variety of other AWS services.
YugabyteDB has a Debezium connector which you can use to read changes to a table and then write those into Kinesis Data Streams using the AWS SDK for Java and the Kinesis Producer Library.
Connect
To connect your YugabyteDB database to Kinesis:
-
Create a Kinesis Data Stream from AWS Management Console. Note the name of the stream you create.
-
Start a YugabyteDB cluster. Refer YugabyteDB Prerequisites.
-
Create a CDC stream using following command:
./bin/yb-admin --master_addresses 127.0.0.1:7100 create_change_data_stream ysql.yugabyte
-
Create a table using ysqlsh as follows:
CREATE TABLE users( id bigserial PRIMARY KEY, created_at timestamp, name text, email text, address text, city text, state text, zip text, birth_date text, latitude float, longitude float, password text, source text);
-
Write a Java application which will use the Debezium connector to receive CDC data from YugabyteDB and write to Kinesis Data Streams. The following code snippet shows an example implementation:
// Build Kinesis client AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); String ak = System.getenv("AWS_ACCESS_KEY_ID"); String sk = System.getenv("AWS_SECRET_ACCESS_KEY"); AWSCredentials ac = new BasicAWSCredentials(ak, sk); AWSCredentialsProvider cp = new AWSStaticCredentialsProvider(ac); ClientConfiguration cc = new ClientConfiguration(); clientBuilder.setRegion("us-west-2"); clientBuilder.setCredentials(cp); clientBuilder.setClientConfiguration(cc); AmazonKinesis kinesisClient = clientBuilder.build(); private String lastSequenceNumber; // used while writing into Kinesis Data Stream // Configure properties Properties props = new Properties(); props.setProperty("name", "engine"); props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore"); props.setProperty("offset.storage.file.filename", "/tmp/offsets.dat"); props.setProperty("offset.flush.interval.ms", "60000"); props.setProperty("connector.class", "io.debezium.connector.yugabytedb.YugabyteDBConnector "); props.setProperty("database.streamid", <stream_id>); // provide stream id generated above props.setProperty("database.master.addresses", <master_address>); // provide master address, for local cluster it is 127.0.0.1:7100 props.setProperty("table.include.list", "public.users"); props.setProperty("database.hostname", <hostname>); // provide hostname, for local cluster it is 127.0.0.1 props.setProperty("database.port", "5433"); props.setProperty("database.user", "yugabyte"); props.setProperty("database.password","yugabyte"); props.setProperty("database.dbname", "yugabyte"); props.setProperty("database.server.name", "dbserver1"); props.setProperty("snapshot.mode", "never"); // Create Debezium engine DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class) .using(props) .notifying(record -> { try { System.out.println("Topic: " + record.destination()); putRecordIntoKinesis(record.value()); } catch (Throwable t) { System.out.println("Failed putRecord: " + t); } }).build() // code in putRecordIntoKinesis(): // Use city name as partition key String pk = "NO CITY"; try { JSONObject o = (JSONObject) new JSONParser().parse(record); JSONObject payload = (JSONObject) o.get("payload"); JSONObject record = (JSONObject) payload.get("after"); Map<String, String> city = (Map) record.get("city"); System.out.println("Found city: " + city.get("value")); pk = city.get("value"); } catch (ParseException pe) { System.out.println("Failed record: " + record); } PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName( "yugabytedb-to-kinesis" ); // This is the name of data stream created on AWS putRecordRequest.setData(ByteBuffer.wrap( record.getBytes() )); putRecordRequest.setPartitionKey( pk ); putRecordRequest.setSequenceNumberForOrdering( lastSequenceNumber ); PutRecordResult putRecordResult = kinesisClient.putRecord( putRecordRequest ); lastSequenceNumber = putRecordResult.getSequenceNumber(); // Now run the engine asynchronously ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(engine);
-
As you insert records into the users table, you can check the records arriving in the Kinesis data stream. For some INSERT DMLs, refer to the
users.sql
script in CDC examples.