functional.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. """Purely functional code.
  2. """
  3. from future.utils import viewitems
  4. from .io import NativeIO as StringIO
  5. import collections
  6. import logging
  7. import re
  8. LOG = logging.getLogger(__name__)
  9. def _verify_pairs(pairs1, pairs2):
  10. if pairs1 != pairs2: # pragma: no cover
  11. print('pair2dali:', pairs1)
  12. print('pair2sort:', pairs2)
  13. print('dali-sort:', set(pairs1) - set(pairs2))
  14. print('sort-dali:', set(pairs2) - set(pairs1))
  15. print('pair2dali:', len(pairs1))
  16. print('pair2sort:', len(pairs2))
  17. assert pairs1 == pairs2
  18. def skip_LAcheck(bash):
  19. def lines():
  20. for line in StringIO(bash):
  21. if 'LAcheck' in line:
  22. yield 'set +e\n'
  23. yield line
  24. yield 'set -e\n'
  25. else:
  26. yield line
  27. return ''.join(lines())
  28. def get_daligner_job_descriptions_sans_LAcheck(run_jobs_stream, db_prefix, single=False):
  29. """Strip LAcheck (somehow) from each bash script.
  30. (For now, we will run it but not fail on error.)
  31. """
  32. descs = get_daligner_job_descriptions(run_jobs_stream, db_prefix, single)
  33. result = {}
  34. for (k, v) in viewitems(descs):
  35. bash = skip_LAcheck(v)
  36. bash = bash.replace(
  37. 'LAsort', 'python3 -m falcon_kit.mains.LAsort {}'.format(db_prefix))
  38. bash = bash.replace(
  39. 'LAmerge', 'python3 -m falcon_kit.mains.LAmerge {}'.format(db_prefix))
  40. result[k] = bash
  41. return result
  42. def get_daligner_job_descriptions(run_jobs_stream, db_prefix, single=False):
  43. """Return a dict of job-desc-tuple -> HPCdaligner bash-job.
  44. Comments are ignored.
  45. E.g., each item will look like:
  46. ('.2', '.1', '.2', '.3'): 'daligner
  47. Rationale
  48. ---------
  49. For i/o efficiency, we want to daligner calls with LAsort/LAmerge lines. But
  50. Gene has done this himself too. So now, we only want the daligner calls here.
  51. Later, we will do the extra LAmerge lines, grouped by A-read.
  52. """
  53. re_block_dali = re.compile(r'%s(\.\d+|)' % db_prefix)
  54. def blocks_dali(line):
  55. """Return ['.1', '.2', ...]
  56. Can return [''] if only 1 block.
  57. """
  58. return [mo.group(1) for mo in re_block_dali.finditer(line)]
  59. # X == blocks[0]; A/B/C = blocks[...]
  60. lines = [line.strip() for line in run_jobs_stream]
  61. # in case caller passed filename, not stream
  62. assert any(len(l) > 1 for l in lines), repr('\n'.join(lines))
  63. lines_dali = [l for l in lines if l.startswith(
  64. 'daligner')] # could be daligner_p
  65. result = {}
  66. for dali in lines_dali:
  67. id = tuple(blocks_dali(dali))
  68. early_checks = [
  69. "LAcheck -v {db_prefix} *.las".format(db_prefix=db_prefix)]
  70. script = '\n'.join([dali] + early_checks) + '\n'
  71. result[id] = script
  72. return result
  73. re_first_block_las = re.compile(r'^(?:\S+)(?:\s+-\S+)*\s+[^\.]+\.(\d+|)')
  74. def first_block_las(line):
  75. """
  76. >>> first_block_las('LAsort -v -a foo.1.foo.1.C0')
  77. 1
  78. """
  79. mo = re_first_block_las.search(line)
  80. try:
  81. return int(mo.group(1))
  82. except Exception as e:
  83. raise Exception('Pattern {!r} does not match line {!r}: {}'.format(
  84. re_first_block_las.pattern, line, e))
  85. def get_las_filenames(mjob_data, db_prefix):
  86. """Given result of get_mjob_data(),
  87. return {int: final_las_filename}
  88. """
  89. # This is our best guess.
  90. # (We might not even need this, since we know the output filename of each merge-task by convention.)
  91. # Eventually, we need to re-write HPC.daligner.
  92. result = {}
  93. re_LAmerge = re.compile(r'^LAmerge\s+(?:\-\S+\s+)(\S+)')
  94. re_LAcheck = re.compile(r'^LAcheck\s+(?:\-\S+\s+)\S+\s+(\S+)')
  95. for (p_id, bash_lines) in viewitems(mjob_data):
  96. if not bash_lines:
  97. # The daligner+LAsort+LAmerge job produced the final .las
  98. # for this block. We will symlink it later.
  99. las_fn = '{}.{}.las'.format(db_prefix, p_id)
  100. result[p_id] = las_fn
  101. continue
  102. # Find the last line which can tell us the final .las name.
  103. i = len(bash_lines) - 1
  104. while bash_lines[i].split()[0] not in ('LAmerge', 'LAcheck'):
  105. i -= 1
  106. # Now we will raise an exception if there were none. But in theory, there
  107. # must be at least an LAsort.
  108. first_word = bash_lines[i].split()[0]
  109. if first_word == 'LAmerge':
  110. regex = re_LAmerge
  111. elif first_word == 'LAcheck':
  112. regex = re_LAcheck
  113. else:
  114. raise Exception('first_word={!r} in line#{} of {!r}'.format(
  115. first_word, i, bash_lines))
  116. mo = regex.search(bash_lines[i])
  117. if not mo:
  118. raise Exception('Regex {!r} failed on {!r}'.format(
  119. regex.pattern, bash_lines[i]))
  120. las_fn = mo.group(1) + '.las'
  121. result[p_id] = las_fn
  122. return result
  123. def get_mjob_data(run_jobs_stream):
  124. """Given output of HPC.daligner,
  125. return {int: [bash-lines]}
  126. """
  127. f = run_jobs_stream
  128. # Strip either '&& rm ...' or '; rm ...' ?
  129. #re_strip_rm = re.compile(r'^(.*) ((\&\&)|;) .*$')
  130. mjob_data = {}
  131. for l in f:
  132. l = l.strip()
  133. first_word = l.split()[0]
  134. if first_word not in ("LAsort", "LAmerge", "rm"):
  135. continue
  136. if first_word in ["LAsort"]:
  137. # We now run this part w/ daligner, but we still need
  138. # a small script for some book-keeping.
  139. p_id = first_block_las(l)
  140. mjob_data.setdefault(p_id, [])
  141. # mjob_data[p_id].append( " ".join(l) ) # Already done w/ daligner!
  142. raise Exception('We do not expect to see LAsort at all anymore.')
  143. elif first_word in ["LAmerge"]:
  144. p_id = first_block_las(l)
  145. mjob_data.setdefault(p_id, [])
  146. # l = re_strip_rm.sub(r'\1', l) # (LAmerge && rm) rm is very safe if we run in /tmp
  147. mjob_data[p_id].append(l)
  148. #LOG.info('{}: {}'.format(p_id, l))
  149. elif first_word in ["rm"]:
  150. p_id = first_block_las(l)
  151. mjob_data.setdefault(p_id, [])
  152. mjob_data[p_id].append(l)
  153. #LOG.info('rm{}: {}'.format(p_id, l))
  154. #for key, data in mjob_data.items():
  155. # mjob_data[key] = '\n'.join(data)
  156. return mjob_data
  157. def yield_args_from_line(bash_line):
  158. """Given a line of LAmerge, etc.,
  159. return [output_las_fn, input_las_fn0, input_las_fn1, ...]
  160. """
  161. for word in bash_line.split():
  162. if word.startswith('-') or word in ('LAcheck', 'LAmerge', 'LAsort'):
  163. continue
  164. yield word
  165. _re_sub_daligner = re.compile(r'^daligner\b', re.MULTILINE)
  166. def xform_script_for_preads(script):
  167. daligner_exe = 'daligner_p'
  168. # , flags=re.MULTILINE) # flags in py2.7
  169. return _re_sub_daligner.sub(daligner_exe, script)
  170. def xform_script_for_raw_reads(script):
  171. return script
  172. def get_script_xformer(pread_aln):
  173. if pread_aln:
  174. return xform_script_for_preads
  175. else:
  176. return xform_script_for_raw_reads
  177. class GenomeCoverageError(Exception):
  178. pass
  179. def calc_cutoff_from_reverse_sorted_readlength_counts(rl_counts, target):
  180. """Return first read_len which gives at least 'target' bases.
  181. """
  182. total = sum(pair[0] * pair[1] for pair in rl_counts)
  183. subtotal = 0
  184. if target > total:
  185. msg = 'Not enough reads available for desired genome coverage (bases needed={} > actual={})'.format(
  186. target, total)
  187. raise GenomeCoverageError(msg)
  188. cutoff = 0
  189. for (rl, count) in rl_counts:
  190. subtotal += rl * count
  191. if subtotal >= target:
  192. cutoff = rl
  193. break
  194. else: # pragma: no cover
  195. msg = 'Impossible target (probably a bug): target={target}, subtotal={subtotal}, total={total}'.format(
  196. locals())
  197. raise Exception(msg)
  198. return cutoff
  199. def num2int(num):
  200. """
  201. >>> num2int('1,000,000')
  202. 1000000
  203. """
  204. return int(num.replace(',', ''))
  205. def get_reverse_sorted_readlength_counts_from_DBstats(DBstats_output):
  206. """Return pairs of (readlength, count).
  207. Bin: Count % Reads % Bases Average
  208. 169,514: 1 0.0 0.0 169514
  209. ...
  210. ->
  211. [(169514, 1), ...]
  212. """
  213. rl_counts = list()
  214. lines = DBstats_output.splitlines()
  215. re_stat = re.compile(
  216. r'^\s*(?P<bin>\S+):\s+(?P<count>\S+)\s+\S+\s+\S+\s+\S+\s*$')
  217. for line in lines:
  218. match = re_stat.search(line)
  219. if not match:
  220. continue
  221. rl = num2int(match.group('bin'))
  222. count = num2int(match.group('count'))
  223. rl_counts.append((rl, count))
  224. return rl_counts
  225. def calc_cutoff(target, DBstats_output):
  226. """Calculate the length_cutoff needed for at least 'target' bases.
  227. DBstats_output: ASCII output of 'DBstats -b1 DB',
  228. """
  229. rl_counts = get_reverse_sorted_readlength_counts_from_DBstats(
  230. DBstats_output)
  231. return calc_cutoff_from_reverse_sorted_readlength_counts(rl_counts, target)
  232. def parse_2columns_of_ints(data):
  233. r"""Given 2 columns of integers,
  234. space- and line-delimited,
  235. yield tuples.
  236. >>> tuple(parse_2columns_of_ints("1 2\n3 4"))
  237. ((1, 2), (3, 4))
  238. """
  239. for line in data.splitlines():
  240. line = line.strip()
  241. if not line:
  242. continue
  243. yield tuple(int(x) for x in line.split())
  244. def weighted_average(cols):
  245. """Given tuples of (weight, value),
  246. return weighted average.
  247. >>> weighted_average(((100, 1), (200, 2), (100, 5)))
  248. 2.5
  249. """
  250. return sum(w * v for (w, v) in cols) / sum(w for (w, v) in cols)
  251. def parsed_readlengths_from_dbdump_output(output):
  252. """Given output text from the DBump command,
  253. yield all read-lengths.
  254. """
  255. re_length = re.compile(r'^L\s+\d+\s+(\d+)\s+(\d+)$')
  256. for line in output.splitlines():
  257. mo = re_length.search(line)
  258. if mo:
  259. beg, end = mo.group(1, 2)
  260. beg = int(beg)
  261. end = int(end)
  262. yield end - beg
  263. def mapped_readlengths_from_dbdump_output(output):
  264. """Given output text from the DBump command,
  265. return dict of (id => read-length).
  266. There will be alternate lines like these:
  267. R #
  268. L # # #
  269. https://dazzlerblog.wordpress.com/command-guides/dazz_db-command-guide/
  270. """
  271. lengths = dict()
  272. re_rid = re.compile(r'^R\s+(\d+)$')
  273. re_length = re.compile(r'^L\s+(\d+)\s+(\d+)\s+(\d+)$')
  274. for line in output.splitlines():
  275. mo = re_rid.search(line)
  276. if mo:
  277. idx = int(mo.group(1))
  278. continue
  279. mo = re_length.search(line)
  280. if mo:
  281. well, beg, end = mo.group(1, 2, 3)
  282. well = int(idx)
  283. beg = int(beg)
  284. end = int(end)
  285. lengths[idx] = (end - beg)
  286. continue
  287. return lengths
  288. def average_difference(dictA, dictB):
  289. """Return the average difference of
  290. values in dictA minus dictB, only
  291. using values in dictA.
  292. If a value is missing from dictB, raise Exception.
  293. """
  294. total_diff = 0.0
  295. for (k, va) in viewitems(dictA):
  296. vb = dictB[k]
  297. total_diff += (va - vb)
  298. return total_diff / len(dictA)
  299. def calc_metric_fragmentation(perl_counts_output):
  300. # """perl -e 'while (<>) { if ( m{>[^/]+/(\d+)\d/} ) { $id{$1}++; } }; while (my ($k, $v) = each %%id) { $counts{$v}++; }; while (my ($k, $v) = each %%counts) { print "$v $k\n"; };' %s""" %(fastas)
  301. cols = tuple(parse_2columns_of_ints(perl_counts_output))
  302. avg = weighted_average(cols)
  303. return avg
  304. def calc_metric_truncation(dbdump_output, length_pairs_output):
  305. # """perl -e 'while (<>) { if ( m{>[^/]+/0*(\d+)\d/(\d+)_(\d+)} ) { $lengths{$1} += ($3 - $2); } }; while (my ($k, $v) = each %%lengths) { print "$k $v\n"; };' %s""" %(fastas)
  306. cols = tuple(parse_2columns_of_ints(length_pairs_output))
  307. pread_lengths = dict((k, v) for (k, v) in cols)
  308. orig_lengths = mapped_readlengths_from_dbdump_output(dbdump_output)
  309. avg = -average_difference(pread_lengths, orig_lengths)
  310. return avg
  311. def choose_cat_fasta(fofn):
  312. """Given the contents of a fasta FOFN,
  313. return a command to write the contents of a fasta to stdout,
  314. keeping the original file.
  315. Raise Exception on error.
  316. >>> choose_cat_fasta('abc.gz')
  317. 'zcat '
  318. >>> choose_cat_fasta('abc.dexta')
  319. 'undexta -vkU -w60 -i < '
  320. >>> choose_cat_fasta('abc')
  321. 'cat '
  322. """
  323. first_line = fofn.splitlines()[0]
  324. if first_line.endswith('.gz'):
  325. return 'zcat '
  326. elif first_line.endswith('.dexta'):
  327. return 'undexta -vkU -w60 -i < '
  328. else:
  329. return 'cat '
  330. re_underscore_flag = re.compile(r'(--[\w-]+)(_)')
  331. def dash_flags(val):
  332. """
  333. >>> dash_flags('--foo_bar --one_two_three')
  334. '--foo-bar --one-two-three'
  335. >>> dash_flags('')
  336. ''
  337. """
  338. while True:
  339. # Repeat until settled, as there might be multiple _ in the same flag.
  340. new_val = re_underscore_flag.sub(r'\1-', val)
  341. if new_val == val:
  342. return new_val
  343. val = new_val
  344. def cfg_tobool(v):
  345. """
  346. >>> cfg_tobool('yes')
  347. True
  348. >>> cfg_tobool('true')
  349. True
  350. >>> cfg_tobool('T')
  351. True
  352. >>> cfg_tobool('1')
  353. True
  354. >>> cfg_tobool('no')
  355. False
  356. >>> cfg_tobool('false')
  357. False
  358. >>> cfg_tobool('F')
  359. False
  360. >>> cfg_tobool('0')
  361. False
  362. >>> cfg_tobool('')
  363. False
  364. """
  365. if v in (True, False, None):
  366. return v
  367. if not v:
  368. return False
  369. if v.upper()[0] in ('T', 'Y'):
  370. return True
  371. if v.upper()[0] in ('F', 'N'):
  372. return False
  373. return bool(int(v))
  374. # https://stackoverflow.com/questions/3387691/how-to-perfectly-override-a-dict
  375. # We derived from dict instead of from MutableMapping to json.dumps() works.
  376. _RaiseKeyError = object() # singleton for no-default behavior
  377. class LowerDict(dict): # dicts take a mapping or iterable as their optional first argument
  378. __slots__ = () # no __dict__ - that would be redundant
  379. def __init__(self):
  380. # No args allowed, to keep it simple.
  381. super(LowerDict, self).__init__(self)
  382. def __getitem__(self, k):
  383. return super(LowerDict, self).__getitem__(k.lower())
  384. def __setitem__(self, k, v):
  385. return super(LowerDict, self).__setitem__(k.lower(), v)
  386. def __delitem__(self, k):
  387. return super(LowerDict, self).__delitem__(k.lower())
  388. def get(self, k, default=None):
  389. return super(LowerDict, self).get(k.lower(), default)
  390. def setdefault(self, k, default=None):
  391. return super(LowerDict, self).setdefault(k.lower(), default)
  392. def pop(self, k, v=_RaiseKeyError):
  393. if v is _RaiseKeyError:
  394. return super(LowerDict, self).pop(k.lower())
  395. return super(LowerDict, self).pop(k.lower(), v)
  396. #def update(self, mapping=(), **kwargs):
  397. # super(LowerDict, self).update(self._process_args(mapping, **kwargs))
  398. def __contains__(self, k):
  399. return super(LowerDict, self).__contains__(k.lower())
  400. #def copy(self): # don't delegate w/ super - dict.copy() -> dict :(
  401. # return type(self)(self)
  402. @classmethod
  403. def fromkeys(cls, keys, v=None):
  404. return super(LowerDict, cls).fromkeys((k.lower() for k in keys), v)
  405. def __repr__(self):
  406. return '{0}({1})'.format(type(self).__name__, super(LowerDict, self).__repr__())
  407. __loop_set = set()
  408. def toLowerDict(cfg):
  409. """Change key-names to be lower-case, at all levels of dict cfg.
  410. Then, return the case-insensitive LowerDict, substituted recursively.
  411. """
  412. if isinstance(cfg, LowerDict):
  413. return cfg
  414. if id(cfg) in __loop_set:
  415. # Prevent infinite loop.
  416. raise Exception('Already ran update_lowercase({}) (len(set)=={}):\n {}'.format(
  417. id(cfg), len(__loop_set), cfg))
  418. __loop_set.add(id(cfg))
  419. low = LowerDict()
  420. for k,v in list(cfg.items()):
  421. if isinstance(v, dict):
  422. v = toLowerDict(v) # RECURSION
  423. if k in low:
  424. msg = 'Collision for "{}" in dict:\n{}'.format(k, cfg)
  425. if v != low[k]:
  426. raise Exception(msg)
  427. low[k] = v
  428. return low
  429. def parse_REPmask_code(code):
  430. """
  431. Return list of 3 (group_size, coverage_limit) pairs.
  432. group_size==0 indicates "no-op".
  433. Otherwise, super-high coverage_limit indicates "do work, but produce empty mask-track".
  434. >>> parse_REPmask_code('1,10/2,20/3,300')
  435. [(1, 10), (2, 20), (3, 300)]
  436. """
  437. ec = 0 # arbitrary
  438. result = [(0, ec), (0, ec), (0, ec)] # all no-op by default
  439. try:
  440. if '/' in code:
  441. pairs = code.split('/')
  442. else:
  443. assert ';' in code, 'code contains neither ";" nor "/": {!r}'.format(code)
  444. pairs = code.split(';')
  445. assert len(pairs) <= 3
  446. for i, p in enumerate(pairs):
  447. g, c = list(map(int, p.split(',')))
  448. result[i] = (g, c)
  449. except Exception as exc:
  450. LOG.exception('Failed to parse REPmask_code {!r}. Using extreme, to produce empty rep tracks.'.format(code))
  451. # Validate
  452. paira = result[0]
  453. pairb = result[1]
  454. pairc = result[2]
  455. if (paira[0] != 0 and pairb[0] != 0 and pairc[0] != 0):
  456. # Check only if all groups are non-zero. Otherwise, the user must know what they're doing.
  457. if (paira[0] == pairb[0] or
  458. pairb[0] == pairc[0]):
  459. raise Exception('Non-zero group sizes must not match in parsed REPmask_code: {!r} from {!r}'.format(result, code))
  460. if (paira[0] > pairb[0] or pairb[0] > pairc[0]):
  461. raise Exception('Non-zero group sizes must increase monotonically in parsed REPmask_code: {!r} from {!r}'.format(result, code))
  462. return result
  463. def dazzler_get_nblocks(db_stream):
  464. """Return #blocks in dazzler-db.
  465. """
  466. nblock = 1
  467. new_db = True
  468. for l in db_stream:
  469. l = l.strip().split()
  470. if l[0] == "blocks" and l[1] == "=":
  471. nblock = int(l[2])
  472. new_db = False
  473. break
  474. return nblock
  475. re_R = re.compile(r'^\+ R (\d+)')
  476. def dazzler_num_reads(dump):
  477. """Given DBdump, return number of reads. (Proper DBdump call is assumed.)
  478. >>> dazzler_num_reads('+ R 27')
  479. 27
  480. >>> dazzler_num_reads('')
  481. -1
  482. """
  483. mo = re_R.search(dump)
  484. if mo:
  485. return int(mo.group(1))
  486. else:
  487. return -1