"""
Store internal protein structure representation.
Classes
-------
Structure
The main API that represents a protein structure in IDPConfGen.
"""
import traceback
import warnings
from collections import defaultdict
from functools import reduce
import numpy as np
from idpconfgen import log
from idpconfgen.core.definitions import aa3to1, blocked_ids, pdb_ligand_codes
from idpconfgen.core.definitions import residue_elements as _allowed_elements
from idpconfgen.core.exceptions import (
EmptyFilterError,
NotBuiltError,
ParserNotFoundError,
PDBFormatError,
)
from idpconfgen.libs import libpdb
from idpconfgen.libs.libcif import CIFParser, is_cif
from idpconfgen.libs.libparse import group_runs, sample_case, type2string
from idpconfgen.libs.libpdb import RE_ENDMDL, RE_MODEL, delete_insertions
from idpconfgen.logger import S
[docs]class Structure:
"""
Hold structural data from PDB/mmCIF files.
Run the ``.build()`` method to read the structure.
Cases for PDB Files:
* If there are several MODELS only the first model is considered.
Parameters
----------
data : str, bytes, Path
Raw structural data from PDB/mmCIF formatted files.
If `data` is a path to a file it *must* be a `pathlib.Path`
object. If string or bytes, it must be the raw content of the
input file.
Examples
--------
Opens a PDB file, selects only chain 'A' and saves selection to a file.
>>> s = Structure(Path('1ABC.pdb'))
>>> s.build()
>>> s.add_filter_chain('A')
>>> s.write_PDB('out.pdb')
Opens a mmCIF file, selects only residues above 50 and saves
selection to a file.
>>> s = Structure(Path('1ABC.cif'))
>>> s.build()
>>> s.add_filter(lambda x: int(x[col_resSeq]) > 50)
>>> s.write_PDB('out.pdb')
>>> with open('1ABC.pdb', 'r') as fin:
>>> lines = fin.read()
>>> s = Structure(lines)
>>> s.build()
"""
__slots__ = [
'_data_array',
'_datastr',
'_filters',
'_structure_parser',
'kwargs',
]
def __init__(self, data, **kwargs):
datastr = get_datastr(data)
self._structure_parser = detect_structure_type(datastr)
self._datastr = datastr
self.kwargs = kwargs
self.clear_filters()
assert isinstance(self.filters, list)
def __len__(self):
return self.data_array.shape[0]
[docs] def build(self):
"""
Read structure raw data in :attr:`rawdata`.
After `.build()`, filters and data can be accessed.
"""
self._data_array = self._structure_parser(self._datastr, **self.kwargs)
del self._datastr
[docs] def clear_filters(self):
"""Clear/Deletes registered filters."""
self._filters = []
@property
def data_array(self):
"""Contain structure data in the form of a Numpy array."""
try:
return self._data_array
except AttributeError as err:
errmsg = (
'Please `.build()` the Structure before attempting to access'
'its data'
)
raise NotBuiltError(errmsg=errmsg) from err
@property
def filters(self):
"""Filter functions registered ordered by registry record."""
return self._filters
@property
def filtered_atoms(self):
"""
Filter data array by the selected filters.
Returns
-------
list
The data in PDB format after filtering.
"""
apply_filter = _APPLY_FILTER
return np.array(list(reduce(
apply_filter,
self.filters,
self.data_array,
)))
@property
def chain_set(self):
"""All chain IDs present in the raw dataset.""" # noqa: D401
return set(self.data_array[:, col_chainID])
@property
def coords(self):
"""
Coordinates of the filtered atoms.
As float.
"""
return self.filtered_atoms[:, cols_coords].astype(np.float32)
@coords.setter
def coords(self, coords):
self.data_array[:, cols_coords] = \
np.round(coords, decimals=3).astype('<U8')
@property
def consecutive_residues(self):
"""Consecutive residue groups from filtered atoms."""
# the structure.consecutive_residues
# this will ignore the iCode
# please use pdb-tool pdb_delinsertions before this step
# PDBs downloaded with IDPConfGen already correct for these
# see libs.libparse.delete_insertions
return group_runs(self.residues)
@property
def fasta(self):
"""
FASTA sequence of the :attr:`filtered_atoms` lines.
HETATM residues with non-canonical codes are represented as X.
"""
c, rs, rn = col_chainID, col_resSeq, col_resName
chains = defaultdict(dict)
for row in self.filtered_atoms:
chains[row[c]].setdefault(row[rs], aa3to1.get(row[rn], 'X'))
return {
chain: ''.join(residues.values())
for chain, residues in chains.items()
}
@property
def filtered_residues(self):
"""Filter residues according to :attr:`filters`."""
FA = self.filtered_atoms
return [int(i) for i in dict.fromkeys(FA[:, col_resSeq])]
@property
def residues(self):
"""
Residues of the structure.
Without filtering, without chain separation.
"""
return [int(i) for i in dict.fromkeys(self.data_array[:, col_resSeq])]
[docs] def pop_last_filter(self):
"""Pop last filter."""
self._filters.pop()
[docs] def add_filter(self, function):
"""Add a function as filter."""
self.filters.append(function)
[docs] def add_filter_record_name(self, record_name):
"""Add filter for record names."""
self.filters.append(
lambda x: x[col_record].startswith(record_name)
)
[docs] def add_filter_chain(self, chain):
"""Add filters for chain."""
self.filters.append(lambda x: x[col_chainID] == chain)
[docs] def add_filter_backbone(self, minimal=False):
"""Add filter to consider only backbone atoms."""
ib = is_backbone
self.filters.append(
lambda x: ib(x[col_name], x[col_element], minimal=minimal)
)
[docs] def get_PDB(self, pdb_filters=None, renumber=True):
"""
Convert Structure to PDB format.
Considers only filtered lines.
Returns
-------
generator
"""
def _(i, f):
return f(i)
fs = self.filtered_atoms
# renumber atoms
if renumber:
try:
fs[:, col_serial] = np.arange(1, fs.shape[0] + 1).astype('<U8')
except IndexError as err:
errmsg = (
'Could not renumber atoms, most likely, because '
'there are no lines in selection.'
)
err2 = EmptyFilterError(errmsg)
raise err2 from err
pdb_filters = pdb_filters or []
lines = list(reduce(_, pdb_filters, structure_to_pdb(fs)))
return lines
[docs] def get_sorted_minimal_backbone_coords(self, filtered=False):
"""
Generate a copy of the backbone coords sorted.
Sorting according N, CA, C.
This method was created because some PDBs may not have the
backbone atoms sorted properly.
Parameters
----------
filtered : bool, optional
Whether consider current filters or raw data.
"""
atoms = self.filtered_atoms if filtered else self.data_array
coords = atoms[:, cols_coords]
N_coords = coords[atoms[:, col_name] == 'N']
CA_coords = coords[atoms[:, col_name] == 'CA']
C_coords = coords[atoms[:, col_name] == 'C']
N_num = N_coords.shape[0]
CA_num = CA_coords.shape[0]
C_num = C_coords.shape[0]
num_backbone_atoms = sum([N_num, CA_num, C_num])
assert num_backbone_atoms / 3 == N_num
minimal_backbone = np.zeros((num_backbone_atoms, 3), dtype=np.float32)
minimal_backbone[0:-2:3] = N_coords
minimal_backbone[1:-1:3] = CA_coords
minimal_backbone[2::3] = C_coords
return minimal_backbone
[docs] def write_PDB(self, filename, **kwargs):
"""Write Structure to PDB file."""
lines = self.get_PDB(**kwargs)
with warnings.catch_warnings():
warnings.filterwarnings('error')
try:
write_PDB(lines, filename)
except UserWarning:
raise EmptyFilterError(filename)
[docs]def parse_pdb_to_array(datastr, which='both'):
"""
Transform PDB data into an array.
Parameters
----------
datastr : str
String representing the PDB format v3 file.
which : str
Which lines to consider ['ATOM', 'HETATM', 'both'].
Defaults to `'both'`, considers both 'ATOM' and 'HETATM'.
Returns
-------
numpy.ndarray of (N, len(`libpdb.atom_slicers`))
Where N are the number of ATOM and/or HETATM lines,
and axis=1 the number of fields in ATOM/HETATM lines according
to the PDB format v3.
"""
# require
assert isinstance(datastr, str), \
f'`datastr` is not str: {type(datastr)} instead'
model_idx = RE_MODEL.search(datastr)
endmdl_idx = RE_ENDMDL.search(datastr)
if bool(model_idx) + bool(endmdl_idx) == 1:
# only one is True
raise PDBFormatError('Found MODEL and not ENDMDL, or vice-versa')
if model_idx and endmdl_idx:
start = model_idx.span()[1]
end = endmdl_idx.span()[0] - 1
lines = datastr[start: end].split('\n')
else:
lines = datastr.split('\n')
record_lines = filter_record_lines(lines, which=which)
data_array = gen_empty_structure_data_array(len(record_lines))
populate_structure_array_from_pdb(record_lines, data_array)
return data_array
[docs]def parse_cif_to_array(datastr, **kwargs):
"""
Parse mmCIF protein data to array.
Array is as given by :func:`gen_empty_structure_data_array`.
"""
cif = CIFParser(datastr, **kwargs)
number_of_atoms = len(cif)
data_array = gen_empty_structure_data_array(number_of_atoms)
for ii in range(number_of_atoms):
data_array[ii, :] = cif.get_line_elements_for_PDB(line=ii)
return data_array
[docs]def gen_empty_structure_data_array(number_of_atoms):
"""
Generate an array data structure to contain structure data.
Parameters
----------
number_of_atoms : int
The number of atoms in the structure.
Determines the size of the axis 0 of the structure array.
Returns
-------
np.ndarray of (N, :attr:`libpdb.atom_slicers), dtype = '<U8'
Where N is the ``number_of_atoms``.
"""
# require
assert isinstance(number_of_atoms, int), \
f'`number_of_atoms` is not int, {type(number_of_atoms)} '
assert number_of_atoms > 0, \
f'or number is less than zero: {number_of_atoms}.'
return np.empty(
(number_of_atoms, len(libpdb.atom_slicers)),
dtype='<U8',
)
[docs]def generate_residue_labels(*residue_labels, fmt=None, delimiter=' - '):
"""
Generate residue labels column.
Concatenate labels in `residue_labels` using
`concatenate_residue_labels`.
Parameters
----------
fmt : str, optional
The string formatter by default we consider backbone atoms
of a protein with less than 1000 residues.
Defaults to `None`, uses '{:<8}', 8 or multiple of 8 according
to length of residue_labels.
"""
if not fmt:
# 11 because 8 + len(' - ')
fmt = '{:<' + str(len(residue_labels) * (8 + len(delimiter))) + '}'
concat = (
concatenate_residue_labels(label_tuple)
for label_tuple in residue_labels
)
return [fmt.format(delimiter.join(clabels)) for clabels in zip(*concat)]
[docs]def generate_backbone_pairs_labels(da):
"""
Generate backbone atom pairs labels.
Used to create columns in report summaries.
Parameters
----------
da : Structure.data_array - like
Returns
-------
Numpy Array of dtype str, shape (N,)
Where N is the number of minimal backbone atoms.
"""
# masks from minimal backbone atoms
N_mask = da[:, col_name] == 'N'
CA_mask = da[:, col_name] == 'CA'
C_mask = da[:, col_name] == 'C'
# prepares labels column format
# number of digit of the highest residue
# 5 is 3 from 3-letter aa code and 2 from 'CA' label length
max_len = len(str(np.sum(N_mask))) + 5
fmt_res = '{:<' + str(max_len) + '}'
# Generate label pairs
N_CA_labels = generate_residue_labels(
da[:, cols_labels][N_mask],
da[:, cols_labels][CA_mask],
fmt=fmt_res,
)
CA_C_labels = generate_residue_labels(
da[:, cols_labels][CA_mask],
da[:, cols_labels][C_mask],
fmt=fmt_res,
)
C_N_labels = generate_residue_labels(
da[:, cols_labels][C_mask][:-1],
da[:, cols_labels][N_mask][1:],
fmt=fmt_res,
)
# prepares the labels array
# max_label_len to use the exact needed memory size
_ = [N_CA_labels, CA_C_labels, C_N_labels]
number_of_bb_atoms = sum(len(i) for i in _)
max_label_len = max(len(i) for i in N_CA_labels)
labels = np.zeros(number_of_bb_atoms, dtype=f'<U{max_label_len}')
labels[0::3] = N_CA_labels
labels[1::3] = CA_C_labels
labels[2::3] = C_N_labels
return labels
[docs]def concatenate_residue_labels(labels):
"""
Concatenate residue labels.
This function is a generator.
Parameters
----------
labels : numpy array of shape (N, M)
Where N is the number of rows, and M the number of columns
with the labels to be concatenated.
"""
empty_join = ''.join
return (empty_join(res_label) for res_label in labels)
[docs]def populate_structure_array_from_pdb(record_lines, data_array):
"""
Populate structure array from PDB lines.
Parameters
----------
record_lines : list-like
The PDB record lines (ATOM or HETATM) to parse.
data_array : np.ndarray
The array to populate.
Returns
-------
None
Populates array in place.
"""
AS = libpdb.atom_slicers
for row, line in enumerate(record_lines):
for column, slicer_item in enumerate(AS):
data_array[row, column] = line[slicer_item].strip()
[docs]def filter_record_lines(lines, which='both'):
"""Filter lines to get record lines only."""
RH = record_line_headings
try:
return [line for line in lines if line.startswith(RH[which])]
except KeyError as err:
err2 = ValueError(f'`which` got an unexpected value \'{which}\'.')
raise err2 from err
[docs]def get_datastr(data):
"""
Get data in string format.
Can parse data from several formats:
* Path, reads file content
* bytes, converst to str
* str, returns the input
Returns
-------
str
That represents the data
"""
t2s = type2string
data_type = type(data)
try:
datastr = t2s[data_type](data)
except KeyError as err:
err2 = NotImplementedError('Struture data not of proper type')
raise err2 from err
assert isinstance(datastr, str)
return datastr
[docs]def detect_structure_type(datastr):
"""
Detect structure data parser.
Uses ``structure_parsers``.
Returns
-------
parser
That which can parse `datastr` to a :py::class:`Structure'.
"""
sp = structure_parsers
for condition, parser in sp:
if condition(datastr):
return parser
raise ParserNotFoundError
[docs]def write_PDB(lines, filename):
"""
Write Structure data format to PDB.
Parameters
----------
lines : list or np.ndarray
Lines contains PDB data as according to `parse_pdb_to_array`.
filename : str or Path
The name of the output PDB file.
"""
# use join here because lines can be a generator
concat_lines = '\n'.join(lines)
if concat_lines:
with open(filename, 'w') as fh:
fh.write(concat_lines)
fh.write('\n')
else:
warnings.warn('Empty lines, nothing to write, ignoring.', UserWarning) # noqa: E501 B028
[docs]def structure_to_pdb(atoms):
"""
Convert table to PDB formatted lines.
Parameters
----------
atoms : np.ndarray, shape (N, 16) or similar data structure
Where N is the number of atoms and 16 the number of cols.
Yields
------
Formatted PDB line according to `libpdb.atom_line_formatter`.
"""
for line in atoms:
values = [func(i) for i, func in zip(line, libpdb.atom_format_funcs)]
values[col_name] = libpdb.format_atom_name(
values[col_name],
values[col_element],
)
yield libpdb.atom_line_formatter.format(*values)
col_record = 0
col_serial = 1
col_name = 2 # atom name
col_altLoc = 3
col_resName = 4
col_chainID = 5
col_resSeq = 6
col_iCode = 7
col_x = 8
col_y = 9
col_z = 10
col_occ = 11
col_temp = 12
col_segid = 13
col_element = 14
col_model = 15
cols_coords_slice = slice(8, 11)
cols_coords = [col_x, col_y, col_z]
cols_labels = [col_resSeq, col_resName, col_name]
# this servers read_pdb_data_to_array mainly
# it is here for performance
record_line_headings = {
'both': ('ATOM', 'HETATM'),
'ATOM': 'ATOM',
'HETATM': 'HETATM',
}
# order matters
structure_parsers = [
(is_cif, parse_cif_to_array),
(libpdb.is_pdb, parse_pdb_to_array),
]
def _APPLY_FILTER(it, func):
return filter(func, it)
[docs]def is_backbone(atom, element, minimal=False):
"""
Whether `atom` is a protein backbone atom or not.
Parameters
----------
atom : str
The atom name.
element : str
The element name.
minimal : bool
If `True` considers only `C` and `N` elements.
`False`, considers also `O`.
"""
e = element.strip()
a = atom.strip()
elements = {
True: ('N', 'C'),
False: ('N', 'C', 'O'),
}
# elements is needed because of atoms in HETATM entries
# for example 'CA' is calcium
return a in ('N', 'CA', 'C', 'O') and e in elements[minimal]
[docs]def save_structure_by_chains(
pdb_data,
pdbname,
altlocs=('A', '', ' ', '1'), # CIF: 6uwi chain D has altloc 1
chains=None,
record_name=('ATOM', 'HETATM'),
renumber=True,
**kwargs,
):
"""
Save PDBs/mmCIF in separated chains (PDB format).
Logic to parse PDBs from RCSB.
"""
# local assignments for speed boost :D
_DR = pdb_ligand_codes # discarded residues
_AE = _allowed_elements
_S = Structure
_BI = blocked_ids
_DI = [delete_insertions]
pdbdata = _S(pdb_data)
pdbdata.build()
chain_set = pdbdata.chain_set
chains = chains or chain_set
add_filter = pdbdata.add_filter
pdbdata.add_filter_record_name(record_name)
add_filter(lambda x: x[col_resName] not in _DR)
add_filter(lambda x: x[col_element] in _AE)
add_filter(lambda x: x[col_altLoc] in altlocs)
for chain in chains:
# writes chains always in upper case because chain IDs given by
# Dunbrack lab are always in upper case letters
chaincode = f'{pdbname}_{chain}'
# this operation can't be performed before because
# until here there is not way to assure if the chain being
# downloaded is actualy in the blocked_ids.
# because at the CLI level the user can add only the PDBID
# to indicate download all chains, while some may be restricted
if chaincode in _BI:
log.info(S(
f'Ignored code {chaincode} because '
'is listed in blocked ids.'
))
continue
# upper and lower case combinations:
possible_cases = sample_case(chain)
# cases that exist in the structure
cases_that_actually_exist = chain_set.intersection(possible_cases)
# this trick places `chain` first in the for loop because
# it has the highest probability to be the one required
cases_that_actually_exist.discard(chain)
probe_cases = [chain] + list(cases_that_actually_exist)
for chain_case in probe_cases:
pdbdata.add_filter_chain(chain_case)
fout = f'{chaincode}.pdb'
try:
pdb_lines = pdbdata.get_PDB(pdb_filters=_DI)
except EmptyFilterError as err:
err2 = \
EmptyFilterError(f'for chain {pdbname}_{chain_case}')
errlog = (
f'{repr(err)}\n'
f'{repr(err2)}\n'
f'{traceback.format_exc()}\n'
'continuing to new chain\n'
)
log.debug(errlog)
continue
else:
if all(line.startswith('HETATM') for line in pdb_lines):
log.debug(
f'Found only HETATM for {chain_case}, '
'continuing with next chain.'
)
continue
yield fout, '\n'.join(pdb_lines)
break
finally:
pdbdata.pop_last_filter()
else:
log.debug(f'Failed to download {chaincode}')