123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586 |
- from future.utils import viewitems
- from . import bash, functional
- from .functional import cfg_tobool
- from .io import NativeIO
- from .util.system import (make_fofn_abs, make_dirs, cd)
- import json
- import logging
- import logging.config
- import os
- import re
- import io
- import sys
- import tempfile
- import time
- import uuid
- logger = logging.getLogger(__name__)
- from configparser import ConfigParser
- def _prepend_env_paths(content, names):
- """
- E.g.
- names = ['PATH', 'PYTYHONPATH']
- content =
- echo hi
- =>
- export PATH=current:path:${PATH}
- export PYTHON=current:path:${PYTHONPATH}
- echo hi
- """
- export_env_vars = ['export %(k)s=%(v)s:${%(k)s}' % dict(
- k=name, v=os.environ.get(name, '')) for name in names]
- return '\n'.join(export_env_vars + [content])
- def update_env_in_script(fn, names):
- """Modify fn using on prepend_env_paths().
- """
- with open(fn) as ifs:
- content = ifs.read()
- content = _prepend_env_paths(content, names)
- with open(fn, 'w') as ofs:
- ofs.write(content)
- def use_tmpdir_for_files(basenames, src_dir, link_dir):
- """NOT USED. Kept only for reference. This will be done in pypeFLOW.
- Generate script to copy db files to tmpdir (for speed).
- - Choose tmp_dir, based on src_dir name.
- - rsync basenames into tmp_dir # after 'flock', per file
- - symlink from link_dir into tmp_dir.
- Return list of script lines, sans linefeed.
- """
- script = list()
- unique = os.path.abspath(src_dir).replace('/', '_')
- root = tempfile.gettempdir()
- tmp_dir = os.path.join(root, 'falcon', unique)
- script.append('mkdir -p %s' % tmp_dir)
- for basename in basenames:
- src = os.path.join(src_dir, basename)
- dst = os.path.join(tmp_dir, basename)
- rm_cmd = 'rm -f %s' % basename
- # Wait on lock for up to 10 minutes, in case of very large files.
- rsync_cmd = "flock -w 600 %s.lock -c 'rsync -av %s %s'" % (
- dst, src, dst)
- ln_cmd = 'ln -sf %s %s' % (dst, basename)
- script.extend([rm_cmd, rsync_cmd, ln_cmd])
- return script
- def make_job_data(url, script_fn):
- """Choose defaults.
- Run in same directory as script_fn.
- Base job_name on script_fn.
- """
- wd = os.path.dirname(script_fn)
- job_name = '{0}-{1}-{2}'.format(
- os.path.basename(script_fn),
- url.split("/")[-1],
- str(uuid.uuid4())[:8],
- )
- job_data = {"job_name": job_name,
- "cwd": wd,
- "script_fn": script_fn}
- return job_data
- def check_HPCdaligner_option(option):
- msg = ''
- if '-dal' in option:
- msg += 'HPC.daligner option "-dal" has changed to "-B".\n'
- if '-deg' in option:
- msg += 'HPC.daligner option "-deg" has changed to "-D".\n'
- if msg:
- raise Exception(msg)
- def clean_falcon_options(fc):
- """Update some values in fc.
- Replace _ with - in a couple places.
- """
- keys = ('falcon_sense_option', 'overlap_filtering_setting', 'fc_ovlp_to_graph_option',
- )
- for key in keys:
- update_dash_flags(fc, key)
- for dk in ('pa_HPCdaligner_option', 'ovlp_HPCdaligner_option'):
- if dk in fc:
- check_HPCdaligner_option(fc[dk])
- def get_config(config):
- """
- This is only for the call from pbsmrtpipe:
- upport.get_config(support.parse_config(fn))
- We have changed parse_config() to return a dict.
- So this is a no-op.
- """
- cfg = dict(config) # already a dict now
- return cfg
- def dict2config(jdict, section):
- config = ConfigParser()
- if not config.has_section(section):
- config.add_section(section)
- for (k, v) in viewitems(jdict):
- config.set(section, k, str(v))
- return config
- def parse_config(config_fn):
- """Deprecated.
- Called from pbsmrtpipe, for now.
- """
- return parse_cfg_file(config_fn)
- def parse_cfg_file(config_fn):
- """Return as dict.
- """
- with open(config_fn) as stream:
- ext = os.path.splitext(config_fn)[1]
- if ext in ('.json', '.js'):
- config = json.loads(stream.read())
- else:
- # Parse sections (and case-sensitively), into sub-dicts.
- config = parse_cfg_with_sections(stream)
- update_defaults(config['General'])
- # Copy General section to top, for now.
- #for key, val in config['General'].items():
- # config[key] = val
- ##cfg.update(config.get('General', {}))
- check_config_sections(config) # Ensure that the right sections exist.
- update_job_sections(config)
- return config
- def process_job_defaults(job_defaults):
- key = 'use_tmpdir'
- use_tmpdir = job_defaults.get(key, '')
- if '/' in use_tmpdir:
- tempfile.tempdir = use_tmpdir
- os.environ['TMPDIR'] = use_tmpdir
- else:
- if use_tmpdir.lower().startswith('t'):
- use_tmpdir = tempfile.gettempdir()
- else:
- use_tmpdir = False
- job_defaults[key] = use_tmpdir
- def update_job_defaults_section(config):
- """For backwards compatibility with stuff from 'General' section.
- """
- General = config['General']
- job_defaults = config['job.defaults']
- if 'njobs' in General:
- logger.warning('"njobs" belongs in the [job.defaults] section.')
- if 'pwatcher_type' in General:
- logger.warning('Please specify "pwatcher_type" only in the [job.defaults] section, not in [General].')
- if 'job_type' in General:
- logger.warning('Please specify "job_type" only in the [job.defaults] section, not in [General].')
- if 'stop_all_jobs_on_failure' in General:
- logger.warning('Please specify "stop_all_jobs_on_failure" only in the [job.defaults] section, not in [General].')
- if 'use_tmpdir' in General:
- logger.warning('Please specify "use_tmpdir" only in the [job.defaults] section, not in [General].')
- if 'job_name_style' in General:
- logger.warning('Please specify "job_name_style" only in the [job.defaults] section, not in [General].')
- if 'job_queue' in General:
- logger.warning('Please specify "JOB_QUEUE" only in the [job.defaults] section, not as "job_queue" in [General].')
- if 'sge_option' in General:
- logger.warning('Please specify "JOB_OPTS" in the [job.defaults] section, not as "sge_option" in [General].')
- pwatcher_type = General.get('pwatcher_type', 'fs_based') #, config.get('pwatcher_type')))
- job_type = job_defaults.get('job_type', General.get('job_type', '')).lower()
- job_queue = General.get('job_queue', '')
- sge_option = General.get('sge_option', '')
- if 'pwatcher_type' not in job_defaults:
- job_defaults['pwatcher_type'] = pwatcher_type
- else:
- pwatcher_type = job_defaults['pwatcher_type']
- if 'submit' not in config['job.defaults']:
- if 'blocking' == pwatcher_type:
- if not job_queue or ' ' not in job_queue:
- raise Exception('pwatcher_type=blocking, but "submit" is not in [job.defaults] section.')
- config['job.defaults']['submit'] = job_queue
- logger.warning('Please set "submit" in [job.defaults] section. (For now, we will use "job_queue" from [General], which was a hack.)')
- elif 'fs_based' == pwatcher_type or 'network_based' == pwatcher_type:
- if not job_type:
- logger.error('job.defaults.submit is not set; pwatcher_type={}; but job_type is not set. Maybe try "job_type=local" first.'.format(pwatcher_type))
- job_type = 'local'
- job_defaults['job_type'] = job_type
- allowed_job_types = ['sge', 'pbs', 'torque', 'slurm', 'lsf', 'local']
- assert job_type in allowed_job_types, 'job_type={} not in {}'.format(
- job_type, allowed_job_types)
- if job_queue and 'JOB_QUEUE' not in config['job.defaults']:
- job_defaults['JOB_QUEUE'] = job_queue
- else:
- raise Exception('Unknown pwatcher_type={}'.format(pwatcher_type))
- #assert 'submit' in config['job.defaults'], repr(config)
- if sge_option and 'JOB_OPTS' not in config['job.defaults']:
- job_defaults['JOB_OPTS'] = sge_option
- if 'njobs' not in job_defaults:
- config['job.defaults']['njobs'] = int(General.get('default_concurrent_jobs', 8)) # GLOBAL DEFAULT CONCURRENCY
- msg = 'Please supply a default for "njobs" (aka concurrency) in section [job.defaults]. For now, we will use {}'.format(
- config['job.defaults']['njobs'])
- logger.warning(msg)
- def update_if_if(key):
- if key not in job_defaults:
- if key in General:
- job_defaults[key] = General[key]
- logger.warning('Found "{}" from [General] section; should be in [job.defaults] instead.'.format(key))
- update_if_if('job_name_style')
- update_if_if('stop_all_jobs_on_failure')
- update_if_if('use_tmpdir')
- legacy_names = [
- 'pwatcher_type', 'pwatcher_directory',
- 'job_type', 'job_queue', 'job_name_style',
- 'use_tmpdir',
- ]
- def update_if_missing(name, sub_dict):
- if General.get(name) and name not in sub_dict:
- sub_dict[name] = General[name]
- for name in legacy_names:
- update_if_missing(name, config['job.defaults'])
- process_job_defaults(job_defaults)
- def update_job_sections(config):
- """More for backwards compatibility with stuff from 'General' section.
- """
- update_job_defaults_section(config)
- General = config['General']
- # Update a few where the names change and the section is non-default.
- def update_step_job_opts(name):
- if General.get('sge_option_'+name) and 'JOB_OPTS' not in config['job.step.'+name]:
- config['job.step.'+name]['JOB_OPTS'] = General['sge_option_'+name]
- def update_step_njobs(name):
- if General.get(name+'_concurrent_jobs') and 'njobs' not in config['job.step.'+name]:
- config['job.step.'+name]['njobs'] = int(General[name+'_concurrent_jobs'])
- for name in ['bd', 'da', 'la', 'pda', 'pla', 'cns', 'fc', 'asm']:
- update_step_job_opts(name)
- update_step_njobs(name)
- # Prefer 'asm' to 'fc'.
- asm = dict(config['job.step.asm'])
- config['job.step.asm'] = config['job.step.fc']
- del config['job.step.fc']
- config['job.step.asm'].update(asm)
- def parse_cfg_with_sections(stream):
- """Return as dict of dict of ...
- """
- #Experimental:
- """
- ConfigParser sections become sub-sub sections when separated by dots.
- [foo.bar]
- baz = 42
- is equivalent to JSON
- {"foo": {"bar": {"baz": 42}}}
- """
- content = stream.read()
- result = dict()
- try:
- jdict = json.loads(NativeIO(content).read())
- return jdict
- except ValueError:
- pass #logger.exception('Could not parse stream as JSON.')
- try:
- config = ConfigParser(strict=False)
- config.optionxform = str
- config.read_file(NativeIO(content))
- sections = config.sections()
- for sec in sections:
- result[sec] = dict(config.items(sec))
- return result
- except:
- raise
- def check_config_sections(cfg):
- """And ensure these all exist.
- """
- allowed_sections = set(['General',
- 'job.step.dust',
- 'job.step.da', 'job.step.pda',
- 'job.step.la', 'job.step.pla',
- 'job.step.cns', 'job.step.fc',
- 'job.step.asm',
- 'job.defaults',
- ])
- all_sections = set(k for k,v in list(cfg.items()) if isinstance(v, dict))
- unexpected = all_sections - allowed_sections
- if unexpected:
- msg = 'You have {} unexpected cfg sections: {}'.format(
- len(unexpected), unexpected)
- raise Exception(msg)
- # Guarantee they all exist.
- for sec in allowed_sections:
- if sec not in cfg:
- cfg[sec] = dict()
- def update_dash_flags(cfg, key):
- if key not in cfg:
- return
- val = cfg[key]
- cfg[key] = new_val = functional.dash_flags(cfg[key])
- if val != new_val:
- msg = '''\
- Option contains flags with "_":
- "{key}={val}". Those should be "-", as in
- "{key}={new_val}". Auto-replaced.'''.format(**locals())
- logger.warning(msg)
- TEXT_FILE_BUSY = 'avoid_text_file_busy'
- def update_defaults(cfg):
- """cfg is probably the General sub-dict.
- """
- def set_default(key, val):
- if key not in cfg:
- cfg[key] = val
- set_default('input_type', 'raw')
- set_default('overlap_filtering_setting', '--max-diff 1000 --max-cov 1000 --min-cov 2')
- #set_default('pa_daligner_option', '-e.70 -s100 -t16') # TODO: -t is a dumb default
- #set_default('ovlp_daligner_option', '-e.96 -s1000 -h60 -t32') # TODO: -t is a dumb default
- set_default('pa_HPCdaligner_option', '-v -D24')
- set_default('ovlp_HPCdaligner_option', '-v -D24 -l500')
- set_default('pa_HPCTANmask_option', '-l500') # daligner defaults to -l1000
- #set_default('ovlp_HPCTANmask_option', '-l500')
- set_default('pa_REPmask_code', '0,300/0,300/0,300')
- set_default('pa_DBsplit_option', '-x500 -s200 -a')
- set_default('skip_checks', False)
- set_default('pa_DBdust_option', '') # Gene recommends the defaults. I have tried -w128 -t2.5 -m20
- set_default('pa_fasta_filter_option', 'streamed-internal-median')
- set_default('pa_subsample_coverage', 0)
- set_default('pa_subsample_strategy', 'random')
- set_default('pa_subsample_random_seed', 12345)
- set_default('dazcon', False)
- set_default('pa_dazcon_option', '-j 4 -x -l 500')
- set_default('ovlp_DBdust_option', '')
- set_default('ovlp_DBsplit_option', '-x500 -s200 -a')
- set_default('falcon_sense_option', '--output-multi --min-idt 0.70 --min-cov 2 --max-n-read 1800')
- set_default('falcon_sense_skip_contained', False)
- set_default('falcon_sense_greedy', False)
- set_default('LA4Falcon_preload', '')
- set_default('fc_ovlp_to_graph_option', '')
- set_default('genome_size', 0)
- set_default('seed_coverage', 20)
- set_default('length_cutoff', -1)
- set_default('length_cutoff_pr', 0)
- set_default('bestn', 12)
- set_default('target', 'assembly')
- set_default(TEXT_FILE_BUSY, bash.BUG_avoid_Text_file_busy)
- for bool_key in ('skip_checks', 'dazcon', 'falcon_sense_skip_contained', 'falcon_sense_greedy', 'LA4Falcon_preload', TEXT_FILE_BUSY):
- cfg[bool_key] = functional.cfg_tobool(cfg.get(bool_key, False))
- if 'dust' in cfg:
- logger.warning(
- "The 'dust' option is deprecated and ignored. We always run DBdust now. Use ovlp_/pa_DBdust_option to override DBdust default arguments.")
- bash.BUG_avoid_Text_file_busy = cfg[TEXT_FILE_BUSY]
- clean_falcon_options(cfg)
- falcon_sense_option = cfg['falcon_sense_option']
- if 'local_match_count' in falcon_sense_option or 'output_dformat' in falcon_sense_option:
- raise Exception('Please remove obsolete "--local_match_count_*" or "--output_dformat"' +
- ' from "falcon_sense_option" in your cfg: %s' % repr(falcon_sense_option))
- genome_size = int(cfg['genome_size'])
- length_cutoff = int(cfg['length_cutoff'])
- if length_cutoff < 0 and genome_size < 1:
- raise Exception(
- 'Must specify either length_cutoff>0 or genome_size>0')
- pa_subsample_strategy = cfg['pa_subsample_strategy']
- pa_subsample_random_seed = int(cfg['pa_subsample_random_seed'])
- pa_subsample_coverage = int(cfg['pa_subsample_coverage'])
- if pa_subsample_coverage > 0:
- if genome_size < 1:
- raise Exception(
- 'Must specify genome_size > 0 for subsampling.')
- # This one depends on length_cutoff_pr for its default.
- fc_ovlp_to_graph_option = cfg['fc_ovlp_to_graph_option']
- if '--min_len' not in fc_ovlp_to_graph_option and '--min-len' not in fc_ovlp_to_graph_option:
- length_cutoff_pr = cfg['length_cutoff_pr']
- fc_ovlp_to_graph_option += ' --min-len {}'.format(length_cutoff_pr)
- cfg['fc_ovlp_to_graph_option'] = fc_ovlp_to_graph_option
- target = cfg['target']
- if target not in ["overlapping", "pre-assembly", "assembly"]:
- msg = """ Target has to be "overlapping", "pre-assembly" or "assembly" in this verison. You have an unknown target {!r} in the configuration file. """.format(target)
- raise Exception(msg)
- possible_extra_keys = [
- 'sge_option', 'default_concurrent_jobs',
- 'pwatcher_type', 'pwatcher_directory',
- 'job_type', 'job_queue', 'job_name_style',
- 'use_tmpdir',
- ]
- for step in ['dust', 'da', 'la', 'pda', 'pla', 'fc', 'cns', 'asm']:
- sge_option_key = 'sge_option_' + step
- possible_extra_keys.append(sge_option_key)
- concurrent_jobs_key = step + '_concurrent_jobs'
- possible_extra_keys.append(concurrent_jobs_key)
- extra = list()
- for key in possible_extra_keys:
- if key in cfg:
- extra.append(key)
- if extra:
- extra.sort()
- msg = 'You have several old-style options. These should be provided in the `[job.defaults]` or `[job.step.*]` sections, and possibly renamed. See https://github.com/PacificBiosciences/FALCON/wiki/Configuration\n {}'.format(extra)
- logger.warning(msg)
- check_unexpected_keys(cfg)
- def check_unexpected_keys(cfg):
- # Warn on unused variables.
- expected = (TEXT_FILE_BUSY,
- 'input_fofn',
- 'input_type',
- 'genome_size',
- 'seed_coverage',
- 'length_cutoff',
- 'length_cutoff_pr',
- 'dazcon',
- 'pa_dazcon_option',
- 'pa_DBdust_option',
- 'pa_fasta_filter_option',
- 'pa_subsample_coverage',
- 'pa_subsample_strategy',
- 'pa_subsample_random_seed',
- 'pa_DBsplit_option',
- 'pa_HPCTANmask_option',
- 'pa_HPCREPmask_option',
- 'pa_REPmask_code',
- 'pa_daligner_option',
- 'pa_HPCdaligner_option',
- 'ovlp_DBdust_option',
- 'ovlp_DBsplit_option',
- #'ovlp_HPCTANmask_option',
- 'ovlp_daligner_option',
- 'ovlp_HPCdaligner_option',
- 'skip_checks',
- 'falcon_sense_option',
- 'falcon_sense_skip_contained',
- 'falcon_sense_greedy',
- 'LA4Falcon_preload',
- 'LA4Falcon_pre', # hidden
- 'LA4Falcon_post', # hidden
- 'LA4Falcon_dbdir', # hidden
- 'overlap_filtering_setting',
- 'fc_ovlp_to_graph_option',
- 'bestn',
- 'target',
- )
- unused = set(cfg.keys()) - set(expected)
- if unused:
- logger.warning("Unexpected keys in input config: {}".format(unused))
- default_logging_config = """
- [loggers]
- keys=root
- [handlers]
- keys=stream,file_all
- [formatters]
- keys=form01,form02
- [logger_root]
- level=NOTSET
- handlers=stream,file_all
- [handler_stream]
- class=StreamHandler
- level=INFO
- formatter=form02
- args=(sys.stderr,)
- [handler_file_all]
- class=FileHandler
- level=DEBUG
- formatter=form01
- args=('all.log', 'w')
- [formatter_form01]
- format=%(asctime)s - %(name)s:%(lineno)d - %(levelname)s - %(message)s
- [formatter_form02]
- format=[%(levelname)s]%(message)s
- """
- def _setup_logging(logging_config_fn):
- """See https://docs.python.org/2/library/logging.config.html
- """
- logging.Formatter.converter = time.gmtime # cannot be done in .ini
- if logging_config_fn:
- if logging_config_fn.endswith('.json'):
- logging.config.dictConfig(
- json.loads(open(logging_config_fn).read()))
- # print repr(logging.Logger.manager.loggerDict) # to debug
- return
- logger_fileobj = open(logging_config_fn)
- else:
- logger_fileobj = NativeIO(default_logging_config)
- defaults = {
- }
- logging.config.fileConfig(
- logger_fileobj, defaults=defaults, disable_existing_loggers=False)
- def setup_logger(logging_config_fn):
- global logger
- try:
- _setup_logging(logging_config_fn)
- logger = logging.getLogger("fc_run")
- logger.info('Setup logging from file "{}".'.format(logging_config_fn))
- except Exception:
- logging.basicConfig()
- logger = logging.getLogger()
- logger.exception(
- 'Failed to setup logging from file "{}". Using basicConfig().'.format(logging_config_fn))
- try:
- import logging_tree
- logger.info(logging_tree.format.build_description())
- except ImportError:
- pass
- return logger
- def get_length_cutoff(length_cutoff, fn):
- if length_cutoff < 0:
- length_cutoff = int(open(fn).read().strip())
- logger.info('length_cutoff=%d from %r' % (length_cutoff, fn))
- return length_cutoff # possibly updated
- def logger_record(func):
- def wrapper(*args, **kwargs):
- logger.info("====>Begin executing function: {}".format(func.__name__))
- if args:
- logger.info("====>args={}".format(args))
- if kwargs:
- logger.info("====>kwargs={}".format(kwargs))
- # function execution
- t_start = time.time()
- func(*args, **kwargs)
- t_end = time.time()
- logger.info("====>End executing function: {}, time cost: {} second."
- .format(func.__name__, (t_end-t_start)))
- return wrapper
|