Chapter 16 Subworkflows: Divide and Conquer

motivation

16.1 Subworkflow Basics

Create a Subworkflow in data-management directory.

Start with empty file:

$ touch src/data-management/Snakefile

Add the following info to src/data-management/Snakefile:

# subworkflow - data-management
#
# @yourname
#

# --- Importing Configuration Files --- #

configfile: "config.yaml"

# --- Build Rules --- #

## gen_regression_vars: creates variables needed to estimate a regression
rule gen_regression_vars:
    input:
        script = config["src_data_mgt"] + "gen_reg_vars.R",
        data   = config["out_data"] + "out/data/mrw_renamed.csv",
        params = config["src_data_specs"] + "param_solow.json",
    output:
        data = config["out_data"] + "mrw_complete.csv"
    log:
        config["log"] + "data-mgt/gen_reg_vars.Rout"
    shell:
        "Rscript {input.script} \
            --data {input.data} \
            --param {input.params} \
            --out {output.data} \
            >& {log}"

## rename_vars        : creates meaningful variable names
rule rename_vars:
    input:
        script = config["src_data_mgt"] + "rename_variables.R",
        data   = config["src_data"] + "mrw.dta"
    output:
        data = config["out_data"] + "mrw_renamed.csv"
    log:
        config["log"] + "data-mgt/rename_variables.Rout"
    shell:
        "Rscript {input.script} \
            --data {input.data} \
            --out {output.data} \
            >& {log}"

Can remove these rules from Snakefile in root directory.

Snakefile in root needs to know there’s another Snakefile that creates outputs that rules in it depend on:

# --- Dictionaries --- #

<...>

# --- Sub Workflows --- #
# only need the final outputs here
subworkflow data_mgt:
   workdir: config["ROOT"]
   snakefile:  config["src_data_mgt"] + "Snakefile"

# --- Build Rules --- #
<...>

And we need to tell it when an input in one of our rules is created from a subworkflow. Do this by, in this case, wrapping the output with data_mgt(output_name) Then the build rules section of our root directory Snakefile becomes:

<...>

# --- Build Rules --- #

## all                : builds all final outputs
rule all:
    input:
        figs   = expand(config["out_figures"] + "{iFigure}.pdf",
                            iFigure = FIGURES),
        models = expand(config["out_analysis"] + "{iModel}_ols_{iSubset}.rds",
                            iModel = MODELS,
                            iSubset = DATA_SUBSET),
        tables  = expand(config["out_tables"] + "{iTable}.tex",
                            iTable = TABLES)

## augment_solow      : construct a table of estimates for augmented solow model
rule augment_solow:
    input:
        script = config["src_tables"] + "tab02_augment_solow.R",
        models = expand(config["out_analysis"] + "{iModel}_ols_{iSubset}.rds",
                            iModel = MODELS,
                            iSubset = DATA_SUBSET),
    params:
        filepath   = config["out_analysis"],
        model_expr = "model_aug_solow*.rds"
    output:
        table = config["out_tables"] + "tab02_augment_solow.tex",
    log:
        config["log"] + "tables/tab02_augment_solow.Rout"
    shell:
        "Rscript {input.script} \
            --filepath {params.filepath} \
            --models {params.model_expr} \
            --out {output.table} \
            >& {log}"

## textbook_solow     : construct a table of regression estimates for textbook solow model
rule textbook_solow:
    input:
        script = config["src_tables"] + "tab01_textbook_solow.R",
        models = expand(config["out_analysis"] + "{iModel}_ols_{iSubset}.rds",
                            iModel = MODELS,
                            iSubset = DATA_SUBSET),
    params:
        filepath   = config["out_analysis"],
        model_expr = "model_solow*.rds"
    output:
        table = config["out_tables"] + "tab01_textbook_solow.tex"
    log:
        config["log"] + "tables/tab01_textbook_solow.Rout"
    shell:
        "Rscript {input.script} \
            --filepath {params.filepath} \
            --models {params.model_expr} \
            --out {output.table} \
            >& {log}"

## make_figs          : builds all figures
rule make_figs:
    input:
        expand(config["out_figures"] + "{iFigure}.pdf",
                iFigure = FIGURES)

## figures            : recipe for constructing a figure (cannot be called)
rule figures:
    input:
        script = config["src_figures"] + "{iFigure}.R",
        data   = data_mgt(config["out_data"] + "mrw_complete.csv"),
        subset = config["src_data_specs"] + "subset_intermediate.json"
    output:
        fig = config["out_figures"] + "{iFigure}.pdf"
    log:
        config["log"]+ "figures/{iFigure}.Rout"
    shell:
        "Rscript {input.script} \
            --data {input.data} \
            --subset {input.subset} \
            --out {output.fig} \
            >& {log}"

## estimate_models    : estimates all regressions
rule estimate_models:
    input:
        expand(config["out_analysis"] + "{iModel}_ols_{iSubset}.rds",
                    iModel = MODELS,
                    iSubset = DATA_SUBSET)

## ols_models         : recipe for estimating a single regression (cannot be called)
rule ols_model:
    input:
        script = config["src_analysis"] + "estimate_ols_model.R",
        data   = data_mgt(config["out_data"] + "mrw_complete.csv"),
        model  = config["src_model_specs"] + "{iModel}.json",
        subset = config["src_data_specs"]  + "{iSubset}.json"
    output:
        model_est = config["out_analysis"] + "{iModel}_ols_{iSubset}.rds",
    log:
        config["log"] + "analysis/{iModel}_ols_{iSubset}.Rout"
    shell:
        "Rscript {input.script} \
            --data {input.data} \
            --model {input.model} \
            --subset {input.subset} \
            --out {output.model_est} \
            >& {log}"

<...>

Let’s examine what happens with this new Workflow. Start with a clean output directory:

$ snakemake clean

Now do a dry run.

$ snakemake --dryrun

The beginning of the output looks like:

Building DAG of jobs...
Executing subworkflow data_mgt.
Building DAG of jobs...
Job counts:
    count   jobs
    1   gen_regression_vars
    1   rename_vars
    2

[Mon Feb 11 22:41:42 2019]
rule rename_vars:
    input: src/data-management/rename_variables.R, src/data/mrw.dta
    output: out/data/mrw_renamed.csv
    log: logs/data-mgt/rename_variables.Rout
    jobid: 1


[Mon Feb 11 22:41:42 2019]
rule gen_regression_vars:
    input: src/data-management/gen_reg_vars.R, out/data/mrw_renamed.csv, src/data-specs/param_solow.json
    output: out/data/mrw_complete.csv
    log: logs/data-mgt/gen_reg_vars.Rout
    jobid: 0

Job counts:
    count   jobs
    1   gen_regression_vars
    1   rename_vars
    2
Executing main workflow.
Job counts:
    count   jobs
    1   all
    1   augment_solow
    3   figures
    24  ols_model
    1   textbook_solow
    30

Explain what has happened here.

16.2 Subworkflow dependencies

We want to go further….

So create a subworkflow for analysis too, containing all rules that estimate models. Create an empty Snakefile:

$ touch src/analysis/Snakefile

Move analysis rules across to src/analysis/Snakefile:

# subworkflow - analysis
#
# @yourname
#

# --- Importing Configuration Files --- #

configfile: "config.yaml"

# --- Build Rules --- #
## estimate_models    : estimates all regressions
rule estimate_models:
    input:
        expand(config["out_analysis"] + "{iModel}_ols_{iSubset}.rds",
                    iModel = MODELS,
                    iSubset = DATA_SUBSET)

## ols_models         : recipe for estimating a single regression (cannot be called)
rule ols_model:
    input:
        script = config["src_analysis"] + "estimate_ols_model.R",
        data   = data_mgt(config["out_data"] + "mrw_complete.csv"),
        model  = config["src_model_specs"] + "{iModel}.json",
        subset = config["src_data_specs"]  + "{iSubset}.json"
    output:
        model_est = config["out_analysis"] + "{iModel}_ols_{iSubset}.rds",
    log:
        config["log"] + "analysis/{iModel}_ols_{iSubset}.Rout"
    shell:
        "Rscript {input.script} \
            --data {input.data} \
            --model {input.model} \
            --subset {input.subset} \
            --out {output.model_est} \
            >& {log}"

Following the logic in 11.1, we then update the snakefile in root by:

  1. removing the rules we copied over
  2. adding a subworkflow called analysis
  3. ensuring if we use outputs created in analysis subworkflow anywhere else, we wrap them in analysis(output)

Doing (1)-(3) in our main Snakefile, suggests we update the following:

(list)

Let’s do it:

<...>
# --- Sub Workflows --- #
subworkflow data_mgt:
   workdir: config["ROOT"]
   snakefile:  config["src_data_mgt"] + "Snakefile"

subworkflow analysis:
   workdir: config["ROOT"]
   snakefile:  config["src_analysis"] + "Snakefile"

# --- Build Rules --- #

## all                : builds all final outputs
rule all:
    input:
        figs   = expand(config["out_figures"] + "{iFigure}.pdf",
                            iFigure = FIGURES),
        models = analysis(expand(config["out_analysis"] +
                            "{iModel}_ols_{iSubset}.rds",
                            iModel = MODELS,
                            iSubset = DATA_SUBSET)
                            ),
        tables  = expand(config["out_tables"] + "{iTable}.tex",
                            iTable = TABLES)

## augment_solow      : construct a table of estimates for augmented solow model
rule augment_solow:
    input:
        script = config["src_tables"] + "tab02_augment_solow.R",
        models = analysis(expand(config["out_analysis"] +
                            "{iModel}_ols_{iSubset}.rds",
                            iModel = MODELS,
                            iSubset = DATA_SUBSET)
                            ),
    params:
        filepath   = config["out_analysis"],
        model_expr = "model_aug_solow*.rds"
    output:
        table = config["out_tables"] + "tab02_augment_solow.tex",
    log:
        config["log"] + "tables/tab02_augment_solow.Rout"
    shell:
        "Rscript {input.script} \
            --filepath {params.filepath} \
            --models {params.model_expr} \
            --out {output.table} \
            >& {log}"

## textbook_solow     : construct a table of regression estimates for textbook solow model
rule textbook_solow:
    input:
        script = config["src_tables"] + "tab01_textbook_solow.R",
        models = analysis(expand(config["out_analysis"] +
                            "{iModel}_ols_{iSubset}.rds",
                            iModel = MODELS,
                            iSubset = DATA_SUBSET)
                            ),
    params:
        filepath   = config["out_analysis"],
        model_expr = "model_solow*.rds"
    output:
        table = config["out_tables"] + "tab01_textbook_solow.tex"
    log:
        config["log"] + "tables/tab01_textbook_solow.Rout"
    shell:
        "Rscript {input.script} \
            --filepath {params.filepath} \
            --models {params.model_expr} \
            --out {output.table} \
            >& {log}"

## make_figs          : builds all figures
rule make_figs:
    input:
        expand(config["out_figures"] + "{iFigure}.pdf",
                iFigure = FIGURES)

## figures            : recipe for constructing a figure (cannot be called)
rule figures:
    input:
        script = config["src_figures"] + "{iFigure}.R",
        data   = data_mgt(config["out_data"] + "mrw_complete.csv"),
        subset = config["src_data_specs"] + "subset_intermediate.json"
    output:
        fig = config["out_figures"] + "{iFigure}.pdf"
    log:
        config["log"]+ "figures/{iFigure}.Rout"
    shell:
        "Rscript {input.script} \
            --data {input.data} \
            --subset {input.subset} \
            --out {output.fig} \
            >& {log}"
<...>

Remark about dictionaries not having to be moved…

Now if we again do a dry run with Snakemake let’s see what happens:

Executing subworkflow analysis.
Building DAG of jobs...
Job counts:
    count   jobs
    24  ols_model
    24

<LIST OF ANALYSIS JOBS>
Executing subworkflow data_mgt.
Building DAG of jobs...
Job counts:
    count   jobs
    1   gen_regression_vars
    1   rename_vars
    2
<LIST OF DATA-MGT JOBS>
Executing main workflow.
Job counts:
    count   jobs
    1   all
    1   augment_solow
    3   figures
    1   textbook_solow
    6
<LIST OF MAIN WORKFLOW JOBS>

Snakemake tells us nothing is wrong. However, if we look at the order of execution:

(list)

it wants to run analysis workflow before data-mgt. This should be a problem. If we try and run snakemake and execute the workflow:

$ snakemake

We do get the expected error:

Building DAG of jobs...
Executing subworkflow analysis.
Building DAG of jobs...
Using shell: /bin/bash
Provided cores: 1
Rules claiming more threads will be scaled down.
Job counts:
    count   jobs
    24  ols_model
    24
Traceback (most recent call last):
  File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/__init__.py", line 537, in snakemake
    report=report)
  File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/workflow.py", line 653, in execute
    success = scheduler.schedule()
  File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/scheduler.py", line 275, in schedule
    run = self.job_selector(needrun)
  File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/scheduler.py", line 399, in job_selector
    c = list(map(self.job_reward, jobs))  # job rewards
  File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/scheduler.py", line 469, in job_reward
    input_size = job.inputsize
  File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/jobs.py", line 288, in inputsize
    self._inputsize = sum(f.size for f in self.input)
  File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/jobs.py", line 288, in <genexpr>
    self._inputsize = sum(f.size for f in self.input)
  File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/io.py", line 123, in wrapper
    return func(self, *args, **kwargs)
  File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/io.py", line 138, in wrapper
    return func(self, *args, **kwargs)
  File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/io.py", line 286, in size
    return self.size_local
  File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/io.py", line 291, in size_local
    self.check_broken_symlink()
  File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/io.py", line 296, in check_broken_symlink
    if not self.exists_local and lstat(self.file):
  File "/home/lachlan/anaconda3/lib/python3.5/site-packages/snakemake/io.py", line 29, in lstat
    follow_symlinks=os.stat not in os.supports_follow_symlinks)
FileNotFoundError: [Errno 2] No such file or directory: '/home/lachlan/teaching/snakemake-econ-r-learner/out/data/mrw_complete.csv'

This says that out/data/mrw_complete.csv doesn’t exist, therefore the models cannot be estimated.

What’s the solution? We need to specify that the analysis subworkflow itself has a subworkflow, data_mgt, that needs to run before analysis. So we add the subworkflow data_mgt to the Snakefile in src/analysis:

# subworkflow - analysis
#
# @yourname
#

# --- Importing Configuration Files --- #

configfile: "config.yaml"

# --- Sub Workflows --- #
subworkflow data_mgt:
   workdir: config["ROOT"]
   snakefile:  config["src_data_mgt"] + "Snakefile"

# --- Build Rules --- #
## estimate_models    : estimates all regressions
rule estimate_models:
    input:
        expand(config["out_analysis"] + "{iModel}_ols_{iSubset}.rds",
                    iModel = MODELS,
                    iSubset = DATA_SUBSET)

## ols_models         : recipe for estimating a single regression (cannot be called)
rule ols_model:
    input:
        script = config["src_analysis"] + "estimate_ols_model.R",
        data   = data_mgt(config["out_data"] + "mrw_complete.csv"),
        model  = config["src_model_specs"] + "{iModel}.json",
        subset = config["src_data_specs"]  + "{iSubset}.json"
    output:
        model_est = config["out_analysis"] + "{iModel}_ols_{iSubset}.rds",
    log:
        config["log"] + "analysis/{iModel}_ols_{iSubset}.Rout"
    shell:
        "Rscript {input.script} \
            --data {input.data} \
            --model {input.model} \
            --subset {input.subset} \
            --out {output.model_est} \
            >& {log}"

And we try again:

$ snakemake
Building DAG of jobs...
Executing subworkflow data_mgt.
Building DAG of jobs...
Using shell: /bin/bash
Provided cores: 1
Rules claiming more threads will be scaled down.
Job counts:
    count   jobs
    1   gen_regression_vars
    1   rename_vars
    2

[Mon Feb 11 23:12:42 2019]
rule rename_vars:
    input: src/data-management/rename_variables.R, src/data/mrw.dta
    output: out/data/mrw_renamed.csv
    log: logs/data-mgt/rename_variables.Rout
    jobid: 1

[Mon Feb 11 23:12:42 2019]
Finished job 1.
1 of 2 steps (50%) done

[Mon Feb 11 23:12:42 2019]
rule gen_regression_vars:
    input: src/data-management/gen_reg_vars.R, src/data-specs/param_solow.json, out/data/mrw_renamed.csv
    output: out/data/mrw_complete.csv
    log: logs/data-mgt/gen_reg_vars.Rout
    jobid: 0

[Mon Feb 11 23:12:43 2019]
Finished job 0.
2 of 2 steps (100%) done
Complete log: /home/lachlan/teaching/snakemake-econ-r-learner/.snakemake/log/2019-02-11T231242.348577.snakemake.log
Executing subworkflow analysis.
Building DAG of jobs...
Executing subworkflow data_mgt.
Error: Snakefile "/home/lachlan/teaching/snakemake-econ-r-learner/src/analysis/src/data-management/Snakefile" not present.

What has happened this time? its about the path

explain why this fix works:

# subworkflow - analysis
#
# @yourname
#

# --- Importing Configuration Files --- #

configfile: "config.yaml"

# --- Sub Workflows --- #
subworkflow data_mgt:
   workdir: config["sub2root"] + config["ROOT"]
   snakefile:  config["sub2root"]+ config["src_data_mgt"] + "Snakefile"

# --- Build Rules --- #
## estimate_models    : estimates all regressions
rule estimate_models:
    input:
        expand(config["out_analysis"] + "{iModel}_ols_{iSubset}.rds",
                    iModel = MODELS,
                    iSubset = DATA_SUBSET)

## ols_models         : recipe for estimating a single regression (cannot be called)
rule ols_model:
    input:
        script = config["src_analysis"] + "estimate_ols_model.R",
        data   = data_mgt(config["out_data"] + "mrw_complete.csv"),
        model  = config["src_model_specs"] + "{iModel}.json",
        subset = config["src_data_specs"]  + "{iSubset}.json"
    output:
        model_est = config["out_analysis"] + "{iModel}_ols_{iSubset}.rds",
    log:
        config["log"] + "analysis/{iModel}_ols_{iSubset}.Rout"
    shell:
        "Rscript {input.script} \
            --data {input.data} \
            --model {input.model} \
            --subset {input.subset} \
            --out {output.model_est} \
            >& {log}"

Now if we run snakemake again:

$ snakemake

Our build runs from start to finish.

Exercise: More Subworkflows

Make two new subworkflows figs and tables to contain the rules to construct all figures and tables respectively. The end result should be only the all rule in the “Main Build Rules” section of the Snakefile of the root directory. Be sure to carefully think about properly adding subworkflows, and subworkflow dependencies.

To check your new subworkflow system builds, clean the output directory and be sure after entering snakemake into the terminal that all outputs are successfully created.

Guided Exercise: help rules with subworkflows

With the new subworkflow system in place, examine how the output from snakemake help looks like.

To bring our help rule back into shape, replace the shell command in the help rule with:

find . -type f -name 'Snakefile' | tac | xargs sed -n 's/^##//p' \
            > {output}

Explain exactly what this command is doing. (Look for help in ‘usual’ places).

Next, update the inputs of the help rule so that the dependencies are correct.