Source code for idpconfgen.libs.libparse

"""
Parsing routines for different data structure.

All functions in this module receive a certain Python native datastructure,
parse the information inside and return/yield the parsed information.
"""
import ast
import subprocess
from functools import partial
from itertools import product, repeat
from operator import setitem
from pathlib import Path as Path_

from numba import njit

from idpconfgen import Path, log
from idpconfgen.core.definitions import (
    aa1set,
    aa1to3,
    dssp_trans_bytes,
    jsonparameters,
    )
from idpconfgen.core.exceptions import DSSPParserError
from idpconfgen.libs.libfunc import chainfs, consume
from idpconfgen.libs.libpdb import PDBIDFactory, atom_resSeq
from idpconfgen.logger import S


# _ascii_lower_set = set(string.ascii_lowercase)
# _ascii_upper_set = set(string.ascii_uppercase)


# USED OKAY
# all should return a string
# used for control flow
type2string = {
    type(Path_()): lambda x: x.read_text(),
    type(Path()): lambda x: x.read_text(),
    bytes: lambda x: x.decode('utf_8'),
    str: lambda x: x,
    }


[docs]def make_list_if_not(item): """Make a list from item.""" if isinstance(item, (str, int, float)): return [item] return item
[docs]def get_diff_between_groups(group1, group2): """Get difference between groups as a set.""" return set(group1).difference(set(group2))
get_diff_between_aa1l = partial(get_diff_between_groups, group2=aa1set)
[docs]def is_valid_fasta(fasta): # str -> bool """ Confirm string is a valid FASTA primary sequence. Does not accept headers, just the protein sequence in a single string. """ is_valid = \ fasta.isupper() \ and not get_diff_between_aa1l(fasta) return is_valid
# USED OKAY
[docs]def group_by(data): """ Group data by indexes. Parameters ---------- data : iterable The data to group by. Returns ------- list : [[type, slice],] Examples -------- >>> group_by('LLLLLSSSSSSEEEEE') [['L', slice(0, 5)], ['S', slice(5, 11)], ['E', slice(11,16)]] """ assert len(data) > 0 datum_hold = prev = data[0] start = 0 groups = [] GA = groups.append for i, datum in enumerate(data): if datum != prev: GA([datum_hold, slice(start, i)]) datum_hold, start = datum, i prev = datum else: GA([datum_hold, slice(start, i + 1)]) assert isinstance(groups[0][0], str) assert isinstance(groups[0][1], slice) return groups
# from https://stackoverflow.com/questions/21142231
[docs]def group_runs(li, tolerance=1): """Group consecutive numbers given a tolerance.""" out = [] last = li[0] for x in li: if x - last > tolerance: yield out out = [] out.append(x) last = x yield out
[docs]def mkdssp_w_split(pdb, cmd, **kwargs): """ Execute `mkdssp` from DSSP. Saves the data splitted accoring to backbone continuity as identified by `mkdssp`. Splits the input PDB into bb continuity segments. https://github.com/cmbi/dssp Parameters ---------- pdb : Path The path to the pdb file. cmd : str The command to execute the external DSSP program. Yields ------ from `split_pdb_by_dssp` """ pdbfile = pdb.resolve() _cmd = [cmd, '-i', str(pdbfile)] result = subprocess.run(_cmd, capture_output=True) # if subpress.run fails, the parser will raise an error, # no need to assert result.returncode yield from split_pdb_by_dssp(pdbfile, result.stdout, **kwargs)
[docs]def split_pdb_by_dssp(pdbfile, dssp_text, minimum=2, reduced=False): """ Split PDB file based on DSSP raw data. Parameters ---------- minimum : int The minimum length allowed for a segment. reduce : bool Whether to reduce the DSSP nomemclature to H/E/L. """ # local scope _ss = jsonparameters.ss _fasta = jsonparameters.fasta _resids = jsonparameters.resids # did not use the pathlib interface read_bytes on purpose. with open(pdbfile, 'rb') as fin: pdb_bytes = fin.readlines() segs = 0 pdbname = str(PDBIDFactory(pdbfile.stem)) # converted because end product is to be saved in json or used as string for dssp, fasta, residues in parse_dssp(dssp_text, reduced=reduced): if len(residues) < minimum: continue fname = f'{pdbname}_seg{segs}' residues_set = set(residues) lines2write = [ line for line in pdb_bytes if line[atom_resSeq].strip() in residues_set ] if all(line.startswith(b'HETATM') for line in lines2write): continue # it is much faster to decode three small strings here # than a whole DSSP file at the beginning __data = { _ss: dssp.decode('utf-8'), _fasta: fasta.decode('utf-8'), _resids: b','.join(residues).decode('utf-8'), } yield fname, __data, b''.join(lines2write) segs += 1
# USED OKAY
[docs]def sample_case(input_string): """ Sample all possible cases combinations from `string`. Examples -------- >>> sample_case('A') {'A', 'a'} >>> sample_case('Aa') {'AA', 'Aa', 'aA', 'aa'} """ possible_cases = set( map( ''.join, product(*zip(input_string.upper(), input_string.lower())), ) ) return possible_cases
# USED OKAY
[docs]def parse_dssp(data, reduced=False): """ Parse DSSP file data. JSON doesn't accept bytes That is why `data` is expected as str. """ DT = dssp_trans_bytes data_ = data.split(b'\n') # RM means removed empty RM1 = (i for i in data_ if i) # exausts generator until for line in RM1: if line.strip().startswith(b'#'): break else: # if the whole generator is exhausted raise DSSPParserError("File exhausted without finding '#'") dssp = [] fasta = [] residues = [] _d_append = dssp.append _f_append = fasta.append _r_append = residues.append bjoin = b''.join for line in RM1: if line[13:14] != b'!': _d_append(line[16:17]) _f_append(line[13:14]) _r_append(line[6:10].strip()) else: dssp_ = bjoin(dssp).translate(DT) if reduced else bjoin(dssp) yield dssp_, bjoin(fasta), residues dssp.clear() fasta.clear() residues.clear() # Attention with this clear! else: dssp_ = bjoin(dssp).translate(DT) if reduced else bjoin(dssp) yield dssp_, bjoin(fasta), residues
[docs]def pop_difference_with_log( dict1, dict2, logmsg='Removing {} from the dictionary.\n', ): """ Pop keys in `dict1` that are not present in `dict2`. Reports pop'ed keys to log INFO. Operates `dict1` in place. Parameters ---------- dict1, dict2 : dict Returns ------- None """ d1k = set(dict1.keys()) d2k = set(dict2.keys()) diff = d1k.difference(d2k) if diff: log.info(S(logmsg, diff)) consume(map(dict1.pop, diff))
[docs]def remap_sequence(seq, target='A', group=('P', 'G')): """ Remap sequence. Parameters ---------- seq : Protein primary sequence in FASTA format. target : str (1-char) The residue to which all other residues will be converted to. group : tuple The list of residues that excape map/conversion. Return ------ str The remaped string. Examples -------- >>> remap_sequence('AGTKLPHNG') 'AGAAAPAAG' """ return ''.join(target if res not in group else res for res in seq)
# njit available # domain specific
[docs]def get_trimer_seq(seq, idx): """Get sequence of trimer.""" pre = seq[idx - 1] if idx > 0 else 'G' curr_res = seq[idx] try: pos = seq[idx + 1] except: # noqa: B001,E722 IndexError in plain python pos = 'G' return curr_res, pre + pos
[docs]def get_mers(seq, size): """ Get X-mers from seq. Example ------- >>> get_mers('MEAIKHD', 3) {'MEA', 'EAI', 'AIK', 'KHD'} """ mers = [] mersa = mers.append for i in range(len(seq) - (size - 1)): mersa(seq[i:i + size]) return set(mers)
# njit available
[docs]def get_seq_chunk(seq, idx, size): """Get a fragment from sequence at start at `idx` with `size`.""" return seq[idx: idx + size]
# njit available
[docs]def get_seq_next_residue(seq, idx, size): """Get the next residue after the fragment.""" return seq[idx + size: idx + size + 1]
# TODO: correct for HIS/HIE/HID/HIP
[docs]def translate_seq_to_3l(input_seq): """ Translate 1-letter sequence to 3-letter sequence. # Currently translates 'H' to 'HIP', to accommodate double protonation. Editing to 'HIS' causes issues with libbuild. """ return [ 'HIP' if _res == 'H' else aa1to3[_res] for _res in input_seq ]
[docs]def fill_list(seq, fill, size): """ Fill list with `fill` to `size`. If seq is not a list, converts it to a list. Returns ------- list The original with fill values. """ return list(seq) + list(repeat(fill, size - len(seq)))
[docs]def convert_int_float_lines_to_dict(lines): r""" Convert string lines composed of putative int and float to dict. Example ------- >>> convert_int_float_lines_to_dict(['1 2']) {1: 2.0} >>> convert_int_float_lines_to_dict(['1 2\n', '3 45.5\n']) {1: 2.0, 3: 45.5} """ pairs = {} operations = [ str.strip, str.split, lambda t: (int(t[0]), float(t[1])), lambda t: setitem(pairs, *t), ] do_with_line = chainfs(*operations) consume(map(do_with_line, lines)) return pairs
[docs]def remove_empty_keys(ddict): """Remove empty keys from dictionary.""" empty_keys = [k for k, v in ddict.items() if not v] consume(map(ddict.pop, empty_keys))
[docs]def values_to_dict(values): """ Generalization of converting parameters to dict. Adapted from: https://github.com/joaomcteixeira/taurenmd/blob/6bf4cf5f01df206e9663bd2552343fe397ae8b8f/src/taurenmd/libs/libcli.py#L94-L138 Parameters ---------- values : string List of values with the format "par1=1 par2='string' par3=[1,2,3] Returns ------- param_dict : dictionary Converted string above to dictionary with `=` denoting linkage E.g. {'par1': 1, 'par2':'string', 'par3': [1,2,3]} """ bool_value = { 'true': True, 'false': False, } param_dict = {} for kv in values: # print(param_dict, kv) try: k, v = kv.split('=') except ValueError: param_dict[kv] = True else: if ',' in v: vs = v.split(',') try: param_dict[k] = tuple(ast.literal_eval(i) for i in vs) except (ValueError, TypeError, SyntaxError): param_dict[k] = tuple(i for i in vs) else: try: param_dict[k] = ast.literal_eval(v) except (ValueError, TypeError): # is string or list param_dict[k] = bool_value.get(v.lower(), v) except (SyntaxError): param_dict[k] = v return param_dict
[docs]def convert_tuples_to_lists(data): """ Recursively processes input data and converts it all to list of lists. Parameter --------- data : list of tuple Return ------ result : list of list """ result = [] for item in data: if isinstance(item, tuple): result.append(convert_tuples_to_lists(item)) else: result.append(item) return result
get_trimer_seq_njit = njit(get_trimer_seq) get_seq_chunk_njit = njit(get_seq_chunk) get_seq_next_residue_njit = njit(get_seq_next_residue)
[docs]def split_into_chunks(string, size=150): """ Split a string into chunks of characters. The last chunk may be longer or shoter. Parameters ---------- string : str String of characters to split size : int Integer value of chunk sizes. Defaults to 200. Returns ------- chunks : list List of strings split into chunks of pre-determined sizes. """ chunks = [] while len(string) > size: chunks.append(string[:size]) string = string[size:] if len(string) > 0: chunks.append(string) return chunks
[docs]def split_by_ranges(seq, ranges): """ Split a string into substrings based on a list of custom ranges. Parameters ---------- seq : str String or sequence of desire to be split. ranges : list of int Integers represent the index of which the split will occur. Each value is not inclusive. Returns ------- chunks : list List of split strings at their desired locations. """ chunks = [] start = 0 for end in ranges: chunks.append(seq[start:end]) start = end if start < len(seq): chunks.append(seq[start:]) return chunks