Skip to content

Commit

Permalink
[pre-commit.ci] auto fixes from pre-commit.com hooks
Browse files Browse the repository at this point in the history
for more information, see https://pre-commit.ci
  • Loading branch information
pre-commit-ci[bot] committed Feb 5, 2025
1 parent 0482e75 commit 6149b8b
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 33 deletions.
8 changes: 4 additions & 4 deletions benchmarks/asv.conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
// If missing or the empty string, the tool will be automatically
// determined by looking for tools on the PATH environment
// variable.
"timeout": 1200, // Timeout for each benchmark in seconds
"timeout": 1200, // Timeout for each benchmark in seconds
"environment_type": "conda",

// timeout in seconds for installing any dependencies in environment
Expand Down Expand Up @@ -89,9 +89,9 @@
"pooch": [""],
"scikit-image": [""],
// "scikit-misc": [""],
"scikit-learn": [""],
"pip+asv_runner": [""],
"dask": [""]
"scikit-learn": [""],
"pip+asv_runner": [""],
"dask": [""],
},

// Combinations of libraries/python versions can be excluded/included
Expand Down
19 changes: 12 additions & 7 deletions benchmarks/benchmarks/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,14 @@ def lung93k() -> AnnData:
return _lung93k().copy()



@cache
def _musmus_11m() -> AnnData:
# Define the path to the dataset
path = "/sc/arion/projects/psychAD/mikaela/scanpy/scanpy/benchmarks/data/MusMus_4M_cells_cellxgene.h5ad"
adata = sc.read_h5ad(path)
#assert isinstance(adata.X, sparse.csr_matrix)
# assert isinstance(adata.X, sparse.csr_matrix)
# Add counts layer
#adata.layers["counts"] = adata.X.astype(np.int32, copy=True)
# adata.layers["counts"] = adata.X.astype(np.int32, copy=True)
sc.pp.log1p(adata)
return adata

Expand All @@ -135,10 +134,12 @@ def musmus_11m() -> AnnData:


@cache
def _large_synthetic_dataset(n_obs: int = 500_000, n_vars: int = 5_000, density: float = 0.01) -> AnnData:
def _large_synthetic_dataset(
n_obs: int = 500_000, n_vars: int = 5_000, density: float = 0.01
) -> AnnData:
"""
Generate a synthetic dataset suitable for Dask testing.
Parameters:
n_obs: int
Number of observations (rows, typically cells).
Expand All @@ -152,7 +153,9 @@ def _large_synthetic_dataset(n_obs: int = 500_000, n_vars: int = 5_000, density:
The synthetic dataset.
"""

X = sparse.random(n_obs, n_vars, density=density, format="csr", dtype=np.float32, random_state=42)
X = sparse.random(
n_obs, n_vars, density=density, format="csr", dtype=np.float32, random_state=42
)
obs = {"obs_names": [f"cell_{i}" for i in range(n_obs)]}
var = {"var_names": [f"gene_{j}" for j in range(n_vars)]}
adata = anndata.AnnData(X=X, obs=obs, var=var)
Expand All @@ -166,7 +169,9 @@ def _large_synthetic_dataset(n_obs: int = 500_000, n_vars: int = 5_000, density:
return adata


def large_synthetic_dataset(n_obs: int = 500_000, n_vars: int = 5_000, density: float = 0.01) -> AnnData:
def large_synthetic_dataset(
n_obs: int = 500_000, n_vars: int = 5_000, density: float = 0.01
) -> AnnData:
return _large_synthetic_dataset(n_obs, n_vars, density).copy()


Expand Down
86 changes: 64 additions & 22 deletions benchmarks/benchmarks/preprocessing_counts_dask.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,71 @@
from __future__ import annotations

from typing import TYPE_CHECKING

import dask.array as dd
from dask.distributed import Client, LocalCluster

import scanpy as sc
from scipy import sparse

from ._utils import get_count_dataset

if TYPE_CHECKING:
from anndata import AnnData

from ._utils import Dataset, KeyCount

# Setup global variables
adata: AnnData
batch_key: str | None


def setup(dataset: Dataset, layer: KeyCount, *_):
"""Setup global variables before each benchmark."""
global adata, batch_key
adata, batch_key = get_count_dataset(dataset, layer=layer)
assert "log1p" not in adata.uns


#def setup_dask_cluster():
# def setup_dask_cluster():
# """Set up a local Dask cluster for benchmarking."""
# cluster = LocalCluster(n_workers=4, threads_per_worker=2)
# client = Client(cluster)
# return client


def setup_dask_cluster():
"""Set up a local Dask cluster for benchmarking."""
cluster = LocalCluster(n_workers=5,
threads_per_worker=2,
memory_limit="60GB",
timeout="1200s")
cluster = LocalCluster(
n_workers=5, threads_per_worker=2, memory_limit="60GB", timeout="1200s"
)
client = Client(cluster)
return client


# ASV suite
params: tuple[list[Dataset], list[KeyCount]] = (
["musmus_11m"],
["musmus_11m"],
["counts", "counts-off-axis"],
)
param_names = ["dataset", "layer"]


### Dask-Based Benchmarks ###
def time_filter_cells_dask(*_):
client = setup_dask_cluster()
try:
optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1])
optimal_chunks = (
adata.X.shape[0] // (4 * len(client.nthreads())),
adata.X.shape[1],
)
adata.X = dd.from_array(adata.X, chunks=optimal_chunks).persist()
sc.pp.filter_cells(adata, min_genes=100)
assert adata.n_obs > 0 # Ensure cells are retained
finally:
client.close()

#def time_filter_cells_dask(*_):

# def time_filter_cells_dask(*_):
# client = setup_dask_cluster()
# try:
# # Compute optimal chunks based on Dask cluster
Expand All @@ -72,7 +80,10 @@ def time_filter_cells_dask(*_):
def peakmem_filter_cells_dask(*_):
client = setup_dask_cluster()
try:
optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1])
optimal_chunks = (
adata.X.shape[0] // (4 * len(client.nthreads())),
adata.X.shape[1],
)
adata.X = dd.from_array(adata.X, chunks=optimal_chunks)
sc.pp.filter_cells(adata, min_genes=100)
finally:
Expand All @@ -82,7 +93,10 @@ def peakmem_filter_cells_dask(*_):
def time_filter_genes_dask(*_):
client = setup_dask_cluster()
try:
optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1])
optimal_chunks = (
adata.X.shape[0] // (4 * len(client.nthreads())),
adata.X.shape[1],
)
adata.X = dd.from_array(adata.X, chunks=optimal_chunks)
adata.X = adata.X.persist()
sc.pp.filter_genes(adata, min_cells=3)
Expand All @@ -93,7 +107,10 @@ def time_filter_genes_dask(*_):
def peakmem_filter_genes_dask(*_):
client = setup_dask_cluster()
try:
optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1])
optimal_chunks = (
adata.X.shape[0] // (4 * len(client.nthreads())),
adata.X.shape[1],
)
adata.X = dd.from_array(adata.X, chunks=optimal_chunks)
sc.pp.filter_genes(adata, min_cells=3)
finally:
Expand All @@ -102,48 +119,67 @@ def peakmem_filter_genes_dask(*_):

### General Dask and Non-Dask Preprocessing Benchmarks ###


class FastSuite:
"""Suite for benchmarking preprocessing operations with Dask."""

params: tuple[list[Dataset], list[KeyCount]] = (
["musmus_11m"],
["musmus_11m"],
["counts", "counts-off-axis"],
)
param_names = ["dataset", "layer"]

def time_calculate_qc_metrics_dask(self, *_):
client = setup_dask_cluster()
try:
optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1])
optimal_chunks = (
adata.X.shape[0] // (4 * len(client.nthreads())),
adata.X.shape[1],
)
adata.X = dd.from_array(adata.X, chunks=optimal_chunks)
adata.X = adata.X.persist()
sc.pp.calculate_qc_metrics(adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True)
sc.pp.calculate_qc_metrics(
adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True
)
finally:
client.close()

def peakmem_calculate_qc_metrics_dask(self, *_):
client = setup_dask_cluster()
try:
optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1])
optimal_chunks = (
adata.X.shape[0] // (4 * len(client.nthreads())),
adata.X.shape[1],
)
adata.X = dd.from_array(adata.X, chunks=optimal_chunks)
sc.pp.calculate_qc_metrics(adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True)
sc.pp.calculate_qc_metrics(
adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True
)
finally:
client.close()

def time_normalize_total_dask(self, *_):
client = setup_dask_cluster()
try:
optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1])
optimal_chunks = (
adata.X.shape[0] // (4 * len(client.nthreads())),
adata.X.shape[1],
)
adata.X = dd.from_array(adata.X, chunks=optimal_chunks)
adata.X = adata.X.map_blocks(lambda x: x / x.sum(axis=1), dtype=float) # Optimize normalization
adata.X = adata.X.map_blocks(
lambda x: x / x.sum(axis=1), dtype=float
) # Optimize normalization
adata.X = adata.X.persist()
finally:
client.close()

def peakmem_normalize_total_dask(self, *_):
client = setup_dask_cluster()
try:
optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1])
optimal_chunks = (
adata.X.shape[0] // (4 * len(client.nthreads())),
adata.X.shape[1],
)
adata.X = dd.from_array(adata.X, chunks=optimal_chunks)
sc.pp.normalize_total(adata, target_sum=1e4)
finally:
Expand All @@ -153,7 +189,10 @@ def time_log1p_dask(self, *_):
client = setup_dask_cluster()
try:
adata.uns.pop("log1p", None)
optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1])
optimal_chunks = (
adata.X.shape[0] // (4 * len(client.nthreads())),
adata.X.shape[1],
)
adata.X = dd.from_array(adata.X, chunks=optimal_chunks)
adata.X = adata.X.persist()
sc.pp.log1p(adata)
Expand All @@ -164,7 +203,10 @@ def peakmem_log1p_dask(self, *_):
client = setup_dask_cluster()
try:
adata.uns.pop("log1p", None)
optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1])
optimal_chunks = (
adata.X.shape[0] // (4 * len(client.nthreads())),
adata.X.shape[1],
)
adata.X = dd.from_array(adata.X, chunks=optimal_chunks)
sc.pp.log1p(adata)
finally:
Expand Down

0 comments on commit 6149b8b

Please sign in to comment.