ur.create_replication_slot('wal2json_test_slot', output_plugin = 'wal2json') cur.start_replication(slot_name = 'wal2json_test_slot', options = {'pretty-print' : 1}, decode= True) code example

Example: ur.create_replication_slot('wal2json_test_slot', output_plugin = 'wal2json') cur.start_replication(slot_name = 'wal2json_test_slot', options = {'pretty-print' : 1}, decode= True)

import boto3
import json
import random
import calendar
import time
from datetime import datetime
import psycopg2
from psycopg2.extras import LogicalReplicationConnection

my_stream_name = 'Foo'
kinesis_client = boto3.client('kinesis', region_name='us-east-1')
my_connection  = psycopg2.connect(
                   "dbname='postgres' host='mypgdb.abcdefghijk.us-east-1.rds.amazonaws.com' user='repluser' password='replpass'" ,
                   connection_factory = LogicalReplicationConnection)
cur = my_connection.cursor()
cur.drop_replication_slot('wal2json_test_slot')
cur.create_replication_slot('wal2json_test_slot', output_plugin = 'wal2json')
cur.start_replication(slot_name = 'wal2json_test_slot', options = {'pretty-print' : 1}, decode= True)

def consume(msg):
    kinesis_client.put_record(StreamName=my_stream_name, Data=json.dumps(msg.payload), PartitionKey="default")
    print (msg.payload)

cur.consume_stream(consume)

Tags: