Skip to content

Commit

Permalink
Merge branch 'Azure:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
cbtham authored Feb 23, 2022
2 parents 951fb0c + 214c949 commit f0125d0
Show file tree
Hide file tree
Showing 272 changed files with 25,517 additions and 156 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install db-dtypes
python -m pip install -e feast/sdk/python[ci]
- name: Run Feast unit tests
run: FEAST_USAGE=False IS_TEST=True python -m pytest -n 8 feast/sdk/python/tests
203 changes: 55 additions & 148 deletions cluster/samples/feature_store_azure.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,19 @@
},
{
"cell_type": "code",
"execution_count": 9,
"execution_count": null,
"metadata": {},
"outputs": [],
"outputs": [
{
"ename": "",
"evalue": "",
"output_type": "error",
"traceback": [
"\u001b[1;31mUnable to start Kernel 'azure_38_test (Python 3.8.12)' due to connection timeout. \n",
"View Jupyter <a href='command:jupyter.viewOutput'>log</a> for further details."
]
}
],
"source": [
"driver_statistics_source_uri = os.path.join(demo_data_location, \"driver_statistics\")\n",
"\n",
Expand All @@ -329,7 +339,19 @@
" conv_rate,\n",
" avg_daily_trips\n",
" ],\n",
" max_age=Duration(seconds=86400 * 1),\n",
" # You might want to change this interval to see how your feature get joined, because Feast joins your feature based on this\n",
" '''\n",
" Feast joins the features to the entities based on the following conditions:\n",
"\n",
" 1. Entity primary key(s) value matches.\n",
" 2. Feature event timestamp is the closest match possible to the entity event timestamp,\n",
" but must not be more recent than the entity event timestamp, and the difference must\n",
" not be greater than the maximum age specified in the feature table, unless the maximum age is not specified.\n",
" 3. If more than one feature table rows satisfy condition 1 and 2, feature row with the\n",
" most recent created timestamp will be chosen.\n",
" 4. If none of the above conditions are satisfied, the feature rows will have null values.\n",
" '''\n",
" max_age=Duration(seconds=3600*24*365),\n",
" batch_source=FileSource(\n",
" event_timestamp_column=\"datetime\",\n",
" created_timestamp_column=\"created\",\n",
Expand All @@ -342,7 +364,7 @@
},
{
"cell_type": "code",
"execution_count": 10,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
Expand Down Expand Up @@ -619,6 +641,8 @@
"metadata": {},
"outputs": [],
"source": [
"import glob\n",
"\n",
"def read_parquet(uri):\n",
" parsed_uri = urlparse(uri)\n",
" if parsed_uri.scheme == \"file\":\n",
Expand All @@ -634,17 +658,30 @@
" return ds.read().to_pandas()\n",
" elif parsed_uri.scheme == 'abfss':\n",
" credential = ClientSecretCredential(os.getenv('AZURE_TENANT_ID'), os.getenv('AZURE_CLIENT_ID'), os.getenv('AZURE_CLIENT_SECRET'))\n",
" # credential = DefaultAzureCredential()\n",
" datalake = parsed_uri.netloc.split('@')\n",
" \n",
" print(parsed_uri.path)\n",
" service_client = DataLakeServiceClient(account_url=\"https://\" + datalake[1], credential=credential)\n",
" file_system_client = service_client.get_file_system_client(datalake[0])\n",
" file_client = file_system_client.get_file_client(parsed_uri.path)\n",
" data = file_client.download_file(0)\n",
" with io.BytesIO() as b:\n",
" data.readinto(b)\n",
" table = pq.read_table(b)\n",
" print(table)\n",
" return table\n",
"\n",
" directory_client = file_system_client.get_directory_client(parsed_uri.path)\n",
" ## returns the paths to all the files in the target director in ADLS\n",
" adls_paths = [file_path.name.split(\"/\")[-1] for file_path in file_system_client.get_paths(path=parsed_uri.path) if not file_path.is_directory][1:]\n",
" ## need to generate list of local paths to write the files to\n",
" local_paths = [os.path.join(\"/tmp\",file_name) for file_name in adls_paths]\n",
" for idx, file_to_write in enumerate(adls_paths):\n",
" try:\n",
" local_file = open(local_paths[idx],'wb')\n",
" file_client = directory_client.get_file_client(file_to_write)\n",
" download = file_client.download_file()\n",
" downloaded_bytes = download.readall()\n",
" local_file.write(downloaded_bytes)\n",
" local_file.close()\n",
" except Exception as e:\n",
" print(e)\n",
" files = glob.glob(os.path.join(\"/tmp\", \"*.parquet\"))\n",
" ds = ParquetDataset(files)\n",
" return ds.read().to_pandas()\n",
" else:\n",
" raise ValueError(f\"Unsupported URL scheme {uri}\")"
]
Expand Down Expand Up @@ -800,7 +837,7 @@
"outputs": [],
"source": [
"# get_output_file_uri will block until the Spark job is completed.\n",
"# output_file_uri = job.get_output_file_uri()"
"output_file_uri = job.get_output_file_uri()"
]
},
{
Expand All @@ -809,7 +846,7 @@
"metadata": {},
"outputs": [],
"source": [
"# read_parquet(output_file_uri)"
"read_parquet(output_file_uri)"
]
},
{
Expand Down Expand Up @@ -1095,27 +1132,7 @@
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[{'driver_id': 626004},\n",
" {'driver_id': 910500},\n",
" {'driver_id': 432869},\n",
" {'driver_id': 171604},\n",
" {'driver_id': 770377},\n",
" {'driver_id': 258547},\n",
" {'driver_id': 6109},\n",
" {'driver_id': 994804},\n",
" {'driver_id': 857106},\n",
" {'driver_id': 276404}]"
]
},
"execution_count": 37,
"metadata": {},
"output_type": "execute_result"
}
],
"outputs": [],
"source": [
"entities_sample = np.random.choice(entities, 10, replace=False)\n",
"entities_sample = [{\"driver_id\": e} for e in entities_sample]\n",
Expand All @@ -1129,125 +1146,15 @@
"outputs": [],
"source": [
"features = client.get_online_features(\n",
" feature_refs=[\"driver_statistics:avg_daily_trips\", \"driver_trips:trips_today\"],\n",
" feature_refs=[\"driver_statistics:avg_daily_trips\"],\n",
" entity_rows=entities_sample).to_dict()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>driver_id</th>\n",
" <th>driver_trips:trips_today</th>\n",
" <th>driver_statistics:avg_daily_trips</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>626004</td>\n",
" <td>None</td>\n",
" <td>NaN</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>910500</td>\n",
" <td>None</td>\n",
" <td>NaN</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>432869</td>\n",
" <td>None</td>\n",
" <td>NaN</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>171604</td>\n",
" <td>None</td>\n",
" <td>NaN</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>770377</td>\n",
" <td>None</td>\n",
" <td>486.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>258547</td>\n",
" <td>None</td>\n",
" <td>NaN</td>\n",
" </tr>\n",
" <tr>\n",
" <th>6</th>\n",
" <td>6109</td>\n",
" <td>None</td>\n",
" <td>NaN</td>\n",
" </tr>\n",
" <tr>\n",
" <th>7</th>\n",
" <td>994804</td>\n",
" <td>None</td>\n",
" <td>NaN</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <td>857106</td>\n",
" <td>None</td>\n",
" <td>NaN</td>\n",
" </tr>\n",
" <tr>\n",
" <th>9</th>\n",
" <td>276404</td>\n",
" <td>None</td>\n",
" <td>251.0</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" driver_id driver_trips:trips_today driver_statistics:avg_daily_trips\n",
"0 626004 None NaN\n",
"1 910500 None NaN\n",
"2 432869 None NaN\n",
"3 171604 None NaN\n",
"4 770377 None 486.0\n",
"5 258547 None NaN\n",
"6 6109 None NaN\n",
"7 994804 None NaN\n",
"8 857106 None NaN\n",
"9 276404 None 251.0"
]
},
"execution_count": 39,
"metadata": {},
"output_type": "execute_result"
}
],
"outputs": [],
"source": [
"pd.DataFrame(features)"
]
Expand Down Expand Up @@ -1281,7 +1188,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.11"
"version": "3.8.12"
},
"save_output": true,
"synapse_widget": {
Expand Down
4 changes: 2 additions & 2 deletions cluster/sdk/infra/scripts/k8s-common-functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ function setup_sparkop_role {

cat <<EOF | kubectl apply -f -
kind: Role
apiVersion: rbac.authorization.k8s.io/v1beta1
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: use-spark-operator
namespace: sparkop
Expand All @@ -134,7 +134,7 @@ rules:
resources: ["sparkapplications"]
verbs: ["create", "delete", "deletecollection", "get", "list", "update", "watch", "patch"]
---
apiVersion: rbac.authorization.k8s.io/v1beta1
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: use-spark-operator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ def create_spark_batch_job(self, job_name, main_definition_file, class_name = No
executor_cores = EXECUTOR_SIZE[self._executor_size]['Cores']
executor_memory = EXECUTOR_SIZE[self._executor_size]['Memory']

# This is needed to correctly set the spark properties needed by org.apache.hadoop.fs.azure.NativeAzureFileSystem
# Please see: https://github.com/Azure/feast-azure/issues/41
if "FEAST_AZURE_BLOB_ACCOUNT_NAME" in os.environ and "FEAST_AZURE_BLOB_ACCOUNT_ACCESS_KEY" in os.environ:
blob_configuration = {f'spark.hadoop.fs.azure.account.key.{os.environ["FEAST_AZURE_BLOB_ACCOUNT_NAME"]}.blob.core.windows.net': os.environ["FEAST_AZURE_BLOB_ACCOUNT_ACCESS_KEY"]}
configuration = blob_configuration if configuration is None else configuration.update(blob_configuration)

# SDK source code is here: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/synapse/azure-synapse
# Exact code is here: https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/synapse/azure-synapse-spark/azure/synapse/spark/operations/_spark_batch_operations.py#L114
# Adding spaces between brackets. This is to workaround this known YARN issue (when running Spark on YARN):
Expand Down
2 changes: 1 addition & 1 deletion cluster/setup/Feast_backup_restore.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ This guide lists thesteps you to backup and restore feast in your environment.
*PostgreSQL host ip*
It can be found in the Azure portal. Go to the AKS where Feast is deployed, select “Services and ingresses” in the left side menu. Look for a service called “feast-release-postgresql”. You can type or paste in the full name in the filter text box. Record the cluster IP.

![Postgre SQL IP](./images/feast_backup_restore/feastbackuprestore1.PNG)
![Postgre SQL IP](./images/feast_backup_restore/feastbackuprestore1.png)

*PostgreSQL password*
It will be the password you provided when configuring the Feast service
Expand Down
2 changes: 2 additions & 0 deletions cluster/setup/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ Before you proceed, here are prerequisites to install feast in your Azure subscr
- An Existing virtual network (VNET) and subnet.
- Azure Kubernetes (AKS) cluster in that VNET.

__PLEASE NOTE: Currently we only support Kubernetes version 1.21.7, which is default version for AKS.__

You can jump to the [install feast section](#-steps-to-install-feast) if you already have the prerequisite resources.

Here are the commands to create VNET and AKS cluster in that VNET. For more details refer to the documentation [here](https://docs.microsoft.com/en-us/azure/aks/configure-kubenet).
Expand Down
22 changes: 22 additions & 0 deletions cluster/setup/feast-0.9.5-helmchart/.helmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Patterns to ignore when building packages.
# This supports shell glob matching, relative path matching, and
# negation (prefixed with !). Only one pattern per line.
.DS_Store
# Common VCS dirs
.git/
.gitignore
.bzr/
.bzrignore
.hg/
.hgignore
.svn/
# Common backup files
*.swp
*.bak
*.tmp
*~
# Various IDEs
.project
.idea/
*.tmproj
.vscode/
4 changes: 4 additions & 0 deletions cluster/setup/feast-0.9.5-helmchart/Chart.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
apiVersion: v1
description: Feature store for machine learning.
name: feast
version: 0.9.5
Loading

0 comments on commit f0125d0

Please sign in to comment.