Skip to content

Commit

Permalink
Merge branch 'main' into feature/add-accuracy-metric
Browse files Browse the repository at this point in the history
* main: (39 commits)
  MINOR - Better PII classification for JSON data (open-metadata#17734)
  New Email Templates (OSS) (open-metadata#17606)
  fix pom. (open-metadata#17682)
  GEN-1333 Add TS validation on DQ and Porfiler data ingestion (open-metadata#17731)
  make cost analysis as collate only (open-metadata#17719)
  Minor: remove unused dependency (open-metadata#17709)
  test: migrate login config to playwright (open-metadata#17740)
  minor(test): fix ingestion related flaky for aut (open-metadata#17727)
  fix expand all operation on terms page (open-metadata#17733)
  Docs: Updating the Image Reference for Bots (open-metadata#17736)
  fix ui freezing due to images in feed changes (open-metadata#17703)
  add links to menus (open-metadata#17659)
  supported followed data in Following widget using search api (open-metadata#17689)
  minor(ui): align dependency version to fix vulnerabilities (open-metadata#17729)
  Fixes some things on the APICollection (open-metadata#17704)
  DOCS - OSS deployment is flagged as Collate False (open-metadata#17722)
  minor: disable image upload support in description editor (open-metadata#17697)
  fix user spec flaky playwright test (open-metadata#17684)
  fetch domains before any widget is loaded (open-metadata#17695)
  minor(test): migrate persona spec (open-metadata#17701)
  ...
  • Loading branch information
hurongliang committed Sep 6, 2024
2 parents 9470e1e + 8191202 commit f3e8b9c
Show file tree
Hide file tree
Showing 467 changed files with 5,163 additions and 3,725 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def patch( # pylint: disable=too-many-arguments

except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Error trying to PATCH {get_log_name(source)}: {exc}")
logger.warning(f"Error trying to PATCH {get_log_name(source)}: {exc}")

return None

Expand Down
8 changes: 3 additions & 5 deletions ingestion/src/metadata/ingestion/sink/metadata_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,11 +586,9 @@ def write_profiler_response(self, record: ProfilerResponse) -> Either[Table]:
table=record.table, column_tags=record.column_tags
)
if not patched:
self.status.failed(
StackTraceError(
name=table.fullyQualifiedName.root,
error="Error patching tags for table",
)
self.status.warning(
key=table.fullyQualifiedName.root,
reason="Error patching tags for table",
)
else:
logger.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,24 @@ def get_owner_ref(
logger.warning(f"Could not fetch owner data due to {err}")
return None

@staticmethod
def _get_data_models_tags(dataModels: [DataSource]) -> Set[str]:
"""
Get the tags from the data model in the upstreamDatasources
"""
tags = set()
try:
for data_model in dataModels:
# tags seems to be available for upstreamDatasources only, not for dataModels
for upstream_source in data_model.upstreamDatasources or []:
for tag in upstream_source.tags:
tags.add(tag.name)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error fetching tags from data models: {exc}")

return tags

def yield_tags(
self, dashboard_details: TableauDashboard
) -> Iterable[Either[OMetaTagAndClassification]]:
Expand All @@ -160,8 +178,14 @@ def yield_tags(
for elem in container:
tags.update(elem.tags)

_tags = {tag.label for tag in tags}
# retrieve tags from data models
_data_models_tags = self._get_data_models_tags(dashboard_details.dataModels)

_all_tags = _tags.union(_data_models_tags)

yield from get_ometa_tag_and_classification(
tags=[tag.label for tag in tags],
tags=[tag for tag in _all_tags],
classification_name=TABLEAU_TAG_CATEGORY,
tag_description="Tableau Tag",
classification_description="Tags associated with tableau entities",
Expand Down Expand Up @@ -201,13 +225,23 @@ def _create_datamodel_request(
self.status.filter(data_model_name, "Data model filtered out.")
return
try:
data_model_tags = data_model.tags or []
data_model_request = CreateDashboardDataModelRequest(
name=EntityName(data_model.id),
displayName=data_model_name,
description=Markdown(data_model.description)
if data_model.description
else None,
service=FullyQualifiedEntityName(self.context.get().dashboard_service),
dataModelType=data_model_type.value,
serviceType=DashboardServiceType.Tableau.value,
columns=self.get_column_info(data_model),
tags=get_tag_labels(
metadata=self.metadata,
tags=[tag.name for tag in data_model_tags],
classification_name=TABLEAU_TAG_CATEGORY,
include_tags=self.source_config.includeTags,
),
sql=self._get_datamodel_sql_query(data_model=data_model),
owners=self.get_owner_ref(dashboard_details=dashboard_details),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ class TableauTag(BaseModel):
label: str


class TableauDataModelTag(BaseModel):
"""
Aux class for Tag object for Tableau Data Model
"""

name: str


class TableauOwner(TableauBaseModel):
"""
Aux class for Owner object of the tableau_api_lib response
Expand Down Expand Up @@ -121,6 +129,8 @@ class UpstreamTable(BaseModel):
class DataSource(BaseModel):
id: str
name: Optional[str] = None
description: Optional[str] = None
tags: Optional[List[TableauDataModelTag]] = []
fields: Optional[List[DatasourceField]] = None
upstreamTables: Optional[List[UpstreamTable]] = None
upstreamDatasources: Optional[List["DataSource"]] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
upstreamDatasources{{
id
name
description
tags {{
name
}}
fields {{
id
name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def _(config: DbtCloudConfig): # pylint: disable=too-many-locals
try:
logger.debug("Requesting [dbt_run_results]")
dbt_run_results = client.get(
f"/accounts/{account_id}/runs/{run_id}/artifacts/{DBT_RUN_RESULTS_FILE_NAME}"
f"/accounts/{account_id}/runs/{run_id}/artifacts/{DBT_RUN_RESULTS_FILE_NAME}.json"
)
except Exception as exc:
logger.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
get_columns,
get_etable_owner,
get_foreign_keys,
get_json_fields_and_type,
get_table_comment,
get_table_owner,
get_view_definition,
Expand Down Expand Up @@ -138,6 +139,7 @@
Inspector.get_all_table_ddls = get_all_table_ddls
Inspector.get_table_ddl = get_table_ddl
Inspector.get_table_owner = get_etable_owner
Inspector.get_json_fields_and_type = get_json_fields_and_type

PGDialect.get_foreign_keys = get_foreign_keys

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,88 @@
n.oid = c.relnamespace
ORDER BY 1
"""

POSTGRES_GET_JSON_FIELDS = """
WITH RECURSIVE json_hierarchy AS (
SELECT
key AS path,
json_typeof(value) AS type,
value,
json_build_object() AS properties,
key AS title
FROM
{table_name} tbd,
LATERAL json_each({column_name}::json)
),
build_hierarchy AS (
SELECT
path,
type,
title,
CASE
WHEN type = 'object' THEN
json_build_object(
'title', title,
'type', 'object',
'properties', (
SELECT json_object_agg(
key,
json_build_object(
'title', key,
'type', json_typeof(value),
'properties', (
CASE
WHEN json_typeof(value) = 'object' THEN
(
SELECT json_object_agg(
key,
json_build_object(
'title', key,
'type', json_typeof(value),
'properties',
json_build_object()
)
)
FROM json_each(value::json) AS sub_key_value
)
ELSE json_build_object()
END
)
)
)
FROM json_each(value::json) AS key_value
)
)
WHEN type = 'array' THEN
json_build_object(
'title', title,
'type', 'array',
'properties', json_build_object()
)
ELSE
json_build_object(
'title', title,
'type', type
)
END AS hierarchy
FROM
json_hierarchy
),
aggregate_hierarchy AS (
select
json_build_object(
'title','{column_name}',
'type','object',
'properties',
json_object_agg(
path,
hierarchy
)) AS result
FROM
build_hierarchy
)
SELECT
result
FROM
aggregate_hierarchy;
"""
26 changes: 26 additions & 0 deletions ingestion/src/metadata/ingestion/source/database/postgres/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"""
Postgres SQLAlchemy util methods
"""
import json
import re
import traceback
from typing import Dict, Optional, Tuple
Expand All @@ -23,15 +24,18 @@
from sqlalchemy.engine import reflection
from sqlalchemy.sql import sqltypes

from metadata.generated.schema.entity.data.table import Column
from metadata.ingestion.source.database.postgres.queries import (
POSTGRES_COL_IDENTITY,
POSTGRES_FETCH_FK,
POSTGRES_GET_JSON_FIELDS,
POSTGRES_GET_SERVER_VERSION,
POSTGRES_SQL_COLUMNS,
POSTGRES_TABLE_COMMENTS,
POSTGRES_TABLE_OWNERS,
POSTGRES_VIEW_DEFINITIONS,
)
from metadata.parsers.json_schema_parser import parse_json_schema
from metadata.utils.logger import utils_logger
from metadata.utils.sqlalchemy_utils import (
get_table_comment_wrapper,
Expand Down Expand Up @@ -186,6 +190,28 @@ def get_table_comment(
)


@reflection.cache
def get_json_fields_and_type(
self, table_name, column_name, schema=None, **kw
): # pylint: disable=unused-argument
try:
query = POSTGRES_GET_JSON_FIELDS.format(
table_name=table_name, column_name=column_name
)
cursor = self.engine.execute(query)
result = cursor.fetchone()
if result:
parsed_column = parse_json_schema(json.dumps(result[0]), Column)
if parsed_column:
return parsed_column[0].children
except Exception as err:
logger.warning(
f"Unable to parse the json fields for {table_name}.{column_name} - {err}"
)
logger.debug(traceback.format_exc())
return None


@reflection.cache
def get_columns( # pylint: disable=too-many-locals
self, connection, table_name, schema=None, **kw
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,26 @@ def _process_complex_col_type(self, parsed_string: dict, column: dict) -> Column
]
return Column(**parsed_string)

@calculate_execution_time()
def process_json_type_column_fields( # pylint: disable=too-many-locals
self, schema_name: str, table_name: str, column_name: str, inspector: Inspector
) -> Optional[List[Column]]:
"""
Parse fields column with json data types
"""
try:
if hasattr(inspector, "get_json_fields_and_type"):
result = inspector.get_json_fields_and_type(
table_name, column_name, schema_name
)
return result

except NotImplementedError:
logger.debug(
"Cannot parse json fields for table column [{schema_name}.{table_name}.{col_name}]: NotImplementedError"
)
return None

@calculate_execution_time()
def get_columns_and_constraints( # pylint: disable=too-many-locals
self, schema_name: str, table_name: str, db_name: str, inspector: Inspector
Expand Down Expand Up @@ -271,6 +291,12 @@ def get_columns_and_constraints( # pylint: disable=too-many-locals
precision,
)
col_data_length = 1 if col_data_length is None else col_data_length

if col_type == "JSON":
children = self.process_json_type_column_fields(
schema_name, table_name, column.get("name"), inspector
)

om_column = Column(
name=ColumnName(
root=column["name"]
Expand Down
2 changes: 1 addition & 1 deletion ingestion/src/metadata/parsers/json_schema_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def get_json_schema_fields(
displayName=value.get("title"),
dataType=JsonSchemaDataTypes(value.get("type", "unknown")).name,
description=value.get("description"),
children=get_json_schema_fields(value.get("properties"))
children=get_json_schema_fields(value.get("properties"), cls=cls)
if value.get("type") == "object"
else None,
)
Expand Down
3 changes: 2 additions & 1 deletion ingestion/src/metadata/pii/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def __init__(
) # Used to satisfy type checked

self._ner_scanner = None
self.name_scanner = ColumnNameScanner()
self.confidence_threshold = self.source_config.confidence

@property
Expand Down Expand Up @@ -128,7 +129,7 @@ def process_column(
return None

# Scan by column name. If no results there, check the sample data, if any
tag_and_confidence = ColumnNameScanner.scan(column.name.root) or (
tag_and_confidence = self.name_scanner.scan(column.name.root) or (
self.ner_scanner.scan([row[idx] for row in table_data.rows])
if table_data
else None
Expand Down
23 changes: 23 additions & 0 deletions ingestion/src/metadata/pii/scanners/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Basic Scanner ABC
"""
from abc import ABC, abstractmethod
from typing import Any


class BaseScanner(ABC):
"""Basic scanner abstract class"""

@abstractmethod
def scan(self, data: Any):
"""Scan the given data from a column"""
Loading

0 comments on commit f3e8b9c

Please sign in to comment.