Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can dynamic map recognize headers in the supplied data frame #1119

Closed
3 tasks done
jennysjaarda opened this issue Dec 19, 2019 · 21 comments
Closed
3 tasks done

Can dynamic map recognize headers in the supplied data frame #1119

jennysjaarda opened this issue Dec 19, 2019 · 21 comments

Comments

@jennysjaarda
Copy link

jennysjaarda commented Dec 19, 2019

Prework

  • Read and abide by drake's code of conduct.
  • Search for duplicates among the existing issues, both open and closed.
  • If you think your question has a quick and definite answer, consider posting to Stack Overflow under the drake-r-package tag. (If you anticipate extended follow-up and discussion, you are already in the right place!)

Question

What would you like to know?

Very excited to learn about dynamic branching! I had previously been closing my plan to define a map object and then starting a new plan, so I'm glad that I can now do that within a plan. As someone that is familiar with the static branching features, is this the best way to use the map function within dynamic branching? i.e. Do we need to call the variable explicitly with $ (unlike with static branching which reads the headers directly from the .data argument. Perhaps an example directly comparing the two approaches in the manual would be useful.

sum_data <- function(x,y){
  return(x+y)
}

my_plan <- drake_plan(
  my_grid = tibble(
    x = c(100,200,300),
    y = c(10,20,30)
  ),
  x = target(sum_data(my_grid$x, my_grid$y), ## does not work with `$`
      dynamic = map(my_grid))
)

make(my_plan)
@wlandau
Copy link
Member

wlandau commented Dec 19, 2019

Glad you're using dynamic branching. Because of the way dynamic branching maps over data, you do need $ in the plan you propose above.

library(drake)
library(tibble)

sum_data <- function(x, y){
  return(x + y)
}

my_plan <- drake_plan(
  my_grid = tibble(
    x = c(100, 200, 300),
    y = c(10, 20, 30)
  ),
  x = target(
    sum_data(my_grid$x, my_grid$y),
    dynamic = map(my_grid)
  )
)

make(my_plan)
#> target my_grid
#> dynamic x
#> subtarget x_8b81d5e8
#> subtarget x_3e5d4ba6
#> subtarget x_0358c37e
#> aggregate x

readd(x)
#> [1] 110 220 330

Created on 2019-12-19 by the reprex package (v0.3.0)

Unlike static branching, dynamic branching is built on vctrs (even more so in development drake). That means that if you map() over a data frame, each slice is also a data frame. The fact that all dynamic targets are vectors makes the data more stable, and it makes it much easier to aggregate targets and predict how the aggregates are structured.

Related: https://books.ropensci.org/drake/dynamic.html#dynamic-targets, #1105, #1087 (comment)

@wlandau wlandau closed this as completed Dec 19, 2019
@jennysjaarda
Copy link
Author

jennysjaarda commented Jan 15, 2020 via email

@wlandau
Copy link
Member

wlandau commented Jan 15, 2020

I ran your example on SGE, and the job monitoring tells me that drake is indeed launching different jobs for different sub-targets. (To confirm, return Sys.info() from sum_data().) But the log file is not displaying enough information. I suspect we need a line like this in future_build():

config$logger$minor("build on an hpc worker", target = target)

wlandau-lilly added a commit that referenced this issue Jan 15, 2020
@wlandau
Copy link
Member

wlandau commented Jan 15, 2020

Added an extra log message in 613f775. Those dynamic sub-targets were running in parallel on HPC before (at least they should have been) and now the log file should show evidence.

@jennysjaarda
Copy link
Author

jennysjaarda commented Jan 15, 2020 via email

@wlandau
Copy link
Member

wlandau commented Jan 15, 2020

Maybe I was a bit too hasty in my previous response. Since you were checking squeue before and seeing no parallelism, I actually doubt 613f775 fixes it. (However, it does make sure the remote node names and PIDs get printed to the log file, and it might help to post the log file of your minimal example using "future" parallelism.)

What does your template file look like?

@jennysjaarda
Copy link
Author

jennysjaarda commented Jan 16, 2020 via email

@wlandau
Copy link
Member

wlandau commented Jan 16, 2020

I suspect you are running into a lot of overhead due to batchtools, which brings me right back to HenrikBengtsson/future#204 (comment). I think we are all waiting on a faster alternative to batchtools for transient HPC workers.

@jennysjaarda
Copy link
Author

jennysjaarda commented Jan 17, 2020 via email

@jennysjaarda
Copy link
Author

jennysjaarda commented Jan 17, 2020 via email

@wlandau
Copy link
Member

wlandau commented Jan 18, 2020

Just to follow-up, I was able to get it working if I set hpc=FALSE, on
the target that loads the very large object. I immediately modify this
object with another target to make it much, much smaller and afterward I no
longer need the large target...

For projects like this, it might be good to see if you can condense these two operations into a single target so you do not need to save a copy of the large dataset.

Can I trust that adding memory_strategy = "lookahead", garbage_collection = TRUE, is sufficient to make sure that this large target will be unloaded
and not get sent to the nodes? It seems to be taking a very long time to
"send common data".

That's odd. Sending common data should theoretically be something that happens before any targets begin building on the worker. Another reprex in a different issue thread would help if you would like me to take a closer look on SGE.

cmq_master_iter <- function(config) {
msg <- config$workers$receive_data()
cmq_conclude_build(msg = msg, config = config)
if (!identical(msg$token, "set_common_data_token")) {
config$logger$minor("sending common data")
config$workers$send_common_data()
} else if (!config$queue$empty()) {
cmq_next_target(config)
} else {
config$workers$send_shutdown_worker()
}
}

Also, for a target that only directly depends on the small dataset, the large dataset does not get sent.

deps <- cmq_deps_list(target, config)

I verified this last point just now by running make(plan, parallelism = "clustermq") under debug(cmq_deps_list) with the following plan.

plan <- drake_plan(
  large = target(
    data.frame(x = rnorm(1e6), y = rnorm(1e6)),
    hpc = FALSE
  ),
  small = target(
    head(large),
    hpc = FALSE
  ),
  analysis = colMeans(small)
)

Note hpc = FALSE on both large and small so the large data stays completely local.

target large
target small
target analysis
Called from: cmq_deps_list(target, config)
Browse[1]> vals_static
$small
           x          y
1  1.3030945 -0.6060721
2 -0.3266300 -0.7998393
3  1.2924504  0.2008886
4 -1.5228575  1.0286076
5 -1.1767842 -1.4433261
6 -0.9846573 -1.2302709

Memory strategies and garbage collection do not control the data that goes to the workers. One possible option is make(plan, parallelism = "clustermq", caching = "worker"). With caching = "worker", the workers deal with the cache directly instead of receiving data over ZeroMQ sockets. However, it this is usually slower than the default caching = "master" in my experience, though some users claim it is faster for their use cases.

@jennysjaarda
Copy link
Author

Hi Will,
Thanks for all the help. I took your advice and combined it into a single step and that seemed to help a lot. I also used hpc=FALSE on the large target so it doesn't get passed to a worker. The only thing I'm still experiencing are these warnings:

Warning messages:
1: In if (!nzchar(x)) { :
  the condition has length > 1 and only the first element will be used
2: In if (!nzchar(x)) { :
  the condition has length > 1 and only the first element will be used
3: In if (!nzchar(x)) { :
  the condition has length > 1 and only the first element will be used
4: In if (!nzchar(x)) { :
  the condition has length > 1 and only the first element will be used
5: In if (!nzchar(x)) { :
  the condition has length > 1 and only the first element will be used
6: In if (!nzchar(x)) { :
  the condition has length > 1 and only the first element will be used
7: In if (!nzchar(x)) { :
  the condition has length > 1 and only the first element will be used
8: In if (!nzchar(x)) { :
  the condition has length > 1 and only the first element will be used

Everything seems to be working fine however... it happens whether I perform a standard make(plan) or execute with clustermq.

@wlandau
Copy link
Member

wlandau commented Jan 20, 2020

Would you post a local reprex (without HPC) so I can take a look?

@jennysjaarda
Copy link
Author

jennysjaarda commented Jan 20, 2020 via email

@wlandau
Copy link
Member

wlandau commented Jan 20, 2020

Yeah, the warnings do not appear when I run this example. What about options(warn = 2) and a traceback()?

library(drake)
library(dplyr)
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
library(tibble)

test <- function(df,i){
  return(df[i,])
}

plan <- drake_plan(
  foo = iris,
  traits = tibble(
    i = 1:5,
  ),
  bar = target(
    test(foo, traits$i),
    dynamic = map(traits)
  ),
  bah = bind_rows(bar) %>% as_tibble()
)

make(plan)
#> target foo
#> target traits
#> dynamic bar
#> subtarget bar_982d7b12
#> subtarget bar_2c5f0700
#> subtarget bar_623f5045
#> subtarget bar_4dd68315
#> subtarget bar_1ebcff01
#> aggregate bar
#> target bah

Created on 2020-01-20 by the reprex package (v0.3.0)

@jennysjaarda
Copy link
Author

OK this isn't very pretty, but here's the results.

> make(plan3)
Error in if (!nzchar(x)) { :
  (converted from warning) the condition has length > 1 and only the first element will be used
> traceback()
41: doWithOneRestart(return(expr), restart)
40: withOneRestart(expr, restarts[[1L]])
39: withRestarts({
        .Internal(.signalCondition(simpleWarning(msg, call), msg,
            call))
        .Internal(.dfltWarn(msg, call))
    }, muffleWarning = function() NULL)
38: .signalSimpleWarning("the condition has length > 1 and only the first element will be used",
        quote(if (!nzchar(x)) {
            return()
        })) 
37: analyze_global(expr$x, results, locals, allowed_globals)
36: analyze_assign(expr, results, locals, allowed_globals)
35: walk_base(expr, results, locals, allowed_globals, name)
34: walk_call(expr, results, locals, allowed_globals)
33: FUN(X[[i]], ...)
32: lapply(X = expr, FUN = ignore(walk_code), results = results,
        locals = locals, allowed_globals = allowed_globals)
31: walk_recursive(expr, results, locals, allowed_globals)
30: walk_drake(expr, results, locals, allowed_globals, name)
29: walk_call(expr, results, locals, allowed_globals)
28: FUN(X[[i]], ...)
27: lapply(X = expr, FUN = ignore(walk_code), results = results,
        locals = locals, allowed_globals = allowed_globals)
26: walk_recursive(expr[-2], results, locals, allowed_globals)
25: analyze_for(expr, results, locals, allowed_globals)
24: walk_base(expr, results, locals, allowed_globals, name)
23: walk_call(expr, results, locals, allowed_globals)
22: FUN(X[[i]], ...)
21: lapply(X = expr, FUN = ignore(walk_code), results = results,
        locals = locals, allowed_globals = allowed_globals)
20: walk_recursive(expr, results, locals, allowed_globals)
19: walk_drake(expr, results, locals, allowed_globals, name)
18: walk_call(expr, results, locals, allowed_globals)
17: ignore(walk_code)(body(expr), results, locals, allowed_globals)
16: analyze_function(expr, results, locals, allowed_globals)
15: walk_code(expr, results, locals, allowed_globals)
14: analyze_code_impl(expr = expr, exclude = exclude, allowed_globals = allowed_globals)
13: analyze_code(expr = expr, exclude = exclude, allowed_globals = allowed_globals)
12: cds_import_dependencies(expr = imports[[index]], exclude = name,
        allowed_globals = config$ht_imports)
11: FUN(X[[i]], ...)
10: lapply(X = X, FUN = FUN, ...)
9: weak_mclapply(X = keys, FUN = FUN, mc.cores = jobs, ...)
8: lightly_parallelize_atomic(X = X, FUN = FUN, jobs = jobs, ...)
7: lightly_parallelize(X = seq_along(imports), FUN = cdl_analyze_import,
       jobs = config$jobs, imports = imports, names = names, config = config)
6: cds_analyze_imports(config, imports)
5: force(expr)
4: memo_expr(cds_analyze_imports(config, imports), config$cache,
       imports_kernel)
3: create_drake_spec(plan = plan, envir = envir, logger = logger,
       jobs = jobs_preprocess, trigger = trigger, cache = cache)
2: drake_config(plan = plan, targets = targets, envir = envir, seed = seed,
       verbose = verbose, hook = hook, parallelism = parallelism,
       jobs = jobs, jobs_preprocess = jobs_preprocess, packages = packages,
       lib_loc = lib_loc, prework = prework, prepend = prepend,
       command = command, args = args, recipe_command = recipe_command,
       log_progress = log_progress, cache = cache, fetch_cache = fetch_cache,
       timeout = timeout, cpu = cpu, elapsed = elapsed, retries = retries,
       force = force, graph = graph, trigger = trigger, skip_targets = skip_targets,
       skip_imports = skip_imports, skip_safety_checks = skip_safety_checks,
       lazy_load = lazy_load, session_info = session_info, cache_log_file = cache_log_file,
       caching = caching, keep_going = keep_going, session = session,
       pruning_strategy = pruning_strategy, makefile_path = makefile_path,
       console_log_file = console_log_file, ensure_workers = ensure_workers,
       garbage_collection = garbage_collection, template = template,
       sleep = sleep, hasty_build = hasty_build, memory_strategy = memory_strategy,
       layout = layout, spec = spec, lock_envir = lock_envir, history = history,
       recover = recover, recoverable = recoverable, curl_handles = curl_handles,
       max_expand = max_expand, log_build_times = log_build_times)
1: make(plan3)

@wlandau
Copy link
Member

wlandau commented Jan 20, 2020

Thanks, that really helps.

Would you call deps_code() on each of your functions and see if one of them returns those warnings?

The warning seems to come from here:

if (!nzchar(x)) {

By that point, x should have length 1. The traceback proceeds through cds_import_dependencies(), which makes me think one of your custom functions could be giving drake some trouble.

@jennysjaarda
Copy link
Author

jennysjaarda commented Jan 21, 2020 via email

wlandau-lilly added a commit that referenced this issue Jan 21, 2020
@wlandau
Copy link
Member

wlandau commented Jan 21, 2020

Thanks for tracking it down. Should be fixed in 74d4f40.

@jennysjaarda
Copy link
Author

jennysjaarda commented Jan 21, 2020 via email

@jennysjaarda
Copy link
Author

jennysjaarda commented Jan 21, 2020 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants