Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Override table naming pattern #201

Closed
kholisrag opened this issue Nov 13, 2024 · 21 comments
Closed

Override table naming pattern #201

kholisrag opened this issue Nov 13, 2024 · 21 comments

Comments

@kholisrag
Copy link

kholisrag commented Nov 13, 2024

as per mentioned in #195 :

bigquery table name ends up like this: {topic.prefix}{source_database_name}{table_name}. dbz could be good value for it.

is it possible to override the table naming pattern? I only want to use {table_name} in the table naming, does that possible @ismailsimsek?

more info on log:

2024-11-13 08:42:45,942 ERROR [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: Failed to append data to stream projects/gcp-project-id/datasets/stage_postgresql_dbname_public/tables/topic_prefix_schema_dbtable/_default
INVALID_ARGUMENT: Append serialization failed for writer: projects/gcp-project-id/datasets/stage_postgresql_dbname_public/tables/topic_prefix_schema_dbtable/_default', error = 'io.debezium.DebeziumException: Failed to append data to stream projects/gcp-project-id/datasets/stage_postgresql_dbname_public/tables/topic_prefix_schema_dbtable/_default
INVALID_ARGUMENT: Append serialization failed for writer: projects/gcp-pk roject-id/datasets/stage_postgresql_dbname_public/tables/topic_prefix_schema_dbtable/_default': io.debezium.DebeziumException: Failed to append data to stream projects/gcp-project-id/datasets/stage_postgresql_dbname_public/tables/topic_prefix_schema_dbtable/_default
INVALID_ARGUMENT: Append serialization failed for writer: projects/gcp-project-id/datasets/stage_postgresql_dbname_public/tables/topic_prefix_schema_dbtable/_default
	at io.debezium.server.bigquery.StreamDataWriter.appendSync(StreamDataWriter.java:89)
	at io.debezium.server.bigquery.StreamBigqueryChangeConsumer.uploadDestination(StreamBigqueryChangeConsumer.java:154)
	at io.debezium.server.bigquery.AbstractChangeConsumer.handleBatch(AbstractChangeConsumer.java:126)
	at io.debezium.embedded.ConvertingEngineBuilder$ConvertingChangeConsumer.handleBatch(ConvertingEngineBuilder.java:108)
	at io.debezium.embedded.EmbeddedEngine.pollRecords(EmbeddedEngine.java:735)
	at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:475)
	at io.debezium.embedded.ConvertingEngineBuilder$1.run(ConvertingEngineBuilder.java:248)
	at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:170)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: com.google.cloud.bigquery.storage.v1.Exceptions$AppendSerializationError: INVALID_ARGUMENT: Append serialization failed for writer: projects/gcp-project-id/datasets/stage_postgresql_dbname_public/tables/topic_prefix_schema_dbtable/_default
	at com.google.cloud.bigquery.storage.v1.SchemaAwareStreamWriter.appendWithUniqueId(SchemaAwareStreamWriter.java:250)
	at com.google.cloud.bigquery.storage.v1.SchemaAwareStreamWriter.append(SchemaAwareStreamWriter.java:140)
	at com.google.cloud.bigquery.storage.v1.JsonStreamWriter.append(JsonStreamWriter.java:65)
	at io.debezium.server.bigquery.StreamDataWriter.appendSync(StreamDataWriter.java:83)
	... 10 more

2024-11-13 08:42:46,146 INFO  [io.deb.ser.DebeziumServer] (main) Received request to stop the engine
2024-11-13 08:42:46,149 INFO  [io.deb.emb.EmbeddedEngine] (main) Stopping the embedded engine
2024-11-13 08:42:46,151 INFO  [com.goo.clo.big.sto.v1.ConnectionWorkerPool] (main) During closing of writeStream for projects/gcp-project-id/datasets/stage_postgresql_dbname_public/tables/topic_prefix_schema_dbtable/_default with writer id 1d374d44-f9a0-4b05-beae-c74f339e2b51, we decided to close 0 connections, pool size after removal $s
com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Requested entity was not found. Entity: projects/gcp-project-id/datasets/stage_postgresql_dbname_public/tables/topic_prefix_schema_dbtable/_default
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:90)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
	at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
	at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1130)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1300)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1061)
	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:811)
	at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:568)
	at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:538)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:569)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:564)
	at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:72)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:729)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:710)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
	Suppressed: com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed
		at com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
		at com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
		at com.google.cloud.bigquery.storage.v1.BigQueryWriteClient.finalizeWriteStream(BigQueryWriteClient.java:677)
		at com.google.cloud.bigquery.storage.v1.BigQueryWriteClient.finalizeWriteStream(BigQueryWriteClient.java:647)
		at io.debezium.server.bigquery.StreamDataWriter.close(StreamDataWriter.java:97)
		at io.debezium.server.bigquery.StreamBigqueryChangeConsumer.closeStreams(StreamBigqueryChangeConsumer.java:99)
		at io.debezium.server.bigquery.StreamBigqueryChangeConsumer_Bean.doDestroy(Unknown Source)
		at io.debezium.server.bigquery.StreamBigqueryChangeConsumer_Bean.destroy(Unknown Source)
		at io.debezium.server.bigquery.StreamBigqueryChangeConsumer_Bean.destroy(Unknown Source)
		at io.debezium.server.DebeziumServer.stop(DebeziumServer.java:242)
		at io.debezium.server.DebeziumServer_Observer_stop_yMpCZhmgvv79zBZyHwCy4l6x-EI.notify(Unknown Source)
		at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:351)
		at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:333)
		at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:80)
		at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:156)
		at io.quarkus.arc.runtime.ArcRecorder$2.run(ArcRecorder.java:112)
		at io.quarkus.runtime.StartupContext.runAllAndClear(StartupContext.java:79)
		at io.quarkus.runtime.StartupContext.close(StartupContext.java:70)
		at io.quarkus.runner.ApplicationImpl.doStop(Unknown Source)
		at io.quarkus.runtime.Application.stop(Application.java:208)
		at io.quarkus.runtime.Application.stop(Application.java:155)
		at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:235)
		at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)
		at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)
		at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)
		at io.debezium.server.Main.main(Main.java:15)
Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Requested entity was not found. Entity: projects/gcp-project-id/datasets/stage_postgresql_dbname_public/tables/topic_prefix_schema_dbtable/_default
	at io.grpc.Status.asRuntimeException(Status.java:532)
	... 14 more
2024-11-13 08:42:46,825 WARN  [io.deb.ser.big.StreamBigqueryChangeConsumer] (main) Exception while closing bigquery stream, destination:postgresql_postgres-staging_bifrost_audit.public.dbtable: com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Requested entity was not found. Entity: projects/gcp-project-id/datasets/stage_postgresql_dbname_public/tables/topic_prefix_schema_dbtable/_default
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:90)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
	at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
	at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1130)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1300)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1061)
	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:811)
	at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:568)
	at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:538)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:569)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:564)
	at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:72)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:729)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:710)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
	Suppressed: com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed
		at com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
		at com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
		at com.google.cloud.bigquery.storage.v1.BigQueryWriteClient.finalizeWriteStream(BigQueryWriteClient.java:677)
		at com.google.cloud.bigquery.storage.v1.BigQueryWriteClient.finalizeWriteStream(BigQueryWriteClient.java:647)
		at io.debezium.server.bigquery.StreamDataWriter.close(StreamDataWriter.java:97)
		at io.debezium.server.bigquery.StreamBigqueryChangeConsumer.closeStreams(StreamBigqueryChangeConsumer.java:99)
		at io.debezium.server.bigquery.StreamBigqueryChangeConsumer_Bean.doDestroy(Unknown Source)
		at io.debezium.server.bigquery.StreamBigqueryChangeConsumer_Bean.destroy(Unknown Source)
		at io.debezium.server.bigquery.StreamBigqueryChangeConsumer_Bean.destroy(Unknown Source)
		at io.debezium.server.DebeziumServer.stop(DebeziumServer.java:242)
		at io.debezium.server.DebeziumServer_Observer_stop_yMpCZhmgvv79zBZyHwCy4l6x-EI.notify(Unknown Source)
		at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:351)
		at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:333)
		at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:80)
		at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:156)
		at io.quarkus.arc.runtime.ArcRecorder$2.run(ArcRecorder.java:112)
		at io.quarkus.runtime.StartupContext.runAllAndClear(StartupContext.java:79)
		at io.quarkus.runtime.StartupContext.close(StartupContext.java:70)
		at io.quarkus.runner.ApplicationImpl.doStop(Unknown Source)
		at io.quarkus.runtime.Application.stop(Application.java:208)
		at io.quarkus.runtime.Application.stop(Application.java:155)
		at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:235)
		at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)
		at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)
		at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)
		at io.debezium.server.Main.main(Main.java:15)
Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Requested entity was not found. Entity: projects/gcp-project-id/datasets/stage_postgresql_dbname_public/tables/topic_prefix_schema_dbtable/_default

the config:

# The common configuration for the Debezium Server BigQuery
debezium.format.value=json
debezium.format.key=json
debezium.format.schemas.enable=true

quarkus.log.level=INFO
quarkus.log.console.json=false
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN

debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,table,lsn,source.ts_ms,ts_ms
debezium.transforms.unwrap.add.headers=db
debezium.transforms.unwrap.delete.tombstone.handling.mode=rewrite

# The BigQuery related configuration for the Debezium Server BigQuery

debezium.source.offset.storage=io.debezium.server.bigquery.offset.BigqueryOffsetBackingStore
debezium.source.offset.storage.bigquery.table-name=dsbq_offset
debezium.source.offset.flush.interval.ms=10000

debezium.source.database.history=io.debezium.server.bigquery.history.BigquerySchemaHistory
debezium.source.database.history.bigquery.table-name=dsbq_history

debezium.sink.type=bigquerystream
debezium.sink.bigquerystream.project=gcp-project-id
debezium.sink.bigquerystream.location=asia-southeast1
debezium.sink.bigquerystream.dataset=stage_postgresql_dbname_public
debezium.sink.bigquerystream.ignore-unknown-fields=false
debezium.sink.bigquerystream.credentials-file=/app/secrets/sa.json
debezium.sink.bigquerystream.create-if-needed=true
debezium.sink.bigquerystream.partition-field=__ts_ms
debezium.sink.bigquerystream.clustering-field=__source_ts_ms
debezium.sink.bigquerystream.partition-type=DAY

debezium.sink.batch.batch-size-wait=NoBatchSizeWait
debezium.sink.bigquerystream.upsert=true
debezium.sink.bigquerystream.upsert-keep-deletes=false
debezium.sink.bigquerystream.upsert-dedup-column=__source_ts_ms
debezium.sink.bigquerystream.upsert-op-column=__op

# The PostgreSQL related configuration for the Debezium Server BigQuery

debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.database.hostname=postgres-staging
debezium.source.database.port=5432
debezium.source.database.user=dbz_bq_user
debezium.source.database.password=xxxxx
debezium.source.database.dbname=dbname
debezium.source.table.include.list=public.audit_log,public.schema_migrations

debezium.source.plugin.name=pgoutput
debezium.source.publication.name=db_publication
debezium.source.publication.autocreate.mode=disabled
debezium.source.topic.prefix=postgresql_postgres-staging_dbname
debezium.source.slot.name=slot_dbname
debezium.source.slot.drop.on.stop=false
debezium.source.snapshot.mode=never

Thanks in advance

@ismailsimsek
Copy link
Member

@kholisrag is this happening with the first run? when table created?

currently there is a BigQuery issue. when connector creates the table first time and tries to write stream into it, it fails with above exception. because Bigquery needs some time to create the default stream for the new table.

however when you have tables already created then this error should not happen.
more info:
googleapis/java-bigquery#2368
googleapis/google-cloud-go#975 (comment)

@kholisrag
Copy link
Author

kholisrag commented Nov 13, 2024

@kholisrag is this happening with the first run? when table created?

yes it is...

anyway, I already have table like below

2024-11-13_16-10

does that possible to use existing table? audit_log / schema_migrations like above? since old data is on there. @ismailsimsek

@ismailsimsek
Copy link
Member

yes, its possible to use existing table. when the table exists the consumer will use it. and if not it will try to create it.
it will not remove the data

@kholisrag
Copy link
Author

kholisrag commented Nov 13, 2024

yes, its possible to use existing table. when the table exists the consumer will use it. and if not it will try to create it.
it will not remove the data

I mean to change this pattern (#195 )

bigquery table name ends up like this: {topic.prefix}{source_database_name}{table_name}. dbz could be good value for it.

I only need {table_name}, does that possible @ismailsimsek ?

Update:

2024-11-13_16-24_1

or may I know the piece of code that do the pattern?

@ismailsimsek
Copy link
Member

this is currently not possible, however could be added as a new feature.
this is where table name is defined:

public TableId getTableId(String destination) {
final String tableName = destination
.replaceAll(destinationRegexp.orElse(""), destinationRegexpReplace.orElse(""))
.replace(".", "_");
return TableId.of(gcpProject.get(), bqDataset.get(), tableName);
}

@kholisrag
Copy link
Author

thank you @ismailsimsek

checking the code is that possible to use debezium.sink.batch.destination-regexp and debezium.sink.batch.destination-regexp-replace?

@ismailsimsek
Copy link
Member

Right 👍 actually that could be used.

@kholisrag
Copy link
Author

can you give me an example? didn't find anywhere in the test code too

@kholisrag
Copy link
Author

kholisrag commented Nov 13, 2024

something like this? @ismailsimsek

debezium.sink.batch.destination-regexp=.*_(.*)_(.*)
debezium.sink.batch.destination-regexp-replace=$2

Update:

both above and below, still not working

debezium.sink.batch.destination-regexp=.*\..*\.(.*)
debezium.sink.batch.destination-regexp-replace=$1

error:

2024-11-13 10:28:38,805 ERROR [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: Provided table is null or empty', error = 'java.lang.IllegalArgumentException: Provided table is null or empty': java.lang.IllegalArgumentException: Provided table is null or empty

@ismailsimsek
Copy link
Member

ismailsimsek commented Nov 13, 2024

something like following: you could use this tool to develop the regexp https://regex101.com/

# use regexp which matches the prefix.
debezium.sink.batch.destination-regexp=^prefix\.
# use empty string as replacement
debezium.sink.batch.destination-regexp-replace=

@kholisrag
Copy link
Author

kholisrag commented Nov 13, 2024

yeah I'm using https://regex101.com/ too, here https://regex101.com/r/gvzYpZ/2
already tried, but still not working too

Update:

use below config:

debezium.sink.batch.destination-regexp=^dsbq\.public\.
debezium.sink.batch.destination-regexp-replace=

the table naming already replaced, but seem still not working, here is the log

2024-11-13 12:05:36,856 ERROR [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: Failed to append data to stream projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default
INVALID_ARGUMENT: Append serialization failed for writer: projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default', error = 'io.debezium.DebeziumException: Failed to append data to stream projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default
INVALID_ARGUMENT: Append serialization failed for writer: projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default': io.debezium.DebeziumException: Failed to append data to stream projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default
INVALID_ARGUMENT: Append serialization failed for writer: projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default
	at io.debezium.server.bigquery.StreamDataWriter.appendSync(StreamDataWriter.java:89)
	at io.debezium.server.bigquery.StreamBigqueryChangeConsumer.uploadDestination(StreamBigqueryChangeConsumer.java:154)
	at io.debezium.server.bigquery.AbstractChangeConsumer.handleBatch(AbstractChangeConsumer.java:126)
	at io.debezium.embedded.ConvertingEngineBuilder$ConvertingChangeConsumer.handleBatch(ConvertingEngineBuilder.java:108)
	at io.debezium.embedded.EmbeddedEngine.pollRecords(EmbeddedEngine.java:735)
	at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:475)
	at io.debezium.embedded.ConvertingEngineBuilder$1.run(ConvertingEngineBuilder.java:248)
	at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:170)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: com.google.cloud.bigquery.storage.v1.Exceptions$AppendSerializationError: INVALID_ARGUMENT: Append serialization failed for writer: projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default
	at com.google.cloud.bigquery.storage.v1.SchemaAwareStreamWriter.appendWithUniqueId(SchemaAwareStreamWriter.java:250)
	at com.google.cloud.bigquery.storage.v1.SchemaAwareStreamWriter.append(SchemaAwareStreamWriter.java:140)
	at com.google.cloud.bigquery.storage.v1.JsonStreamWriter.append(JsonStreamWriter.java:65)
	at io.debezium.server.bigquery.StreamDataWriter.appendSync(StreamDataWriter.java:83)
	... 10 more

2024-11-13 12:05:37,057 INFO  [io.deb.ser.DebeziumServer] (main) Received request to stop the engine
2024-11-13 12:05:37,064 INFO  [io.deb.emb.EmbeddedEngine] (main) Stopping the embedded engine
2024-11-13 12:05:37,067 INFO  [com.goo.clo.big.sto.v1.ConnectionWorkerPool] (main) During closing of writeStream for projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default with writer id fa54a2bc-5081-40d1-8500-7a3173d99bad, we decided to close 0 connections, pool size after removal $s
com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Requested entity was not found. Entity: projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:90)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
	at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
	at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1130)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1300)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1061)
	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:811)
	at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:568)
	at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:538)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:569)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:564)
	at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:72)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:729)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:710)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
	Suppressed: com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed
		at com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
		at com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
		at com.google.cloud.bigquery.storage.v1.BigQueryWriteClient.finalizeWriteStream(BigQueryWriteClient.java:677)
		at com.google.cloud.bigquery.storage.v1.BigQueryWriteClient.finalizeWriteStream(BigQueryWriteClient.java:647)
		at io.debezium.server.bigquery.StreamDataWriter.close(StreamDataWriter.java:97)
		at io.debezium.server.bigquery.StreamBigqueryChangeConsumer.closeStreams(StreamBigqueryChangeConsumer.java:99)
		at io.debezium.server.bigquery.StreamBigqueryChangeConsumer_Bean.doDestroy(Unknown Source)
		at io.debezium.server.bigquery.StreamBigqueryChangeConsumer_Bean.destroy(Unknown Source)
		at io.debezium.server.bigquery.StreamBigqueryChangeConsumer_Bean.destroy(Unknown Source)
		at io.debezium.server.DebeziumServer.stop(DebeziumServer.java:242)
		at io.debezium.server.DebeziumServer_Observer_stop_yMpCZhmgvv79zBZyHwCy4l6x-EI.notify(Unknown Source)
		at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:351)
		at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:333)
		at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:80)
		at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:156)
		at io.quarkus.arc.runtime.ArcRecorder$2.run(ArcRecorder.java:112)
		at io.quarkus.runtime.StartupContext.runAllAndClear(StartupContext.java:79)
		at io.quarkus.runtime.StartupContext.close(StartupContext.java:70)
		at io.quarkus.runner.ApplicationImpl.doStop(Unknown Source)
		at io.quarkus.runtime.Application.stop(Application.java:208)
		at io.quarkus.runtime.Application.stop(Application.java:155)
		at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:235)
		at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)
		at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)
		at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)
		at io.debezium.server.Main.main(Main.java:15)
Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Requested entity was not found. Entity: projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default
	at io.grpc.Status.asRuntimeException(Status.java:532)
	... 14 more
2024-11-13 12:05:37,459 WARN  [io.deb.ser.big.StreamBigqueryChangeConsumer] (main) Exception while closing bigquery stream, destination:dsbq.public.audit_log: com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Requested entity was not found. Entity: projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:90)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
	at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
	at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1130)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1300)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1061)
	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:811)
	at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:568)
	at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:538)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:569)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:564)
	at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:72)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:729)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:710)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
	Suppressed: com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed
		at com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
		at com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
		at com.google.cloud.bigquery.storage.v1.BigQueryWriteClient.finalizeWriteStream(BigQueryWriteClient.java:677)
		at com.google.cloud.bigquery.storage.v1.BigQueryWriteClient.finalizeWriteStream(BigQueryWriteClient.java:647)
		at io.debezium.server.bigquery.StreamDataWriter.close(StreamDataWriter.java:97)
		at io.debezium.server.bigquery.StreamBigqueryChangeConsumer.closeStreams(StreamBigqueryChangeConsumer.java:99)
		at io.debezium.server.bigquery.StreamBigqueryChangeConsumer_Bean.doDestroy(Unknown Source)
		at io.debezium.server.bigquery.StreamBigqueryChangeConsumer_Bean.destroy(Unknown Source)
		at io.debezium.server.bigquery.StreamBigqueryChangeConsumer_Bean.destroy(Unknown Source)
		at io.debezium.server.DebeziumServer.stop(DebeziumServer.java:242)
		at io.debezium.server.DebeziumServer_Observer_stop_yMpCZhmgvv79zBZyHwCy4l6x-EI.notify(Unknown Source)
		at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:351)
		at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:333)
		at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:80)
		at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:156)
		at io.quarkus.arc.runtime.ArcRecorder$2.run(ArcRecorder.java:112)
		at io.quarkus.runtime.StartupContext.runAllAndClear(StartupContext.java:79)
		at io.quarkus.runtime.StartupContext.close(StartupContext.java:70)
		at io.quarkus.runner.ApplicationImpl.doStop(Unknown Source)
		at io.quarkus.runtime.Application.stop(Application.java:208)
		at io.quarkus.runtime.Application.stop(Application.java:155)
		at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:235)
		at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)
		at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)
		at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)
		at io.debezium.server.Main.main(Main.java:15)
Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Requested entity was not found. Entity: projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default
	at io.grpc.Status.asRuntimeException(Status.java:532)
	... 14 more

@kholisrag
Copy link
Author

kholisrag commented Nov 13, 2024

hmhmh when I try to use bigquerybatch mode, got the following error:

{
  "error": {
    "code": 400,
    "message": "Provided Schema does not match Table gcp-project-id:stage_postgresql_bifrost_audit_public.audit_log. Field audit_time has changed type from TIMESTAMP to INTEGER",
    "errors": [
      {
        "message": "Provided Schema does not match Table gcp-project-id:stage_postgresql_bifrost_audit_public.audit_log. Field audit_time has changed type from TIMESTAMP to INTEGER",
        "domain": "global",
        "reason": "invalid"
      }
    ],
    "status": "INVALID_ARGUMENT"
  }
}

@ismailsimsek
Copy link
Member

@kholisrag it seems like you are switching between stream and batch consumers and both are using different schemas when creating the table. if you delete the current table and let the consumer create it this error should be gone. simply audit_time field type is mismatching.

@ismailsimsek
Copy link
Member

but as you can see the table name is correctly set. without prefix

laku6-145607:stage_postgresql_bifrost_audit_public.audit_log

@kholisrag
Copy link
Author

Okay, it seems like a mismatch schema. Checking the original table in database VM, its timestamp, not sure why its converted to an integer by debezium-server.

2024-11-13_21-29

@ismailsimsek
Copy link
Member

ismailsimsek commented Nov 13, 2024

@kholisrag Bigquery does some level of automatic data type recognition (converting long to timestap ..etc). And this is especially different for time,timestamp types between the batch and streaming consumers. i believe this is where its coming from.

i believe when you just use one consumer this should not happen.

@kholisrag
Copy link
Author

kholisrag commented Nov 13, 2024

for context, we want to migrate from datastream to self-managed using this tools, without changing the existing, only want to update it. but seem like this become a problem ya...

i believe when you just use one consumer, this should not happen.

hmhmh, I try bigquerybatch vs bigquerystream in different time. seem become problem because I migrate from datastream.

anyway thank you!

@ismailsimsek
Copy link
Member

@kholisrag is there specific reason switching from data stream? I believe Data type mapping could easily be improved (especially for bigquerystream)

following config could be used to fix type mapping. (This part is still in beta)

config.put("debezium.source.converters", "bqdatetime");
config.put("debezium.source.bqdatetime.type", "io.debezium.server.converters.TemporalToISOStringConverter");
//

data type mapping code section is below. Type conversions and corrections could be handled here:

public String valueAsJsonLine(Schema schema) throws JsonProcessingException {
if (value == null) {
return null;
}
// process JSON fields
if (schema != null) {
for (Field f : schema.getFields()) {
if (f.getType() == LegacySQLTypeName.JSON && value.has(f.getName())) {
((ObjectNode) value).replace(f.getName(), mapper.readTree(value.get(f.getName()).asText("{}")));
}
// process DATE values
if (f.getType() == LegacySQLTypeName.DATE && value.has(f.getName()) && !value.get(f.getName()).isNull()) {
((ObjectNode) value).put(f.getName(), LocalDate.ofEpochDay(value.get(f.getName()).longValue()).toString());
}
}
}
// Process DEBEZIUM TS_MS values
TS_MS_FIELDS.forEach(tsf -> {
if (value.has(tsf)) {
((ObjectNode) value).put(tsf, Instant.ofEpochMilli(value.get(tsf).longValue()).toString());
}
});
// Process DEBEZIUM BOOLEAN values
BOOLEAN_FIELDS.forEach(bf -> {
if (value.has(bf)) {
((ObjectNode) value).put(bf, Boolean.valueOf(value.get(bf).asText()));
}
});
return mapper.writeValueAsString(value);
}
/**
* See https://cloud.google.com/bigquery/docs/write-api#data_type_conversions
*
* @return
*/
public JSONObject valueAsJsonObject(boolean upsert, boolean upsertKeepDeletes) {
Map<String, Object> jsonMap = mapper.convertValue(value, new TypeReference<>() {
});
// SET UPSERT meta field `_CHANGE_TYPE`
if (upsert) {
// if its deleted row and upsertKeepDeletes = false, deleted records are deleted from target table
if (!upsertKeepDeletes && jsonMap.get("__op").equals("d")) {
jsonMap.put(CHANGE_TYPE_PSEUDO_COLUMN, "DELETE");
} else {
// if it's not deleted row or upsertKeepDeletes = true then add deleted record to target table
jsonMap.put(CHANGE_TYPE_PSEUDO_COLUMN, "UPSERT");
}
}
TS_MS_FIELDS.forEach(tsf -> {
if (jsonMap.containsKey(tsf)) {
// Convert millisecond to microseconds
jsonMap.replace(tsf, ((Long) jsonMap.get(tsf) * 1000L));
}
});
BOOLEAN_FIELDS.forEach(bf -> {
if (jsonMap.containsKey(bf)) {
jsonMap.replace(bf, Boolean.valueOf((String) jsonMap.get(bf)));
}
});
return new JSONObject(jsonMap);
}

Additionally i suggest checking debezium config to change data type values

its also related to googleapis/java-bigquerystorage#1764
and googleapis/java-bigquerystorage#1765

@kholisrag kholisrag reopened this Nov 13, 2024
@kholisrag
Copy link
Author

kholisrag commented Nov 13, 2024

let me test to use:

debezium.source.converters=bqdatetime
debezium.source.bqdatetime.type=io.debezium.server.converters.TemporalToISOStringConverter

@kholisrag is there specific reason switching from data stream? I believe Data type mapping could easily be improved (especially for bigquerystream)

cost issue @ismailsimsek

@ismailsimsek
Copy link
Member

i see, if you don't need real-time data. then bigquerybatch is using free api. it should not create any cost. regarding data type conversion i will add sample test, and document it soon.

@kholisrag
Copy link
Author

kholisrag commented Nov 13, 2024

ah I mean this GCP Datastream https://cloud.google.com/datastream/docs/overview
not the bigquery storage write streaming api.

so we want to move from GCP Datastream to self-manage @ismailsimsek , we're exploring the tools.


btw, still have same issue, probably the type conversion issue, that yeah, we need to update the code to cover it.
below is the config.

debezium.sink.batch.destination-regexp=^dsbq\.public\.
debezium.sink.batch.destination-regexp-replace=
debezium.source.converters=bqdatetime
debezium.source.bqdatetime.type=io.debezium.server.converters.TemporalToISOStringConverter

log:

INVALID_ARGUMENT: Append serialization failed for writer: projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default', error = 'io.debezium.DebeziumException: Failed to append data to stream projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default
INVALID_ARGUMENT: Append serialization failed for writer: projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default': io.debezium.DebeziumException: Failed to append data to stream projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default
INVALID_ARGUMENT: Append serialization failed for writer: projects/gcp-project-id/datasets/stage_postgresql_bifrost_audit_public/tables/audit_log/_default

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants