-
Notifications
You must be signed in to change notification settings - Fork 128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Can dynamic map recognize headers in the supplied data frame #1119
Comments
Glad you're using dynamic branching. Because of the way dynamic branching maps over data, you do need library(drake)
library(tibble)
sum_data <- function(x, y){
return(x + y)
}
my_plan <- drake_plan(
my_grid = tibble(
x = c(100, 200, 300),
y = c(10, 20, 30)
),
x = target(
sum_data(my_grid$x, my_grid$y),
dynamic = map(my_grid)
)
)
make(my_plan)
#> target my_grid
#> dynamic x
#> subtarget x_8b81d5e8
#> subtarget x_3e5d4ba6
#> subtarget x_0358c37e
#> aggregate x
readd(x)
#> [1] 110 220 330 Created on 2019-12-19 by the reprex package (v0.3.0) Unlike static branching, dynamic branching is built on Related: https://books.ropensci.org/drake/dynamic.html#dynamic-targets, #1105, #1087 (comment) |
Hi Will,
Maybe I missed it - but can dynamic plans run in parallel? I believe I got
it to work using clustermq but I can't seem to with the future backend.
Here is an example:
```
library(drake)
library(future.batchtools)
library(future)
library(tidyverse)
future::plan(batchtools_slurm, template =
"/data/sgg2/jenny/projects/proxyMR/slurm_batchtools.tmpl")
sum_data <- function(x, y){
return(x + y)
}
my_plan <- drake_plan(
my_grid = tibble(
x = c(100, 200, 300),
y = c(10, 20, 30)
),
x = target(
sum_data(my_grid$x, my_grid$y),
dynamic = map(my_grid)
)
)
make(my_plan, parallelism = "future",jobs=20,console_log_file =
"test.out")
```
When I look at `test.out` I see no evidence of anything happening in
parallel on different nodes, etc. On the other hand, when I run something
like ` make(my_plan, parallelism = "clustermq",jobs=5,console_log_file =
"test2.out",template = list(cpus = 1, partition = "sgg"))`:
I see lines in the log file that indicate parallel computing worked as
expected:
```
node13.cluster | 115615 | 2020-01-15 17:37:55.742776 +0100 GMT | | load
my_grid
node13.cluster | 122108 | 2020-01-15 17:37:55.748142 +0100 GMT | x_8b81d5e8
| build on an hpc worker
node13.cluster | 115615 | 2020-01-15 17:37:55.794983 +0100 GMT | x_3e5d4ba6
| subtarget x_3e5d4ba6
node13.cluster | 115615 | 2020-01-15 17:37:55.801931 +0100 GMT | x_0358c37e
| subtarget x_0358c37e
node13.cluster | 122124 | 2020-01-15 17:37:55.802210 +0100 GMT | x_3e5d4ba6
| build on an hpc worker
node13.cluster | 122119 | 2020-01-15 17:37:55.809365 +0100 GMT | x_0358c37e
| build on an hpc worker
```
Any thoughts?
Thanks in advance!
…On Thu, Dec 19, 2019 at 7:27 PM Will Landau ***@***.***> wrote:
Closed #1119 <#1119>.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#1119?email_source=notifications&email_token=AITJSVWM6VIMFXPCANKB4PTQZO4HTA5CNFSM4J46YOI2YY3PNVWWK3TUL52HS4DFWZEXG43VMVCXMZLOORHG65DJMZUWGYLUNFXW5KTDN5WW2ZLOORPWSZGOVS7UMUY#event-2898216531>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AITJSVTS52K7IKYL5D7B6Y3QZO4HTANCNFSM4J46YOIQ>
.
|
I ran your example on SGE, and the job monitoring tells me that Line 209 in 7f89b17
|
Added an extra log message in 613f775. Those dynamic sub-targets were running in parallel on HPC before (at least they should have been) and now the log file should show evidence. |
thanks for the quick response. I will give this a try when I'm back at my computer. When I tried this on a larger job and seperatly viewed the scheduled jobs (with slurm this is via "squeue"). I only saw evidence of one job running and it was going much slower than I would expect if it was indeed running in parallel...
… On Jan 15, 2020, at 18:34, Will Landau ***@***.***> wrote:
I ran your example on SGE, and the job monitoring tells me that drake is indeed launching different jobs for different sub-targets. (To confirm, return Sys.info() from sum_data().) But the log file is not displaying enough information. I suspect we need a line like this in future_build():
https://github.com/ropensci/drake/blob/7f89b1744b8a51daa93e4b2a77f3cad0e1dc01bb/R/backend_clustermq.R#L209
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub, or unsubscribe.
|
Maybe I was a bit too hasty in my previous response. Since you were checking What does your template file look like? |
Hi Will,
This is my template file:
```
#!/bin/bash
## Via https://github.com/mllg/batchtools/blob/master/inst/templates/
## Job Resource Interface Definition
##
## ntasks [integer(1)]: Number of required tasks,
## Set larger than 1 if you want to further
parallelize
## with MPI within your job.
## cpus [integer(1)]: Number of required cpus per task,
## Set larger than 1 if you want to further
parallelize
## with multicore/parallel within each task.
## memory [integer(1)]: Memory in megabytes for each cpu.
## Default is 7900 Mo/core
## partition [string(1)]: Partition requested.
## Default is "sgg".
##
## Default resources can be set in your .batchtools.conf.R by defining the
variable
## 'default.resources' as a named list.
<%
# relative paths are not handled well by Slurm
log.file = fs::path_expand(log.file)
#########################
# Set defaults if needed.
if (!"partition" %in% names(resources)) {
resources$partition = "sgg"
}
-%>
#SBATCH --job-name=<%= job.name %>
#SBATCH --output=<%= log.file %>
#SBATCH --error=<%= log.file %>
#SBATCH --ntasks=1
#SBATCH --account=sgg
#SBATCH --partition=<%= resources$partition %>
<%= if (!is.null(resources[["cpus"]])) sprintf(paste0("#SBATCH
--cpus-per-task='", resources[["cpus"]], "'")) %>
<%= if (array.jobs) sprintf("#SBATCH --array=1-%i", nrow(jobs)) else "" %>
<%= if (!is.null(resources[["memory"]])) sprintf(paste0("#SBATCH
--mem-per-cpu='", resources[["memory"]], "'")) %>
## module add ...
## Run R:
Rscript -e 'batchtools::doJobCollection("<%= uri %>")'
```
As I track my jobs with `squeue` I seem to be getting some parallelization
happening... it's just not very fast even when I up the jobs parameter. I
would hope to have 50 jobs running or in queue on the server but I only
have about 15 or less.
This example illustrates better:
```
simple <- function(a,b){
Sys.sleep(60)
return(a+b)
}
test_plan <- drake_plan(
x = 100,
traits = tibble(
i = 1:50,
),
IV_list = target({
simple(traits$i, x)},
dynamic = map(traits)
),
)
make(test_plan,parallelism = "future",console_log_file = "test.out", jobs =
50)
```
…On Wed, Jan 15, 2020 at 7:41 PM Will Landau ***@***.***> wrote:
Maybe I was a bit too hasty in my previous response. Since you were
checking squeue before and seeing no parallelism, I actually doubt 613f775
<613f775>
fixes it. (However, it does make sure the remote node names and PIDs get
printed to the log file, and it might help to post the log file of your
minimal example using "future" parallelism.)
What does your template file look like?
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#1119?email_source=notifications&email_token=AITJSVTQGJA2V2ALVO4YTEDQ55KFVA5CNFSM4J46YOI2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEJBK7RI#issuecomment-574795717>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AITJSVV5TB4WVX4RJ6QJTLLQ55KFVANCNFSM4J46YOIQ>
.
|
I suspect you are running into a lot of overhead due to |
Got it. Then I will eagerly await an alternative in that case! I tried with
`clustermq` instead and indeed it was much, much faster. I knew given the
option `clustermq` is a better choice over `batchtools`, but I didn't
realize it was that much better. So meanwhile I'm going to switch to
`clustermq` when possible. However I'm running into some problems here as
well (see below for example of warnings/errors). I managed to isolate the
problem to a single target:
```
join_ <- drake_plan(
variant_data = fread(file_in(!!variant_file_full_name),data.table=F), ##
note `variant_data` is very large.
)
make(join_,parallelism = "clustermq",console_log_file = "mr_am.out", jobs =
1, template = list(cpus = 1, partition = "cluster",
log_file="/data/sgg2/jenny/projects/proxyMR/%a_clustermq.out")) # I know I
don't need 5 jobs for this but this replicates the problem
Error in config$workers$receive_data() :
Authentication provided by worker does not match
In addition: Warning messages:
1: In if (!nzchar(x)) { :
the condition has length > 1 and only the first element will be used
2: In if (!nzchar(x)) { :
the condition has length > 1 and only the first element will be used
3: In if (!nzchar(x)) { :
the condition has length > 1 and only the first element will be used
4: In if (!nzchar(x)) { :
the condition has length > 1 and only the first element will be used
5: In if (!nzchar(x)) { :
the condition has length > 1 and only the first element will be used
```
If I redefine `variant_file_full_name` to a file that is much smaller I
lose the error, but the warnings persist.
This is my current clustermq template:
```
#!/bin/sh
# From https://github.com/mschubert/clustermq/wiki/SLURM
#SBATCH --job-name={{ job_name }} # job name
#SBATCH --partition={{ partition }} # partition
#SBATCH --output={{ log_file | /dev/null }} # you can add .%a for array
index
#SBATCH --error={{ log_file | /dev/null }} # log file
#SBATCH --workdir=/data/sgg2/jenny/projects/proxyMR
####SBATCH --mem-per-cpu={{ memory | 4096 }} # memory
#SBATCH --array=1-{{ n_jobs }} # job array
#SBATCH --cpus-per-task={{ cpus }}
# module load R # Uncomment if R is an
environment module.
####ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{
master }}")'
```
Are you familiar with these outputs? Is it a clustemq problem? I saw the
issue mentioned here (
mschubert/clustermq#127 (comment)),
but that didn't seem to solve the issue. I am also using packrat to manage
my library.
…On Thu, Jan 16, 2020 at 2:41 PM Will Landau ***@***.***> wrote:
I suspect you are running into a lot of overhead due to batchtools, which
brings me right back to HenrikBengtsson/future#204 (comment)
<HenrikBengtsson/future#204 (comment)>.
I think we are all waiting on a faster alternative to batchtools for
transient HPC workers.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#1119?email_source=notifications&email_token=AITJSVQACCXUGPVQHKCRAXLQ6BPWZA5CNFSM4J46YOI2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEJEC5UY#issuecomment-575155923>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AITJSVRLI5VK662BGYQSYM3Q6BPWZANCNFSM4J46YOIQ>
.
|
Just to follow-up, I was able to get it working if I set `hpc=FALSE`, on
the target that loads the very large object. I immediately modify this
object with another target to make it much, much smaller and afterward I no
longer need the large target (I also have set this target with shrinks the
large file to have `hpc=F` so the large object isn't getting sent to an hpc
node). But I still have many jobs I want to run in parallel via hpc (that
only involves the shrinked data set or maybe something independent).
Can I trust that adding `memory_strategy = "lookahead", garbage_collection
= TRUE`, is sufficient to make sure that this large target will be unloaded
and not get sent to the nodes? It seems to be taking a very long time to
"send common data".
On Fri, Jan 17, 2020 at 2:04 PM Jenny Sjaarda <[email protected]>
wrote:
… Got it. Then I will eagerly await an alternative in that case! I tried
with `clustermq` instead and indeed it was much, much faster. I knew given
the option `clustermq` is a better choice over `batchtools`, but I didn't
realize it was that much better. So meanwhile I'm going to switch to
`clustermq` when possible. However I'm running into some problems here as
well (see below for example of warnings/errors). I managed to isolate the
problem to a single target:
```
join_ <- drake_plan(
variant_data = fread(file_in(!!variant_file_full_name),data.table=F),
## note `variant_data` is very large.
)
make(join_,parallelism = "clustermq",console_log_file = "mr_am.out", jobs
= 1, template = list(cpus = 1, partition = "cluster",
log_file="/data/sgg2/jenny/projects/proxyMR/%a_clustermq.out")) # I know I
don't need 5 jobs for this but this replicates the problem
Error in config$workers$receive_data() :
Authentication provided by worker does not match
In addition: Warning messages:
1: In if (!nzchar(x)) { :
the condition has length > 1 and only the first element will be used
2: In if (!nzchar(x)) { :
the condition has length > 1 and only the first element will be used
3: In if (!nzchar(x)) { :
the condition has length > 1 and only the first element will be used
4: In if (!nzchar(x)) { :
the condition has length > 1 and only the first element will be used
5: In if (!nzchar(x)) { :
the condition has length > 1 and only the first element will be used
```
If I redefine `variant_file_full_name` to a file that is much smaller I
lose the error, but the warnings persist.
This is my current clustermq template:
```
#!/bin/sh
# From https://github.com/mschubert/clustermq/wiki/SLURM
#SBATCH --job-name={{ job_name }} # job name
#SBATCH --partition={{ partition }} # partition
#SBATCH --output={{ log_file | /dev/null }} # you can add .%a for array
index
#SBATCH --error={{ log_file | /dev/null }} # log file
#SBATCH --workdir=/data/sgg2/jenny/projects/proxyMR
####SBATCH --mem-per-cpu={{ memory | 4096 }} # memory
#SBATCH --array=1-{{ n_jobs }} # job array
#SBATCH --cpus-per-task={{ cpus }}
# module load R # Uncomment if R is an
environment module.
####ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{
master }}")'
```
Are you familiar with these outputs? Is it a clustemq problem? I saw the
issue mentioned here (
mschubert/clustermq#127 (comment)),
but that didn't seem to solve the issue. I am also using packrat to manage
my library.
On Thu, Jan 16, 2020 at 2:41 PM Will Landau ***@***.***>
wrote:
> I suspect you are running into a lot of overhead due to batchtools,
> which brings me right back to HenrikBengtsson/future#204 (comment)
> <HenrikBengtsson/future#204 (comment)>.
> I think we are all waiting on a faster alternative to batchtools for
> transient HPC workers.
>
> —
> You are receiving this because you authored the thread.
> Reply to this email directly, view it on GitHub
> <#1119?email_source=notifications&email_token=AITJSVQACCXUGPVQHKCRAXLQ6BPWZA5CNFSM4J46YOI2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEJEC5UY#issuecomment-575155923>,
> or unsubscribe
> <https://github.com/notifications/unsubscribe-auth/AITJSVRLI5VK662BGYQSYM3Q6BPWZANCNFSM4J46YOIQ>
> .
>
|
For projects like this, it might be good to see if you can condense these two operations into a single target so you do not need to save a copy of the large dataset.
That's odd. Sending common data should theoretically be something that happens before any targets begin building on the worker. Another reprex in a different issue thread would help if you would like me to take a closer look on SGE. Lines 73 to 84 in 581d882
Also, for a target that only directly depends on the small dataset, the large dataset does not get sent. Line 141 in 581d882
I verified this last point just now by running
Note target large
target small
target analysis
Called from: cmq_deps_list(target, config)
Browse[1]> vals_static
$small
x y
1 1.3030945 -0.6060721
2 -0.3266300 -0.7998393
3 1.2924504 0.2008886
4 -1.5228575 1.0286076
5 -1.1767842 -1.4433261
6 -0.9846573 -1.2302709 Memory strategies and garbage collection do not control the data that goes to the workers. One possible option is |
Hi Will,
Everything seems to be working fine however... it happens whether I perform a standard |
Would you post a local |
Yes sure, sorry:
This gives me the warnings, but I don't think it will help you as I think
it might have something to do with my cache or something, as if I run this
in another project it works just fine.
test <- function(df,i){
return(df[i,])
}
plan <- drake_plan(
foo = iris,
traits = tibble(
i = 1:5,
),
bar = target(
test(foo, traits$i),
dynamic = map(traits)
),
#bah = bind_rows(bar) %>% as_tibble()
)
…On Mon, Jan 20, 2020 at 6:04 PM Will Landau ***@***.***> wrote:
Would you post a local reprex (without HPC) so I can take a look?
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#1119?email_source=notifications&email_token=AITJSVV6U6ULVRKTLPFH6PTQ6XKSJA5CNFSM4J46YOI2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEJNJG4A#issuecomment-576361328>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AITJSVXPPP65DCPUKJBLT73Q6XKSJANCNFSM4J46YOIQ>
.
|
Yeah, the warnings do not appear when I run this example. What about library(drake)
library(dplyr)
#>
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#>
#> filter, lag
#> The following objects are masked from 'package:base':
#>
#> intersect, setdiff, setequal, union
library(tibble)
test <- function(df,i){
return(df[i,])
}
plan <- drake_plan(
foo = iris,
traits = tibble(
i = 1:5,
),
bar = target(
test(foo, traits$i),
dynamic = map(traits)
),
bah = bind_rows(bar) %>% as_tibble()
)
make(plan)
#> target foo
#> target traits
#> dynamic bar
#> subtarget bar_982d7b12
#> subtarget bar_2c5f0700
#> subtarget bar_623f5045
#> subtarget bar_4dd68315
#> subtarget bar_1ebcff01
#> aggregate bar
#> target bah Created on 2020-01-20 by the reprex package (v0.3.0) |
OK this isn't very pretty, but here's the results.
|
Thanks, that really helps. Would you call The warning seems to come from here: Line 284 in 05ed62b
By that point, |
OK I found it! I've isolated it to an `assign` call:
test <- function(input){
assign(paste0(input, "var"), 1)
}
deps_code(test)
…On Mon, Jan 20, 2020 at 10:15 PM Will Landau ***@***.***> wrote:
Thanks, that really helps.
Would you call deps_code() on each of your functions and see if one of
them returns those warnings?
The warning seems to come from here:
https://github.com/ropensci/drake/blob/05ed62b5f2fc409f41110bef3402153e26ff3d43/R/analyze_code.R#L284
By that point, x should have length 1. The traceback proceeds through
cds_import_dependencies(), which makes me think one of your custom
functions could be giving drake some trouble.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#1119?email_source=notifications&email_token=AITJSVT3PYFJ3OERCX63PGDQ6YIA5A5CNFSM4J46YOI2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEJN2SFA#issuecomment-576432404>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AITJSVVBVBETAK544YMXXSDQ6YIA5ANCNFSM4J46YOIQ>
.
|
Thanks for tracking it down. Should be fixed in 74d4f40. |
Thanks for your help in finding it, I had no idea how to reproduce this
one!
…On Tue, Jan 21, 2020 at 2:33 PM Will Landau ***@***.***> wrote:
Thanks for tracking it down. Should be fixed in 74d4f40
<74d4f40>
.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#1119?email_source=notifications&email_token=AITJSVR3K33GHKDSBM6WXQDQ632THA5CNFSM4J46YOI2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEJPX3NQ#issuecomment-576683446>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AITJSVRGMWAVIFKVDCTCER3Q632THANCNFSM4J46YOIQ>
.
|
Just tested and no warning now. Anything run with the warning (in the old
version) should be OK though? thanks again!
On Tue, Jan 21, 2020 at 3:01 PM Jenny Sjaarda <[email protected]>
wrote:
… Thanks for your help in finding it, I had no idea how to reproduce this
one!
On Tue, Jan 21, 2020 at 2:33 PM Will Landau ***@***.***>
wrote:
> Thanks for tracking it down. Should be fixed in 74d4f40
> <74d4f40>
> .
>
> —
> You are receiving this because you authored the thread.
> Reply to this email directly, view it on GitHub
> <#1119?email_source=notifications&email_token=AITJSVR3K33GHKDSBM6WXQDQ632THA5CNFSM4J46YOI2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEJPX3NQ#issuecomment-576683446>,
> or unsubscribe
> <https://github.com/notifications/unsubscribe-auth/AITJSVRGMWAVIFKVDCTCER3Q632THANCNFSM4J46YOIQ>
> .
>
|
Prework
drake
's code of conduct.drake-r-package
tag. (If you anticipate extended follow-up and discussion, you are already in the right place!)Question
What would you like to know?
Very excited to learn about dynamic branching! I had previously been closing my plan to define a map object and then starting a new plan, so I'm glad that I can now do that within a plan. As someone that is familiar with the static branching features, is this the best way to use the
map
function within dynamic branching? i.e. Do we need to call the variable explicitly with$
(unlike with static branching which reads the headers directly from the.data
argument. Perhaps an example directly comparing the two approaches in the manual would be useful.The text was updated successfully, but these errors were encountered: