Skip to content

Commit

Permalink
Fix CopyIncrementally with no data (#54)
Browse files Browse the repository at this point in the history
* Fix imports and ensure the IDE knows about types

* Use more descriptive messages in copy task

* Handle case when the source table is empty

We have that for partitioned tables where the time based query does not find any data for several hours.
  • Loading branch information
jankatins authored Mar 8, 2021
1 parent 66e7dc1 commit a65151f
Showing 1 changed file with 18 additions and 10 deletions.
28 changes: 18 additions & 10 deletions mara_pipelines/commands/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import mara_db.dbs
import mara_db.shell
import mara_db.postgresql
from mara_page import _, html
from .. import config, shell, pipelines
from ..incremental_processing import file_dependencies
Expand Down Expand Up @@ -43,6 +44,7 @@ def sql_file_path(self) -> pathlib.Path:
pipeline_candidate = self
while not isinstance(pipeline_candidate, pipelines.Pipeline):
pipeline_candidate = pipeline_candidate.parent
assert isinstance(pipeline_candidate, pipelines.Pipeline)
return pipeline_candidate.base_path() / self.sql_file_name

def shell_command(self):
Expand Down Expand Up @@ -163,7 +165,7 @@ def target_db_alias(self):
return self._target_db_alias or config.default_db_alias()

def file_path(self) -> pathlib.Path:
return self.parent.parent.base_path() / self.file_name
return self.parent.parent.base_path() / self.sql_file_name

def run(self) -> bool:
if self.sql_file_name:
Expand All @@ -185,7 +187,6 @@ def run(self) -> bool:
# (see also above in ExecuteSQL)
file_dependencies.delete(self.node_path(), dependency_type)


if not super().run():
return False

Expand Down Expand Up @@ -266,7 +267,7 @@ def run(self) -> bool:
# retrieve the highest current value for the modification comparison (e.g.: the highest timestamp)
# We intentionally use the command line here (rather than sqlalchemy) to avoid forcing people python drivers,
# which can be hard for example in the case of SQL Server
logger.log(f'get highest modification comparison value', format=logger.Format.ITALICS)
logger.log(f'Get new max modification comparison value...', format=logger.Format.ITALICS)
max_value_query = f'SELECT max({self.modification_comparison}) AS maxval FROM {self.source_table}'
logger.log(max_value_query, format=logger.Format.VERBATIM)
result = shell.run_shell_command(f'echo {shlex.quote(max_value_query)} \\\n | '
Expand All @@ -275,30 +276,37 @@ def run(self) -> bool:
if not result:
return False

if isinstance(result, bool):
# This happens if the query above ran, but returned no data and therefore the load
# query below would also return no data
# We assume that this happens e.g. when there is no data *yet* and let the load succeed
# without actually doing anything
logger.log("Found no data, not starting Copy.", format=logger.Format.VERBATIM)
return True
# be flexible with different output formats: remove the column header & remove whitespace & quotes
max_modification_value = ''.join(result).replace('maxval', '').strip().strip('"')
logger.log(repr(max_modification_value), format=logger.Format.VERBATIM)
logger.log(f"New max modification comparison value: {max_modification_value!r}", format=logger.Format.VERBATIM)

# check whether target table is empty
target_table_is_empty = True

target_table_empty_query = f'SELECT TRUE FROM {self.target_table} LIMIT 1'
logger.log(f'check if target table is empty', format=logger.Format.ITALICS)
logger.log(f'Check if target table is empty', format=logger.Format.ITALICS)
logger.log(target_table_empty_query, format=logger.Format.VERBATIM)
with mara_db.postgresql.postgres_cursor_context(self.target_db_alias) as cursor:
cursor.execute(f'SELECT TRUE FROM {self.target_table} LIMIT 1')
target_table_is_empty = not cursor.fetchone()
logger.log(f"target table{'' if target_table_is_empty else ' not'} empty", format=logger.Format.ITALICS)

# get last comparison value
logger.log('get last comparison value', format=logger.Format.ITALICS)
logger.log('Get last comparison value...', format=logger.Format.ITALICS)
last_comparison_value = incremental_copy_status.get_last_comparison_value(
self.node_path(), self.source_db_alias, self.source_table)
logger.log(repr(last_comparison_value), format=logger.Format.VERBATIM)
logger.log(f"Last max modification comparison value: {last_comparison_value!r}", format=logger.Format.VERBATIM)

if target_table_is_empty or not last_comparison_value:
# full load
logger.log('full (non incremental) copy', logger.Format.ITALICS)
logger.log('Using full (non incremental) Copy', logger.Format.ITALICS)
if not target_table_is_empty:
truncate_query = f'TRUNCATE TABLE {self.target_table}'
logger.log(truncate_query, format=logger.Format.VERBATIM)
Expand All @@ -309,7 +317,7 @@ def run(self) -> bool:
# If we would crash during load (with some data already in the table), the next run would
# not trigger a full load and we would miss data. To prevent that, delete the old
# comparison value (we will then set it only on success)
logger.log('deleting old comparison value', logger.Format.ITALICS)
logger.log('Deleting old comparison value', logger.Format.ITALICS)
incremental_copy_status.delete(self.node_path(), self.source_db_alias, self.source_table)

# overwrite the comparison criteria to get everything
Expand All @@ -320,7 +328,7 @@ def run(self) -> bool:

else:
# incremental load. First create the table which will contain the delta
logger.log('incremental copy, create upsert table', logger.Format.ITALICS)
logger.log('Using incremental Copy, create upsert table', logger.Format.ITALICS)
create_upsert_table_query = (f'DROP TABLE IF EXISTS {self.target_table}_upsert;\n'
+ f'CREATE TABLE {self.target_table}_upsert AS SELECT * from {self.target_table} WHERE FALSE')

Expand Down

0 comments on commit a65151f

Please sign in to comment.