From 946b0c8b2aad4e886412ff9d69db3e076abb20c1 Mon Sep 17 00:00:00 2001 From: mikelkou Date: Wed, 23 Oct 2024 18:28:25 +0200 Subject: [PATCH 1/5] dask and asv_runner to the asv.conf file and bench example with dask --- benchmarks/asv.conf.json | 3 + benchmarks/benchmarks/preprocessing_counts.py | 2 +- .../benchmarks/preprocessing_counts_dask.py | 209 ++++++++++++++++++ 3 files changed, 213 insertions(+), 1 deletion(-) create mode 100644 benchmarks/benchmarks/preprocessing_counts_dask.py diff --git a/benchmarks/asv.conf.json b/benchmarks/asv.conf.json index 98192b3725..b134476efa 100644 --- a/benchmarks/asv.conf.json +++ b/benchmarks/asv.conf.json @@ -86,6 +86,9 @@ "pooch": [""], "scikit-image": [""], // "scikit-misc": [""], + "scikit-learn": [""], + "pip+asv_runner": [""], + "dask": [""] }, // Combinations of libraries/python versions can be excluded/included diff --git a/benchmarks/benchmarks/preprocessing_counts.py b/benchmarks/benchmarks/preprocessing_counts.py index 587b441e1c..aaa56d755b 100644 --- a/benchmarks/benchmarks/preprocessing_counts.py +++ b/benchmarks/benchmarks/preprocessing_counts.py @@ -32,7 +32,7 @@ def setup(dataset: Dataset, layer: KeyCount, *_): # ASV suite params: tuple[list[Dataset], list[KeyCount]] = ( - ["pbmc68k_reduced", "pbmc3k"], + ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] diff --git a/benchmarks/benchmarks/preprocessing_counts_dask.py b/benchmarks/benchmarks/preprocessing_counts_dask.py new file mode 100644 index 0000000000..4af1e9eeee --- /dev/null +++ b/benchmarks/benchmarks/preprocessing_counts_dask.py @@ -0,0 +1,209 @@ +""" +This module will benchmark preprocessing operations in Scanpy that run on counts, +with both Dask and non-Dask implementations. +API documentation: https://scanpy.readthedocs.io/en/stable/api/preprocessing.html +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import dask.array as dd +from dask.distributed import Client, LocalCluster +import scanpy as sc + +from ._utils import get_count_dataset + +if TYPE_CHECKING: + from anndata import AnnData + from ._utils import Dataset, KeyCount + +# Setup variables + +adata: AnnData +batch_key: str | None + + +def setup(dataset: Dataset, layer: KeyCount, *_): + """Setup global variables before each benchmark.""" + global adata, batch_key + adata, batch_key = get_count_dataset(dataset, layer=layer) + assert "log1p" not in adata.uns + + +# Dask Setup for Dask-based benchmarks +def setup_dask_cluster(): + """Set up a local Dask cluster for benchmarking.""" + cluster = LocalCluster(n_workers=6, threads_per_worker=2) + client = Client(cluster) + return client + + +# ASV suite + +params: tuple[list[Dataset], list[KeyCount]] = ( + ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], + # ["pbmc68k_reduced", "pbmc3k"], + ["counts", "counts-off-axis"], +) +param_names = ["dataset", "layer"] + +# ### Non-Dask Benchmarks ### + +# def time_filter_cells(*_): +# sc.pp.filter_cells(adata, min_genes=100) + + +# def peakmem_filter_cells(*_): +# sc.pp.filter_cells(adata, min_genes=100) + + +# def time_filter_genes(*_): +# sc.pp.filter_genes(adata, min_cells=3) + + +# def peakmem_filter_genes(*_): +# sc.pp.filter_genes(adata, min_cells=3) + + +# def time_scrublet(*_): +# sc.pp.scrublet(adata, batch_key=batch_key) + + +# def peakmem_scrublet(*_): +# sc.pp.scrublet(adata, batch_key=batch_key) + + +### Dask-Based Benchmarks ### + +def time_filter_cells_dask(*_): + client = setup_dask_cluster() + try: + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) + adata.X = adata.X.persist() + client.rebalance() + sc.pp.filter_cells(adata, min_genes=100) + finally: + client.close() + + +def peakmem_filter_cells_dask(*_): + client = setup_dask_cluster() + try: + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + sc.pp.filter_cells(adata, min_genes=100) + finally: + client.close() + + +def time_filter_genes_dask(*_): + client = setup_dask_cluster() + try: + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + sc.pp.filter_genes(adata, min_cells=3) + finally: + client.close() + + +def peakmem_filter_genes_dask(*_): + client = setup_dask_cluster() + try: + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + sc.pp.filter_genes(adata, min_cells=3) + finally: + client.close() + + +### Suite for Dask and Non-Dask Operations ### + +class FastSuite: + """Suite for fast preprocessing operations.""" + + params: tuple[list[Dataset], list[KeyCount]] = ( + ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], + # ["pbmc3k", "pbmc68k_reduced"],#, "bmmc", "lung93k"], + ["counts", "counts-off-axis"], + ) + param_names = ["dataset", "layer"] + + ### Non-Dask Versions ### + + def time_calculate_qc_metrics(self, *_): + sc.pp.calculate_qc_metrics( + adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True + ) + + def peakmem_calculate_qc_metrics(self, *_): + sc.pp.calculate_qc_metrics( + adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True + ) + + def time_normalize_total(self, *_): + sc.pp.normalize_total(adata, target_sum=1e4) + + def peakmem_normalize_total(self, *_): + sc.pp.normalize_total(adata, target_sum=1e4) + + def time_log1p(self, *_): + adata.uns.pop("log1p", None) + sc.pp.log1p(adata) + + def peakmem_log1p(self, *_): + adata.uns.pop("log1p", None) + sc.pp.log1p(adata) + + ### Dask Versions ### + + def time_calculate_qc_metrics_dask(self, *_): + client = setup_dask_cluster() + try: + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + sc.pp.calculate_qc_metrics( + adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True + ) + finally: + client.close() + + def peakmem_calculate_qc_metrics_dask(self, *_): + client = setup_dask_cluster() + try: + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + sc.pp.calculate_qc_metrics( + adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True + ) + finally: + client.close() + + def time_normalize_total_dask(self, *_): + client = setup_dask_cluster() + try: + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + sc.pp.normalize_total(adata, target_sum=1e4) + finally: + client.close() + + def peakmem_normalize_total_dask(self, *_): + client = setup_dask_cluster() + try: + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + sc.pp.normalize_total(adata, target_sum=1e4) + finally: + client.close() + + def time_log1p_dask(self, *_): + client = setup_dask_cluster() + try: + adata.uns.pop("log1p", None) + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + sc.pp.log1p(adata) + finally: + client.close() + + def peakmem_log1p_dask(self, *_): + client = setup_dask_cluster() + try: + adata.uns.pop("log1p", None) + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + sc.pp.log1p(adata) + finally: + client.close() From d932e593af2c235f8d5a04a660c5429e30c3ec87 Mon Sep 17 00:00:00 2001 From: mikelkou Date: Thu, 24 Oct 2024 17:27:01 +0200 Subject: [PATCH 2/5] peakmem_log1p better w/ Dask --- benchmarks/benchmarks/preprocessing_counts.py | 6 +- .../benchmarks/preprocessing_counts_dask.py | 78 ++++--------------- 2 files changed, 18 insertions(+), 66 deletions(-) diff --git a/benchmarks/benchmarks/preprocessing_counts.py b/benchmarks/benchmarks/preprocessing_counts.py index aaa56d755b..41dc686f5a 100644 --- a/benchmarks/benchmarks/preprocessing_counts.py +++ b/benchmarks/benchmarks/preprocessing_counts.py @@ -32,7 +32,8 @@ def setup(dataset: Dataset, layer: KeyCount, *_): # ASV suite params: tuple[list[Dataset], list[KeyCount]] = ( - ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], + ["lung93k"], + # ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] @@ -78,7 +79,8 @@ class FastSuite: """Suite for fast preprocessing operations.""" params: tuple[list[Dataset], list[KeyCount]] = ( - ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], + # ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], + ["lung93k"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] diff --git a/benchmarks/benchmarks/preprocessing_counts_dask.py b/benchmarks/benchmarks/preprocessing_counts_dask.py index 4af1e9eeee..6af1f14865 100644 --- a/benchmarks/benchmarks/preprocessing_counts_dask.py +++ b/benchmarks/benchmarks/preprocessing_counts_dask.py @@ -34,7 +34,7 @@ def setup(dataset: Dataset, layer: KeyCount, *_): # Dask Setup for Dask-based benchmarks def setup_dask_cluster(): """Set up a local Dask cluster for benchmarking.""" - cluster = LocalCluster(n_workers=6, threads_per_worker=2) + cluster = LocalCluster(n_workers=4, threads_per_worker=2) client = Client(cluster) return client @@ -42,38 +42,12 @@ def setup_dask_cluster(): # ASV suite params: tuple[list[Dataset], list[KeyCount]] = ( - ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], - # ["pbmc68k_reduced", "pbmc3k"], + # ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], + ["lung93k"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] -# ### Non-Dask Benchmarks ### - -# def time_filter_cells(*_): -# sc.pp.filter_cells(adata, min_genes=100) - - -# def peakmem_filter_cells(*_): -# sc.pp.filter_cells(adata, min_genes=100) - - -# def time_filter_genes(*_): -# sc.pp.filter_genes(adata, min_cells=3) - - -# def peakmem_filter_genes(*_): -# sc.pp.filter_genes(adata, min_cells=3) - - -# def time_scrublet(*_): -# sc.pp.scrublet(adata, batch_key=batch_key) - - -# def peakmem_scrublet(*_): -# sc.pp.scrublet(adata, batch_key=batch_key) - - ### Dask-Based Benchmarks ### def time_filter_cells_dask(*_): @@ -90,7 +64,7 @@ def time_filter_cells_dask(*_): def peakmem_filter_cells_dask(*_): client = setup_dask_cluster() try: - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 50, adata.X.shape[1] // 50)) sc.pp.filter_cells(adata, min_genes=100) finally: client.close() @@ -120,54 +94,30 @@ class FastSuite: """Suite for fast preprocessing operations.""" params: tuple[list[Dataset], list[KeyCount]] = ( - ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], - # ["pbmc3k", "pbmc68k_reduced"],#, "bmmc", "lung93k"], + # ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], + ["lung93k"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] - ### Non-Dask Versions ### - - def time_calculate_qc_metrics(self, *_): - sc.pp.calculate_qc_metrics( - adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True - ) - - def peakmem_calculate_qc_metrics(self, *_): - sc.pp.calculate_qc_metrics( - adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True - ) - - def time_normalize_total(self, *_): - sc.pp.normalize_total(adata, target_sum=1e4) - - def peakmem_normalize_total(self, *_): - sc.pp.normalize_total(adata, target_sum=1e4) - - def time_log1p(self, *_): - adata.uns.pop("log1p", None) - sc.pp.log1p(adata) - - def peakmem_log1p(self, *_): - adata.uns.pop("log1p", None) - sc.pp.log1p(adata) - ### Dask Versions ### - def time_calculate_qc_metrics_dask(self, *_): client = setup_dask_cluster() try: - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) + print(f"Dask Array Shape: {adata.X.shape}") + print(f"Dask Array Type: {type(adata.X)}") sc.pp.calculate_qc_metrics( adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True ) finally: client.close() + def peakmem_calculate_qc_metrics_dask(self, *_): client = setup_dask_cluster() try: - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 50, adata.X.shape[1] // 50)) sc.pp.calculate_qc_metrics( adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True ) @@ -177,7 +127,7 @@ def peakmem_calculate_qc_metrics_dask(self, *_): def time_normalize_total_dask(self, *_): client = setup_dask_cluster() try: - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 50, adata.X.shape[1] // 50)) sc.pp.normalize_total(adata, target_sum=1e4) finally: client.close() @@ -203,7 +153,7 @@ def peakmem_log1p_dask(self, *_): client = setup_dask_cluster() try: adata.uns.pop("log1p", None) - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 100, adata.X.shape[1] // 100)) sc.pp.log1p(adata) finally: - client.close() + client.close() \ No newline at end of file From 1909f4c3d4f478c4cfcf406899029e184fefdef0 Mon Sep 17 00:00:00 2001 From: mikelkou Date: Thu, 24 Oct 2024 18:20:02 +0200 Subject: [PATCH 3/5] draft benchmarks with dask --- .../benchmarks/preprocessing_counts_dask.py | 42 ++++++++++++++++--- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/benchmarks/benchmarks/preprocessing_counts_dask.py b/benchmarks/benchmarks/preprocessing_counts_dask.py index 6af1f14865..34ff869012 100644 --- a/benchmarks/benchmarks/preprocessing_counts_dask.py +++ b/benchmarks/benchmarks/preprocessing_counts_dask.py @@ -11,6 +11,7 @@ import dask.array as dd from dask.distributed import Client, LocalCluster import scanpy as sc +from scipy import sparse from ._utils import get_count_dataset @@ -50,17 +51,32 @@ def setup_dask_cluster(): ### Dask-Based Benchmarks ### +# def time_filter_cells_dask(*_): +# client = setup_dask_cluster() +# try: +# adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) +# adata.X = adata.X.persist() +# client.rebalance() +# sc.pp.filter_cells(adata, min_genes=100) +# finally: +# client.close() + def time_filter_cells_dask(*_): client = setup_dask_cluster() try: adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) - adata.X = adata.X.persist() - client.rebalance() + adata.X = adata.X.map_blocks(sparse.csr_matrix) # Convert to sparse chunks + + # If Scanpy requires dense arrays, convert to dense + adata.X = adata.X.map_blocks(lambda x: x.toarray(), dtype=adata.X.dtype, meta=np.array([])) + + # Rechunk after conversion + adata.X = adata.X.rechunk((adata.X.shape[0] // 50, adata.X.shape[1] // 50)) + sc.pp.filter_cells(adata, min_genes=100) finally: client.close() - def peakmem_filter_cells_dask(*_): client = setup_dask_cluster() try: @@ -104,15 +120,31 @@ class FastSuite: def time_calculate_qc_metrics_dask(self, *_): client = setup_dask_cluster() try: + # Use sparse chunks adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) - print(f"Dask Array Shape: {adata.X.shape}") - print(f"Dask Array Type: {type(adata.X)}") + adata.X = adata.X.map_blocks(sparse.csr_matrix) # Convert dense to sparse chunks + + # Rechunk if necessary + adata.X = adata.X.rechunk((adata.X.shape[0] // 50, adata.X.shape[1] // 50)) + sc.pp.calculate_qc_metrics( adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True ) finally: client.close() + # def time_calculate_qc_metrics_dask(self, *_): + # client = setup_dask_cluster() + # try: + # adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) + # print(f"Dask Array Shape: {adata.X.shape}") + # print(f"Dask Array Type: {type(adata.X)}") + # sc.pp.calculate_qc_metrics( + # adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True + # ) + # finally: + # client.close() + def peakmem_calculate_qc_metrics_dask(self, *_): client = setup_dask_cluster() From 5494efd93d9ef073f2aacadbaf365af8de6713aa Mon Sep 17 00:00:00 2001 From: mikelkou Date: Thu, 7 Nov 2024 16:13:50 +0100 Subject: [PATCH 4/5] add larger synthetic dataset --- benchmarks/benchmarks/_utils.py | 38 ++++++++++ benchmarks/benchmarks/preprocessing_counts.py | 8 +- .../benchmarks/preprocessing_counts_dask.py | 76 ++++--------------- 3 files changed, 56 insertions(+), 66 deletions(-) diff --git a/benchmarks/benchmarks/_utils.py b/benchmarks/benchmarks/_utils.py index 810ace74fd..e6833aeaad 100644 --- a/benchmarks/benchmarks/_utils.py +++ b/benchmarks/benchmarks/_utils.py @@ -117,6 +117,42 @@ def lung93k() -> AnnData: return _lung93k().copy() +@cache +def _large_synthetic_dataset(n_obs: int = 500_000, n_vars: int = 5_000, density: float = 0.01) -> AnnData: + """ + Generate a synthetic dataset suitable for Dask testing. + + Parameters: + n_obs: int + Number of observations (rows, typically cells). + n_vars: int + Number of variables (columns, typically genes). + density: float + Fraction of non-zero entries in the sparse matrix. + + Returns: + AnnData + The synthetic dataset. + """ + + X = sparse.random(n_obs, n_vars, density=density, format="csr", dtype=np.float32, random_state=42) + obs = {"obs_names": [f"cell_{i}" for i in range(n_obs)]} + var = {"var_names": [f"gene_{j}" for j in range(n_vars)]} + adata = anndata.AnnData(X=X, obs=obs, var=var) + adata.layers["counts"] = X.copy() + sc.pp.log1p(adata) + adata.var["mt"] = adata.var_names.str.startswith("MT-") + sc.pp.calculate_qc_metrics( + adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True + ) + + return adata + + +def large_synthetic_dataset(n_obs: int = 500_000, n_vars: int = 5_000, density: float = 0.01) -> AnnData: + return _large_synthetic_dataset(n_obs, n_vars, density).copy() + + def to_off_axis(x: np.ndarray | sparse.csr_matrix) -> np.ndarray | sparse.csc_matrix: if isinstance(x, sparse.csr_matrix): return x.tocsc() @@ -138,6 +174,8 @@ def _get_dataset_raw(dataset: Dataset) -> tuple[AnnData, str | None]: adata, batch_key = bmmc(400), "sample" case "lung93k": adata, batch_key = lung93k(), "PatientNumber" + case "large_synthetic": + adata, batch_key = large_synthetic_dataset(), None case _: msg = f"Unknown dataset {dataset}" raise AssertionError(msg) diff --git a/benchmarks/benchmarks/preprocessing_counts.py b/benchmarks/benchmarks/preprocessing_counts.py index 41dc686f5a..6521d258f0 100644 --- a/benchmarks/benchmarks/preprocessing_counts.py +++ b/benchmarks/benchmarks/preprocessing_counts.py @@ -32,8 +32,8 @@ def setup(dataset: Dataset, layer: KeyCount, *_): # ASV suite params: tuple[list[Dataset], list[KeyCount]] = ( - ["lung93k"], - # ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], + ["pbmc3k"], + # ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k", "large_synthetic"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] @@ -79,8 +79,8 @@ class FastSuite: """Suite for fast preprocessing operations.""" params: tuple[list[Dataset], list[KeyCount]] = ( - # ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], - ["lung93k"], + # ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k", "large_synthetic"], + ["pbmc3k"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] diff --git a/benchmarks/benchmarks/preprocessing_counts_dask.py b/benchmarks/benchmarks/preprocessing_counts_dask.py index 34ff869012..ab91d15993 100644 --- a/benchmarks/benchmarks/preprocessing_counts_dask.py +++ b/benchmarks/benchmarks/preprocessing_counts_dask.py @@ -1,13 +1,6 @@ -""" -This module will benchmark preprocessing operations in Scanpy that run on counts, -with both Dask and non-Dask implementations. -API documentation: https://scanpy.readthedocs.io/en/stable/api/preprocessing.html -""" - from __future__ import annotations from typing import TYPE_CHECKING - import dask.array as dd from dask.distributed import Client, LocalCluster import scanpy as sc @@ -19,12 +12,10 @@ from anndata import AnnData from ._utils import Dataset, KeyCount -# Setup variables - +# Setup global variables adata: AnnData batch_key: str | None - def setup(dataset: Dataset, layer: KeyCount, *_): """Setup global variables before each benchmark.""" global adata, batch_key @@ -32,7 +23,6 @@ def setup(dataset: Dataset, layer: KeyCount, *_): assert "log1p" not in adata.uns -# Dask Setup for Dask-based benchmarks def setup_dask_cluster(): """Set up a local Dask cluster for benchmarking.""" cluster = LocalCluster(n_workers=4, threads_per_worker=2) @@ -41,42 +31,24 @@ def setup_dask_cluster(): # ASV suite - params: tuple[list[Dataset], list[KeyCount]] = ( - # ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], - ["lung93k"], + ["pbmc3k"], # Extend with larger datasets as needed ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] ### Dask-Based Benchmarks ### -# def time_filter_cells_dask(*_): -# client = setup_dask_cluster() -# try: -# adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) -# adata.X = adata.X.persist() -# client.rebalance() -# sc.pp.filter_cells(adata, min_genes=100) -# finally: -# client.close() - def time_filter_cells_dask(*_): client = setup_dask_cluster() try: - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) - adata.X = adata.X.map_blocks(sparse.csr_matrix) # Convert to sparse chunks - - # If Scanpy requires dense arrays, convert to dense - adata.X = adata.X.map_blocks(lambda x: x.toarray(), dtype=adata.X.dtype, meta=np.array([])) - - # Rechunk after conversion - adata.X = adata.X.rechunk((adata.X.shape[0] // 50, adata.X.shape[1] // 50)) - + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) + adata.X = adata.X.map_blocks(sparse.csr_matrix) # Ensure sparse chunks sc.pp.filter_cells(adata, min_genes=100) finally: client.close() + def peakmem_filter_cells_dask(*_): client = setup_dask_cluster() try: @@ -89,7 +61,7 @@ def peakmem_filter_cells_dask(*_): def time_filter_genes_dask(*_): client = setup_dask_cluster() try: - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) sc.pp.filter_genes(adata, min_cells=3) finally: client.close() @@ -98,58 +70,38 @@ def time_filter_genes_dask(*_): def peakmem_filter_genes_dask(*_): client = setup_dask_cluster() try: - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) sc.pp.filter_genes(adata, min_cells=3) finally: client.close() -### Suite for Dask and Non-Dask Operations ### +### General Dask and Non-Dask Preprocessing Benchmarks ### class FastSuite: - """Suite for fast preprocessing operations.""" + """Suite for benchmarking preprocessing operations with Dask.""" params: tuple[list[Dataset], list[KeyCount]] = ( - # ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], - ["lung93k"], + ["pbmc3k"], # Extend as needed with larger datasets ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] - ### Dask Versions ### def time_calculate_qc_metrics_dask(self, *_): client = setup_dask_cluster() try: - # Use sparse chunks adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) - adata.X = adata.X.map_blocks(sparse.csr_matrix) # Convert dense to sparse chunks - - # Rechunk if necessary - adata.X = adata.X.rechunk((adata.X.shape[0] // 50, adata.X.shape[1] // 50)) - + adata.X = adata.X.map_blocks(sparse.csr_matrix) sc.pp.calculate_qc_metrics( adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True ) finally: client.close() - # def time_calculate_qc_metrics_dask(self, *_): - # client = setup_dask_cluster() - # try: - # adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) - # print(f"Dask Array Shape: {adata.X.shape}") - # print(f"Dask Array Type: {type(adata.X)}") - # sc.pp.calculate_qc_metrics( - # adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True - # ) - # finally: - # client.close() - - def peakmem_calculate_qc_metrics_dask(self, *_): client = setup_dask_cluster() try: - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 50, adata.X.shape[1] // 50)) + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) sc.pp.calculate_qc_metrics( adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True ) @@ -176,7 +128,7 @@ def time_log1p_dask(self, *_): client = setup_dask_cluster() try: adata.uns.pop("log1p", None) - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 50, adata.X.shape[1] // 50)) sc.pp.log1p(adata) finally: client.close() @@ -188,4 +140,4 @@ def peakmem_log1p_dask(self, *_): adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 100, adata.X.shape[1] // 100)) sc.pp.log1p(adata) finally: - client.close() \ No newline at end of file + client.close() From ca9b8f671ae4403a89d775707498c7422e0384a3 Mon Sep 17 00:00:00 2001 From: mikelkou Date: Thu, 7 Nov 2024 16:15:36 +0100 Subject: [PATCH 5/5] Update preprocessing_counts_dask.py --- benchmarks/benchmarks/preprocessing_counts_dask.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmarks/benchmarks/preprocessing_counts_dask.py b/benchmarks/benchmarks/preprocessing_counts_dask.py index ab91d15993..c28f984d00 100644 --- a/benchmarks/benchmarks/preprocessing_counts_dask.py +++ b/benchmarks/benchmarks/preprocessing_counts_dask.py @@ -32,7 +32,7 @@ def setup_dask_cluster(): # ASV suite params: tuple[list[Dataset], list[KeyCount]] = ( - ["pbmc3k"], # Extend with larger datasets as needed + ["pbmc68k_reduced"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] @@ -82,7 +82,7 @@ class FastSuite: """Suite for benchmarking preprocessing operations with Dask.""" params: tuple[list[Dataset], list[KeyCount]] = ( - ["pbmc3k"], # Extend as needed with larger datasets + ["pbmc68k_reduced"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"]