123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580 |
- """Purely functional code.
- """
- from future.utils import viewitems
- from .io import NativeIO as StringIO
- import collections
- import logging
- import re
- LOG = logging.getLogger(__name__)
- def _verify_pairs(pairs1, pairs2):
- if pairs1 != pairs2: # pragma: no cover
- print('pair2dali:', pairs1)
- print('pair2sort:', pairs2)
- print('dali-sort:', set(pairs1) - set(pairs2))
- print('sort-dali:', set(pairs2) - set(pairs1))
- print('pair2dali:', len(pairs1))
- print('pair2sort:', len(pairs2))
- assert pairs1 == pairs2
- def skip_LAcheck(bash):
- def lines():
- for line in StringIO(bash):
- if 'LAcheck' in line:
- yield 'set +e\n'
- yield line
- yield 'set -e\n'
- else:
- yield line
- return ''.join(lines())
- def get_daligner_job_descriptions_sans_LAcheck(run_jobs_stream, db_prefix, single=False):
- """Strip LAcheck (somehow) from each bash script.
- (For now, we will run it but not fail on error.)
- """
- descs = get_daligner_job_descriptions(run_jobs_stream, db_prefix, single)
- result = {}
- for (k, v) in viewitems(descs):
- bash = skip_LAcheck(v)
- bash = bash.replace(
- 'LAsort', 'python3 -m falcon_kit.mains.LAsort {}'.format(db_prefix))
- bash = bash.replace(
- 'LAmerge', 'python3 -m falcon_kit.mains.LAmerge {}'.format(db_prefix))
- result[k] = bash
- return result
- def get_daligner_job_descriptions(run_jobs_stream, db_prefix, single=False):
- """Return a dict of job-desc-tuple -> HPCdaligner bash-job.
- Comments are ignored.
- E.g., each item will look like:
- ('.2', '.1', '.2', '.3'): 'daligner
- Rationale
- ---------
- For i/o efficiency, we want to daligner calls with LAsort/LAmerge lines. But
- Gene has done this himself too. So now, we only want the daligner calls here.
- Later, we will do the extra LAmerge lines, grouped by A-read.
- """
- re_block_dali = re.compile(r'%s(\.\d+|)' % db_prefix)
- def blocks_dali(line):
- """Return ['.1', '.2', ...]
- Can return [''] if only 1 block.
- """
- return [mo.group(1) for mo in re_block_dali.finditer(line)]
- # X == blocks[0]; A/B/C = blocks[...]
- lines = [line.strip() for line in run_jobs_stream]
- # in case caller passed filename, not stream
- assert any(len(l) > 1 for l in lines), repr('\n'.join(lines))
- lines_dali = [l for l in lines if l.startswith(
- 'daligner')] # could be daligner_p
- result = {}
- for dali in lines_dali:
- id = tuple(blocks_dali(dali))
- early_checks = [
- "LAcheck -v {db_prefix} *.las".format(db_prefix=db_prefix)]
- script = '\n'.join([dali] + early_checks) + '\n'
- result[id] = script
- return result
- re_first_block_las = re.compile(r'^(?:\S+)(?:\s+-\S+)*\s+[^\.]+\.(\d+|)')
- def first_block_las(line):
- """
- >>> first_block_las('LAsort -v -a foo.1.foo.1.C0')
- 1
- """
- mo = re_first_block_las.search(line)
- try:
- return int(mo.group(1))
- except Exception as e:
- raise Exception('Pattern {!r} does not match line {!r}: {}'.format(
- re_first_block_las.pattern, line, e))
- def get_las_filenames(mjob_data, db_prefix):
- """Given result of get_mjob_data(),
- return {int: final_las_filename}
- """
- # This is our best guess.
- # (We might not even need this, since we know the output filename of each merge-task by convention.)
- # Eventually, we need to re-write HPC.daligner.
- result = {}
- re_LAmerge = re.compile(r'^LAmerge\s+(?:\-\S+\s+)(\S+)')
- re_LAcheck = re.compile(r'^LAcheck\s+(?:\-\S+\s+)\S+\s+(\S+)')
- for (p_id, bash_lines) in viewitems(mjob_data):
- if not bash_lines:
- # The daligner+LAsort+LAmerge job produced the final .las
- # for this block. We will symlink it later.
- las_fn = '{}.{}.las'.format(db_prefix, p_id)
- result[p_id] = las_fn
- continue
- # Find the last line which can tell us the final .las name.
- i = len(bash_lines) - 1
- while bash_lines[i].split()[0] not in ('LAmerge', 'LAcheck'):
- i -= 1
- # Now we will raise an exception if there were none. But in theory, there
- # must be at least an LAsort.
- first_word = bash_lines[i].split()[0]
- if first_word == 'LAmerge':
- regex = re_LAmerge
- elif first_word == 'LAcheck':
- regex = re_LAcheck
- else:
- raise Exception('first_word={!r} in line#{} of {!r}'.format(
- first_word, i, bash_lines))
- mo = regex.search(bash_lines[i])
- if not mo:
- raise Exception('Regex {!r} failed on {!r}'.format(
- regex.pattern, bash_lines[i]))
- las_fn = mo.group(1) + '.las'
- result[p_id] = las_fn
- return result
- def get_mjob_data(run_jobs_stream):
- """Given output of HPC.daligner,
- return {int: [bash-lines]}
- """
- f = run_jobs_stream
- # Strip either '&& rm ...' or '; rm ...' ?
- #re_strip_rm = re.compile(r'^(.*) ((\&\&)|;) .*$')
- mjob_data = {}
- for l in f:
- l = l.strip()
- first_word = l.split()[0]
- if first_word not in ("LAsort", "LAmerge", "rm"):
- continue
- if first_word in ["LAsort"]:
- # We now run this part w/ daligner, but we still need
- # a small script for some book-keeping.
- p_id = first_block_las(l)
- mjob_data.setdefault(p_id, [])
- # mjob_data[p_id].append( " ".join(l) ) # Already done w/ daligner!
- raise Exception('We do not expect to see LAsort at all anymore.')
- elif first_word in ["LAmerge"]:
- p_id = first_block_las(l)
- mjob_data.setdefault(p_id, [])
- # l = re_strip_rm.sub(r'\1', l) # (LAmerge && rm) rm is very safe if we run in /tmp
- mjob_data[p_id].append(l)
- #LOG.info('{}: {}'.format(p_id, l))
- elif first_word in ["rm"]:
- p_id = first_block_las(l)
- mjob_data.setdefault(p_id, [])
- mjob_data[p_id].append(l)
- #LOG.info('rm{}: {}'.format(p_id, l))
- #for key, data in mjob_data.items():
- # mjob_data[key] = '\n'.join(data)
- return mjob_data
- def yield_args_from_line(bash_line):
- """Given a line of LAmerge, etc.,
- return [output_las_fn, input_las_fn0, input_las_fn1, ...]
- """
- for word in bash_line.split():
- if word.startswith('-') or word in ('LAcheck', 'LAmerge', 'LAsort'):
- continue
- yield word
- _re_sub_daligner = re.compile(r'^daligner\b', re.MULTILINE)
- def xform_script_for_preads(script):
- daligner_exe = 'daligner_p'
- # , flags=re.MULTILINE) # flags in py2.7
- return _re_sub_daligner.sub(daligner_exe, script)
- def xform_script_for_raw_reads(script):
- return script
- def get_script_xformer(pread_aln):
- if pread_aln:
- return xform_script_for_preads
- else:
- return xform_script_for_raw_reads
- class GenomeCoverageError(Exception):
- pass
- def calc_cutoff_from_reverse_sorted_readlength_counts(rl_counts, target):
- """Return first read_len which gives at least 'target' bases.
- """
- total = sum(pair[0] * pair[1] for pair in rl_counts)
- subtotal = 0
- if target > total:
- msg = 'Not enough reads available for desired genome coverage (bases needed={} > actual={})'.format(
- target, total)
- raise GenomeCoverageError(msg)
- cutoff = 0
- for (rl, count) in rl_counts:
- subtotal += rl * count
- if subtotal >= target:
- cutoff = rl
- break
- else: # pragma: no cover
- msg = 'Impossible target (probably a bug): target={target}, subtotal={subtotal}, total={total}'.format(
- locals())
- raise Exception(msg)
- return cutoff
- def num2int(num):
- """
- >>> num2int('1,000,000')
- 1000000
- """
- return int(num.replace(',', ''))
- def get_reverse_sorted_readlength_counts_from_DBstats(DBstats_output):
- """Return pairs of (readlength, count).
- Bin: Count % Reads % Bases Average
- 169,514: 1 0.0 0.0 169514
- ...
- ->
- [(169514, 1), ...]
- """
- rl_counts = list()
- lines = DBstats_output.splitlines()
- re_stat = re.compile(
- r'^\s*(?P<bin>\S+):\s+(?P<count>\S+)\s+\S+\s+\S+\s+\S+\s*$')
- for line in lines:
- match = re_stat.search(line)
- if not match:
- continue
- rl = num2int(match.group('bin'))
- count = num2int(match.group('count'))
- rl_counts.append((rl, count))
- return rl_counts
- def calc_cutoff(target, DBstats_output):
- """Calculate the length_cutoff needed for at least 'target' bases.
- DBstats_output: ASCII output of 'DBstats -b1 DB',
- """
- rl_counts = get_reverse_sorted_readlength_counts_from_DBstats(
- DBstats_output)
- return calc_cutoff_from_reverse_sorted_readlength_counts(rl_counts, target)
- def parse_2columns_of_ints(data):
- r"""Given 2 columns of integers,
- space- and line-delimited,
- yield tuples.
- >>> tuple(parse_2columns_of_ints("1 2\n3 4"))
- ((1, 2), (3, 4))
- """
- for line in data.splitlines():
- line = line.strip()
- if not line:
- continue
- yield tuple(int(x) for x in line.split())
- def weighted_average(cols):
- """Given tuples of (weight, value),
- return weighted average.
- >>> weighted_average(((100, 1), (200, 2), (100, 5)))
- 2.5
- """
- return sum(w * v for (w, v) in cols) / sum(w for (w, v) in cols)
- def parsed_readlengths_from_dbdump_output(output):
- """Given output text from the DBump command,
- yield all read-lengths.
- """
- re_length = re.compile(r'^L\s+\d+\s+(\d+)\s+(\d+)$')
- for line in output.splitlines():
- mo = re_length.search(line)
- if mo:
- beg, end = mo.group(1, 2)
- beg = int(beg)
- end = int(end)
- yield end - beg
- def mapped_readlengths_from_dbdump_output(output):
- """Given output text from the DBump command,
- return dict of (id => read-length).
- There will be alternate lines like these:
- R #
- L # # #
- https://dazzlerblog.wordpress.com/command-guides/dazz_db-command-guide/
- """
- lengths = dict()
- re_rid = re.compile(r'^R\s+(\d+)$')
- re_length = re.compile(r'^L\s+(\d+)\s+(\d+)\s+(\d+)$')
- for line in output.splitlines():
- mo = re_rid.search(line)
- if mo:
- idx = int(mo.group(1))
- continue
- mo = re_length.search(line)
- if mo:
- well, beg, end = mo.group(1, 2, 3)
- well = int(idx)
- beg = int(beg)
- end = int(end)
- lengths[idx] = (end - beg)
- continue
- return lengths
- def average_difference(dictA, dictB):
- """Return the average difference of
- values in dictA minus dictB, only
- using values in dictA.
- If a value is missing from dictB, raise Exception.
- """
- total_diff = 0.0
- for (k, va) in viewitems(dictA):
- vb = dictB[k]
- total_diff += (va - vb)
- return total_diff / len(dictA)
- def calc_metric_fragmentation(perl_counts_output):
- # """perl -e 'while (<>) { if ( m{>[^/]+/(\d+)\d/} ) { $id{$1}++; } }; while (my ($k, $v) = each %%id) { $counts{$v}++; }; while (my ($k, $v) = each %%counts) { print "$v $k\n"; };' %s""" %(fastas)
- cols = tuple(parse_2columns_of_ints(perl_counts_output))
- avg = weighted_average(cols)
- return avg
- def calc_metric_truncation(dbdump_output, length_pairs_output):
- # """perl -e 'while (<>) { if ( m{>[^/]+/0*(\d+)\d/(\d+)_(\d+)} ) { $lengths{$1} += ($3 - $2); } }; while (my ($k, $v) = each %%lengths) { print "$k $v\n"; };' %s""" %(fastas)
- cols = tuple(parse_2columns_of_ints(length_pairs_output))
- pread_lengths = dict((k, v) for (k, v) in cols)
- orig_lengths = mapped_readlengths_from_dbdump_output(dbdump_output)
- avg = -average_difference(pread_lengths, orig_lengths)
- return avg
- def choose_cat_fasta(fofn):
- """Given the contents of a fasta FOFN,
- return a command to write the contents of a fasta to stdout,
- keeping the original file.
- Raise Exception on error.
- >>> choose_cat_fasta('abc.gz')
- 'zcat '
- >>> choose_cat_fasta('abc.dexta')
- 'undexta -vkU -w60 -i < '
- >>> choose_cat_fasta('abc')
- 'cat '
- """
- first_line = fofn.splitlines()[0]
- if first_line.endswith('.gz'):
- return 'zcat '
- elif first_line.endswith('.dexta'):
- return 'undexta -vkU -w60 -i < '
- else:
- return 'cat '
- re_underscore_flag = re.compile(r'(--[\w-]+)(_)')
- def dash_flags(val):
- """
- >>> dash_flags('--foo_bar --one_two_three')
- '--foo-bar --one-two-three'
- >>> dash_flags('')
- ''
- """
- while True:
- # Repeat until settled, as there might be multiple _ in the same flag.
- new_val = re_underscore_flag.sub(r'\1-', val)
- if new_val == val:
- return new_val
- val = new_val
- def cfg_tobool(v):
- """
- >>> cfg_tobool('yes')
- True
- >>> cfg_tobool('true')
- True
- >>> cfg_tobool('T')
- True
- >>> cfg_tobool('1')
- True
- >>> cfg_tobool('no')
- False
- >>> cfg_tobool('false')
- False
- >>> cfg_tobool('F')
- False
- >>> cfg_tobool('0')
- False
- >>> cfg_tobool('')
- False
- """
- if v in (True, False, None):
- return v
- if not v:
- return False
- if v.upper()[0] in ('T', 'Y'):
- return True
- if v.upper()[0] in ('F', 'N'):
- return False
- return bool(int(v))
- # https://stackoverflow.com/questions/3387691/how-to-perfectly-override-a-dict
- # We derived from dict instead of from MutableMapping to json.dumps() works.
- _RaiseKeyError = object() # singleton for no-default behavior
- class LowerDict(dict): # dicts take a mapping or iterable as their optional first argument
- __slots__ = () # no __dict__ - that would be redundant
- def __init__(self):
- # No args allowed, to keep it simple.
- super(LowerDict, self).__init__(self)
- def __getitem__(self, k):
- return super(LowerDict, self).__getitem__(k.lower())
- def __setitem__(self, k, v):
- return super(LowerDict, self).__setitem__(k.lower(), v)
- def __delitem__(self, k):
- return super(LowerDict, self).__delitem__(k.lower())
- def get(self, k, default=None):
- return super(LowerDict, self).get(k.lower(), default)
- def setdefault(self, k, default=None):
- return super(LowerDict, self).setdefault(k.lower(), default)
- def pop(self, k, v=_RaiseKeyError):
- if v is _RaiseKeyError:
- return super(LowerDict, self).pop(k.lower())
- return super(LowerDict, self).pop(k.lower(), v)
- #def update(self, mapping=(), **kwargs):
- # super(LowerDict, self).update(self._process_args(mapping, **kwargs))
- def __contains__(self, k):
- return super(LowerDict, self).__contains__(k.lower())
- #def copy(self): # don't delegate w/ super - dict.copy() -> dict :(
- # return type(self)(self)
- @classmethod
- def fromkeys(cls, keys, v=None):
- return super(LowerDict, cls).fromkeys((k.lower() for k in keys), v)
- def __repr__(self):
- return '{0}({1})'.format(type(self).__name__, super(LowerDict, self).__repr__())
- __loop_set = set()
- def toLowerDict(cfg):
- """Change key-names to be lower-case, at all levels of dict cfg.
- Then, return the case-insensitive LowerDict, substituted recursively.
- """
- if isinstance(cfg, LowerDict):
- return cfg
- if id(cfg) in __loop_set:
- # Prevent infinite loop.
- raise Exception('Already ran update_lowercase({}) (len(set)=={}):\n {}'.format(
- id(cfg), len(__loop_set), cfg))
- __loop_set.add(id(cfg))
- low = LowerDict()
- for k,v in list(cfg.items()):
- if isinstance(v, dict):
- v = toLowerDict(v) # RECURSION
- if k in low:
- msg = 'Collision for "{}" in dict:\n{}'.format(k, cfg)
- if v != low[k]:
- raise Exception(msg)
- low[k] = v
- return low
- def parse_REPmask_code(code):
- """
- Return list of 3 (group_size, coverage_limit) pairs.
- group_size==0 indicates "no-op".
- Otherwise, super-high coverage_limit indicates "do work, but produce empty mask-track".
- >>> parse_REPmask_code('1,10/2,20/3,300')
- [(1, 10), (2, 20), (3, 300)]
- """
- ec = 0 # arbitrary
- result = [(0, ec), (0, ec), (0, ec)] # all no-op by default
- try:
- if '/' in code:
- pairs = code.split('/')
- else:
- assert ';' in code, 'code contains neither ";" nor "/": {!r}'.format(code)
- pairs = code.split(';')
- assert len(pairs) <= 3
- for i, p in enumerate(pairs):
- g, c = list(map(int, p.split(',')))
- result[i] = (g, c)
- except Exception as exc:
- LOG.exception('Failed to parse REPmask_code {!r}. Using extreme, to produce empty rep tracks.'.format(code))
- # Validate
- paira = result[0]
- pairb = result[1]
- pairc = result[2]
- if (paira[0] != 0 and pairb[0] != 0 and pairc[0] != 0):
- # Check only if all groups are non-zero. Otherwise, the user must know what they're doing.
- if (paira[0] == pairb[0] or
- pairb[0] == pairc[0]):
- raise Exception('Non-zero group sizes must not match in parsed REPmask_code: {!r} from {!r}'.format(result, code))
- if (paira[0] > pairb[0] or pairb[0] > pairc[0]):
- raise Exception('Non-zero group sizes must increase monotonically in parsed REPmask_code: {!r} from {!r}'.format(result, code))
- return result
- def dazzler_get_nblocks(db_stream):
- """Return #blocks in dazzler-db.
- """
- nblock = 1
- new_db = True
- for l in db_stream:
- l = l.strip().split()
- if l[0] == "blocks" and l[1] == "=":
- nblock = int(l[2])
- new_db = False
- break
- return nblock
- re_R = re.compile(r'^\+ R (\d+)')
- def dazzler_num_reads(dump):
- """Given DBdump, return number of reads. (Proper DBdump call is assumed.)
- >>> dazzler_num_reads('+ R 27')
- 27
- >>> dazzler_num_reads('')
- -1
- """
- mo = re_R.search(dump)
- if mo:
- return int(mo.group(1))
- else:
- return -1
|