Skip to content
This repository has been archived by the owner on Nov 11, 2022. It is now read-only.

Since porting to 2.1.0, pipeline now fails when it encounters an empty PCollection for BigQuery sink/output #610

Open
polleyg opened this issue Oct 9, 2017 · 1 comment

Comments

@polleyg
Copy link

polleyg commented Oct 9, 2017

We are porting some pipelines from 1.8.0 to 2.1.0 in our applications.

These pipelines are basic: Read from GCS -> ParDo -> Write to BigQuery. There are multiple side outputs being written to BigQuery (think different months/years, which correspond to sharded tables in BigQuery).

In some cases, we may not have any elements for one or more of the BigQuery sinks. Running on 1.8.0, this never causes an issue. However, when running on 2.1.0, the pipeline now fails if it encounters an empty PCollection<TableRow>:

ID: 2017-10-08_18_49_08-4262554366118214565
2017-10-09 (13:13:50) java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: ... (db49af7a45b5ecaa): java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Failed to create load job with id prefix c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001, reached max retries: 3, last failed load job: { "configuration" : { "load" : { "createDisposition" : "CREATE_IF_NEEDED", "destinationTable" : { "datasetId" : "<redacted>", "projectId" : "<redacted>", "tableId" : "Clicks_transformed_2017_11" }, "sourceFormat" : "NEWLINE_DELIMITED_JSON", "sourceUris" : [ "gs://<redacted>/BigQueryWriteTemp/c4b4181de93d47f3b2b6ba5d2cdd2507/43f3689a-6b4d-4410-b566-3d0bcbf7cbb7" ], "writeDisposition" : "WRITE_APPEND" } }, "etag" : "\"Z8nD8CIuj_TqmPrS_MNV-O9x2rU/CT0v4ZW2CSH0SWrB04bn5-C_ckc\"", "id" : "<redacted>:c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "jobReference" : { "jobId" : "c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "projectId" : "<redacted>" }, "kind" : "bigquery#job", "selfLink" : "https://www.googleapis.com/bigquery/v2/projects/<redacted>/jobs/c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "statistics" : { "creationTime" : "1507515172555", "endTime" : "1507515172669", "startTime" : "1507515172669" }, "status" : { "errorResult" : { "message" : "No schema specified on job or table.", "reason" : "invalid" }, "errors" : [ { "message" : "No schema specified on job or table.", "reason" : "invalid" } ], "state" : "DONE" }, "user_email" : "<redacted>[email protected]" }. at com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182) at com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:104) at com.google.cloud.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:54) at com.google.cloud.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:37) at com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:117) at com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:74) at com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:113) at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48) at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:187) at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:148) at com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:68) at com.google.cloud.dataflow.worker.DataflowWorker.executeWork(DataflowWorker.java:336) at com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:294) at com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Failed to create load job with id prefix c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001, reached max retries: 3, last failed load job: { "configuration" : { "load" : { "createDisposition" : "CREATE_IF_NEEDED", "destinationTable" : { "datasetId" : "<redacted>", "projectId" : "<redacted>", "tableId" : "Clicks_transformed_2017_11" }, "sourceFormat" : "NEWLINE_DELIMITED_JSON", "sourceUris" : [ "gs://<redacted>/BigQueryWriteTemp/c4b4181de93d47f3b2b6ba5d2cdd2507/43f3689a-6b4d-4410-b566-3d0bcbf7cbb7" ], "writeDisposition" : "WRITE_APPEND" } }, "etag" : "\"Z8nD8CIuj_TqmPrS_MNV-O9x2rU/CT0v4ZW2CSH0SWrB04bn5-C_ckc\"", "id" : "<redacted>:c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "jobReference" : { "jobId" : "c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "projectId" : "<redacted>" }, "kind" : "bigquery#job", "selfLink" : "https://www.googleapis.com/bigquery/v2/projects/<redacted>/jobs/c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "statistics" : { "creationTime" : "1507515172555", "endTime" : "1507515172669", "startTime" : "1507515172669" }, "status" : { "errorResult" : { "message" : "No schema specified on job or table.", "reason" : "invalid" }, "errors" : [ { "message" : "No schema specified on job or table.", "reason" : "invalid" } ], "state" : "DONE" }, "user_email" : "<redacted>[email protected]" }. at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) at org.apache.beam.sdk.io.gcp.bigquery.WriteTables$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138) at com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:233) at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48) at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:183) at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211) at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66) at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436) at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424) at org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:84) at org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141) at com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:233) at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48) at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:180) ... 21 more Caused by: java.lang.RuntimeException: Failed to create load job with id prefix c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001, reached max retries: 3, last failed load job: { "configuration" : { "load" : { "createDisposition" : "CREATE_IF_NEEDED", "destinationTable" : { "datasetId" : "<redacted>", "projectId" : "<redacted>", "tableId" : "Clicks_transformed_2017_11" }, "sourceFormat" : "NEWLINE_DELIMITED_JSON", "sourceUris" : [ "gs://<redacted>/BigQueryWriteTemp/c4b4181de93d47f3b2b6ba5d2cdd2507/43f3689a-6b4d-4410-b566-3d0bcbf7cbb7" ], "writeDisposition" : "WRITE_APPEND" } }, "etag" : "\"Z8nD8CIuj_TqmPrS_MNV-O9x2rU/CT0v4ZW2CSH0SWrB04bn5-C_ckc\"", "id" : "<redacted>:c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "jobReference" : { "jobId" : "c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "projectId" : "<redacted>" }, "kind" : "bigquery#job", "selfLink" : "https://www.googleapis.com/bigquery/v2/projects/<redacted>/jobs/c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "statistics" : { "creationTime" : "1507515172555", "endTime" : "1507515172669", "startTime" : "1507515172669" }, "status" : { "errorResult" : { "message" : "No schema specified on job or table.", "reason" : "invalid" }, "errors" : [ { "message" : "No schema specified on job or table.", "reason" : "invalid" } ], "state" : "DONE" }, "user_email" : "<redacted>[email protected]" }. at org.apache.beam.sdk.io.gcp.bigquery.WriteTables.load(WriteTables.java:179) at org.apache.beam.sdk.io.gcp.bigquery.WriteTables.processElement(WriteTables.java:114)

As a test, I hacked one of the GCS (csv) input files, to ensure at least one element ends up in each of the BigQuery sinks. Et voila, it worked (ID: 2017-10-08_20_25_01-3317103171891646372).

We are skipping 1.9.0 and 2.0.0, so I'm not sure which version this bug was introduced in.

@koichirokamoto
Copy link

koichirokamoto commented Nov 20, 2017

Is there any update?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants