Skip to content

Commit

Permalink
feat: allow commit transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
fecet committed Dec 14, 2024
1 parent 1510dc5 commit 5c6da44
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 18 deletions.
46 changes: 28 additions & 18 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2101,7 +2101,7 @@ def _commit(
@staticmethod
def commit(
base_uri: Union[str, Path, LanceDataset],
operation: LanceOperation.BaseOperation,
operation: LanceOperation.BaseOperation | Transaction,
read_version: Optional[int] = None,
commit_lock: Optional[CommitLock] = None,
storage_options: Optional[Dict[str, str]] = None,
Expand Down Expand Up @@ -2206,24 +2206,34 @@ def commit(
f"commit_lock must be a function, got {type(commit_lock)}"
)

if read_version is None and not isinstance(
operation, (LanceOperation.Overwrite, LanceOperation.Restore)
):
raise ValueError(
"read_version is required for all operations except "
"Overwrite and Restore"
if isinstance(operation, Transaction):
new_ds = _Dataset.commit_transaction(
base_uri,
operation,
commit_lock,
storage_options=storage_options,
enable_v2_manifest_paths=enable_v2_manifest_paths,
detached=detached,
max_retries=max_retries,
)
else:
if read_version is None and not isinstance(
operation, (LanceOperation.Overwrite, LanceOperation.Restore)
):
raise ValueError(
"read_version is required for all operations except "
"Overwrite and Restore"
)
new_ds = _Dataset.commit(
base_uri,
operation._to_inner(),
read_version,
commit_lock,
storage_options=storage_options,
enable_v2_manifest_paths=enable_v2_manifest_paths,
detached=detached,
max_retries=max_retries,
)

new_ds = _Dataset.commit(
base_uri,
operation._to_inner(),
read_version,
commit_lock,
storage_options=storage_options,
enable_v2_manifest_paths=enable_v2_manifest_paths,
detached=detached,
max_retries=max_retries,
)
ds = LanceDataset.__new__(LanceDataset)
ds._storage_options = storage_options
ds._ds = new_ds
Expand Down
64 changes: 64 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1498,6 +1498,70 @@ impl Dataset {
})
}

#[allow(clippy::too_many_arguments)]
#[staticmethod]
#[pyo3(signature = (
dest,
transaction,
commit_lock = None,
storage_options = None,
enable_v2_manifest_paths = None,
detached = None,
max_retries = None
))]
fn commit_transaction<'py>(
dest: &Bound<'py, PyAny>,
transaction: &Bound<'py, PyAny>,
commit_lock: Option<&Bound<'py, PyAny>>,
storage_options: Option<HashMap<String, String>>,
enable_v2_manifest_paths: Option<bool>,
detached: Option<bool>,
max_retries: Option<u32>,
) -> PyResult<Self> {
let object_store_params = storage_options.as_ref().map(|storage_options| ObjectStoreParams {
storage_options: Some(storage_options.clone()),
..Default::default()
});

let commit_handler = commit_lock.map(|commit_lock| {
Arc::new(PyCommitLock::new(commit_lock.to_object(commit_lock.py()))) as Arc<dyn CommitHandler>
});

let py = dest.py();

let dest = if dest.is_instance_of::<Self>() {
let dataset: Self = dest.extract()?;
WriteDestination::Dataset(dataset.ds.clone())
} else {
WriteDestination::Uri(dest.extract()?)
};

let mut builder = CommitBuilder::new(dest)
.enable_v2_manifest_paths(enable_v2_manifest_paths.unwrap_or(false))
.with_detached(detached.unwrap_or(false))
.with_max_retries(max_retries.unwrap_or(20));

if let Some(store_params) = object_store_params {
builder = builder.with_store_params(store_params);
}

if let Some(commit_handler) = commit_handler {
builder = builder.with_commit_handler(commit_handler);
}

let transaction = extract_transaction(&transaction)?;

let ds = RT
.block_on(Some(py), builder.execute(transaction))?
.map_err(|err| PyIOError::new_err(err.to_string()))?;

let uri = ds.uri().to_string();
Ok(Self {
ds: Arc::new(ds),
uri,
})
}

#[staticmethod]
#[pyo3(signature = (dest, transactions, commit_lock = None, storage_options = None, enable_v2_manifest_paths = None, detached = None, max_retries = None))]
fn commit_batch<'py>(
Expand Down

0 comments on commit 5c6da44

Please sign in to comment.