Skip to content

Commit

Permalink
Merge pull request #1 from cbtham/cbtham/0.18.1
Browse files Browse the repository at this point in the history
Attempt to support 0.18.1
  • Loading branch information
cbtham authored Feb 23, 2022
2 parents f0125d0 + f587968 commit 644abdf
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 28 deletions.
90 changes: 64 additions & 26 deletions provider/sdk/feast_azure_provider/azure_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@
import pandas
from tqdm import tqdm

from feast import FeatureTable
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import RetrievalJob
from feast.infra.offline_stores.offline_utils import get_offline_store_from_config
from feast.infra.online_stores.helpers import get_online_store_from_config
from feast.infra.provider import (
Provider,
RetrievalJob,
_convert_arrow_to_proto,
_get_column_names,
_run_field_mapping,
Expand All @@ -23,60 +22,71 @@
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.repo_config import RepoConfig
from feast.saved_dataset import SavedDataset

DEFAULT_BATCH_SIZE = 10_000

class AzureProvider(Provider):
def __init__(self, config: RepoConfig):
self.repo_config = config
self.offline_store = get_offline_store_from_config(config.offline_store)
self.online_store = get_online_store_from_config(config.online_store)
self.online_store = (
get_online_store_from_config(config.online_store)
if config.online_store
else None
)

def update_infra(
self,
project: str,
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
tables_to_delete: Sequence[FeatureView],
tables_to_keep: Sequence[FeatureView],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
self.online_store.update(
config=self.repo_config,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_keep=entities_to_keep,
entities_to_delete=entities_to_delete,
partial=partial,
)
# Call update only if there is an online store
if self.online_store:
self.online_store.update(
config=self.repo_config,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_keep=entities_to_keep,
entities_to_delete=entities_to_delete,
partial=partial,
)

def teardown_infra(
self,
project: str,
tables: Sequence[Union[FeatureTable, FeatureView]],
tables: Sequence[FeatureView],
entities: Sequence[Entity],
) -> None:
self.online_store.teardown(self.repo_config, tables, entities)
if self.online_store:
self.online_store.teardown(self.repo_config, tables, entities)

def online_write_batch(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
table: FeatureView,
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
self.online_store.online_write_batch(config, table, data, progress)
if self.online_store:
self.online_store.online_write_batch(config, table, data, progress)

def online_read(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: List[str] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
result = self.online_store.online_read(config, table, entity_keys)

result = []
if self.online_store:
result = self.online_store.online_read(config, table, entity_keys, requested_features)
return result

def materialize_single_feature_view(
Expand Down Expand Up @@ -117,12 +127,16 @@ def materialize_single_feature_view(
table = _run_field_mapping(table, feature_view.batch_source.field_mapping)

join_keys = [entity.join_key for entity in entities]
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)

with tqdm_builder(len(rows_to_write)) as pbar:
self.online_write_batch(
self.repo_config, feature_view, rows_to_write, lambda x: pbar.update(x)
)

with tqdm_builder(table.num_rows) as pbar:
for batch in table.to_batches(DEFAULT_BATCH_SIZE):
rows_to_write = _convert_arrow_to_proto(batch, feature_view, join_keys)
self.online_write_batch(
self.repo_config,
feature_view,
rows_to_write,
lambda x: pbar.update(x),
)

def get_historical_features(
self,
Expand All @@ -144,3 +158,27 @@ def get_historical_features(
full_feature_names=full_feature_names,
)
return job

def retrieve_saved_dataset(
self,
config: RepoConfig,
dataset: SavedDataset
) -> RetrievalJob:
feature_name_columns = [
ref.replace(":", "__") if dataset.full_feature_names else ref.split(":")[1]
for ref in dataset.features
]

# ToDo: replace hardcoded value
event_ts_column = "event_timestamp"

return self.offline_store.pull_all_from_table_or_query(
config=config,
data_source=dataset.storage.to_data_source(),
join_key_columns=dataset.join_keys,
feature_name_columns=feature_name_columns,
event_timestamp_column=event_ts_column,
start_date=make_tzaware(dataset.min_event_timestamp), # type: ignore
end_date=make_tzaware(dataset.max_event_timestamp + timedelta(seconds=1)), # type: ignore
)

37 changes: 37 additions & 0 deletions provider/sdk/feast_azure_provider/mssqlserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,43 @@ def pull_latest_from_table_or_query(
on_demand_feature_views=None,
)

def pull_all_from_table_or_query(
self,
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
event_timestamp_column: str,
start_date: datetime,
end_date: datetime,
) -> RetrievalJob:
assert type(data_source).__name__ == "MsSqlServerSource"
assert (
config.offline_store.type
== "feast_azure_provider.mssqlserver.MsSqlServerOfflineStore"
)
from_expression = data_source.get_table_query_string().replace("`", "")

field_string = ", ".join(join_key_columns + feature_name_columns + timestamps)

query = f"""
SELECT {field_string}
FROM (
SELECT {field_string}
FROM {from_expression}
WHERE {event_timestamp_column} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}'
)
"""
self._make_engine(config.offline_store)

return MsSqlServerRetrievalJob(
query=query,
engine=self._engine,
config=config,
full_feature_names=False,
)


def get_historical_features(
self,
config: RepoConfig,
Expand Down
4 changes: 2 additions & 2 deletions provider/sdk/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
setup(
name="feast-azure-provider",
author="Microsoft",
version="0.2.29",
version="0.2.297",
description="A Feast Azure Provider",
URL="https://github.com/cbtham/feast-azure",
long_description=LONG_DESCRIPTION,
long_description_content_type="text/markdown",
python_requires=">=3.7.0",
packages=find_packages(exclude=("tests",)),
install_requires=[
"feast[redis]==0.17.0",
"feast[redis]==0.18.1",
"azure-storage-blob>=0.37.0",
"azure-identity>=1.6.1",
"SQLAlchemy>=1.4.19",
Expand Down

0 comments on commit 644abdf

Please sign in to comment.