From 54576c3fbf570ecca986e34b81306f3a492fb2a7 Mon Sep 17 00:00:00 2001 From: Reeba Qureshi <64488642+reeba212@users.noreply.github.com> Date: Thu, 22 Aug 2024 23:01:38 +0530 Subject: [PATCH 01/11] Create yaml_enrichment.py --- .../apache_beam/yaml/yaml_enrichment.py | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 sdks/python/apache_beam/yaml/yaml_enrichment.py diff --git a/sdks/python/apache_beam/yaml/yaml_enrichment.py b/sdks/python/apache_beam/yaml/yaml_enrichment.py new file mode 100644 index 000000000000..1d73e8c1794e --- /dev/null +++ b/sdks/python/apache_beam/yaml/yaml_enrichment.py @@ -0,0 +1,55 @@ +from typing import Any, Dict +import apache_beam as beam +from apache_beam.transforms.enrichment_handlers.bigquery import BigQueryEnrichmentHandler +from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler +from apache_beam.transforms.enrichment_handlers.feast_feature_store import FeastFeatureStoreEnrichmentHandler +from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store import VertexAIFeatureStoreEnrichmentHandler +from apache_beam.transforms.enrichment import Enrichment +from typing import Optional + +@beam.ptransform.ptransform_fn +def enrichment_transform(pcoll, enrichment_handler: str, handler_config: Dict[str, Any], timeout: Optional[float] = 30): + """ + The Enrichment transform allows you to dynamically enhance elements in a pipeline + by performing key-value lookups against external services like APIs or databases. + + Args: + enrichment_handler: Specifies the source from where data needs to be extracted + into the pipeline for enriching data. It can be a string value in ["BigQuery", + "BigTable", "FeastFeatureStore", "VertexAIFeatureStore"]. + handler_config: Specifies the parameters for the respective enrichment_handler in a dictionary format. + BigQuery: project, table_name, row_restriction_template, fields, column_names, condition_value_fn, query_fn, min_batch_size, max_batch_size + BigTable: project_id, instance_id, table_id row_key, row_filter, app_profile_id, encoding, ow_key_fn, exception_level, include_timestamp + FeastFeatureStore: feature_store_yaml_path, feature_names, feature_service_name, full_feature_names, entity_row_fn, exception_level + VertexAIFeatureStore: project, location, api_endpoint, feature_store_name:, feature_view_name, row_key, exception_level + + Example Usage: + + - type: Enrichment + config: + enrichment_handler: 'BigTable' + handler_config: + project_id: 'apache-beam-testing' + instance_id: 'beam-test' + table_id: 'bigtable-enrichment-test' + row_key: 'product_id' + timeout: 30 + + """ + if enrichment_handler is None: + raise ValueError("Missing 'source' in enrichment spec.") + if handler_config is None: + raise ValueError("Missing 'handler_config' in enrichment spec.") + + handler_map = { + 'BigQuery': BigQueryEnrichmentHandler, + 'BigTable': BigTableEnrichmentHandler, + 'FeastFeatureStore': FeastFeatureStoreEnrichmentHandler, + 'VertexAIFeatureStore': VertexAIFeatureStoreEnrichmentHandler + } + + if enrichment_handler not in handler_map: + raise ValueError(f"Unknown enrichment source: {enrichment_handler}") + + handler = handler_map[enrichment_handler](**handler_config) + return pcoll | Enrichment(source_handler = handler, timeout = timeout) From 02a22bc2901ebec8c82455ed53e224b5a674c4df Mon Sep 17 00:00:00 2001 From: Reeba Qureshi <64488642+reeba212@users.noreply.github.com> Date: Thu, 22 Aug 2024 23:02:26 +0530 Subject: [PATCH 02/11] Create yaml_enrichment_test.py --- .../apache_beam/yaml/yaml_enrichment_test.py | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 sdks/python/apache_beam/yaml/yaml_enrichment_test.py diff --git a/sdks/python/apache_beam/yaml/yaml_enrichment_test.py b/sdks/python/apache_beam/yaml/yaml_enrichment_test.py new file mode 100644 index 000000000000..4042e33c3520 --- /dev/null +++ b/sdks/python/apache_beam/yaml/yaml_enrichment_test.py @@ -0,0 +1,62 @@ +import unittest +import logging +import mock +import apache_beam as beam +from apache_beam.testing.util import assert_that, equal_to +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.transforms import Map +from apache_beam.yaml.yaml_enrichment import enrichment_transform +from apache_beam import Row +from unittest.mock import patch +from apache_beam.yaml.yaml_transform import YamlTransform + +class FakeEnrichmentTransform: + def __init__(self, enrichment_handler, handler_config, timeout = 30): + self._enrichment_handler = enrichment_handler + self._handler_config = handler_config + self._timeout = timeout + + def __call__(self, enrichment_handler, *, handler_config, timeout = 30): + assert enrichment_handler == self._enrichment_handler + assert handler_config == self._handler_config + assert timeout == self._timeout + return beam.Map(lambda x: beam.Row(**x._asdict())) + + +class EnrichmentTransformTest(unittest.TestCase): + + @patch('apache_beam.yaml.yaml_enrichment.enrichment_transform', FakeEnrichmentTransform) + def test_enrichment_with_bigquery(self): + input_data = [ + Row(label = "item1", rank = 0), + Row(label = "item2", rank = 1), + ] + + handler = 'BigQuery' + config = { + "project": "apache-beam-testing", + "table_name": "project.database.table", + "row_restriction_template": "label='item1' or label='item2'", + "fields": ["label"] + } + + with beam.Pipeline() as p: + with mock.patch('apache_beam.yaml.yaml_enrichment.enrichment_transform', + FakeEnrichmentTransform( + enrichment_handler = handler, + handler_config = config)): + input_pcoll = p | 'CreateInput' >> beam.Create(input_data) + result = input_pcoll | YamlTransform( + f''' + type: Enrichment + config: + enrichment_handler: {handler} + handler_config: {config} + ''') + assert_that( + result, + equal_to(input_data)) + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() From f9fc865fbe4a06df44c461cabb8c841d279668b7 Mon Sep 17 00:00:00 2001 From: Reeba Qureshi <64488642+reeba212@users.noreply.github.com> Date: Thu, 22 Aug 2024 23:03:41 +0530 Subject: [PATCH 03/11] Create enrichment integration test --- .../apache_beam/yaml/tests/enrichment.yaml | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 sdks/python/apache_beam/yaml/tests/enrichment.yaml diff --git a/sdks/python/apache_beam/yaml/tests/enrichment.yaml b/sdks/python/apache_beam/yaml/tests/enrichment.yaml new file mode 100644 index 000000000000..7c107b2e71c1 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/enrichment.yaml @@ -0,0 +1,66 @@ +fixtures: + - name: BQ_TABLE + type: "apache_beam.yaml.integration_tests.temp_bigquery_table" + config: + project: "apache-beam-testing" + - name: TEMP_DIR + type: "apache_beam.yaml.integration_tests.gcs_temp_dir" + config: + bucket: "gs://temp-storage-for-end-to-end-tests/temp-it" + +pipelines: + - pipeline: + type: chain + transforms: + - type: Create + name: Rows + config: + elements: + - {label: '11a', rank: 0} + - {label: '37a', rank: 1} + - {label: '389a', rank: 2} + + - type: WriteToBigQuery + config: + table: "{BQ_TABLE}" + + - pipeline: + type: chain + transforms: + - type: Create + name: Data + config: + elements: + - {label: '11a', name: 'S1'} + - {label: '37a', name: 'S2'} + - {label: '389a', name: 'S3'} + - type: Enrichment + name: Enriched + config: + enrichment_handler: 'BigQuery' + handler_config: + project: apache-beam-testing + table_name: "{BQ_TABLE}" + fields: ['label'] + row_restriction_template: "label = '37a'" + timeout: 30 + + - type: MapToFields + config: + language: python + fields: + label: + callable: 'lambda x: x.label' + output_type: string + rank: + callable: 'lambda x: x.rank' + output_type: integer + name: + callable: 'lambda x: x.name' + output_type: string + + - type: AssertEqual + config: + elements: + - {label: '37a', rank: 1, name: 'S2'} + From ba2b10e6b32a339dc2a29b19cb0f79ececde9121 Mon Sep 17 00:00:00 2001 From: Reeba Qureshi <64488642+reeba212@users.noreply.github.com> Date: Thu, 22 Aug 2024 23:07:06 +0530 Subject: [PATCH 04/11] Register enrichment transform in standard_providers.yaml --- sdks/python/apache_beam/yaml/standard_providers.yaml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdks/python/apache_beam/yaml/standard_providers.yaml b/sdks/python/apache_beam/yaml/standard_providers.yaml index 574179805959..242faaa9a77b 100644 --- a/sdks/python/apache_beam/yaml/standard_providers.yaml +++ b/sdks/python/apache_beam/yaml/standard_providers.yaml @@ -101,3 +101,8 @@ Explode: "beam:schematransform:org.apache.beam:yaml:explode:v1" config: gradle_target: 'sdks:java:extensions:sql:expansion-service:shadowJar' + +- type: 'python' + config: {} + transforms: + Enrichment: 'apache_beam.yaml.yaml_enrichment.enrichment_transform' From 788e4a20e95efd2ab9ce487f2844b0f800ac3694 Mon Sep 17 00:00:00 2001 From: Reeba Qureshi Date: Mon, 26 Aug 2024 16:45:57 +0530 Subject: [PATCH 05/11] minor changes 1. Added links for different handlers and removed code for unreachable conditions 2. Removed patch decorator in test --- sdks/python/apache_beam/yaml/yaml_enrichment.py | 13 ++++--------- .../python/apache_beam/yaml/yaml_enrichment_test.py | 1 - 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_enrichment.py b/sdks/python/apache_beam/yaml/yaml_enrichment.py index 1d73e8c1794e..a3d5a4f8c942 100644 --- a/sdks/python/apache_beam/yaml/yaml_enrichment.py +++ b/sdks/python/apache_beam/yaml/yaml_enrichment.py @@ -18,10 +18,10 @@ def enrichment_transform(pcoll, enrichment_handler: str, handler_config: Dict[st into the pipeline for enriching data. It can be a string value in ["BigQuery", "BigTable", "FeastFeatureStore", "VertexAIFeatureStore"]. handler_config: Specifies the parameters for the respective enrichment_handler in a dictionary format. - BigQuery: project, table_name, row_restriction_template, fields, column_names, condition_value_fn, query_fn, min_batch_size, max_batch_size - BigTable: project_id, instance_id, table_id row_key, row_filter, app_profile_id, encoding, ow_key_fn, exception_level, include_timestamp - FeastFeatureStore: feature_store_yaml_path, feature_names, feature_service_name, full_feature_names, entity_row_fn, exception_level - VertexAIFeatureStore: project, location, api_endpoint, feature_store_name:, feature_view_name, row_key, exception_level + BigQuery : project, table_name, row_restriction_template, fields, column_names, condition_value_fn, query_fn, min_batch_size, max_batch_size + BigTable : project_id, instance_id, table_id row_key, row_filter, app_profile_id, encoding, ow_key_fn, exception_level, include_timestamp + FeastFeatureStore : feature_store_yaml_path, feature_names, feature_service_name, full_feature_names, entity_row_fn, exception_level + VertexAIFeatureStore : project, location, api_endpoint, feature_store_name:, feature_view_name, row_key, exception_level Example Usage: @@ -36,11 +36,6 @@ def enrichment_transform(pcoll, enrichment_handler: str, handler_config: Dict[st timeout: 30 """ - if enrichment_handler is None: - raise ValueError("Missing 'source' in enrichment spec.") - if handler_config is None: - raise ValueError("Missing 'handler_config' in enrichment spec.") - handler_map = { 'BigQuery': BigQueryEnrichmentHandler, 'BigTable': BigTableEnrichmentHandler, diff --git a/sdks/python/apache_beam/yaml/yaml_enrichment_test.py b/sdks/python/apache_beam/yaml/yaml_enrichment_test.py index 4042e33c3520..35e333fc2f41 100644 --- a/sdks/python/apache_beam/yaml/yaml_enrichment_test.py +++ b/sdks/python/apache_beam/yaml/yaml_enrichment_test.py @@ -25,7 +25,6 @@ def __call__(self, enrichment_handler, *, handler_config, timeout = 30): class EnrichmentTransformTest(unittest.TestCase): - @patch('apache_beam.yaml.yaml_enrichment.enrichment_transform', FakeEnrichmentTransform) def test_enrichment_with_bigquery(self): input_data = [ Row(label = "item1", rank = 0), From e20166d87c0369ee7cb243bea59d5bc84daa3f22 Mon Sep 17 00:00:00 2001 From: Reeba Qureshi Date: Thu, 19 Sep 2024 21:48:32 +0530 Subject: [PATCH 06/11] minor updates --- .../apache_beam/yaml/integration_tests.py | 2 +- .../apache_beam/yaml/tests/enrichment.yaml | 17 ++++ .../apache_beam/yaml/yaml_enrichment.py | 97 ++++++++++++++----- .../apache_beam/yaml/yaml_enrichment_test.py | 94 ++++++++++-------- 4 files changed, 144 insertions(+), 66 deletions(-) diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index af1be7b1e8e5..72b3918195da 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -69,7 +69,7 @@ def temp_bigquery_table(project, prefix='yaml_bq_it_'): dataset_id = '%s_%s' % (prefix, uuid.uuid4().hex) bigquery_client.get_or_create_dataset(project, dataset_id) logging.info("Created dataset %s in project %s", dataset_id, project) - yield f'{project}:{dataset_id}.tmp_table' + yield f'{project}.{dataset_id}.tmp_table' request = bigquery.BigqueryDatasetsDeleteRequest( projectId=project, datasetId=dataset_id, deleteContents=True) logging.info("Deleting dataset %s in project %s", dataset_id, project) diff --git a/sdks/python/apache_beam/yaml/tests/enrichment.yaml b/sdks/python/apache_beam/yaml/tests/enrichment.yaml index 7c107b2e71c1..216a18add83f 100644 --- a/sdks/python/apache_beam/yaml/tests/enrichment.yaml +++ b/sdks/python/apache_beam/yaml/tests/enrichment.yaml @@ -1,3 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + fixtures: - name: BQ_TABLE type: "apache_beam.yaml.integration_tests.temp_bigquery_table" diff --git a/sdks/python/apache_beam/yaml/yaml_enrichment.py b/sdks/python/apache_beam/yaml/yaml_enrichment.py index a3d5a4f8c942..77428bbd59f5 100644 --- a/sdks/python/apache_beam/yaml/yaml_enrichment.py +++ b/sdks/python/apache_beam/yaml/yaml_enrichment.py @@ -1,3 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + from typing import Any, Dict import apache_beam as beam from apache_beam.transforms.enrichment_handlers.bigquery import BigQueryEnrichmentHandler @@ -7,21 +24,53 @@ from apache_beam.transforms.enrichment import Enrichment from typing import Optional + @beam.ptransform.ptransform_fn -def enrichment_transform(pcoll, enrichment_handler: str, handler_config: Dict[str, Any], timeout: Optional[float] = 30): - """ - The Enrichment transform allows you to dynamically enhance elements in a pipeline - by performing key-value lookups against external services like APIs or databases. +def enrichment_transform( + pcoll, + enrichment_handler: str, + handler_config: Dict[str, Any], + timeout: Optional[float] = 30): + """ + The Enrichment transform allows you to dynamically + enhance elements in a pipeline by performing key-value + lookups against external services like APIs or databases. Args: - enrichment_handler: Specifies the source from where data needs to be extracted - into the pipeline for enriching data. It can be a string value in ["BigQuery", - "BigTable", "FeastFeatureStore", "VertexAIFeatureStore"]. - handler_config: Specifies the parameters for the respective enrichment_handler in a dictionary format. - BigQuery : project, table_name, row_restriction_template, fields, column_names, condition_value_fn, query_fn, min_batch_size, max_batch_size - BigTable : project_id, instance_id, table_id row_key, row_filter, app_profile_id, encoding, ow_key_fn, exception_level, include_timestamp - FeastFeatureStore : feature_store_yaml_path, feature_names, feature_service_name, full_feature_names, entity_row_fn, exception_level - VertexAIFeatureStore : project, location, api_endpoint, feature_store_name:, feature_view_name, row_key, exception_level + enrichment_handler: Specifies the source from + where data needs to be extracted + into the pipeline for enriching data. + It can be a string value in ["BigQuery", + "BigTable", "FeastFeatureStore", + "VertexAIFeatureStore"]. + handler_config: Specifies the parameters for + the respective enrichment_handler in a dictionary format. + BigQuery = ( + "BigQuery: " + "project, table_name, row_restriction_template, " + "fields, column_names, "condition_value_fn, " + "query_fn, min_batch_size, max_batch_size" + ) + + BigTable = ( + "BigTable: " + "project_id, instance_id, table_id, " + "row_key, row_filter, app_profile_id, " + "encoding, ow_key_fn, exception_level, include_timestamp" + ) + + FeastFeatureStore = ( + "FeastFeatureStore: " + "feature_store_yaml_path, feature_names, " + "feature_service_name, full_feature_names, " + "entity_row_fn, exception_level" + ) + + VertexAIFeatureStore = ( + "VertexAIFeatureStore: " + "project, location, api_endpoint, feature_store_name, " + "feature_view_name, row_key, exception_level" + ) Example Usage: @@ -36,15 +85,15 @@ def enrichment_transform(pcoll, enrichment_handler: str, handler_config: Dict[st timeout: 30 """ - handler_map = { - 'BigQuery': BigQueryEnrichmentHandler, - 'BigTable': BigTableEnrichmentHandler, - 'FeastFeatureStore': FeastFeatureStoreEnrichmentHandler, - 'VertexAIFeatureStore': VertexAIFeatureStoreEnrichmentHandler - } - - if enrichment_handler not in handler_map: - raise ValueError(f"Unknown enrichment source: {enrichment_handler}") - - handler = handler_map[enrichment_handler](**handler_config) - return pcoll | Enrichment(source_handler = handler, timeout = timeout) + handler_map = { + 'BigQuery': BigQueryEnrichmentHandler, + 'BigTable': BigTableEnrichmentHandler, + 'FeastFeatureStore': FeastFeatureStoreEnrichmentHandler, + 'VertexAIFeatureStore': VertexAIFeatureStoreEnrichmentHandler + } + + if enrichment_handler not in handler_map: + raise ValueError(f"Unknown enrichment source: {enrichment_handler}") + + handler = handler_map[enrichment_handler](**handler_config) + return pcoll | Enrichment(source_handler=handler, timeout=timeout) diff --git a/sdks/python/apache_beam/yaml/yaml_enrichment_test.py b/sdks/python/apache_beam/yaml/yaml_enrichment_test.py index 35e333fc2f41..9cd28995dfe4 100644 --- a/sdks/python/apache_beam/yaml/yaml_enrichment_test.py +++ b/sdks/python/apache_beam/yaml/yaml_enrichment_test.py @@ -1,61 +1,73 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + import unittest import logging import mock import apache_beam as beam -from apache_beam.testing.util import assert_that, equal_to -from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.transforms import Map -from apache_beam.yaml.yaml_enrichment import enrichment_transform +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to from apache_beam import Row -from unittest.mock import patch from apache_beam.yaml.yaml_transform import YamlTransform + class FakeEnrichmentTransform: - def __init__(self, enrichment_handler, handler_config, timeout = 30): - self._enrichment_handler = enrichment_handler - self._handler_config = handler_config - self._timeout = timeout + def __init__(self, enrichment_handler, handler_config, timeout=30): + self._enrichment_handler = enrichment_handler + self._handler_config = handler_config + self._timeout = timeout - def __call__(self, enrichment_handler, *, handler_config, timeout = 30): - assert enrichment_handler == self._enrichment_handler - assert handler_config == self._handler_config - assert timeout == self._timeout - return beam.Map(lambda x: beam.Row(**x._asdict())) + def __call__(self, enrichment_handler, *, handler_config, timeout=30): + assert enrichment_handler == self._enrichment_handler + assert handler_config == self._handler_config + assert timeout == self._timeout + return beam.Map(lambda x: beam.Row(**x._asdict())) class EnrichmentTransformTest(unittest.TestCase): + def test_enrichment_with_bigquery(self): + input_data = [ + Row(label="item1", rank=0), + Row(label="item2", rank=1), + ] - def test_enrichment_with_bigquery(self): - input_data = [ - Row(label = "item1", rank = 0), - Row(label = "item2", rank = 1), - ] - - handler = 'BigQuery' - config = { - "project": "apache-beam-testing", - "table_name": "project.database.table", - "row_restriction_template": "label='item1' or label='item2'", - "fields": ["label"] - } - - with beam.Pipeline() as p: - with mock.patch('apache_beam.yaml.yaml_enrichment.enrichment_transform', - FakeEnrichmentTransform( - enrichment_handler = handler, - handler_config = config)): - input_pcoll = p | 'CreateInput' >> beam.Create(input_data) - result = input_pcoll | YamlTransform( - f''' + handler = 'BigQuery' + config = { + "project": "apache-beam-testing", + "table_name": "project.database.table", + "row_restriction_template": "label='item1' or label='item2'", + "fields": ["label"] + } + + with beam.Pipeline() as p: + with mock.patch('apache_beam.yaml.yaml_enrichment.enrichment_transform', + FakeEnrichmentTransform(enrichment_handler=handler, + handler_config=config)): + input_pcoll = p | 'CreateInput' >> beam.Create(input_data) + result = input_pcoll | YamlTransform( + f''' type: Enrichment config: enrichment_handler: {handler} handler_config: {config} ''') - assert_that( - result, - equal_to(input_data)) + assert_that(result, equal_to(input_data)) + if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - unittest.main() + logging.getLogger().setLevel(logging.INFO) + unittest.main() From ee6258ef5abfbf110067497975c509033e547a97 Mon Sep 17 00:00:00 2001 From: Reeba Qureshi Date: Fri, 20 Sep 2024 00:36:57 +0530 Subject: [PATCH 07/11] fixing lint failures --- sdks/python/apache_beam/yaml/yaml_enrichment.py | 8 +++++--- sdks/python/apache_beam/yaml/yaml_enrichment_test.py | 6 ++++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_enrichment.py b/sdks/python/apache_beam/yaml/yaml_enrichment.py index 77428bbd59f5..4ec8a5a786d3 100644 --- a/sdks/python/apache_beam/yaml/yaml_enrichment.py +++ b/sdks/python/apache_beam/yaml/yaml_enrichment.py @@ -15,14 +15,16 @@ # limitations under the License. # -from typing import Any, Dict +from typing import Any +from typing import Dict +from typing import Optional + import apache_beam as beam +from apache_beam.transforms.enrichment import Enrichment from apache_beam.transforms.enrichment_handlers.bigquery import BigQueryEnrichmentHandler from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler from apache_beam.transforms.enrichment_handlers.feast_feature_store import FeastFeatureStoreEnrichmentHandler from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store import VertexAIFeatureStoreEnrichmentHandler -from apache_beam.transforms.enrichment import Enrichment -from typing import Optional @beam.ptransform.ptransform_fn diff --git a/sdks/python/apache_beam/yaml/yaml_enrichment_test.py b/sdks/python/apache_beam/yaml/yaml_enrichment_test.py index 9cd28995dfe4..e26d6140af23 100644 --- a/sdks/python/apache_beam/yaml/yaml_enrichment_test.py +++ b/sdks/python/apache_beam/yaml/yaml_enrichment_test.py @@ -15,13 +15,15 @@ # limitations under the License. # -import unittest import logging +import unittest + import mock + import apache_beam as beam +from apache_beam import Row from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to -from apache_beam import Row from apache_beam.yaml.yaml_transform import YamlTransform From c647e47c573e884196c4919027ad59e4575af429 Mon Sep 17 00:00:00 2001 From: Reeba Qureshi Date: Fri, 20 Sep 2024 07:39:31 +0530 Subject: [PATCH 08/11] disable feast if not installed --- sdks/python/apache_beam/yaml/yaml_enrichment.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/yaml_enrichment.py b/sdks/python/apache_beam/yaml/yaml_enrichment.py index 4ec8a5a786d3..0fbe57321395 100644 --- a/sdks/python/apache_beam/yaml/yaml_enrichment.py +++ b/sdks/python/apache_beam/yaml/yaml_enrichment.py @@ -20,12 +20,17 @@ from typing import Optional import apache_beam as beam +from apache_beam.yaml import options from apache_beam.transforms.enrichment import Enrichment from apache_beam.transforms.enrichment_handlers.bigquery import BigQueryEnrichmentHandler from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler -from apache_beam.transforms.enrichment_handlers.feast_feature_store import FeastFeatureStoreEnrichmentHandler from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store import VertexAIFeatureStoreEnrichmentHandler +try: + from apache_beam.transforms.enrichment_handlers.feast_feature_store import FeastFeatureStoreEnrichmentHandler +except ImportError: + FeastFeatureStoreEnrichmentHandler = None + @beam.ptransform.ptransform_fn def enrichment_transform( @@ -87,6 +92,12 @@ def enrichment_transform( timeout: 30 """ + options.YamlOptions.check_enabled(pcoll.pipeline, 'Enrichment') + if (enrichment_handler == 'FeastFeatureStore' and + not FeastFeatureStoreEnrichmentHandler): + raise ValueError( + "FeastFeatureStore handler requires 'feast' package to be installed. " + + "Please install using 'pip install feast[gcp]' and try again.") handler_map = { 'BigQuery': BigQueryEnrichmentHandler, 'BigTable': BigTableEnrichmentHandler, From e917933c09005d6d37d4ad9c0116a6688322258b Mon Sep 17 00:00:00 2001 From: Reeba Qureshi Date: Thu, 3 Oct 2024 23:36:34 +0530 Subject: [PATCH 09/11] fix failures --- .../apache_beam/yaml/standard_providers.yaml | 3 ++- .../apache_beam/yaml/tests/enrichment.yaml | 3 ++- .../apache_beam/yaml/yaml_enrichment.py | 21 ++++++++++++++----- sdks/python/apache_beam/yaml/yaml_provider.py | 16 ++++++++++---- 4 files changed, 32 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/yaml/standard_providers.yaml b/sdks/python/apache_beam/yaml/standard_providers.yaml index 242faaa9a77b..15d5fdc24914 100644 --- a/sdks/python/apache_beam/yaml/standard_providers.yaml +++ b/sdks/python/apache_beam/yaml/standard_providers.yaml @@ -103,6 +103,7 @@ gradle_target: 'sdks:java:extensions:sql:expansion-service:shadowJar' - type: 'python' - config: {} + config: + requires_gcp: true transforms: Enrichment: 'apache_beam.yaml.yaml_enrichment.enrichment_transform' diff --git a/sdks/python/apache_beam/yaml/tests/enrichment.yaml b/sdks/python/apache_beam/yaml/tests/enrichment.yaml index 216a18add83f..6469c094b8b4 100644 --- a/sdks/python/apache_beam/yaml/tests/enrichment.yaml +++ b/sdks/python/apache_beam/yaml/tests/enrichment.yaml @@ -80,4 +80,5 @@ pipelines: config: elements: - {label: '37a', rank: 1, name: 'S2'} - + options: + yaml_experimental_features: [ 'Enrichment' ] \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/yaml_enrichment.py b/sdks/python/apache_beam/yaml/yaml_enrichment.py index 0fbe57321395..e2dc72f3dac8 100644 --- a/sdks/python/apache_beam/yaml/yaml_enrichment.py +++ b/sdks/python/apache_beam/yaml/yaml_enrichment.py @@ -21,15 +21,19 @@ import apache_beam as beam from apache_beam.yaml import options -from apache_beam.transforms.enrichment import Enrichment -from apache_beam.transforms.enrichment_handlers.bigquery import BigQueryEnrichmentHandler -from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler -from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store import VertexAIFeatureStoreEnrichmentHandler try: + from apache_beam.transforms.enrichment import Enrichment + from apache_beam.transforms.enrichment_handlers.bigquery import BigQueryEnrichmentHandler + from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler + from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store import VertexAIFeatureStoreEnrichmentHandler from apache_beam.transforms.enrichment_handlers.feast_feature_store import FeastFeatureStoreEnrichmentHandler except ImportError: - FeastFeatureStoreEnrichmentHandler = None + Enrichment = None # type: ignore + BigQueryEnrichmentHandler = None # type: ignore + BigTableEnrichmentHandler = None # type: ignore + VertexAIFeatureStoreEnrichmentHandler = None # type: ignore + FeastFeatureStoreEnrichmentHandler = None # type: ignore @beam.ptransform.ptransform_fn @@ -93,11 +97,18 @@ def enrichment_transform( """ options.YamlOptions.check_enabled(pcoll.pipeline, 'Enrichment') + + if not Enrichment: + raise ValueError( + f"gcp dependencies not installed. Cannot use {enrichment_handler} " + f"handler. Please install using 'pip install apache-beam[gcp]'.") + if (enrichment_handler == 'FeastFeatureStore' and not FeastFeatureStoreEnrichmentHandler): raise ValueError( "FeastFeatureStore handler requires 'feast' package to be installed. " + "Please install using 'pip install feast[gcp]' and try again.") + handler_map = { 'BigQuery': BigQueryEnrichmentHandler, 'BigTable': BigTableEnrichmentHandler, diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py index c2cba936abce..2d6ed2e5b956 100755 --- a/sdks/python/apache_beam/yaml/yaml_provider.py +++ b/sdks/python/apache_beam/yaml/yaml_provider.py @@ -231,9 +231,17 @@ def provider_from_spec(cls, spec): result.to_json = lambda: spec return result except Exception as exn: - raise ValueError( - f'Unable to instantiate provider of type {type} ' - f'at line {SafeLineLoader.get_line(spec)}: {exn}') from exn + if isinstance(exn, ModuleNotFoundError) and config.get('requires_gcp', + False): + print( + f"gcp dependencies not installed. Cannot use transforms: " + f"{', '.join(urns.keys())}. Please install using " + f"'pip install apache-beam[gcp]'.") + return InlineProvider({}) + else: + raise ValueError( + f'Unable to instantiate provider of type {type} ' + f'at line {SafeLineLoader.get_line(spec)}: {exn}') from exn else: raise NotImplementedError( f'Unknown provider type: {type} ' @@ -335,7 +343,7 @@ def cache_artifacts(self): @ExternalProvider.register_provider_type('python') -def python(urns, packages=()): +def python(urns, packages=(), requires_gcp=False): if packages: return ExternalPythonProvider(urns, packages) else: From e24ea06263f20d3425673e96e245d0527ec83743 Mon Sep 17 00:00:00 2001 From: Reeba Qureshi Date: Fri, 4 Oct 2024 00:31:41 +0530 Subject: [PATCH 10/11] separate block for feast import error --- sdks/python/apache_beam/yaml/yaml_enrichment.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/yaml_enrichment.py b/sdks/python/apache_beam/yaml/yaml_enrichment.py index e2dc72f3dac8..00f2a5c1b1d1 100644 --- a/sdks/python/apache_beam/yaml/yaml_enrichment.py +++ b/sdks/python/apache_beam/yaml/yaml_enrichment.py @@ -27,12 +27,15 @@ from apache_beam.transforms.enrichment_handlers.bigquery import BigQueryEnrichmentHandler from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store import VertexAIFeatureStoreEnrichmentHandler - from apache_beam.transforms.enrichment_handlers.feast_feature_store import FeastFeatureStoreEnrichmentHandler except ImportError: Enrichment = None # type: ignore BigQueryEnrichmentHandler = None # type: ignore BigTableEnrichmentHandler = None # type: ignore VertexAIFeatureStoreEnrichmentHandler = None # type: ignore + +try: + from apache_beam.transforms.enrichment_handlers.feast_feature_store import FeastFeatureStoreEnrichmentHandler +except ImportError: FeastFeatureStoreEnrichmentHandler = None # type: ignore From c288f1ad38410b266a8bf6ed5b54dc2d55935de3 Mon Sep 17 00:00:00 2001 From: Reeba Qureshi Date: Fri, 4 Oct 2024 20:01:56 +0530 Subject: [PATCH 11/11] minor changes --- .../apache_beam/yaml/standard_providers.yaml | 3 +-- sdks/python/apache_beam/yaml/yaml_provider.py | 16 ++++------------ 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/yaml/standard_providers.yaml b/sdks/python/apache_beam/yaml/standard_providers.yaml index 15d5fdc24914..242faaa9a77b 100644 --- a/sdks/python/apache_beam/yaml/standard_providers.yaml +++ b/sdks/python/apache_beam/yaml/standard_providers.yaml @@ -103,7 +103,6 @@ gradle_target: 'sdks:java:extensions:sql:expansion-service:shadowJar' - type: 'python' - config: - requires_gcp: true + config: {} transforms: Enrichment: 'apache_beam.yaml.yaml_enrichment.enrichment_transform' diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py index 2d6ed2e5b956..c2cba936abce 100755 --- a/sdks/python/apache_beam/yaml/yaml_provider.py +++ b/sdks/python/apache_beam/yaml/yaml_provider.py @@ -231,17 +231,9 @@ def provider_from_spec(cls, spec): result.to_json = lambda: spec return result except Exception as exn: - if isinstance(exn, ModuleNotFoundError) and config.get('requires_gcp', - False): - print( - f"gcp dependencies not installed. Cannot use transforms: " - f"{', '.join(urns.keys())}. Please install using " - f"'pip install apache-beam[gcp]'.") - return InlineProvider({}) - else: - raise ValueError( - f'Unable to instantiate provider of type {type} ' - f'at line {SafeLineLoader.get_line(spec)}: {exn}') from exn + raise ValueError( + f'Unable to instantiate provider of type {type} ' + f'at line {SafeLineLoader.get_line(spec)}: {exn}') from exn else: raise NotImplementedError( f'Unknown provider type: {type} ' @@ -343,7 +335,7 @@ def cache_artifacts(self): @ExternalProvider.register_provider_type('python') -def python(urns, packages=(), requires_gcp=False): +def python(urns, packages=()): if packages: return ExternalPythonProvider(urns, packages) else: