Chapter 16 Subworkflows: Divide and Conquer
motivation
16.1 Subworkflow Basics
Create a Subworkflow in data-management
directory.
Start with empty file:
Add the following info to src/data-management/Snakefile
:
# subworkflow - data-management
#
# @yourname
#
# --- Importing Configuration Files --- #
configfile: "config.yaml"
# --- Build Rules --- #
## gen_regression_vars: creates variables needed to estimate a regression
rule gen_regression_vars:
input:
script = config["src_data_mgt"] + "gen_reg_vars.R",
data = config["out_data"] + "out/data/mrw_renamed.csv",
params = config["src_data_specs"] + "param_solow.json",
output:
data = config["out_data"] + "mrw_complete.csv"
log:
config["log"] + "data-mgt/gen_reg_vars.Rout"
shell:
"Rscript {input.script} \
--data {input.data} \
--param {input.params} \
--out {output.data} \
>& {log}"
## rename_vars : creates meaningful variable names
rule rename_vars:
input:
script = config["src_data_mgt"] + "rename_variables.R",
data = config["src_data"] + "mrw.dta"
output:
data = config["out_data"] + "mrw_renamed.csv"
log:
config["log"] + "data-mgt/rename_variables.Rout"
shell:
"Rscript {input.script} \
--data {input.data} \
--out {output.data} \
>& {log}"
Can remove these rules from Snakefile in root directory.
Snakefile in root needs to know there’s another Snakefile that creates outputs that rules in it depend on:
# --- Dictionaries --- #
<...>
# --- Sub Workflows --- #
# only need the final outputs here
subworkflow data_mgt:
workdir: config["ROOT"]
snakefile: config["src_data_mgt"] + "Snakefile"
# --- Build Rules --- #
<...>
And we need to tell it when an input in one of our rules is created from a subworkflow.
Do this by, in this case, wrapping the output with data_mgt(output_name)
Then the build rules section of our root directory Snakefile becomes:
<...>
# --- Build Rules --- #
## all : builds all final outputs
rule all:
input:
figs = expand(config["out_figures"] + "{iFigure}.pdf",
iFigure = FIGURES),
models = expand(config["out_analysis"] + "{iModel}_ols_{iSubset}.rds",
iModel = MODELS,
iSubset = DATA_SUBSET),
tables = expand(config["out_tables"] + "{iTable}.tex",
iTable = TABLES)
## augment_solow : construct a table of estimates for augmented solow model
rule augment_solow:
input:
script = config["src_tables"] + "tab02_augment_solow.R",
models = expand(config["out_analysis"] + "{iModel}_ols_{iSubset}.rds",
iModel = MODELS,
iSubset = DATA_SUBSET),
params:
filepath = config["out_analysis"],
model_expr = "model_aug_solow*.rds"
output:
table = config["out_tables"] + "tab02_augment_solow.tex",
log:
config["log"] + "tables/tab02_augment_solow.Rout"
shell:
"Rscript {input.script} \
--filepath {params.filepath} \
--models {params.model_expr} \
--out {output.table} \
>& {log}"
## textbook_solow : construct a table of regression estimates for textbook solow model
rule textbook_solow:
input:
script = config["src_tables"] + "tab01_textbook_solow.R",
models = expand(config["out_analysis"] + "{iModel}_ols_{iSubset}.rds",
iModel = MODELS,
iSubset = DATA_SUBSET),
params:
filepath = config["out_analysis"],
model_expr = "model_solow*.rds"
output:
table = config["out_tables"] + "tab01_textbook_solow.tex"
log:
config["log"] + "tables/tab01_textbook_solow.Rout"
shell:
"Rscript {input.script} \
--filepath {params.filepath} \
--models {params.model_expr} \
--out {output.table} \
>& {log}"
## make_figs : builds all figures
rule make_figs:
input:
expand(config["out_figures"] + "{iFigure}.pdf",
iFigure = FIGURES)
## figures : recipe for constructing a figure (cannot be called)
rule figures:
input:
script = config["src_figures"] + "{iFigure}.R",
data = data_mgt(config["out_data"] + "mrw_complete.csv"),
subset = config["src_data_specs"] + "subset_intermediate.json"
output:
fig = config["out_figures"] + "{iFigure}.pdf"
log:
config["log"]+ "figures/{iFigure}.Rout"
shell:
"Rscript {input.script} \
--data {input.data} \
--subset {input.subset} \
--out {output.fig} \
>& {log}"
## estimate_models : estimates all regressions
rule estimate_models:
input:
expand(config["out_analysis"] + "{iModel}_ols_{iSubset}.rds",
iModel = MODELS,
iSubset = DATA_SUBSET)
## ols_models : recipe for estimating a single regression (cannot be called)
rule ols_model:
input:
script = config["src_analysis"] + "estimate_ols_model.R",
data = data_mgt(config["out_data"] + "mrw_complete.csv"),
model = config["src_model_specs"] + "{iModel}.json",
subset = config["src_data_specs"] + "{iSubset}.json"
output:
model_est = config["out_analysis"] + "{iModel}_ols_{iSubset}.rds",
log:
config["log"] + "analysis/{iModel}_ols_{iSubset}.Rout"
shell:
"Rscript {input.script} \
--data {input.data} \
--model {input.model} \
--subset {input.subset} \
--out {output.model_est} \
>& {log}"
<...>
Let’s examine what happens with this new Workflow. Start with a clean output directory:
Now do a dry run.
The beginning of the output looks like:
Building DAG of jobs...
Executing subworkflow data_mgt.
Building DAG of jobs...
Job counts:
count jobs
1 gen_regression_vars
1 rename_vars
2
[Mon Feb 11 22:41:42 2019]
rule rename_vars:
input: src/data-management/rename_variables.R, src/data/mrw.dta
output: out/data/mrw_renamed.csv
log: logs/data-mgt/rename_variables.Rout
jobid: 1
[Mon Feb 11 22:41:42 2019]
rule gen_regression_vars:
input: src/data-management/gen_reg_vars.R, out/data/mrw_renamed.csv, src/data-specs/param_solow.json
output: out/data/mrw_complete.csv
log: logs/data-mgt/gen_reg_vars.Rout
jobid: 0
Job counts:
count jobs
1 gen_regression_vars
1 rename_vars
2
Executing main workflow.
Job counts:
count jobs
1 all
1 augment_solow
3 figures
24 ols_model
1 textbook_solow
30
Explain what has happened here.
16.2 Subworkflow dependencies
We want to go further….
So create a subworkflow for analysis too, containing all rules that estimate models. Create an empty Snakefile:
Move analysis rules across to src/analysis/Snakefile
:
# subworkflow - analysis
#
# @yourname
#
# --- Importing Configuration Files --- #
configfile: "config.yaml"
# --- Build Rules --- #
## estimate_models : estimates all regressions
rule estimate_models:
input:
expand(config["out_analysis"] + "{iModel}_ols_{iSubset}.rds",
iModel = MODELS,
iSubset = DATA_SUBSET)
## ols_models : recipe for estimating a single regression (cannot be called)
rule ols_model:
input:
script = config["src_analysis"] + "estimate_ols_model.R",
data = data_mgt(config["out_data"] + "mrw_complete.csv"),
model = config["src_model_specs"] + "{iModel}.json",
subset = config["src_data_specs"] + "{iSubset}.json"
output:
model_est = config["out_analysis"] + "{iModel}_ols_{iSubset}.rds",
log:
config["log"] + "analysis/{iModel}_ols_{iSubset}.Rout"
shell:
"Rscript {input.script} \
--data {input.data} \
--model {input.model} \
--subset {input.subset} \
--out {output.model_est} \
>& {log}"
Following the logic in 11.1, we then update the snakefile in root by:
- removing the rules we copied over
- adding a subworkflow called
analysis
- ensuring if we use outputs created in
analysis
subworkflow anywhere else, we wrap them inanalysis(output)
Doing (1)-(3) in our main Snakefile, suggests we update the following:
(list)
Let’s do it:
<...>
# --- Sub Workflows --- #
subworkflow data_mgt:
workdir: config["ROOT"]
snakefile: config["src_data_mgt"] + "Snakefile"
subworkflow analysis:
workdir: config["ROOT"]
snakefile: config["src_analysis"] + "Snakefile"
# --- Build Rules --- #
## all : builds all final outputs
rule all:
input:
figs = expand(config["out_figures"] + "{iFigure}.pdf",
iFigure = FIGURES),
models = analysis(expand(config["out_analysis"] +
"{iModel}_ols_{iSubset}.rds",
iModel = MODELS,
iSubset = DATA_SUBSET)
),
tables = expand(config["out_tables"] + "{iTable}.tex",
iTable = TABLES)
## augment_solow : construct a table of estimates for augmented solow model
rule augment_solow:
input:
script = config["src_tables"] + "tab02_augment_solow.R",
models = analysis(expand(config["out_analysis"] +
"{iModel}_ols_{iSubset}.rds",
iModel = MODELS,
iSubset = DATA_SUBSET)
),
params:
filepath = config["out_analysis"],
model_expr = "model_aug_solow*.rds"
output:
table = config["out_tables"] + "tab02_augment_solow.tex",
log:
config["log"] + "tables/tab02_augment_solow.Rout"
shell:
"Rscript {input.script} \
--filepath {params.filepath} \
--models {params.model_expr} \
--out {output.table} \
>& {log}"
## textbook_solow : construct a table of regression estimates for textbook solow model
rule textbook_solow:
input:
script = config["src_tables"] + "tab01_textbook_solow.R",
models = analysis(expand(config["out_analysis"] +
"{iModel}_ols_{iSubset}.rds",
iModel = MODELS,
iSubset = DATA_SUBSET)
),
params:
filepath = config["out_analysis"],
model_expr = "model_solow*.rds"
output:
table = config["out_tables"] + "tab01_textbook_solow.tex"
log:
config["log"] + "tables/tab01_textbook_solow.Rout"
shell:
"Rscript {input.script} \
--filepath {params.filepath} \
--models {params.model_expr} \
--out {output.table} \
>& {log}"
## make_figs : builds all figures
rule make_figs:
input:
expand(config["out_figures"] + "{iFigure}.pdf",
iFigure = FIGURES)
## figures : recipe for constructing a figure (cannot be called)
rule figures:
input:
script = config["src_figures"] + "{iFigure}.R",
data = data_mgt(config["out_data"] + "mrw_complete.csv"),
subset = config["src_data_specs"] + "subset_intermediate.json"
output:
fig = config["out_figures"] + "{iFigure}.pdf"
log:
config["log"]+ "figures/{iFigure}.Rout"
shell:
"Rscript {input.script} \
--data {input.data} \
--subset {input.subset} \
--out {output.fig} \
>& {log}"
<...>
Remark about dictionaries not having to be moved…
Now if we again do a dry run with Snakemake let’s see what happens:
Executing subworkflow analysis.
Building DAG of jobs...
Job counts:
count jobs
24 ols_model
24
<LIST OF ANALYSIS JOBS>
Executing subworkflow data_mgt.
Building DAG of jobs...
Job counts:
count jobs
1 gen_regression_vars
1 rename_vars
2
<LIST OF DATA-MGT JOBS>
Executing main workflow.
Job counts:
count jobs
1 all
1 augment_solow
3 figures
1 textbook_solow
6
<LIST OF MAIN WORKFLOW JOBS>
Snakemake tells us nothing is wrong. However, if we look at the order of execution:
(list)
it wants to run analysis
workflow before data-mgt
.
This should be a problem.
If we try and run snakemake and execute the workflow:
We do get the expected error:
Building DAG of jobs...
Executing subworkflow analysis.
Building DAG of jobs...
Using shell: /bin/bash
Provided cores: 1
Rules claiming more threads will be scaled down.
Job counts:
count jobs
24 ols_model
24
Traceback (most recent call last):
File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/__init__.py", line 537, in snakemake
report=report)
File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/workflow.py", line 653, in execute
success = scheduler.schedule()
File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/scheduler.py", line 275, in schedule
run = self.job_selector(needrun)
File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/scheduler.py", line 399, in job_selector
c = list(map(self.job_reward, jobs)) # job rewards
File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/scheduler.py", line 469, in job_reward
input_size = job.inputsize
File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/jobs.py", line 288, in inputsize
self._inputsize = sum(f.size for f in self.input)
File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/jobs.py", line 288, in <genexpr>
self._inputsize = sum(f.size for f in self.input)
File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/io.py", line 123, in wrapper
return func(self, *args, **kwargs)
File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/io.py", line 138, in wrapper
return func(self, *args, **kwargs)
File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/io.py", line 286, in size
return self.size_local
File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/io.py", line 291, in size_local
self.check_broken_symlink()
File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/io.py", line 296, in check_broken_symlink
if not self.exists_local and lstat(self.file):
File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/io.py", line 29, in lstat
follow_symlinks=os.stat not in os.supports_follow_symlinks)
FileNotFoundError: [Errno 2] No such file or directory: '/home/lachlan/teaching/snakemake-econ-r-learner/out/data/mrw_complete.csv'
This says that out/data/mrw_complete.csv
doesn’t exist, therefore the models cannot be estimated.
What’s the solution?
We need to specify that the analysis
subworkflow itself has a subworkflow, data_mgt
,
that needs to run before analysis
.
So we add the subworkflow data_mgt
to the Snakefile in src/analysis
:
# subworkflow - analysis
#
# @yourname
#
# --- Importing Configuration Files --- #
configfile: "config.yaml"
# --- Sub Workflows --- #
subworkflow data_mgt:
workdir: config["ROOT"]
snakefile: config["src_data_mgt"] + "Snakefile"
# --- Build Rules --- #
## estimate_models : estimates all regressions
rule estimate_models:
input:
expand(config["out_analysis"] + "{iModel}_ols_{iSubset}.rds",
iModel = MODELS,
iSubset = DATA_SUBSET)
## ols_models : recipe for estimating a single regression (cannot be called)
rule ols_model:
input:
script = config["src_analysis"] + "estimate_ols_model.R",
data = data_mgt(config["out_data"] + "mrw_complete.csv"),
model = config["src_model_specs"] + "{iModel}.json",
subset = config["src_data_specs"] + "{iSubset}.json"
output:
model_est = config["out_analysis"] + "{iModel}_ols_{iSubset}.rds",
log:
config["log"] + "analysis/{iModel}_ols_{iSubset}.Rout"
shell:
"Rscript {input.script} \
--data {input.data} \
--model {input.model} \
--subset {input.subset} \
--out {output.model_est} \
>& {log}"
And we try again:
Building DAG of jobs...
Executing subworkflow data_mgt.
Building DAG of jobs...
Using shell: /bin/bash
Provided cores: 1
Rules claiming more threads will be scaled down.
Job counts:
count jobs
1 gen_regression_vars
1 rename_vars
2
[Mon Feb 11 23:12:42 2019]
rule rename_vars:
input: src/data-management/rename_variables.R, src/data/mrw.dta
output: out/data/mrw_renamed.csv
log: logs/data-mgt/rename_variables.Rout
jobid: 1
[Mon Feb 11 23:12:42 2019]
Finished job 1.
1 of 2 steps (50%) done
[Mon Feb 11 23:12:42 2019]
rule gen_regression_vars:
input: src/data-management/gen_reg_vars.R, src/data-specs/param_solow.json, out/data/mrw_renamed.csv
output: out/data/mrw_complete.csv
log: logs/data-mgt/gen_reg_vars.Rout
jobid: 0
[Mon Feb 11 23:12:43 2019]
Finished job 0.
2 of 2 steps (100%) done
Complete log: /home/lachlan/teaching/snakemake-econ-r-learner/.snakemake/log/2019-02-11T231242.348577.snakemake.log
Executing subworkflow analysis.
Building DAG of jobs...
Executing subworkflow data_mgt.
Error: Snakefile "/home/lachlan/teaching/snakemake-econ-r-learner/src/analysis/src/data-management/Snakefile" not present.
What has happened this time? its about the path
explain why this fix works:
# subworkflow - analysis
#
# @yourname
#
# --- Importing Configuration Files --- #
configfile: "config.yaml"
# --- Sub Workflows --- #
subworkflow data_mgt:
workdir: config["sub2root"] + config["ROOT"]
snakefile: config["sub2root"]+ config["src_data_mgt"] + "Snakefile"
# --- Build Rules --- #
## estimate_models : estimates all regressions
rule estimate_models:
input:
expand(config["out_analysis"] + "{iModel}_ols_{iSubset}.rds",
iModel = MODELS,
iSubset = DATA_SUBSET)
## ols_models : recipe for estimating a single regression (cannot be called)
rule ols_model:
input:
script = config["src_analysis"] + "estimate_ols_model.R",
data = data_mgt(config["out_data"] + "mrw_complete.csv"),
model = config["src_model_specs"] + "{iModel}.json",
subset = config["src_data_specs"] + "{iSubset}.json"
output:
model_est = config["out_analysis"] + "{iModel}_ols_{iSubset}.rds",
log:
config["log"] + "analysis/{iModel}_ols_{iSubset}.Rout"
shell:
"Rscript {input.script} \
--data {input.data} \
--model {input.model} \
--subset {input.subset} \
--out {output.model_est} \
>& {log}"
Now if we run snakemake again:
Our build runs from start to finish.
Exercise: More Subworkflows
Make two new subworkflows figs
and tables
to contain the rules to construct
all figures and tables respectively.
The end result should be only the all
rule in the “Main Build Rules” section
of the Snakefile
of the root directory.
Be sure to carefully think about properly adding subworkflows,
and subworkflow dependencies.
To check your new subworkflow system builds, clean the output directory
and be sure after entering snakemake
into the terminal that all outputs
are successfully created.
Guided Exercise: help
rules with subworkflows
With the new subworkflow system in place, examine how the output from
snakemake help
looks like.
To bring our help
rule back into shape, replace the shell command
in the help
rule with:
Explain exactly what this command is doing. (Look for help in ‘usual’ places).
Next, update the inputs of the help
rule so that the dependencies are correct.