Skip to content

Commit

Permalink
DBZ-1052 Emit tx BEGIN/END messages
Browse files Browse the repository at this point in the history
  • Loading branch information
jpechane authored and gunnarmorling committed Jan 30, 2020
1 parent 01126bf commit 870ecfa
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 16 deletions.
2 changes: 2 additions & 0 deletions proto/pg_logicaldec.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ enum Op {
INSERT = 0;
UPDATE = 1;
DELETE = 2;
BEGIN = 3;
COMMIT = 4;
}

message Point {
Expand Down
86 changes: 76 additions & 10 deletions src/decoderbufs.c
Original file line number Diff line number Diff line change
Expand Up @@ -164,16 +164,6 @@ static void pg_decode_shutdown(LogicalDecodingContext *ctx) {
MemoryContextDelete(data->context);
}

/* BEGIN callback */
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn) {
}

/* COMMIT callback */
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn) {
}

/* print tuple datums (only used for debug-mode) */
static void print_tuple_datums(StringInfo out, Decoderbufs__DatumMessage **tup,
size_t n) {
Expand Down Expand Up @@ -491,6 +481,82 @@ static void add_metadata_to_msg(Decoderbufs__TypeInfo **tmsg,
}
}

/* BEGIN callback */
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn) {

DecoderData *data;
MemoryContext old;
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
elog(DEBUG1, "Entering begin callback");


/* Avoid leaking memory by using and resetting our own context */
data = ctx->output_plugin_private;
old = MemoryContextSwitchTo(data->context);

rmsg.op = DECODERBUFS__OP__BEGIN;
rmsg.has_op = true;
rmsg.transaction_id = txn->xid;
rmsg.has_transaction_id = true;
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
rmsg.has_commit_time = true;

/* write msg */
OutputPluginPrepareWrite(ctx, true);
if (data->debug_mode) {
print_row_msg(ctx->out, &rmsg);
} else {
size_t psize = decoderbufs__row_message__get_packed_size(&rmsg);
void *packed = palloc(psize);
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
appendBinaryStringInfo(ctx->out, packed, ssize);
}
OutputPluginWrite(ctx, true);

/* Cleanup, freeing memory */
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
}

/* COMMIT callback */
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn) {

DecoderData *data;
MemoryContext old;
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
elog(DEBUG1, "Entering commit callback");


/* Avoid leaking memory by using and resetting our own context */
data = ctx->output_plugin_private;
old = MemoryContextSwitchTo(data->context);

rmsg.op = DECODERBUFS__OP__COMMIT;
rmsg.has_op = true;
rmsg.transaction_id = txn->xid;
rmsg.has_transaction_id = true;
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
rmsg.has_commit_time = true;

/* write msg */
OutputPluginPrepareWrite(ctx, true);
if (data->debug_mode) {
print_row_msg(ctx->out, &rmsg);
} else {
size_t psize = decoderbufs__row_message__get_packed_size(&rmsg);
void *packed = palloc(psize);
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
appendBinaryStringInfo(ctx->out, packed, ssize);
}
OutputPluginWrite(ctx, true);

/* Cleanup, freeing memory */
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
}

/* callback for individual changed tuples */
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change) {
Expand Down
14 changes: 9 additions & 5 deletions src/proto/pg_logicaldec.pb-c.c
Original file line number Diff line number Diff line change
Expand Up @@ -565,17 +565,21 @@ const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor =
(ProtobufCMessageInit) decoderbufs__row_message__init,
NULL,NULL,NULL /* reserved[123] */
};
static const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[3] =
static const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[5] =
{
{ "INSERT", "DECODERBUFS__OP__INSERT", 0 },
{ "UPDATE", "DECODERBUFS__OP__UPDATE", 1 },
{ "DELETE", "DECODERBUFS__OP__DELETE", 2 },
{ "BEGIN", "DECODERBUFS__OP__BEGIN", 3 },
{ "COMMIT", "DECODERBUFS__OP__COMMIT", 4 },
};
static const ProtobufCIntRange decoderbufs__op__value_ranges[] = {
{0, 0},{0, 3}
{0, 0},{0, 5}
};
static const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[3] =
static const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[5] =
{
{ "BEGIN", 3 },
{ "COMMIT", 4 },
{ "DELETE", 2 },
{ "INSERT", 0 },
{ "UPDATE", 1 },
Expand All @@ -587,9 +591,9 @@ const ProtobufCEnumDescriptor decoderbufs__op__descriptor =
"Op",
"Decoderbufs__Op",
"decoderbufs",
3,
5,
decoderbufs__op__enum_values_by_number,
3,
5,
decoderbufs__op__enum_values_by_name,
1,
decoderbufs__op__value_ranges,
Expand Down
4 changes: 3 additions & 1 deletion src/proto/pg_logicaldec.pb-c.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 870ecfa

Please sign in to comment.