Copyright 2020 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
You’ll learn how to build a Propensity a model (if a Customer is going to buy) and how to orchestrate an ML Pipeline for doing so. You can use the code as a reference guide. Amend, replace, or add new AI Pipeline Components according to your use case. Please refer to the Notebook, which contains further documentation and detailed instruction.
To see a step-by-step tutorial that walks you through implementing this solution, see Predicting customer propensity to buy by using BigQuery ML and AI Platform.
- Environment Setup
- Setup Cloud AI Platform Pipelines (using the CloudConsole)
- Install KFP client
- Install Python packages for Google Cloud Services
- Kubeflow Pipelines (KFP) Setup
- Prepare Data for the training
- Create/Validate a Google Cloud Storage Bucket/Folder
- Create the input table in BigQuery
- Train the model
- Evaluate the model
- Prepare the Model for batch prediction
- Prepare a test dataset (a table)
- Predict the model in BigQuery
- Prepare the Model for online prediction
- Create a new revision (Model revision management)
- Export the BigQuery Model
- Export the Model from BigQuery to Google Cloud Storage
- Export the Training Stats to Google Cloud Storage
- Export the Eval Metrics to Google Cloud Storage
- Deploy to Cloud AI Platform Prediction
- Predict the model in Cloud AI Platform Prediction
- Data Exploration using BigQuery, Pandas, matplotlib
- SDLC methodologies Adherence (opinionated)
- Variables naming conventions
- Upper case Names for immutable variables
- Lower case Names for mutable variables
- Naming prefixes with rpm_ or RPM_
- Unit Tests
- Cleanup/Reset utility functions
- KFP knowledge share (demonstration)
- Pass inputs params through function args
- Pass params through pipeline args
- Pass Output from one Component as input of another
- Create an external Shared Volume available to all the Comp
- Use built in Operators
- Built light weight Component
- Set Component not to cache
Create a local context and use it to unit test the KFP Pipeline component locally. Below is an example of testing your component locally:
# test locally create_gcs_bucket_folder
local_context = get_local_context()
import json
update_local_context (create_gcs_bucket_folder(
json.dumps(local_context),
local_context['RPM_GCP_STORAGE_BUCKET'],
local_context['RPM_GCP_PROJECT'],
local_context['RPM_DEFAULT_BUCKET_EXT'],
local_context['RPM_GCP_STORAGE_BUCKET_FOLDER'],
local_context['RPM_DEFAULT_BUCKET_FOLDER_NAME']
))
Below is an utility function which purges GCS artifacts while unit/integration testing:
#delete BQ Table if not needed...!!!BE CAREFUL!!!
def delete_table(table_id):
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
# client.delete_table(table_id, not_found_ok=True) # Make an API request.
client.delete_table(table_id) # Make an API request.
print("Deleted table '{}'.".format(table_id))
#delete the table in the bigquery
delete_table(get_local_context()['rpm_table_id'])
You must set values of these parameters, please refer to the instructions in the Notebook for details:
RPM_GCP_PROJECT:'<Your GCP Project>'
RPM_GCP_KFP_HOST='<Your KFP pipeline host>'
RPM_GCP_APPLICATION_CREDENTIALS="<Full path with the file name to the above downloaded json file>"
The below code snippets demonstrates various KFP syntaxes. It shows various ways to pass parameters. You could use whatever works for you.
# create BQ DS only if it doesn't exist
from typing import NamedTuple
def create_bq_ds (ctx:str,
RPM_GCP_PROJECT: str,
RPM_BQ_DATASET_NAME: str,
RPM_LOCATION: str
) -> NamedTuple('Outputs', [
('rpm_context', str),
('rpm_bq_ds_name', str),
]):
def bq_googlestr_dataset_to_bq_to_caip_pipeline(
data_path = all_vars['RPM_PVC_NAME'] #you can pass input variables
):
Pass Output from one Kubeflow Pipelines Component as Input of another Kubeflow Pipelines Component example:
Output 'rpm_table_id' from load_bq_ds_op component passed as input to create_bq_ml_op comp
create_bq_ml_op = create_kfp_comp(create_bq_ml)(
load_bq_ds_op.outputs['rpm_context'],
all_vars['RPM_GCP_PROJECT'],
all_vars['RPM_MODEL_NAME'],
all_vars['RPM_DEFAULT_MODEL_NAME'],
create_bq_ds_op.outputs['rpm_bq_ds_name'],
load_bq_ds_op.outputs['rpm_table_id']
)
#create a volume where the dataset will be temporarily stored.
pvc_op = VolumeOp(
name=all_vars['RPM_PVC_NAME'],
resource_name=all_vars['RPM_PVC_NAME'],
size="20Gi",
modes=dsl.VOLUME_MODE_RWO
)
#create a volume where the dataset will be temporarily stored.
pvc_op = VolumeOp(
name=all_vars['RPM_PVC_NAME'],
resource_name=all_vars['RPM_PVC_NAME'],
size="20Gi",
modes=dsl.VOLUME_MODE_RWO
)
# converting functions to container operations
import kfp.components as comp
def create_kfp_comp(rpm_comp):
return comp.func_to_container_op(
func=rpm_comp,
base_image="google/cloud-sdk:latest")
get_versioned_bqml_model_export_path_op.execution_options.caching_strategy.max_cache_staleness = "P0D"
If you have any questions or feedback, please open up a new issue.