Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support for gpu queue #3642

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nipype/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def get_nipype_gitversion():
"filelock>=3.0.0",
"etelemetry>=0.2.0",
"looseversion!=1.2",
"gputil==1.4.0",
]

TESTS_REQUIRES = [
Expand Down
5 changes: 5 additions & 0 deletions nipype/pipeline/engine/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,11 @@ def update(self, **opts):
"""Update inputs"""
self.inputs.update(**opts)

def is_gpu_node(self):
return (hasattr(self.inputs, 'use_cuda') and self.inputs.use_cuda) or (
hasattr(self.inputs, 'use_gpu') and self.inputs.use_gpu
)


class JoinNode(Node):
"""Wraps interface objects that join inputs into a list.
Expand Down
67 changes: 60 additions & 7 deletions nipype/pipeline/plugins/multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@

- non_daemon: boolean flag to execute as non-daemon processes
- n_procs: maximum number of threads to be executed in parallel
- n_gpu_procs: maximum number of GPU threads to be executed in parallel
- memory_gb: maximum memory (in GB) that can be used at once.
- raise_insufficient: raise error if the requested resources for
a node over the maximum `n_procs` and/or `memory_gb`
Expand Down Expand Up @@ -130,10 +131,23 @@
)
self.raise_insufficient = self.plugin_args.get("raise_insufficient", True)

# GPU found on system
self.n_gpus_visible = MultiProcPlugin.gpu_count()
# proc per GPU set by user
self.n_gpu_procs = self.plugin_args.get('n_gpu_procs', self.n_gpus_visible)

# total no. of processes allowed on all gpus
if self.n_gpu_procs > self.n_gpus_visible:
logger.info(
'Total number of GPUs proc requested (%d) exceeds the available number of GPUs (%d) on the system. Using requested GPU slots at your own risk!'
% (self.n_gpu_procs, self.n_gpus_visible)
)

# Instantiate different thread pools for non-daemon processes
logger.debug(
"[MultiProc] Starting (n_procs=%d, mem_gb=%0.2f, cwd=%s)",
"[MultiProc] Starting (n_procs=%d, n_gpu_procs=%d, mem_gb=%0.2f, cwd=%s)",
self.processors,
self.n_gpu_procs,
self.memory_gb,
self._cwd,
)
Expand Down Expand Up @@ -184,9 +198,12 @@
"""Check if any node exceeds the available resources"""
tasks_mem_gb = []
tasks_num_th = []
tasks_gpu_th = []
for node in graph.nodes():
tasks_mem_gb.append(node.mem_gb)
tasks_num_th.append(node.n_procs)
if node.is_gpu_node():
tasks_gpu_th.append(node.n_procs)

if np.any(np.array(tasks_mem_gb) > self.memory_gb):
logger.warning(
Expand All @@ -203,6 +220,10 @@
)
if self.raise_insufficient:
raise RuntimeError("Insufficient resources available for job")
if np.any(np.array(tasks_gpu_th) > self.n_gpu_procs):
logger.warning('Nodes demand more GPU than allowed (%d).', self.n_gpu_procs)
if self.raise_insufficient:
raise RuntimeError('Insufficient GPU resources available for job')

def _postrun_check(self):
self.pool.shutdown()
Expand All @@ -213,11 +234,14 @@
"""
free_memory_gb = self.memory_gb
free_processors = self.processors
free_gpu_slots = self.n_gpu_procs
for _, jobid in running_tasks:
free_memory_gb -= min(self.procs[jobid].mem_gb, free_memory_gb)
free_processors -= min(self.procs[jobid].n_procs, free_processors)
if self.procs[jobid].is_gpu_node():
free_gpu_slots -= min(self.procs[jobid].n_procs, free_gpu_slots)

Check warning on line 242 in nipype/pipeline/plugins/multiproc.py

View check run for this annotation

Codecov / codecov/patch

nipype/pipeline/plugins/multiproc.py#L242

Added line #L242 was not covered by tests

return free_memory_gb, free_processors
return free_memory_gb, free_processors, free_gpu_slots

def _send_procs_to_workers(self, updatehash=False, graph=None):
"""
Expand All @@ -232,7 +256,9 @@
)

# Check available resources by summing all threads and memory used
free_memory_gb, free_processors = self._check_resources(self.pending_tasks)
free_memory_gb, free_processors, free_gpu_slots = self._check_resources(
self.pending_tasks
)

stats = (
len(self.pending_tasks),
Expand All @@ -241,6 +267,8 @@
self.memory_gb,
free_processors,
self.processors,
free_gpu_slots,
self.n_gpu_procs,
)
if self._stats != stats:
tasks_list_msg = ""
Expand All @@ -256,13 +284,15 @@
tasks_list_msg = indent(tasks_list_msg, " " * 21)
logger.info(
"[MultiProc] Running %d tasks, and %d jobs ready. Free "
"memory (GB): %0.2f/%0.2f, Free processors: %d/%d.%s",
"memory (GB): %0.2f/%0.2f, Free processors: %d/%d, Free GPU slot:%d/%d.%s",
len(self.pending_tasks),
len(jobids),
free_memory_gb,
self.memory_gb,
free_processors,
self.processors,
free_gpu_slots,
self.n_gpu_procs,
tasks_list_msg,
)
self._stats = stats
Expand Down Expand Up @@ -304,28 +334,39 @@
# Check requirements of this job
next_job_gb = min(self.procs[jobid].mem_gb, self.memory_gb)
next_job_th = min(self.procs[jobid].n_procs, self.processors)
next_job_gpu_th = min(self.procs[jobid].n_procs, self.n_gpu_procs)

is_gpu_node = self.procs[jobid].is_gpu_node()

# If node does not fit, skip at this moment
if next_job_th > free_processors or next_job_gb > free_memory_gb:
if (
next_job_th > free_processors
or next_job_gb > free_memory_gb
or (is_gpu_node and next_job_gpu_th > free_gpu_slots)
):
logger.debug(
"Cannot allocate job %d (%0.2fGB, %d threads).",
"Cannot allocate job %d (%0.2fGB, %d threads, %d GPU slots).",
jobid,
next_job_gb,
next_job_th,
next_job_gpu_th,
)
continue

free_memory_gb -= next_job_gb
free_processors -= next_job_th
if is_gpu_node:
free_gpu_slots -= next_job_gpu_th

Check warning on line 359 in nipype/pipeline/plugins/multiproc.py

View check run for this annotation

Codecov / codecov/patch

nipype/pipeline/plugins/multiproc.py#L359

Added line #L359 was not covered by tests
logger.debug(
"Allocating %s ID=%d (%0.2fGB, %d threads). Free: "
"%0.2fGB, %d threads.",
"%0.2fGB, %d threads, %d GPU slots.",
self.procs[jobid].fullname,
jobid,
next_job_gb,
next_job_th,
free_memory_gb,
free_processors,
free_gpu_slots,
)

# change job status in appropriate queues
Expand All @@ -352,6 +393,8 @@
self._remove_node_dirs()
free_memory_gb += next_job_gb
free_processors += next_job_th
if is_gpu_node:
free_gpu_slots -= next_job_gpu_th

Check warning on line 397 in nipype/pipeline/plugins/multiproc.py

View check run for this annotation

Codecov / codecov/patch

nipype/pipeline/plugins/multiproc.py#L397

Added line #L397 was not covered by tests
# Display stats next loop
self._stats = None

Expand Down Expand Up @@ -379,3 +422,13 @@
key=lambda item: (self.procs[item].mem_gb, self.procs[item].n_procs),
)
return jobids

@staticmethod
def gpu_count():
n_gpus = 1
try:
import GPUtil

return len(GPUtil.getGPUs())
except ImportError:
return n_gpus

Check warning on line 434 in nipype/pipeline/plugins/multiproc.py

View check run for this annotation

Codecov / codecov/patch

nipype/pipeline/plugins/multiproc.py#L433-L434

Added lines #L433 - L434 were not covered by tests
19 changes: 19 additions & 0 deletions nipype/pipeline/plugins/tests/test_multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def test_run_multiproc(tmpdir):
class InputSpecSingleNode(nib.TraitedSpec):
input1 = nib.traits.Int(desc="a random int")
input2 = nib.traits.Int(desc="a random int")
use_gpu = nib.traits.Bool(False, mandatory=False, desc="boolean for GPU nodes")


class OutputSpecSingleNode(nib.TraitedSpec):
Expand Down Expand Up @@ -117,6 +118,24 @@ def test_no_more_threads_than_specified(tmpdir):
pipe.run(plugin="MultiProc", plugin_args={"n_procs": max_threads})


def test_no_more_gpu_threads_than_specified(tmpdir):
tmpdir.chdir()

pipe = pe.Workflow(name="pipe")
n1 = pe.Node(SingleNodeTestInterface(), name="n1", n_procs=2)
n1.inputs.use_gpu = True
n1.inputs.input1 = 4
pipe.add_nodes([n1])

max_threads = 2
max_gpu = 1
with pytest.raises(RuntimeError):
pipe.run(
plugin="MultiProc",
plugin_args={"n_procs": max_threads, 'n_gpu_procs': max_gpu},
)


@pytest.mark.skipif(
sys.version_info >= (3, 8), reason="multiprocessing issues in Python 3.8"
)
Expand Down