Skip to content

Commit

Permalink
Fix editing a locked db
Browse files Browse the repository at this point in the history
In a locked db if existing values are updated, the changes
are stored in the cache and can be committed when the db is
no longer locked. Removing items also works the same way.
Adding items to a locked db is denied and will result in
an error. If trying to commit changes in the db while it
is locked an error telling the commit failed pops up. When
the lock is resolved the changes can again be committed.

Re #2201
  • Loading branch information
Henrik Koski committed Aug 17, 2023
1 parent 3b8cc7e commit 9a9bea6
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 18 deletions.
7 changes: 5 additions & 2 deletions spinetoolbox/mvcmodels/compound_table_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ def batch_set_data(self, indexes, data):
d = {} # Maps models to (index, value) tuples
rows = []
columns = []
successful = True
for index, value in zip(indexes, data):
if not index.isValid():
continue
Expand All @@ -201,14 +202,16 @@ def batch_set_data(self, indexes, data):
d.setdefault(sub_model, list()).append((sub_index, value))
for model, index_value_tuples in d.items():
indexes, values = zip(*index_value_tuples)
model.batch_set_data(list(indexes), list(values))
if not model.batch_set_data(list(indexes), list(values)):
successful = False
break
# Find square envelope of indexes to emit dataChanged
top = min(rows)
bottom = max(rows)
left = min(columns)
right = max(columns)
self.dataChanged.emit(self.index(top, left), self.index(bottom, right))
return True
return successful

def insertRows(self, row, count, parent=QModelIndex()):
"""Inserts count rows after the given row under the given parent.
Expand Down
8 changes: 6 additions & 2 deletions spinetoolbox/spine_db_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,14 +268,16 @@ def _do_clone(self):

def redo(self):
super().redo()
self.db_mngr.add_items(
successful = self.db_mngr.add_items(
self.redo_db_map_data,
self.item_type,
readd=self._readd,
cascade=False,
check=self._check,
callback=self.handle_redo_complete,
)
if not successful:
self.setObsolete(True)

def undo(self):
super().undo()
Expand Down Expand Up @@ -343,9 +345,11 @@ def _do_clone(self):

def redo(self):
super().redo()
self.db_mngr.update_items(
successful = self.db_mngr.update_items(
self.redo_db_map_data, self.item_type, check=self._check, callback=self.handle_redo_complete
)
if not successful:
self.setObsolete(True)

def undo(self):
super().undo()
Expand Down
6 changes: 5 additions & 1 deletion spinetoolbox/spine_db_editor/widgets/spine_db_editor.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,9 +832,13 @@ def tear_down(self):
return False
self._purge_change_notifiers()
self._torn_down = True
self.db_mngr.unregister_listener(
failed_db_maps = self.db_mngr.unregister_listener(
self, *self.db_maps, dirty_db_maps=dirty_db_maps, commit_dirty=commit_dirty, commit_msg=commit_msg
)
if failed_db_maps:
msg = f"Fail to commit due to locked database"
self.db_mngr.receive_error_msg({i: [msg] for i in failed_db_maps})
return False
return True

def _prompt_to_commit_changes(self):
Expand Down
38 changes: 29 additions & 9 deletions spinetoolbox/spine_db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def do_create_new_spine_database(url):
class SpineDBManager(QObject):
"""Class to manage DBs within a project."""

error_msg = Signal(dict)
error_msg = Signal(object)
session_refreshed = Signal(set)
session_committed = Signal(set, object)
session_rolled_back = Signal(set)
Expand Down Expand Up @@ -510,23 +510,30 @@ def unregister_listener(self, listener, *db_maps, dirty_db_maps=None, commit_dir
*db_maps (DiffDatabaseMapping)
commit_dirty (bool): True to commit dirty database mapping, False to roll back
commit_msg (str): commit message
Returns:
failed_db_maps (list): All the db maps that failed to commit
"""
failed_db_maps = list()
if dirty_db_maps:
if commit_dirty:
failed_db_maps = self.commit_session(commit_msg, *dirty_db_maps)
else:
self.rollback_session(*dirty_db_maps)
for db_map in db_maps:
if db_map in failed_db_maps:
continue
self.remove_db_map_listener(db_map, listener)
try:
self.undo_stack[db_map].canRedoChanged.disconnect(listener.update_undo_redo_actions)
self.undo_stack[db_map].canUndoChanged.disconnect(listener.update_undo_redo_actions)
self.undo_stack[db_map].cleanChanged.disconnect(listener.update_commit_enabled)
except AttributeError:
pass
if dirty_db_maps:
if commit_dirty:
self.commit_session(commit_msg, *dirty_db_maps)
else:
self.rollback_session(*dirty_db_maps)
for db_map in db_maps:
if not self.db_map_listeners(db_map):
self.close_session(db_map.db_url)
return failed_db_maps

def is_dirty(self, db_map):
"""Returns True if mapping has pending changes.
Expand Down Expand Up @@ -596,13 +603,20 @@ def commit_session(self, commit_msg, *dirty_db_maps, cookie=None):
commit_msg (str): commit message for all database maps
*dirty_db_maps: dirty database maps to commit
cookie (object, optional): a free form identifier which will be forwarded to ``session_committed`` signal
Returns:
failed_db_maps (list): list of the db maps that failed to commit
"""
failed_db_maps = list()
for db_map in dirty_db_maps:
try:
worker = self._get_worker(db_map)
except KeyError:
continue
worker.commit_session(commit_msg, cookie)
success = worker.commit_session(commit_msg, cookie)
if not success:
failed_db_maps.append(db_map)
return failed_db_maps

def notify_session_committed(self, cookie, *db_maps):
"""Notifies manager and listeners when a commit has taken place by a third party.
Expand Down Expand Up @@ -1000,24 +1014,30 @@ def _import_data_cmds(self, db_map, data_for_import, db_map_error_log):
yield AddItemsCommand(self, db_map, to_add, item_type, check=False)

def add_items(self, db_map_data, item_type, readd=False, cascade=True, check=True, callback=None):
"""Returns True if the adding of all items succeeded, False otherwise"""
successful = []
for db_map, data in db_map_data.items():
try:
worker = self._get_worker(db_map)
except KeyError:
# We're closing the kiosk.
continue
cache = self.get_db_map_cache(db_map)
worker.add_items(data, item_type, readd, cascade, check, cache, callback)
successful.append(not worker.add_items(data, item_type, readd, cascade, check, cache, callback))
return not any(successful)

def update_items(self, db_map_data, item_type, check=True, callback=None):
"""Returns True if the updating of all items succeeded, False otherwise"""
successful = []
for db_map, data in db_map_data.items():
try:
worker = self._get_worker(db_map)
except KeyError:
# We're closing the kiosk.
continue
cache = self.get_db_map_cache(db_map)
worker.update_items(data, item_type, check, cache, callback)
successful.append(not worker.update_items(data, item_type, check, cache, callback))
return not any(successful)

def add_alternatives(self, db_map_data):
"""Adds alternatives to db.
Expand Down
37 changes: 33 additions & 4 deletions spinetoolbox/spine_db_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
The SpineDBWorker class
"""
from functools import wraps
from sqlalchemy.exc import DBAPIError
from PySide6.QtCore import QObject, Signal, Slot
from spinedb_api import DatabaseMapping, SpineDBAPIError
from .helpers import busy_effect, separate_metadata_and_item_metadata
Expand Down Expand Up @@ -169,7 +170,12 @@ def _do_advance_query(self, item_type):
return False
offset = self._offsets.setdefault(item_type, 0)
query = self._db_map.query(getattr(self._db_map, sq_name)).limit(_CHUNK_SIZE).offset(offset)
chunk = [x._asdict() for x in query]
chunk = list()
try:
chunk = [x._asdict() for x in query]
except DBAPIError as e:
msg = f"DBAPIError while fetching more '{item_type}' items: {e.orig.args}"
self._db_mngr.receive_error_msg({self._db_map: [msg]})
self._offsets[item_type] += len(chunk)
if not chunk:
self._fetched_item_types.add(item_type)
Expand Down Expand Up @@ -416,6 +422,9 @@ def add_items(self, orig_items, item_type, readd, cascade, check, cache, callbac
check (bool): Whether to check integrity
cache (dict): Cache
callback (None or function): something to call with the result
Returns:
bool: True if adding successful, False otherwise
"""
method_name = {
"object_class": "add_object_classes",
Expand Down Expand Up @@ -477,7 +486,9 @@ def add_items(self, orig_items, item_type, readd, cascade, check, cache, callbac
db_map_data = {self._db_map: data}
if item_type == actual_item_type and callback is not None:
callback(db_map_data)
self._db_mngr.items_added.emit(actual_item_type, db_map_data)
if items:
self._db_mngr.items_added.emit(actual_item_type, db_map_data)
return True

def _rebind_recursively(self, item):
"""Rebinds a cache item and its referrers to fetch parents.
Expand All @@ -502,6 +513,9 @@ def update_items(self, orig_items, item_type, check, cache, callback):
check (bool): Whether or not to check integrity
cache (dict): Cache
callback (None or function): something to call with the result
Returns:
bool: True if update successful, False otherwise
"""
method_name = {
"object_class": "update_object_classes",
Expand All @@ -528,6 +542,7 @@ def update_items(self, orig_items, item_type, check, cache, callback):
items, errors = getattr(self._db_map, method_name)(*orig_items, check=check, return_items=True, cache=cache)
if errors:
self._db_mngr.error_msg.emit({self._db_map: errors})
return False
if self._committing:
if callback is not None:
callback({})
Expand All @@ -538,6 +553,7 @@ def update_items(self, orig_items, item_type, check, cache, callback):
if item_type == actual_item_type and callback is not None:
callback(db_map_data)
self._db_mngr.items_updated.emit(actual_item_type, db_map_data)
return True

@busy_effect
def remove_items(self, item_type, ids, callback, committing_callback):
Expand Down Expand Up @@ -572,11 +588,15 @@ def commit_session(self, commit_msg, cookie=None):
Args:
commit_msg (str): commit message
cookie (Any): a cookie to include in session_committed signal
Returns:
success (bool): True if commit succeeded, False otherwise
"""
# Make sure that the worker thread has a reference to undo stacks even if they get deleted
# in the GUI thread.
undo_stack = self._db_mngr.undo_stack[self._db_map]
self._executor.submit(self._commit_session, commit_msg, undo_stack, cookie).result()
success = self._executor.submit(self._commit_session, commit_msg, undo_stack, cookie).result()
return success

def _commit_session(self, commit_msg, undo_stack, cookie=None):
"""Commits session for given database maps.
Expand All @@ -585,16 +605,25 @@ def _commit_session(self, commit_msg, undo_stack, cookie=None):
commit_msg (str): commit message
undo_stack (AgedUndoStack): undo stack that outlive the DB manager
cookie (Any): a cookie to include in session_committed signal
Return:
bool: True if commit succeeded, False otherwise
"""
self._committing = True
undo_stack.commit()
try:
undo_stack.commit()
except SpineDBAPIError as err:
self._db_mngr.error_msg.emit({self._db_map: [err.msg]})
return False
self._committing = False
try:
self._db_map.commit_session(commit_msg)
self._db_mngr.session_committed.emit({self._db_map}, cookie)
except SpineDBAPIError as err:
self._db_mngr.error_msg.emit({self._db_map: [err.msg]})
return False
undo_stack.setClean()
return True

def rollback_session(self):
"""Initiates rollback session in the worker thread."""
Expand Down

0 comments on commit 9a9bea6

Please sign in to comment.