API of this plugin is considered unstable and subject to change without notice!
Home-made Postgres CDC for Streamz.
Detecting changes in databases is a hard problem. Most solutions require extensive infrastructure and heavy tweaking of the source database. If you're looking for Postgres CDC, you're probably going to end up reading about logical replication and Debezium. These are great solutions (and scalable too), but a little bit of an overkill for smaller projects.
If you need CDC on a couple-million-rows table and don't have a dedicated data
engineering team, you probably won't like CDC tutorials starting with "first, let's
install ZooKeeper". You can do almost the same with a glorified while True:
loop and
some select queries.
pip install git+https://github.com/roveo/streamz_postgres.git
There are some requirements for the source table.
-
It needs to have an integer primary key: that's what we're going to detect updates of.
-
It shouldn't be too big. The source is going to generate queries that are essentially sequence scans of the table, since you can't create indexes on
xmin
system column. You can overcome this with triggers and a dedicated integer column on the table, but it's not yet supported by this plugin. I'll probably add this functionality and write a tutorial on how to do this sometime in the future.
from streamz import Stream
src = Stream.from_postgres_cdc(
"my_table",
pk="id", # this is the default
polling_interval=10, # seconds
connection_params=dict(
host="localhost",
dbname="postgres",
user="user",
password="password"
)
)
L = src.sink_to_list()
src.start()
That's it. At each iteration, rows updated since the previous one will end up in L
.
Each row is a psycopg2
DictRow
.
- As I said, each query will generate a sequence scan. Be mindful of performance. You probably want to use a read replica as a source.
- The source won't emit anything when a row is deleted. Most modern systems implement soft-delete, so this shouldn't be a problem, but if yours doesn't, you're out of luck.