diff --git a/mara_pipelines/commands/sql.py b/mara_pipelines/commands/sql.py index e566b6e..cc4fdda 100644 --- a/mara_pipelines/commands/sql.py +++ b/mara_pipelines/commands/sql.py @@ -8,6 +8,7 @@ import mara_db.dbs import mara_db.shell +import mara_db.postgresql from mara_page import _, html from .. import config, shell, pipelines from ..incremental_processing import file_dependencies @@ -43,6 +44,7 @@ def sql_file_path(self) -> pathlib.Path: pipeline_candidate = self while not isinstance(pipeline_candidate, pipelines.Pipeline): pipeline_candidate = pipeline_candidate.parent + assert isinstance(pipeline_candidate, pipelines.Pipeline) return pipeline_candidate.base_path() / self.sql_file_name def shell_command(self): @@ -163,7 +165,7 @@ def target_db_alias(self): return self._target_db_alias or config.default_db_alias() def file_path(self) -> pathlib.Path: - return self.parent.parent.base_path() / self.file_name + return self.parent.parent.base_path() / self.sql_file_name def run(self) -> bool: if self.sql_file_name: @@ -185,7 +187,6 @@ def run(self) -> bool: # (see also above in ExecuteSQL) file_dependencies.delete(self.node_path(), dependency_type) - if not super().run(): return False @@ -262,7 +263,7 @@ def run(self) -> bool: # retrieve the highest current value for the modification comparison (e.g.: the highest timestamp) # We intentionally use the command line here (rather than sqlalchemy) to avoid forcing people python drivers, # which can be hard for example in the case of SQL Server - logger.log(f'get highest modification comparison value', format=logger.Format.ITALICS) + logger.log(f'Get new max modification comparison value...', format=logger.Format.ITALICS) max_value_query = f'SELECT max({self.modification_comparison}) AS maxval FROM {self.source_table}' logger.log(max_value_query, format=logger.Format.VERBATIM) result = shell.run_shell_command(f'echo {shlex.quote(max_value_query)} \\\n | ' @@ -271,15 +272,22 @@ def run(self) -> bool: if not result: return False + if isinstance(result, bool): + # This happens if the query above ran, but returned no data and therefore the load + # query below would also return no data + # We assume that this happens e.g. when there is no data *yet* and let the load succeed + # without actually doing anything + logger.log("Found no data, not starting Copy.", format=logger.Format.VERBATIM) + return True # be flexible with different output formats: remove the column header & remove whitespace & quotes max_modification_value = ''.join(result).replace('maxval', '').strip().strip('"') - logger.log(repr(max_modification_value), format=logger.Format.VERBATIM) + logger.log(f"New max modification comparison value: {max_modification_value!r}", format=logger.Format.VERBATIM) # check whether target table is empty target_table_is_empty = True target_table_empty_query = f'SELECT TRUE FROM {self.target_table} LIMIT 1' - logger.log(f'check if target table is empty', format=logger.Format.ITALICS) + logger.log(f'Check if target table is empty', format=logger.Format.ITALICS) logger.log(target_table_empty_query, format=logger.Format.VERBATIM) with mara_db.postgresql.postgres_cursor_context(self.target_db_alias) as cursor: cursor.execute(f'SELECT TRUE FROM {self.target_table} LIMIT 1') @@ -287,14 +295,14 @@ def run(self) -> bool: logger.log(f"target table{'' if target_table_is_empty else ' not'} empty", format=logger.Format.ITALICS) # get last comparison value - logger.log('get last comparison value', format=logger.Format.ITALICS) + logger.log('Get last comparison value...', format=logger.Format.ITALICS) last_comparison_value = incremental_copy_status.get_last_comparison_value( self.node_path(), self.source_db_alias, self.source_table) - logger.log(repr(last_comparison_value), format=logger.Format.VERBATIM) + logger.log(f"Last max modification comparison value: {last_comparison_value!r}", format=logger.Format.VERBATIM) if target_table_is_empty or not last_comparison_value: # full load - logger.log('full (non incremental) copy', logger.Format.ITALICS) + logger.log('Using full (non incremental) Copy', logger.Format.ITALICS) if not target_table_is_empty: truncate_query = f'TRUNCATE TABLE {self.target_table}' logger.log(truncate_query, format=logger.Format.VERBATIM) @@ -305,7 +313,7 @@ def run(self) -> bool: # If we would crash during load (with some data already in the table), the next run would # not trigger a full load and we would miss data. To prevent that, delete the old # comparison value (we will then set it only on success) - logger.log('deleting old comparison value', logger.Format.ITALICS) + logger.log('Deleting old comparison value', logger.Format.ITALICS) incremental_copy_status.delete(self.node_path(), self.source_db_alias, self.source_table) # overwrite the comparison criteria to get everything @@ -316,7 +324,7 @@ def run(self) -> bool: else: # incremental load. First create the table which will contain the delta - logger.log('incremental copy, create upsert table', logger.Format.ITALICS) + logger.log('Using incremental Copy, create upsert table', logger.Format.ITALICS) create_upsert_table_query = (f'DROP TABLE IF EXISTS {self.target_table}_upsert;\n' + f'CREATE TABLE {self.target_table}_upsert AS SELECT * from {self.target_table} WHERE FALSE')