diff --git a/benchmarks/asv.conf.json b/benchmarks/asv.conf.json index e2e260a7ec..222d69b021 100644 --- a/benchmarks/asv.conf.json +++ b/benchmarks/asv.conf.json @@ -45,7 +45,7 @@ // If missing or the empty string, the tool will be automatically // determined by looking for tools on the PATH environment // variable. - "timeout": 1200, // Timeout for each benchmark in seconds + "timeout": 1200, // Timeout for each benchmark in seconds "environment_type": "conda", // timeout in seconds for installing any dependencies in environment @@ -89,9 +89,9 @@ "pooch": [""], "scikit-image": [""], // "scikit-misc": [""], - "scikit-learn": [""], - "pip+asv_runner": [""], - "dask": [""] + "scikit-learn": [""], + "pip+asv_runner": [""], + "dask": [""], }, // Combinations of libraries/python versions can be excluded/included diff --git a/benchmarks/benchmarks/_utils.py b/benchmarks/benchmarks/_utils.py index 1601f73dfe..d0b7e2547a 100644 --- a/benchmarks/benchmarks/_utils.py +++ b/benchmarks/benchmarks/_utils.py @@ -117,15 +117,14 @@ def lung93k() -> AnnData: return _lung93k().copy() - @cache def _musmus_11m() -> AnnData: # Define the path to the dataset path = "/sc/arion/projects/psychAD/mikaela/scanpy/scanpy/benchmarks/data/MusMus_4M_cells_cellxgene.h5ad" adata = sc.read_h5ad(path) - #assert isinstance(adata.X, sparse.csr_matrix) + # assert isinstance(adata.X, sparse.csr_matrix) # Add counts layer - #adata.layers["counts"] = adata.X.astype(np.int32, copy=True) + # adata.layers["counts"] = adata.X.astype(np.int32, copy=True) sc.pp.log1p(adata) return adata @@ -135,10 +134,12 @@ def musmus_11m() -> AnnData: @cache -def _large_synthetic_dataset(n_obs: int = 500_000, n_vars: int = 5_000, density: float = 0.01) -> AnnData: +def _large_synthetic_dataset( + n_obs: int = 500_000, n_vars: int = 5_000, density: float = 0.01 +) -> AnnData: """ Generate a synthetic dataset suitable for Dask testing. - + Parameters: n_obs: int Number of observations (rows, typically cells). @@ -152,7 +153,9 @@ def _large_synthetic_dataset(n_obs: int = 500_000, n_vars: int = 5_000, density: The synthetic dataset. """ - X = sparse.random(n_obs, n_vars, density=density, format="csr", dtype=np.float32, random_state=42) + X = sparse.random( + n_obs, n_vars, density=density, format="csr", dtype=np.float32, random_state=42 + ) obs = {"obs_names": [f"cell_{i}" for i in range(n_obs)]} var = {"var_names": [f"gene_{j}" for j in range(n_vars)]} adata = anndata.AnnData(X=X, obs=obs, var=var) @@ -166,7 +169,9 @@ def _large_synthetic_dataset(n_obs: int = 500_000, n_vars: int = 5_000, density: return adata -def large_synthetic_dataset(n_obs: int = 500_000, n_vars: int = 5_000, density: float = 0.01) -> AnnData: +def large_synthetic_dataset( + n_obs: int = 500_000, n_vars: int = 5_000, density: float = 0.01 +) -> AnnData: return _large_synthetic_dataset(n_obs, n_vars, density).copy() diff --git a/benchmarks/benchmarks/preprocessing_counts_dask.py b/benchmarks/benchmarks/preprocessing_counts_dask.py index 4f9086f283..cb6d45ea03 100644 --- a/benchmarks/benchmarks/preprocessing_counts_dask.py +++ b/benchmarks/benchmarks/preprocessing_counts_dask.py @@ -1,21 +1,24 @@ from __future__ import annotations from typing import TYPE_CHECKING + import dask.array as dd from dask.distributed import Client, LocalCluster + import scanpy as sc -from scipy import sparse from ._utils import get_count_dataset if TYPE_CHECKING: from anndata import AnnData + from ._utils import Dataset, KeyCount # Setup global variables adata: AnnData batch_key: str | None + def setup(dataset: Dataset, layer: KeyCount, *_): """Setup global variables before each benchmark.""" global adata, batch_key @@ -23,41 +26,46 @@ def setup(dataset: Dataset, layer: KeyCount, *_): assert "log1p" not in adata.uns -#def setup_dask_cluster(): +# def setup_dask_cluster(): # """Set up a local Dask cluster for benchmarking.""" # cluster = LocalCluster(n_workers=4, threads_per_worker=2) # client = Client(cluster) # return client + def setup_dask_cluster(): """Set up a local Dask cluster for benchmarking.""" - cluster = LocalCluster(n_workers=5, - threads_per_worker=2, - memory_limit="60GB", - timeout="1200s") + cluster = LocalCluster( + n_workers=5, threads_per_worker=2, memory_limit="60GB", timeout="1200s" + ) client = Client(cluster) return client # ASV suite params: tuple[list[Dataset], list[KeyCount]] = ( - ["musmus_11m"], + ["musmus_11m"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] + ### Dask-Based Benchmarks ### def time_filter_cells_dask(*_): client = setup_dask_cluster() try: - optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + optimal_chunks = ( + adata.X.shape[0] // (4 * len(client.nthreads())), + adata.X.shape[1], + ) adata.X = dd.from_array(adata.X, chunks=optimal_chunks).persist() sc.pp.filter_cells(adata, min_genes=100) assert adata.n_obs > 0 # Ensure cells are retained finally: client.close() -#def time_filter_cells_dask(*_): + +# def time_filter_cells_dask(*_): # client = setup_dask_cluster() # try: # # Compute optimal chunks based on Dask cluster @@ -72,7 +80,10 @@ def time_filter_cells_dask(*_): def peakmem_filter_cells_dask(*_): client = setup_dask_cluster() try: - optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + optimal_chunks = ( + adata.X.shape[0] // (4 * len(client.nthreads())), + adata.X.shape[1], + ) adata.X = dd.from_array(adata.X, chunks=optimal_chunks) sc.pp.filter_cells(adata, min_genes=100) finally: @@ -82,7 +93,10 @@ def peakmem_filter_cells_dask(*_): def time_filter_genes_dask(*_): client = setup_dask_cluster() try: - optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + optimal_chunks = ( + adata.X.shape[0] // (4 * len(client.nthreads())), + adata.X.shape[1], + ) adata.X = dd.from_array(adata.X, chunks=optimal_chunks) adata.X = adata.X.persist() sc.pp.filter_genes(adata, min_cells=3) @@ -93,7 +107,10 @@ def time_filter_genes_dask(*_): def peakmem_filter_genes_dask(*_): client = setup_dask_cluster() try: - optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + optimal_chunks = ( + adata.X.shape[0] // (4 * len(client.nthreads())), + adata.X.shape[1], + ) adata.X = dd.from_array(adata.X, chunks=optimal_chunks) sc.pp.filter_genes(adata, min_cells=3) finally: @@ -102,11 +119,12 @@ def peakmem_filter_genes_dask(*_): ### General Dask and Non-Dask Preprocessing Benchmarks ### + class FastSuite: """Suite for benchmarking preprocessing operations with Dask.""" params: tuple[list[Dataset], list[KeyCount]] = ( - ["musmus_11m"], + ["musmus_11m"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] @@ -114,28 +132,43 @@ class FastSuite: def time_calculate_qc_metrics_dask(self, *_): client = setup_dask_cluster() try: - optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + optimal_chunks = ( + adata.X.shape[0] // (4 * len(client.nthreads())), + adata.X.shape[1], + ) adata.X = dd.from_array(adata.X, chunks=optimal_chunks) adata.X = adata.X.persist() - sc.pp.calculate_qc_metrics(adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True) + sc.pp.calculate_qc_metrics( + adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True + ) finally: client.close() def peakmem_calculate_qc_metrics_dask(self, *_): client = setup_dask_cluster() try: - optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + optimal_chunks = ( + adata.X.shape[0] // (4 * len(client.nthreads())), + adata.X.shape[1], + ) adata.X = dd.from_array(adata.X, chunks=optimal_chunks) - sc.pp.calculate_qc_metrics(adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True) + sc.pp.calculate_qc_metrics( + adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True + ) finally: client.close() def time_normalize_total_dask(self, *_): client = setup_dask_cluster() try: - optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + optimal_chunks = ( + adata.X.shape[0] // (4 * len(client.nthreads())), + adata.X.shape[1], + ) adata.X = dd.from_array(adata.X, chunks=optimal_chunks) - adata.X = adata.X.map_blocks(lambda x: x / x.sum(axis=1), dtype=float) # Optimize normalization + adata.X = adata.X.map_blocks( + lambda x: x / x.sum(axis=1), dtype=float + ) # Optimize normalization adata.X = adata.X.persist() finally: client.close() @@ -143,7 +176,10 @@ def time_normalize_total_dask(self, *_): def peakmem_normalize_total_dask(self, *_): client = setup_dask_cluster() try: - optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + optimal_chunks = ( + adata.X.shape[0] // (4 * len(client.nthreads())), + adata.X.shape[1], + ) adata.X = dd.from_array(adata.X, chunks=optimal_chunks) sc.pp.normalize_total(adata, target_sum=1e4) finally: @@ -153,7 +189,10 @@ def time_log1p_dask(self, *_): client = setup_dask_cluster() try: adata.uns.pop("log1p", None) - optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + optimal_chunks = ( + adata.X.shape[0] // (4 * len(client.nthreads())), + adata.X.shape[1], + ) adata.X = dd.from_array(adata.X, chunks=optimal_chunks) adata.X = adata.X.persist() sc.pp.log1p(adata) @@ -164,7 +203,10 @@ def peakmem_log1p_dask(self, *_): client = setup_dask_cluster() try: adata.uns.pop("log1p", None) - optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + optimal_chunks = ( + adata.X.shape[0] // (4 * len(client.nthreads())), + adata.X.shape[1], + ) adata.X = dd.from_array(adata.X, chunks=optimal_chunks) sc.pp.log1p(adata) finally: