Skip to content

Commit

Permalink
integrate tqdm in joblib and multiprocessing
Browse files Browse the repository at this point in the history
  • Loading branch information
adumasphi committed Jan 18, 2024
1 parent d4d4291 commit 24f9ac9
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 22 deletions.
64 changes: 44 additions & 20 deletions otwrapy/_otwrapy.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,8 @@ def _exec_sample_serial(func, verbosity):
func : Function or callable
A callable python object, usually a function. The function should take
an input vector as argument and return an output vector.
verbosity : int
If value greater than 0, the progress bar is displayed.
verbosity : bool
If True, the progress bar is displayed.
Returns
-------
Expand All @@ -358,7 +358,7 @@ def _exec_sample_serial(func, verbosity):

def _exec_sample(X):
X = ot.Sample(X)
if verbosity > 0 and X.getSize() > 1:
if verbosity and X.getSize() > 1:
Y = ot.Sample(0, func.getOutputDimension())
for x in tqdm(X):
Y.add(func(x))
Expand All @@ -377,20 +377,43 @@ def _exec_sample_joblib(func, n_cpus, verbosity):
func : Function or callable
A callable python object, usually a function. The function should take
an input vector as argument and return an output vector.
n_cpus : int
Number of CPUs on which to distribute the function calls.
verbosity : bool
If True, the progress bar is displayed.
Returns
-------
_exec_sample : Function or callable
The parallelized function.
"""
import joblib
from joblib import Parallel, delayed
import contextlib

# solution found in: https://stackoverflow.com/questions/24983493/tracking-progress-of-joblib-parallel-execution/58936697#58936697
@contextlib.contextmanager
def tqdm_joblib(tqdm_object):
"""Context manager to patch joblib to report into tqdm progress bar given as argument"""
class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack):
def __call__(self, *args, **kwargs):
tqdm_object.update(n=self.batch_size)
return super().__call__(*args, **kwargs)

old_batch_callback = joblib.parallel.BatchCompletionCallBack
joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback
try:
yield tqdm_object
finally:
joblib.parallel.BatchCompletionCallBack = old_batch_callback
tqdm_object.close()

def _exec_sample(X):
Y = Parallel(n_jobs=n_cpus, verbose=verbosity)(
delayed(func)(x) for x in X)
if verbosity:
with tqdm_joblib(tqdm(total=ot.Sample(X).getSize())) as progress_bar:
Y = Parallel(n_jobs=n_cpus, verbose=0)(delayed(func)(x) for x in X)
else:
Y = Parallel(n_jobs=n_cpus, verbose=0)(delayed(func)(x) for x in X)
return ot.Sample(Y)

return _exec_sample
Expand All @@ -415,10 +438,9 @@ def _exec_sample_multiprocessing(func, n_cpus):
"""
def _exec_sample(X):
from multiprocessing import Pool
p = Pool(processes=n_cpus)
rs = p.map_async(func, X)
p.close()
return ot.Sample(rs.get())
with Pool(processes=n_cpus) as p:
rs = list(tqdm(p.imap(func, X), total=ot.Sample(X).getSize()))
return ot.Sample(rs)
return _exec_sample


Expand Down Expand Up @@ -540,26 +562,27 @@ class Parallelizer(ot.OpenTURNSPythonFunction):
backend : string (Optional)
Whether to parallelize using 'ipyparallel', 'joblib', 'pathos', 'multiprocessing' or
'serial'. Serial backend means unit evaluation, with a progress bar if verbosity > 0.
'serial'. Serial backend means unit evaluation, with a progress bar if verbosity is True.
n_cpus : int (Optional)
Number of CPUs on which the simulations will be distributed. Needed Only
if using 'joblib', pathos or 'multiprocessing' as backend.
If n_cpus = 1, the behavior is the same as 'serial'.
verbosity : int (Optional)
verbose parameter when using 'joblib', 'dask' or without parallelization. Default is 10.
For 'serial', if verbosity > 0, a progress bar is displayed using tqdm module.
When 'dask' is used, 0 means no progress bar, whereas other value activate the progress bar.
verbosity : bool (Optional)
Verbose parameter when using 'serial', 'joblib', 'multiprocessing' or 'dask'.
Default is True.
For 'joblib', 'multiprocessing' and 'serial', a progress bar is displayed using tqdm module.
For 'dask' is used, the progress bar provided by dask is used.
dask_args : dict (Optional)
Dictionnary parameters when using 'dask'. It must follow this form:
{'scheduler': ip adress or host name,
'workers': {'ip adress or host name': n_cpus},
'remote_python': {'ip adress or host name': path_to_bin_python}}.
'workers': {'ip adress or host name': n_cpus},
'remote_python': {'ip adress or host name': path_to_bin_python}}.
The parallelization uses SSHCluster class of dask distributed with 1 thread per worker.
When dask is chosen, the argument n_cpus is not used. The progress bar is enabled if
verbosity != 0.
verbosity is True.
The dask dashboard is enabled at port 8787.
Examples
Expand All @@ -578,7 +601,8 @@ class Parallelizer(ot.OpenTURNSPythonFunction):
`model` is already an :class:`ot.Function`.
"""

def __init__(self, wrapper, backend='multiprocessing', n_cpus=-1, verbosity=10, dask_args=None):
def __init__(self, wrapper, backend='multiprocessing', n_cpus=-1, verbosity=True,
dask_args=None):

# -1 cpus means all available cpus - 1 for the scheduler
if n_cpus == -1:
Expand All @@ -605,7 +629,7 @@ def __init__(self, wrapper, backend='multiprocessing', n_cpus=-1, verbosity=10,

# This configures how to run samples on the model :
if backend == 'serial' or self.n_cpus == 1:
self._exec_sample = _exec_sample_serial(self.wrapper, verbosity)
self._exec_sample = _exec_sample_serial(self.wrapper, self.verbosity)

elif (backend == 'ipython') or (backend == 'ipyparallel'):
# Check that ipyparallel is installed
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
'joblib': ["joblib>=0.9.3"],
'ipyparallel': ["ipyparallel>=5.0.1"],
'pathos': ["pathos>=0.2.0"],
'dask': ["dask>=2021.01.0", "asyncssh"],
'tqdm': ["tqdm>=4.0.0"]
'dask': ["dask>=2021.01.0", "asyncssh"]
},
install_requires=["tqdm>=4.0.0"],
author="Felipe Aguirre Martinez",
author_email="[email protected]",
description="General purpose OpenTURNS python wrapper tools",
Expand Down

0 comments on commit 24f9ac9

Please sign in to comment.