Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix CopyIncrementally with no data #54

Merged
merged 3 commits into from
Mar 8, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions mara_pipelines/commands/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import mara_db.dbs
import mara_db.shell
import mara_db.postgresql
from mara_page import _, html
from .. import config, shell, pipelines
from ..incremental_processing import file_dependencies
Expand Down Expand Up @@ -43,6 +44,7 @@ def sql_file_path(self) -> pathlib.Path:
pipeline_candidate = self
while not isinstance(pipeline_candidate, pipelines.Pipeline):
pipeline_candidate = pipeline_candidate.parent
assert isinstance(pipeline_candidate, pipelines.Pipeline)
return pipeline_candidate.base_path() / self.sql_file_name

def shell_command(self):
Expand Down Expand Up @@ -163,7 +165,7 @@ def target_db_alias(self):
return self._target_db_alias or config.default_db_alias()

def file_path(self) -> pathlib.Path:
return self.parent.parent.base_path() / self.file_name
return self.parent.parent.base_path() / self.sql_file_name
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have error'd -> not used?


def run(self) -> bool:
if self.sql_file_name:
Expand All @@ -185,7 +187,6 @@ def run(self) -> bool:
# (see also above in ExecuteSQL)
file_dependencies.delete(self.node_path(), dependency_type)


if not super().run():
return False

Expand Down Expand Up @@ -262,7 +263,7 @@ def run(self) -> bool:
# retrieve the highest current value for the modification comparison (e.g.: the highest timestamp)
# We intentionally use the command line here (rather than sqlalchemy) to avoid forcing people python drivers,
# which can be hard for example in the case of SQL Server
logger.log(f'get highest modification comparison value', format=logger.Format.ITALICS)
logger.log(f'Get new max modification comparison value...', format=logger.Format.ITALICS)
max_value_query = f'SELECT max({self.modification_comparison}) AS maxval FROM {self.source_table}'
logger.log(max_value_query, format=logger.Format.VERBATIM)
result = shell.run_shell_command(f'echo {shlex.quote(max_value_query)} \\\n | '
Expand All @@ -271,30 +272,37 @@ def run(self) -> bool:
if not result:
return False

if isinstance(result, bool):
# This happens if the query above ran, but returned no data and therefore the load
# query below would also return no data
# We assume that this happens e.g. when there is no data *yet* and let the load succeed
# without actually doing anything
logger.log("Found no data, not starting Copy.", format=logger.Format.VERBATIM)
return True
# be flexible with different output formats: remove the column header & remove whitespace & quotes
max_modification_value = ''.join(result).replace('maxval', '').strip().strip('"')
logger.log(repr(max_modification_value), format=logger.Format.VERBATIM)
logger.log(f"New max modification comparison value: {max_modification_value!r}", format=logger.Format.VERBATIM)

# check whether target table is empty
target_table_is_empty = True

target_table_empty_query = f'SELECT TRUE FROM {self.target_table} LIMIT 1'
logger.log(f'check if target table is empty', format=logger.Format.ITALICS)
logger.log(f'Check if target table is empty', format=logger.Format.ITALICS)
logger.log(target_table_empty_query, format=logger.Format.VERBATIM)
with mara_db.postgresql.postgres_cursor_context(self.target_db_alias) as cursor:
cursor.execute(f'SELECT TRUE FROM {self.target_table} LIMIT 1')
target_table_is_empty = not cursor.fetchone()
logger.log(f"target table{'' if target_table_is_empty else ' not'} empty", format=logger.Format.ITALICS)

# get last comparison value
logger.log('get last comparison value', format=logger.Format.ITALICS)
logger.log('Get last comparison value...', format=logger.Format.ITALICS)
last_comparison_value = incremental_copy_status.get_last_comparison_value(
self.node_path(), self.source_db_alias, self.source_table)
logger.log(repr(last_comparison_value), format=logger.Format.VERBATIM)
logger.log(f"Last max modification comparison value: {last_comparison_value!r}", format=logger.Format.VERBATIM)

if target_table_is_empty or not last_comparison_value:
# full load
logger.log('full (non incremental) copy', logger.Format.ITALICS)
logger.log('Using full (non incremental) Copy', logger.Format.ITALICS)
if not target_table_is_empty:
truncate_query = f'TRUNCATE TABLE {self.target_table}'
logger.log(truncate_query, format=logger.Format.VERBATIM)
Expand All @@ -305,7 +313,7 @@ def run(self) -> bool:
# If we would crash during load (with some data already in the table), the next run would
# not trigger a full load and we would miss data. To prevent that, delete the old
# comparison value (we will then set it only on success)
logger.log('deleting old comparison value', logger.Format.ITALICS)
logger.log('Deleting old comparison value', logger.Format.ITALICS)
incremental_copy_status.delete(self.node_path(), self.source_db_alias, self.source_table)

# overwrite the comparison criteria to get everything
Expand All @@ -316,7 +324,7 @@ def run(self) -> bool:

else:
# incremental load. First create the table which will contain the delta
logger.log('incremental copy, create upsert table', logger.Format.ITALICS)
logger.log('Using incremental Copy, create upsert table', logger.Format.ITALICS)
create_upsert_table_query = (f'DROP TABLE IF EXISTS {self.target_table}_upsert;\n'
+ f'CREATE TABLE {self.target_table}_upsert AS SELECT * from {self.target_table} WHERE FALSE')

Expand Down