123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- """Exact copy of falcon_unzip/tasks/snakemake.py
- TODO: Consolidate.
- """
- from future.utils import viewitems
- from future.utils import itervalues
- from builtins import object
- import json
- import os
- import re
- def find_wildcards(pattern):
- """
- >>> find_wildcards('{foo}/{bar}')
- ['bar', 'foo']
- """
- re_wildcard = re.compile(r'\{(\w+)\}')
- found = [mo.group(1) for mo in re_wildcard.finditer(pattern)]
- return list(sorted(found))
- class SnakemakeRuleWriter(object):
- def legalize(self, rule_name):
- return self.re_bad_char.sub('_', rule_name, count=0)
- def unique_rule_name(self, basename):
- rule_name = basename
- if rule_name in self.rule_names:
- i = 1
- while rule_name in self.rule_names:
- rule_name = basename + str(i)
- i += 1
- self.rule_names.add(rule_name)
- return rule_name
- def write_dynamic_rules(self, rule_name, input_json, inputs, shell_template,
- parameters, wildcard_outputs, output_json):
- """Lots of conventions.
- input_json: should have a key 'mapped_inputs', which is a map of key->filename
- Those filenames will be symlinked here, according to the patterns in wildcard_inputs.
- shell_template: for the parallel task
- output_json: This will contain only key->filename, based on wildcard_outputs.
- inputs: These include non-wildcards too.
- (For now, we assume inputs/outputs is just one per parallel task.)
- """
- # snakemake does not like paths starting with './'; they can lead to mismatches.
- # So we run normpath everywhere.
- input_json = os.path.normpath(input_json)
- output_json = os.path.normpath(output_json)
- # snakemake cannot use already-generated files as dynamic outputs (the wildcard_inputs for the parallel task),
- # so we rename them and plan to symlink.
- wildcard_inputs = dict(inputs)
- nonwildcard_inputs = dict()
- for (key, fn) in list(viewitems(wildcard_inputs)):
- if '{' not in fn:
- del wildcard_inputs[key]
- nonwildcard_inputs[key] = fn
- continue
- dn, bn = os.path.split(wildcard_inputs[key])
- wildcard_inputs[key] = os.path.join(dn + '.symlink', bn)
- rule_name = self.unique_rule_name(rule_name)
- dynamic_output_kvs = ', '.join("%s=dynamic('%s')"%(k, os.path.normpath(v)) for (k, v) in viewitems(wildcard_inputs))
- dynamic_input_kvs = ', '.join("%s=ancient(dynamic('%s'))"%(k, os.path.normpath(v)) for (k, v) in viewitems(wildcard_outputs))
- rule_parameters = {k: v for (k, v) in viewitems(parameters) if not k.startswith('_')}
- params = ','.join('\n %s="%s"'%(k,v) for (k, v) in viewitems(rule_parameters))
- pattern_kv_list = list()
- for (name, wi) in viewitems(wildcard_inputs):
- fn_pattern = wi
- fn_pattern = fn_pattern.replace('{', '{{')
- fn_pattern = fn_pattern.replace('}', '}}')
- pattern_kv_list.append('%s="%s"' %(name, fn_pattern))
- wi_pattern_kvs = ' '.join(pattern_kv_list)
- rule = """
- rule dynamic_%(rule_name)s_split:
- input: %(input_json)r
- output: %(dynamic_output_kvs)s
- shell: 'python3 -m falcon_kit.mains.copy_mapped --special-split={input} %(wi_pattern_kvs)s'
- """%(locals())
- self.write(rule)
- input_wildcards = set() # Not sure yet whether input must match output wildcards.
- for wi_fn in itervalues(wildcard_inputs):
- found = find_wildcards(wi_fn)
- input_wildcards.update(found)
- wildcards = list(sorted(input_wildcards))
- params_plus_wildcards = {k: '{%s}'%k for k in wildcards}
- params_plus_wildcards.update(parameters)
- # The parallel script uses all inputs, not just wildcards.
- all_inputs = dict(wildcard_inputs)
- all_inputs.update(nonwildcard_inputs)
- self.write_script_rule(all_inputs, wildcard_outputs, params_plus_wildcards, shell_template, rule_name=None)
- wo_str_lists_list = ['%s=[str(i) for i in input.%s]' %(name, name) for name in list(wildcard_outputs.keys())]
- wo_pattern_kv_list = ['%s="%s"' %(name, os.path.normpath(patt)) for (name, patt) in viewitems(wildcard_outputs)]
- wo_str_lists_kvs = ',\n '.join(wo_str_lists_list)
- wo_pattern_kvs = ',\n '.join(wo_pattern_kv_list)
- wildcards = list()
- for wi_fn in itervalues(wildcard_outputs):
- found = find_wildcards(wi_fn)
- if wildcards:
- assert wildcards == found, 'snakemake requires all outputs (and inputs?) to have the same wildcards'
- else:
- wildcards = found
- wildcards_comma_sep = ', '.join('"%s"' %k for k in wildcards)
- rule = '''
- rule dynamic_%(rule_name)s_merge:
- input: %(dynamic_input_kvs)s
- output: %(output_json)r
- run:
- snake_merge_multi_dynamic(output[0],
- dict(
- %(wo_str_lists_kvs)s
- ),
- dict(
- %(wo_pattern_kvs)s
- ),
- [%(wildcards_comma_sep)s] # all wildcards
- )
- '''%(locals())
- self.write(rule)
- def write_script_rule(self, inputs, outputs, parameters, shell_template, rule_name):
- assert '_bash_' not in parameters
- first_output_name, first_output_fn = list(outputs.items())[0] # for rundir, since we cannot sub wildcards in shell
- if not rule_name:
- rule_name = os.path.dirname(first_output_fn)
- rule_name = self.unique_rule_name(self.legalize(rule_name))
- wildcard_rundir = os.path.normpath(os.path.dirname(first_output_fn)) # unsubstituted
- # We use snake_string_path b/c normpath drops leading ./, but we do NOT want abspath.
- input_kvs = ', '.join('%s=%s'%(k, snake_string_path(v)) for k,v in
- sorted(viewitems(inputs)))
- output_kvs = ', '.join('%s=%s'%(k, snake_string_path(v)) for k,v in
- sorted(viewitems(outputs)))
- rule_parameters = {k: v for (k, v) in viewitems(parameters) if not k.startswith('_')}
- #rule_parameters['reltopdir'] = os.path.relpath('.', wildcard_rundir) # in case we need this later
- params = ','.join('\n %s="%s"'%(k,v) for (k, v) in viewitems(rule_parameters))
- shell = snake_shell(shell_template, wildcard_rundir)
- # cd $(dirname '{output.%(first_output_name)s}')
- rule = """
- rule static_%(rule_name)s:
- input: %(input_kvs)s
- output: %(output_kvs)s
- params:%(params)s
- shell:
- '''
- outdir=$(dirname {output[0]})
- #mkdir -p ${{outdir}}
- cd ${{outdir}}
- date
- %(shell)s
- date
- '''
- """%(locals())
- self.write(rule)
- def __call__(self, inputs, outputs, parameters, shell_template, rule_name=None):
- self.write_script_rule(inputs, outputs, parameters, shell_template, rule_name)
- def __init__(self, writer):
- self.write = writer.write
- self.rule_names = set() # to ensure uniqueness
- self.re_bad_char = re.compile(r'\W')
- self.write("""
- # THIS IS CURRENTLY BROKEN.
- import json
- import os
- #import snakemake.utils
- def snake_merge_dynamic_dict(reldir, input_fns, pattern, wildcards):
- '''Assume each wildcard appears at most once in the pattern.
- '''
- for k in wildcards:
- pattern = pattern.replace('{%s}' %k, '(?P<%s>\w+)' %k)
- re_dynamic = re.compile(pattern)
- mapped = list()
- for fn in input_fns:
- mo = re_dynamic.search(fn)
- assert mo, '{!r} did not match {!r}'.format(fn, re_dynamic.pattern)
- file_description = dict()
- file_description['wildcards'] = dict(mo.groupdict())
- file_description['fn'] = os.path.relpath(fn, reldir)
- mapped.append(file_description)
- return mapped
- def snake_merge_multi_dynamic(output_fn, dict_of_input_fns, dict_of_patterns, wildcards):
- outdir = os.path.normpath(os.path.dirname(output_fn))
- if not os.path.isdir(outdir):
- os.makedirs(outdir)
- assert list(sorted(dict_of_input_fns.keys())) == list(sorted(dict_of_patterns.keys()))
- all_mapped = dict()
- for i in dict_of_patterns.keys():
- input_fns = dict_of_input_fns[i]
- pattern = dict_of_patterns[i]
- mapped = snake_merge_dynamic_dict(outdir, input_fns, pattern, wildcards)
- all_mapped[i] = mapped
- all_grouped = dict()
- for i, mapped in all_mapped.items():
- #print(i, mapped)
- for file_description in mapped:
- #print(file_description)
- #print(file_description['wildcards'])
- #print(list(sorted(file_description['wildcards'].items())))
- wildkey = ','.join('{}={}'.format(k,v) for k,v in sorted(file_description['wildcards'].items()))
- if wildkey not in all_grouped:
- new_group = dict(
- wildcards=dict(file_description['wildcards']),
- fns=dict(),
- )
- all_grouped[wildkey] = new_group
- group = all_grouped[wildkey]
- wildcards = file_description['wildcards']
- assert wildcards == group['wildcards'], '{!r} should match {!r} by snakemake convention'.format(
- wildcards, group['wildcards'])
- fn = file_description['fn']
- group['fns'][i] = fn
- ser = json.dumps(all_grouped, indent=2, separators=(',', ': ')) + '\\n'
- with open(output_fn, 'w') as out:
- out.write(ser)
- """)
- prefix = """
- shell.prefix('''
- # Add -e vs. in falcon_unzip.
- set -vex
- hostname
- pwd
- ''')
- """
- self.write(prefix)
- class SnakemakeDynamic(object):
- """Not currently used."""
- def __init__(self, path):
- self.path = path
- def snake_string_path(p):
- """normpath drops leading ./
- """
- if isinstance(p, SnakemakeDynamic):
- return "dynamic('{}')".format(
- os.path.normpath(p.path))
- else:
- return "'{}'".format(
- os.path.normpath(p))
- def snake_shell(template, rundir):
- reltopdir = os.path.relpath('.', rundir)
- def makerel(mo):
- return os.path.join(reltopdir, mo.group(0))
- re_inout = re.compile(r'{(?:input|output)')
- return re_inout.sub(makerel, template, count =0)
|