Skip to content

Commit

Permalink
[impl]: Factor out logic to compute the number of threads/processes t…
Browse files Browse the repository at this point in the history
…o use
  • Loading branch information
haoyu-zc committed Sep 26, 2024
1 parent 13e59ef commit 9dfea76
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 11 deletions.
49 changes: 38 additions & 11 deletions libs/ccc/coef/impl.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""
Contains function that implement the Clustermatch Correlation Coefficient (CCC).
"""
from __future__ import annotations

import os
from concurrent.futures import ThreadPoolExecutor, as_completed, ProcessPoolExecutor
from typing import Iterable, Union
Expand Down Expand Up @@ -515,6 +517,30 @@ def cdist_func(x, y):
return max_ari_list, max_part_idx_list, pvalues


def get_n_workers(n_jobs: int | None) -> int:
"""
Helper function to get the number of workers for parallel processing.
Args:
n_jobs: value specified by the main ccc function.
"""
n_cpu_cores = os.cpu_count()
if n_cpu_cores is None:
raise ValueError("Could not determine the number of CPU cores. Please specify a positive value of n_jobs")

n_workers = n_cpu_cores
if n_jobs is None:
return n_workers

n_workers = os.cpu_count() + n_jobs if n_jobs < 0 else n_jobs

if n_workers < 1:
raise ValueError(f"The number of threads/processes to use must be greater than 0. Got {n_workers}."
"Please check the n_jobs argument provided")

return n_workers


def ccc(
x: NDArray,
y: NDArray = None,
Expand Down Expand Up @@ -544,9 +570,10 @@ def ccc(
n_chunks_threads_ratio: allows to modify how pairwise comparisons are
split across different threads. It's given as the ratio parameter of
function get_chunks.
n_jobs: number of CPU cores to use for parallelization. The value
n_jobs: number of CPU cores/threads to use for parallelization. The value
None will use all available cores (`os.cpu_count()`), and negative
values will use `os.cpu_count() - n_jobs`. Default is 1.
values will use `os.cpu_count() + n_jobs` (exception will be raised
if this expression yields a result less than 1). Default is 1.
pvalue_n_perms: if given, it computes the p-value of the
coefficient using the given number of permutations.
partitioning_executor: Executor type used for partitioning the data. It
Expand Down Expand Up @@ -596,6 +623,7 @@ def ccc(
X_numerical_type = None
if x.ndim == 1 and (y is not None and y.ndim == 1):
# both x and y are 1d arrays
# TODO: remove assertions and raise exceptions
assert x.shape == y.shape, "x and y need to be of the same size"
n_objects = x.shape[0]
n_features = 2
Expand Down Expand Up @@ -639,8 +667,7 @@ def ccc(
raise ValueError("Wrong combination of parameters x and y")

# get number of cores to use
n_jobs = os.cpu_count() if n_jobs is None else n_jobs
default_n_threads = (os.cpu_count() + n_jobs) if n_jobs < 0 else n_jobs
n_workers = get_n_workers(n_jobs)

if internal_n_clusters is not None:
_tmp_list = List()
Expand Down Expand Up @@ -675,11 +702,11 @@ def ccc(
max_parts = np.zeros((n_features_comp, 2), dtype=np.uint64)

with (
ThreadPoolExecutor(max_workers=default_n_threads) as executor,
ProcessPoolExecutor(max_workers=default_n_threads) as pexecutor,
ThreadPoolExecutor(max_workers=n_workers) as executor,
ProcessPoolExecutor(max_workers=n_workers) as pexecutor,
):
map_func = map
if default_n_threads > 1:
if n_workers > 1:
if partitioning_executor == "thread":
map_func = executor.map
elif partitioning_executor == "process":
Expand All @@ -695,7 +722,7 @@ def ccc(
for f_idx in range(n_features)
for c_idx, c in enumerate(range_n_clusters)
],
default_n_threads,
n_workers,
n_chunks_threads_ratio,
)

Expand Down Expand Up @@ -732,7 +759,7 @@ def ccc(
cdist_executor = False
inner_executor = DummyExecutor()

if default_n_threads > 1:
if n_workers > 1:
if n_features_comp == 1:
map_func = map
cdist_executor = executor
Expand All @@ -742,14 +769,14 @@ def ccc(
map_func = pexecutor.map

# iterate over all chunks of object pairs and compute the coefficient
inputs = get_chunks(n_features_comp, default_n_threads, n_chunks_threads_ratio)
inputs = get_chunks(n_features_comp, n_workers, n_chunks_threads_ratio)
inputs = [
(
i,
n_features,
parts,
pvalue_n_perms,
default_n_threads,
n_workers,
n_chunks_threads_ratio,
cdist_executor,
inner_executor,
Expand Down
26 changes: 26 additions & 0 deletions tests/test_coef.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from concurrent.futures import ThreadPoolExecutor
from random import shuffle
from unittest.mock import patch
import time
import os

Expand All @@ -19,6 +20,7 @@
cdist_parts_basic,
cdist_parts_parallel,
get_chunks,
get_n_workers,
)


Expand Down Expand Up @@ -1557,3 +1559,27 @@ def test_cm_with_too_few_objects():
ccc(data, internal_n_clusters=3)

assert "too few objects" in str(e.value)



@pytest.mark.parametrize("n_jobs, cpu_count, expected", [
(None, 4, 4),
(2, 4, 2),
(-1, 4, 3),
(6, 4, 6),
])
def test_get_n_workers_valid(n_jobs, cpu_count, expected):
with patch('os.cpu_count', return_value=cpu_count):
assert get_n_workers(n_jobs) == expected


@pytest.mark.parametrize("n_jobs, cpu_count, error_type, error_message", [
(0, 4, ValueError, "The number of threads/processes to use must be greater than 0. Got 0"),
(-5, 4, ValueError, "The number of threads/processes to use must be greater than 0. Got -1"),
(2, None, ValueError, "Could not determine the number of CPU cores. Please specify a positive value of n_jobs"),
(None, None, ValueError, "Could not determine the number of CPU cores. Please specify a positive value of n_jobs"),
])
def test_get_n_workers_invalid(n_jobs, cpu_count, error_type, error_message):
with patch('os.cpu_count', return_value=cpu_count):
with pytest.raises(error_type, match=error_message):
get_n_workers(n_jobs)

0 comments on commit 9dfea76

Please sign in to comment.