From 7f9899a9ba764c8ec3954903e01b9151ed386013 Mon Sep 17 00:00:00 2001 From: Mike Wasson <3992422+MikeWasson@users.noreply.github.com> Date: Thu, 27 Jan 2022 21:57:56 +0000 Subject: [PATCH] docs(samples): Write API tutorial --- samples/tutorials/pom.xml | 84 ++++++++++++ .../WriteToDefaultStreamTutorial.java | 128 ++++++++++++++++++ 2 files changed, 212 insertions(+) create mode 100644 samples/tutorials/pom.xml create mode 100644 samples/tutorials/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTutorial.java diff --git a/samples/tutorials/pom.xml b/samples/tutorials/pom.xml new file mode 100644 index 0000000000..a945790bb4 --- /dev/null +++ b/samples/tutorials/pom.xml @@ -0,0 +1,84 @@ + + 4.0.0 + com.google.cloud + google-cloud-bigquerystorage + + + com.google.cloud.samples + shared-configuration + 1.2.0 + + + + 1.8 + 1.8 + UTF-8 + + + + + com.google.cloud + google-cloud-bigquerystorage + 2.8.2 + + + com.google.cloud + google-cloud-bigquery + 2.6.0 + + + org.apache.avro + avro + 1.10.2 + + + org.apache.arrow + arrow-vector + 6.0.1 + + + org.apache.arrow + arrow-memory-netty + 6.0.1 + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 3.0.0 + + + generate-sources + + add-source + + + + src/main/java + ../snippets/src/main/java/com/example/bigquerystorage + + + + + + + org.codehaus.mojo + exec-maven-plugin + 3.0.0 + + + + exec + + + + + maven + + + + + \ No newline at end of file diff --git a/samples/tutorials/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTutorial.java b/samples/tutorials/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTutorial.java new file mode 100644 index 0000000000..8e7c48d706 --- /dev/null +++ b/samples/tutorials/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTutorial.java @@ -0,0 +1,128 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +import com.google.api.core.ApiFuture; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import org.json.JSONArray; +import org.json.JSONObject; + +public class WriteToDefaultStreamTutorial { + + public static void main(String[] args) throws Exception { + if (args.length < 4) { + System.out.println("Arguments: project, dataset, table, source_file"); + return; + } + + String projectId = args[0]; + String datasetName = args[1]; + String tableName = args[2]; + String dataFile = args[3]; + createDestinationTable(projectId, datasetName, tableName); + writeToDefaultStream(projectId, datasetName, tableName, dataFile); + } + + // createDestinationTable: Creates the destination table for streaming with the desired schema. + public static void createDestinationTable( + String projectId, String datasetName, String tableName) { + BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); + // Create a schema that matches the source data. + Schema schema = + Schema.of( + Field.of("commit", StandardSQLTypeName.STRING), + Field.newBuilder("parent", StandardSQLTypeName.STRING) + .setMode(Field.Mode.REPEATED) + .build(), + Field.of("author", StandardSQLTypeName.STRING), + Field.of("committer", StandardSQLTypeName.STRING), + Field.of("time_sec", StandardSQLTypeName.INT64), + Field.of("subject", StandardSQLTypeName.STRING), + Field.of("message", StandardSQLTypeName.STRING), + Field.of("repo_name", StandardSQLTypeName.STRING)); + + // Create a table that uses this schema. + TableId tableId = TableId.of(projectId, datasetName, tableName); + Table table = bigquery.getTable(tableId); + if (table == null) { + TableInfo tableInfo = + TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build(); + bigquery.create(tableInfo); + } + } + + // writeToDefaultStream: Writes records from the source file to the destination table. + public static void writeToDefaultStream( + String projectId, String datasetName, String tableName, String dataFile) + throws DescriptorValidationException, InterruptedException, IOException { + + BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); + + // Get the schema of the destination table and convert to the equivalent BigQueryStorage type. + Table table = bigquery.getTable(datasetName, tableName); + Schema schema = table.getDefinition().getSchema(); + TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema); + + // Use the JSON stream writer to send records in JSON format. + TableName parentTable = TableName.of(projectId, datasetName, tableName); + try (JsonStreamWriter writer = + JsonStreamWriter.newBuilder(parentTable.toString(), tableSchema).build()) { + // Read JSON data from the source file and send it to the Write API. + BufferedReader reader = + new BufferedReader( + new FileReader(dataFile)); + String line = reader.readLine(); + while (line != null) { + // As a best practice, send batches of records, instead of single records at a time. + JSONArray jsonArr = new JSONArray(); + for (int i = 0; i < 100; i++) { + JSONObject record = new JSONObject(line); + jsonArr.put(record); + line = reader.readLine(); + if (line == null) { + break; + } + } // batch + ApiFuture future = writer.append(jsonArr); + AppendRowsResponse response = future.get(); + } + System.out.println("Appended records successfully."); + } catch (ExecutionException e) { + // If the wrapped exception is a StatusRuntimeException, check the state of the operation. + // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see: + // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html + System.out.println("Failed to append records. \n" + e.toString()); + } + } +}