This library is compatible with Go 1.11+
Please refer to CHANGELOG.md
if you encounter breaking changes.
Ability to process incrementally incoming data in a way that is both duplication free and cost-effective is of paramount importance, especially when data is loaded or streamed to BigQuery in real time. When dealing with many tables at once managing processing state can add yet additional aspect that needs to be taken care. This library was developed to simplify multi tables time windowing processing. It can be deployed as stand alone service or as cloud function.
Big Query provides a mechanism allowing windowing data ingested within the last 7 days with range decorators.
Syntax:
SELECT * PROJECT_ID:DATASET.TABLE@<timeFrom>-<timeTo>
References table data added between and , in milliseconds since the epoch.
- and must be within the last 7 days.
One important factor driving Big Query table layout design that needs to be taken into account is that the range decorators are only supported with Legacy SQL, meaning that standardSQL supported partition and clustered tables can not be windowed with this method currently.
In the absence of partition and clustering the following table design layout should provide good flexibility:
- DATASET.TABLE_[DATE_SUFFIX]
- DATASET.TABLE_[PARTITION_SHARD]_[DATE_SUFFIX]
In both of the scenarios it is possible to use table template in case when data is streamed to Big Query.
This project uses a meta file to store time windowed table processing state.
@metafile
{
"URL": "gs://mybucket/xmeta",
"DatasetID": "my-project:mydataset",
"Tables": [
{
"ID": "mydataset.my_table_10_20181227",
"ProjectID": "my-project",
"Name": "my_table_10_20181227",
"Dataset": "mydataset",
"Window": {
"From": "2018-12-27T16:00:37.802Z",
"To": "2018-12-27T17:00:15.832Z"
},
"LastChangedFlag": "2018-12-27T17:00:57.238680333Z",
"Changed": true,
"Expression": "[mydataset.my_table_10_20181227@1545926437802-1545930015832]",
"AbsoluteExpression": "[my-project:mydataset.my_table_10_20181227@1545926437802-1545930015832]"
},
{
"ID": "mydataset.my_table_10_20181226",
"ProjectID": "my-project",
"Name": "my_table_10_20181226",
"Dataset": "mydataset",
"Window": {
"From": "2018-12-26T16:00:37.802Z",
"To": "2018-12-26T17:00:15.832Z"
},
"LastChangedFlag": "2018-12-26T17:00:57.238680333Z",
"Changed": false
}
],
"Expression": "[mydataset.my_table_10_20181227@1545926437802-1545930015832]",
"AbsoluteExpression": "[my-project:mydataset.my_table_10_20181227@1545926437802-1545930015832]"
}
type WindowedTable struct {
ID string
ProjectID string
Name string
Dataset string
Window *TimeWindow `description:"recent change range: from, to timestamp"`
LastChanged time.Time
Changed bool
Expression string `description:"represents table ranged decorator expression"`
AbsoluteExpression string `description:"represents absolute table path ranged decorator expression"`
}
type Meta struct {
URL string
DatasetID string
Tables []*WindowedTable
Expression string `description:"represents recently changed tables ranged decorator relative expression (without project id)"`
AbsoluteExpression string `description:"represents recently changed tables ranged decorator absolute expression (with project id)"`
}
Service accepts both POST and GET http method
- POST method request
type Request struct {
Mode string `description:"operation mode: r - take snapshot, w - persist snapshot"`
MetaURL string `description:"meta-file location, if relative path is used it adds gs:// protocol"`
Location string `description:"dataset location"`
DatasetID string `description:"source dataset"`
MatchingTables []string `description:"matching table contain expression"`
PruneThresholdInSec int `description:"max allowed duration in sec for unchanged windowed tables before removing"`
LoopbackWindowInSec int `description:"dataset max loopback window for checking changed tables in supplied dataset"`
Expression bool `description:"if expression flag is set it returns only relative expression (without poejct id)"`
AbsoluteExpression bool `description:"if expression flag is set it returns only abslute expression (with poejct id)"`
Method string `description:"data insert method: stream or load by default"`
StorageRegion string `description:"storageRegion for standard sql"`
}
- mode: Mode
- meta: MetaURL
- dataset: DatasetID
- match: MatchingTables
- location: Location
- prune: PruneThresholdInSec (min 7 days)
- loopback: LoopbackWindowInSec
- expr: Expression
- absExpr: AbsoluteExpression
- method: Method
- storage: StorageRegion, with or without project id
i.e: http://endpoint/WindowedTable?mode=r&meta=mybucket/xmeta&dataset=db1&expr=true
Note that changing table eviction time triggers table modification, thus prune threshold can not be less then 7 days.
Mode request attribute controls table time window snapshot, where r: take a snapshot, w: persist snapshot.
Taking snapshot
- when metafile does not exist the service reads all matching table info and create temp metafile with range decorator expression
- when temp meta file exists the service returns range decorator expression from that file
- when meta file exists the services compute changes between metafile and recently updated table, it stores updated table info and range decorator expression in a temp metafile
Persisting snapshot
- temp meta file is persisted to meta file.
Multi Read One Write scenario
The following shows example dataset windowing timeline:
- t0: data is streamed to Big Query
- t1: Process X reads dataset snapshot between t0 and t1
- WindowedTable?mode=r&meta=bucket/x/meta.json&dataset=project:dataset&expr=true'
- t2: more data is streamed
- t3: Process X completed t0 to t1 processing, flags t0-t1 completed
- WindowedTable?mode=w&meta=bucket/x/meta.json&dataset=project:dataset&expr=true'
- t4: more data is streamed
- t5: Process X reads dataset snapshot between t2 and t4
- WindowedTable?mode=r&meta=bucket/x/meta.json&dataset=project:dataset&expr=true'
- t6: more data is streamed
- t7: Process X tries to process data but something goes wrong, thus no update
- t8: more data is streamed
- t9: Process X again reads dataset snapshot between t2 and t4
- WindowedTable?mode=r&meta=bucket/x/meta.json&dataset=project:dataset&expr=true'
- t10: more data is streamed
- t11: Process X completed t2 to t4 processing, flags t2-t4 completed
- WindowedTable?mode=w&meta=bucket/x/meta.json&dataset=project:dataset&expr=true'
snapshot1, err := getHttpBody("http://myEndpoint/WindowedTable?mode=r&meta=myBucket/meta&dataset=myDataset")
if err != nil {
log.Fatal(err)
}
if hasData := len(snapshoot1) > 0;hasData {
SQL := "SELECT * FROM " + string(snapshot1)
fmt.Printf("%v\n", SQL)
//Process query ....
//Persist snapshot only if there were no processing error
_, err = getHttpBody("http://myEndpoint/WindowedTable?mode=w&meta=myBucket/meta&dataset=myDataset")
if err != nil {
log.Fatal(err)
}
}
SQL Provider Class
import com.google.common.base.Strings;
import org.apache.beam.sdk.options.ValueProvider;
import java.io.Serializable;
public class SQLProvider implements ValueProvider<String>, Serializable {
private final String baseSQL;
private final String windowedTableURL;
private final String emptyDatasetSQL;
public SQLProvider(String baseSQL, String windowedTableURL, String emptyDatasetSQL) {
this.baseSQL = baseSQL;
this.emptyDatasetSQL = emptyDatasetSQL;
this.windowedTableURL = windowedTableURL;
}
@Override
public String get() {
String from = Helper.getHttpBody(windowedTableURL);
if(Strings.isNullOrEmpty(from)) {
from = emptyDatasetSQL;
}
return baseSQL.replace("$SOURCE", from);
}
@Override
public boolean isAccessible() {
return true;
}
}
Pipeline integration
public class Main {
public static final String EMPTY_QUERY = "SELECT * FROM (SELECT INTEGER(NULL) AS field1, STRING(NULL) AS fieldN) WHERE 1 = 0";
public static final String WINDOWED_TABLE_URL = "http://myEndpoint/WindowedTable?mode=r&meta=myBucket/meta&dataset=myDataset";
public static final String SQL = "SELECT * FROM $SOURCE";
public static final Strin TABLE = "myTable";
public static void main(String[] args) {
ValueProvider<String> sqlProvider = new SQLProvider(SQL, WINDOWED_TABLE_URL, EMPTY_QUERY);
Pipeline pipeline = Pipeline.create(options);
PCollection<TableRow> collection = pipeline.apply("read data", BigQueryIO.readTableRows().fromQuery(sqlProvider).withTemplateCompatibility().withoutValidation());
//add more processing collection transforms here ....
WriteResult eventOutput = collection.apply("write data",
BigQueryIO.writeTableRows()
.to(schema.getTempTable(false))
.withSchema(Helper.getTableSchema())
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
//Persist window table snapshot after writing to a table
collection.apply(Wait.on(eventOutput.getFailedInserts()))
// Transforms each row inserted to an Integer of value 1
.apply(ParDo.of(countRows()))
.apply(Sum.integersGlobally())
.apply(ParDo.of(new SnapshpotUpdater()));
}
public static class SnapshpotUpdater extends DoFn<Integer, Void> implements Serializable{
private final String notificationURL = "http://myEndpoint/WindowedTable?mode=w&meta=myBucket/meta&dataset=myDataset";
@ProcessElement
public void processElement(ProcessContext c) {
Helper.getHttpBody(notificationURL);
}
}
}
git clone https://github.com/viant/bqwt.git
cd bqwt/server
go build bqwt.go
./bqwt -port 8080
git clone https://github.com/viant/bqwt.git
cd bqwt
docker build --no-cache -t viant/bqwt:1.0 .
cd docker/
docker-compose up -d
- gcloud auth login
- gcloud config set project PROJECT_ID
- gcloud functions deploy WindowedTable --entry-point Handle --runtime go111 --trigger-http
-
Non partitioning/clustering Windowing table with range decorator is only supported with legacy SQL, thus only non-partition, non-clustered tables run in legacy mode at the moment.
-
Substantial data delay with streaming insert method In case of using streaming insert method, data first arrive to streaming buffer, which retains recently inserted rows. While query engine has ability to read records directly from the streaming buffer, these records are not considered for copy, extract job or range decorators. With this in mind this API uses StreamingBuffer.OldestEntryTime - 1 as table time window upper bound.
Practically it may take a while (upto ~ 90 minutes) before data is finally extracted from streaming buffer to a table. Find out more about streaming lifecycle
Create a 'test' Big Query project and service account with admin permission. Enable ssh on test host and create localhost secret
Create a test BQ service account secrets, save it as ~/.secret/viant-e2e.json
Install e2e test runner
git clone https://github.com/viant/bqwt.git
cd bqwt/e2e
endly