# -*- coding: utf-8 -*-
"""This module provides the the WatFinder toolkit that detects, predicts and analyzes water bridges.
"""
__author__ = 'Karolina Mikulska-Ruminska'
__credits__ = ['Frane Doljanin', 'Karolina Mikulska-Ruminska']
__email__ = ['karolamik@fizyka.umk.pl', 'fdoljanin@pmfst.hr']
import multiprocessing as mp
from numbers import Number
import numpy as np
import os
from itertools import combinations
from collections import deque
from prody import PY3K
if PY3K:
from enum import Enum, auto
else:
from enum import Enum
import enum
from itertools import count
def auto(it=count()):
return next(it)
enum.auto=auto
from copy import copy
from prody import LOGGER, SETTINGS
from prody.atomic import Atom, Atomic, AtomGroup
from prody.atomic.flags import DEFAULTS
from prody.ensemble import Ensemble
from prody.measure import calcAngle, calcDistance
from prody.measure.contacts import findNeighbors
from prody.proteins import writePDB, parsePDB
from prody.utilities import showFigure, showMatrix
__all__ = ['calcWaterBridges', 'calcWaterBridgesTrajectory', 'getWaterBridgesInfoOutput',
'calcWaterBridgesStatistics', 'getWaterBridgeStatInfo',
'calcWaterBridgeMatrix', 'showWaterBridgeMatrix',
'calcBridgingResiduesHistogram', 'calcWaterBridgesDistribution',
'savePDBWaterBridges', 'savePDBWaterBridgesTrajectory',
'saveWaterBridges', 'parseWaterBridges', 'findClusterCenters',
'filterStructuresWithoutWater', 'selectSurroundingsBox',
'findCommonSelectionTraj']
class ResType(Enum):
WATER = auto()
PROTEIN = auto()
ION = auto()
class AtomNode:
def __init__(self, atom, type):
self.atom = atom
self.hydrogens = []
self.type = type
self.bonds = []
self.bondedAtoms = []
self.isVisited = False
class RelationList:
def __init__(self, numNodes):
self.nodes = [None for _ in range(numNodes)]
def resetVisits(self, atomsToReset=[]):
if atomsToReset:
for atom in atomsToReset:
self[atom].isVisited = False
return
for node in self.nodes:
if node:
node.isVisited = False
def addNode(self, atom, type):
self[atom.getIndex()] = AtomNode(atom, type)
return self[atom]
def addHBond(self, bond):
donor, acceptor = bond.donor, bond.acceptor
self[donor].bonds.append(bond)
self[acceptor].bonds.append(bond)
if acceptor.getIndex() not in map(lambda a: a.getIndex(), self[donor].bondedAtoms):
self[donor].bondedAtoms.append(acceptor)
self[acceptor].bondedAtoms.append(donor)
def removeUnbonded(self):
for i, node in enumerate(self.nodes):
if node and not node.bonds:
self[i] = None
def __getitem__(self, key):
if isinstance(key, Atom):
key = key.getIndex()
return self.nodes[key]
def __setitem__(self, key, value):
self.nodes[key] = value
class HBondConstraints:
def __init__(self, donors, acceptors, angleWW=None, anglePAWD=None, anglePDWA=None):
self.donors = donors
self.acceptors = acceptors
self.angleWW = angleWW
self.anglePAWD = anglePAWD
self.anglePDWA = anglePDWA
def addAngles(self, angleWW, anglePAWD, anglePDWA):
self.angleWW = angleWW
self.anglePAWD = anglePAWD
self.anglePDWA = anglePDWA
class HydrogenBond:
def __init__(self, donor, acceptor):
self.donor = donor
self.acceptor = acceptor
@staticmethod
def checkIsHBond(donor, acceptor, constraints):
if donor.type == ResType.ION or acceptor.type == ResType.ION:
return False
if donor.type != ResType.WATER and donor.atom.getName()[0] not in constraints.donors:
return False
if acceptor.type != ResType.WATER and acceptor.atom.getName()[0] not in constraints.acceptors:
return False
anglesForComb = {
(ResType.WATER, ResType.WATER): constraints.angleWW,
(ResType.WATER, ResType.PROTEIN): constraints.anglePAWD,
(ResType.PROTEIN, ResType.WATER): constraints.anglePDWA,
}
angleRange = anglesForComb[(donor.type, acceptor.type)]
if not angleRange:
return True
return HydrogenBond.checkIsHBondAngle(donor, acceptor, angleRange)
@staticmethod
def checkIsHBondAngle(donor, acceptor, angleRange):
for hydrogen in donor.hydrogens:
bondAngle = calcAngle(donor.atom, hydrogen, acceptor.atom)
if angleRange[0] < bondAngle < angleRange[1]:
return True
return False
class CoordinationBond:
def __init__(self, donor, acceptor):
self.donor = donor
self.acceptor = acceptor
@staticmethod
def checkIsCoorBond(donor, acceptor, constraints):
if donor.type != ResType.WATER and donor.atom.getName() not in constraints.donors:
return False
if acceptor.type != ResType.WATER and acceptor.atom.getName() not in constraints.acceptors:
return False
if donor.type != ResType.ION and acceptor.type != ResType.ION:
return False
return True
def calcBridges(relations, hydrophilicList, method, maxDepth, maxNumResidues):
waterBridges = []
for atom in hydrophilicList:
observedNode = relations[atom]
if not observedNode:
continue
newBridges = []
if method == 'chain':
newBridges = getBridgeChain_BFS(
observedNode, relations, maxDepth+1)
relations.resetVisits()
else:
newBridges = getBridgeForResidue_BFS(
observedNode, relations, maxDepth, maxNumResidues)
waterBridges += newBridges
return waterBridges
def getChainBridgeTuple(bridge):
i_1, i_2 = bridge[0].getIndex(), bridge[-1].getIndex()
return (min(i_1, i_2), max(i_1, i_2))
def getUniqueElements(list, key):
unique, uniqueKeys = [], []
for element in list:
elementKey = key(element)
if elementKey in uniqueKeys:
continue
unique.append(element)
uniqueKeys.append(elementKey)
return unique
def getBridgeChain_BFS(observedNode, relationsList, maxDepth):
queue = deque([(observedNode.atom, [])])
waterBridges = []
while queue:
currentAtom, currentPath = queue.popleft()
currentNode = relationsList[currentAtom]
if currentNode.isVisited:
continue
currentNode.isVisited = True
if currentNode.type == ResType.PROTEIN and len(currentPath):
waterBridges.append(currentPath + [currentAtom])
continue
if len(currentPath) == maxDepth:
continue
for atom in currentNode.bondedAtoms:
queue.append(
(atom, currentPath + [currentAtom]))
return waterBridges
def getBridgeForResidue_BFS(observedNode, relationsList, maxDepth, maxNumResidues):
waterBridges = []
for waterAtom in observedNode.bondedAtoms:
bridgeAtoms = []
newWaters = []
queue = deque([(waterAtom, 0)])
while queue:
currentAtom, depth = queue.popleft()
currentNode = relationsList[currentAtom]
if currentNode.isVisited or (maxDepth and depth > maxDepth):
continue
bridgeAtoms.append(currentAtom)
if currentNode.type == ResType.PROTEIN:
continue
if currentNode.type == ResType.WATER:
currentNode.isVisited = True
newWaters.append(currentNode.atom)
for atom in currentNode.bondedAtoms:
queue.append((atom, depth+1))
bridgeAtomIndices = list(
set(map(lambda a: a.getIndex(), bridgeAtoms)))
numProteinResidues = sum(
relationsList[atomIndex].type == ResType.PROTEIN for atomIndex in bridgeAtomIndices)
numResidues = len(bridgeAtomIndices)
if (not maxNumResidues or numResidues <= maxNumResidues) and numProteinResidues >= 2:
waterBridges.append(bridgeAtomIndices)
else:
relationsList.resetVisits(newWaters)
return waterBridges
class AtomicOutput:
def __init__(self, proteins, waters):
self.proteins = proteins
self.waters = waters
def getInfoOutput(waterBridgesAtomic):
output = []
for bridge in waterBridgesAtomic:
bridgeOutput = []
for atom in bridge.proteins:
residueInfo = "{0}{1}".format(atom.getResname(),
atom.getResnum())
atomInfo = "{0}_{1}".format(atom.getName(),
atom.getIndex())
chainInfo = atom.getChid()
bridgeOutput += [residueInfo, atomInfo, chainInfo]
for atom_1, atom_2 in combinations(bridge.proteins, r=2):
bridgeOutput += [calcDistance(atom_1, atom_2)]
bridgeOutput += [len(bridge.waters)]
bridgeOutput += [
list(map(lambda w: "{0}_{1}".format(w.getChid(),
w.getIndex()),
bridge.waters))]
output.append(bridgeOutput)
return output
[docs]
def getWaterBridgesInfoOutput(waterBridgesAtomic):
"""Converts single frame/trajectory atomic output from calcWaterBridges/Trajectory to info output.
:arg waterBridgesAtomic: water bridges from calcWaterBridges/Trajectory
:type waterBridgesAtomic: list
"""
isSingleFrame = isinstance(waterBridgesAtomic[0], AtomicOutput)
if isSingleFrame:
return getInfoOutput(waterBridgesAtomic)
output = []
for frame in waterBridgesAtomic:
currentFrameInfo = getInfoOutput(frame)
output.append(currentFrameInfo)
return output
def getAtomicOutput(waterBridges, relations):
output = []
for bridge in waterBridges:
proteinAtoms, waterAtoms = [], []
for atomIndex in bridge:
if relations[atomIndex].type in [ResType.PROTEIN, ResType.ION]:
proteinAtoms.append(relations[atomIndex].atom)
else:
waterAtoms.append(relations[atomIndex].atom)
output.append(AtomicOutput(proteinAtoms, waterAtoms))
return output
def getElementsRegex(elements):
return '[{0}].*'.format("|".join(elements))
[docs]
def calcWaterBridges(atoms, **kwargs):
"""Compute water bridges for a protein that has water molecules.
:arg atoms: Atomic object from which atoms are considered
:type atoms: :class:`.Atomic`
:arg method: cluster or chain, where chain find shortest water bridging path between two protein atoms
default is 'chain'
:type method: string 'cluster' | 'chain'
:arg distDA: maximal distance between water/protein donor and acceptor
default is 3.5
:type distDA: int, float
:arg distWR: maximal distance between considered water and any residue
default is 4
:type distWR: int, float
:arg anglePDWA: angle range where protein is donor and water is acceptor
default is (100, 200)
:type anglePDWA: (int, int)
:arg anglePAWD: angle range where protein is acceptor and water is donor
default is (100, 140)
:type anglePDWA: (int, int)
:arg angleWW: angle between water donor/acceptor
default is (140, 180)
:type angleWW: (int, int)
:arg maxDepth: maximum number of waters in chain/depth of residues in cluster
default is 2
:type maxDepth: int, None
:arg maxNumRes: maximum number of water+protein residues in cluster
default is None
:type maxNumRes: int, None
:arg donors: which atoms to count as donors
default is ['N', 'O', 'S', 'F']
:type donors: list
:arg acceptors: which atoms to count as acceptors
default is ['N', 'O', 'S', 'F']
:type acceptors: list
:arg output: return information arrays, (protein atoms, water atoms), or just atom indices per bridge
default is 'atomic'
:type output: 'info' | 'atomic' | 'indices'
:arg isInfoLog: should log information
default is True
:type output: bool
:arg selstr: selection string for focusing analysis
default of **None** focuses on everything
:type selstr: str
:arg expand_selection: whether to expand the selection with
:func:`.selectSurroundingsBox`, selecting a box surrounding it.
Default is **False**
:type expand_selection: bool
:arg considered_atoms_sel: selection string for which atoms to consider
Default is **"protein"**
:type considered_atoms_sel: str
"""
method = kwargs.pop('method', 'chain')
distDA = kwargs.pop('distDA', 3.5)
distWR = kwargs.pop('distWR', 4)
anglePDWA = kwargs.pop('anglePDWA', (100, 200))
anglePAWD = kwargs.pop('anglePAWD', (100, 140))
angleWW = kwargs.pop('angleWW', (140, 180))
maxDepth = kwargs.pop('maxDepth', 2)
maxNumResidues = kwargs.pop('maxNumRes', None)
donors = kwargs.pop('donors', ['N', 'O', 'S', 'F'])
acceptors = kwargs.pop('acceptors', ['N', 'O', 'S', 'F'])
outputType = kwargs.pop('output', 'atomic')
isInfoLog = kwargs.pop('isInfoLog', True)
DIST_COVALENT_H = 1.4
prefix = kwargs.pop('prefix', '')
considered_atoms_sel = kwargs.pop('considered_atoms_sel', "protein")
indices = None
if method not in ['chain', 'cluster']:
raise TypeError('Method should be chain or cluster.')
if outputType not in ['info', 'atomic', 'indices']:
raise TypeError('Output can be info, atomic or indices.')
selstr = kwargs.pop('selstr', None)
if selstr is not None:
selection = atoms.select(selstr).copy()
expand_selection = kwargs.pop('expand_selection', False)
if expand_selection:
atoms = selectSurroundingsBox(atoms, selection, **kwargs).copy()
else:
atoms = selection.copy()
water = atoms.select('water')
if water is None:
raise ValueError('atoms has no water so cannot be analysed with WatFinder')
relations = RelationList(len(atoms))
tooFarAtoms = atoms.select(
'water and not within {0} of protein'.format(distWR))
if tooFarAtoms is None:
consideredAtoms = atoms
else:
consideredAtoms = ~tooFarAtoms
waterHydrogens = consideredAtoms.select('water and hydrogen') or []
waterOxygens = consideredAtoms.select('water and oxygen')
waterHydroOxyPairs = findNeighbors(
waterOxygens, DIST_COVALENT_H, waterHydrogens) if waterHydrogens else []
for oxygen in waterOxygens:
relations.addNode(oxygen, ResType.WATER)
for pair in waterHydroOxyPairs:
oxygen, hydrogen, _ = pair
relations[oxygen].hydrogens.append(hydrogen)
proteinHydrophilic = consideredAtoms.select(
'({0}) and name "{1}" and within {2} of water'.format(considered_atoms_sel,
getElementsRegex(set(donors+acceptors)), distWR))
proteinHydrogens = consideredAtoms.select('({0}) and hydrogen'.format(considered_atoms_sel)) or []
proteinHydroPairs = findNeighbors(
proteinHydrophilic, DIST_COVALENT_H, proteinHydrogens) if proteinHydrogens else []
for hydrophilic in proteinHydrophilic:
if hydrophilic is not None:
if hydrophilic.getFlag('ion'):
relations.addNode(hydrophilic, ResType.ION)
else:
relations.addNode(hydrophilic, ResType.PROTEIN)
for pair in proteinHydroPairs:
hydrophilic, hydrogen, _ = pair
relations[hydrophilic].hydrogens.append(hydrogen)
contactingWaters = findNeighbors(waterOxygens, distDA)
contactingWaterProtein = findNeighbors(
waterOxygens, distDA, proteinHydrophilic)
contactingWaterNodes = list(
map(lambda ww: (relations[ww[0]], relations[ww[1]]), contactingWaters))
contactingWaterProteinNodes = list(
map(lambda wp: (relations[wp[0]], relations[wp[1]]), contactingWaterProtein))
constraints = HBondConstraints(acceptors, donors)
if len(waterHydrogens) + len(proteinHydrogens):
constraints.addAngles(angleWW, anglePAWD, anglePDWA)
else:
LOGGER.info('No hydrogens detected, angle criteria will not be used.')
constraints2 = HBondConstraints(acceptors, donors) # not taking angles
for pair in contactingWaterNodes + contactingWaterProteinNodes:
for a, b in [(0, 1), (1, 0)]:
if HydrogenBond.checkIsHBond(pair[a], pair[b], constraints):
newHBond = HydrogenBond(pair[a].atom, pair[b].atom)
relations.addHBond(newHBond)
elif CoordinationBond.checkIsCoorBond(pair[a], pair[b], constraints2):
newBond = CoordinationBond(pair[a].atom, pair[b].atom)
relations.addHBond(newBond)
relations.removeUnbonded()
waterBridgesWithIndices = calcBridges(
relations, proteinHydrophilic, method, maxDepth, maxNumResidues)
if method == 'chain':
waterBridgesWithIndices = getUniqueElements(
waterBridgesWithIndices, getChainBridgeTuple)
log_string = '{0} water bridges detected using method {1}'.format(
len(waterBridgesWithIndices), method)
if prefix != '':
log_string += ' for ' + prefix
LOGGER.info(log_string)
if method == 'atomic':
LOGGER.info('Call getInfoOutput to convert atomic to info output.')
atomicOutput = getAtomicOutput(waterBridgesWithIndices, relations)
infoOutput = getInfoOutput(atomicOutput)
if isInfoLog:
for bridge in infoOutput:
LOGGER.info(' '.join(map(str, bridge)))
if outputType == 'info':
return infoOutput
if outputType == 'atomic':
return atomicOutput
return waterBridgesWithIndices
# took from interactions.py
[docs]
def calcWaterBridgesTrajectory(atoms, trajectory, **kwargs):
"""Computes water bridges for a given trajectory. Kwargs for options are the same as in calcWaterBridges.
:arg atoms: Atomic object from which atoms are considered
:type atoms: :class:`.Atomic`
:arg trajectory: Trajectory data coming from a DCD, ensemble or multi-model PDB file.
:type trajectory: :class:`.Trajectory', :class:`.Ensemble`, :class:`.Atomic`
:arg start_frame: frame to start from
:type start_frame: int
:arg stop_frame: frame to stop
:type stop_frame: int
:arg max_proc: maximum number of processes to use
default is half of the number of CPUs
:type max_proc: int
:arg selstr: selection string for focusing analysis
default of **None** focuses on everything
:type selstr: str
:arg expand_selection: whether to expand the selection with
:func:`.selectSurroundingsBox`, selecting a box surrounding it.
Default is **False**
:type expand_selection: bool
If selstr is provided, a common selection will be found across all frames
combining selections satifying the criteria in each.
:arg return_selection: whether to return the combined common selection
Default is **False** to keep expected behaviour.
However, this output is required when using selstr.
:type return_selection: bool
:arg considered_atoms_sel: selection string for which atoms to consider
Default is **"protein"**
:type considered_atoms_sel: str
"""
start_frame = kwargs.pop('start_frame', 0)
stop_frame = kwargs.pop('stop_frame', -1)
max_proc = kwargs.pop('max_proc', mp.cpu_count()//2)
selstr = kwargs.pop('selstr', None)
expand_selection = kwargs.pop('expand_selection', False)
return_selection = kwargs.pop('return_selection', False)
padding = kwargs.pop('padding', 0)
indices = None
if trajectory is not None:
if isinstance(trajectory, Atomic):
trajectory = Ensemble(trajectory)
# nfi = trajectory._nfi
# trajectory.reset()
# numFrames = trajectory._n_csets
if stop_frame == -1:
traj = trajectory[start_frame:]
else:
traj = trajectory[start_frame:stop_frame+1]
atoms_copy = atoms.copy()
indices = None
if selstr is not None:
selection, indices = findCommonSelectionTraj(atoms, traj, selstr,
expand_selection=expand_selection,
return_selection=False,
padding=padding)
LOGGER.info('Common selection found with {0} atoms and {1} protein chains'.format(
selection.numAtoms(), len(list(selection.protein.getHierView()))))
def analyseFrame(j0, start_frame, frame0, interactions_all):
LOGGER.info('Frame: {0}'.format(j0))
atoms_copy = atoms.copy()
atoms_copy.setCoords(frame0.getCoords())
if indices is not None:
atoms_copy = atoms_copy[indices]
kwargs['selstr'] = atoms_copy.getSelstr()
interactions = calcWaterBridges(
atoms_copy, isInfoLog=False,
prefix='frame {0}'.format(j0),
**kwargs)
interactions_all[j0-start_frame] = interactions
if max_proc == 1:
interactions_all = []
for j0, frame0 in enumerate(traj, start=start_frame):
interactions_all.append([])
analyseFrame(j0, start_frame, frame0, interactions_all)
else:
with mp.Manager() as manager:
interactions_all = manager.list()
for j0, frame0 in enumerate(traj, start=start_frame):
interactions_all.append([])
j0 = start_frame
while j0 < traj.numConfs()+start_frame:
processes = []
for _ in range(max_proc):
frame0 = traj[j0-start_frame]
p = mp.Process(target=analyseFrame, args=(j0, start_frame,
frame0,
interactions_all))
p.start()
processes.append(p)
j0 += 1
if j0 >= traj.numConfs()+start_frame:
break
for p in processes:
p.join()
interactions_all = interactions_all[:]
# trajectory._nfi = nfi
else:
if atoms.numCoordsets() > 1:
def analyseFrame(i, interactions_all):
frameNum = i+start_frame
LOGGER.info('Model: {0}'.format(frameNum))
atoms.setACSIndex(i+start_frame)
interactions = calcWaterBridges(
atoms, isInfoLog=False, prefix='frame {0}'.format(frameNum),
**kwargs)
interactions_all[i] = interactions
if max_proc == 1:
interactions_all = []
for i in range(len(atoms.getCoordsets()[start_frame:stop_frame])):
interactions_all.append([])
analyseFrame(i, interactions_all)
else:
with mp.Manager() as manager:
interactions_all = manager.list()
for i in range(len(atoms.getCoordsets()[start_frame:stop_frame])):
interactions_all.append([])
i = start_frame
while i < len(atoms.getCoordsets()[start_frame:stop_frame]):
processes = []
for _ in range(max_proc):
p = mp.Process(target=analyseFrame, args=(i, interactions_all))
p.start()
processes.append(p)
i += 1
if i >= len(atoms.getCoordsets()[start_frame:stop_frame]):
break
for p in processes:
p.join()
interactions_all = interactions_all[:]
else:
LOGGER.info('Include trajectory or use multi-model PDB file.')
if return_selection:
if indices is not None:
atoms_copy = atoms_copy[indices]
kwargs['selstr'] = atoms_copy.getSelstr()
return interactions_all, atoms_copy
return interactions_all
def getResidueName(atom, use_segname=False):
result = f'{atom.getResname()}{atom.getResnum()}{atom.getChid()}'
if use_segname:
result += f'{atom.getSegname()}'
return result
class DictionaryList:
def __init__(self, initialValue):
self.values = {}
self.initialValue = initialValue
def __getitem__(self, key):
if key in self.values:
return self.values[key]
return copy(self.initialValue)
def __setitem__(self, key, value):
self.values[key] = value
def keys(self):
return self.values.keys()
def removeDuplicateKeys(self, criterion):
keysCopy = list(self.values.keys())
for key in keysCopy:
if criterion(self.values.keys(), key):
del self.values[key]
def getResInfo(atoms, **kwargs):
considered_atoms_sel = kwargs.pop('considered_atoms_sel', "protein")
dict = {}
nums = atoms.select(considered_atoms_sel).getResnums()
names = atoms.select(considered_atoms_sel).getResnames()
chids = atoms.select(considered_atoms_sel).getChids()
for i, num in enumerate(nums):
dict[num] = "{0}{1}{2}".format(names[i], num, chids[i])
return dict
[docs]
def getWaterBridgeStatInfo(stats, atoms, **kwargs):
"""Converts calcWaterBridgesStatistic indices output to info output from stat.
:arg stats: statistics returned by calcWaterBridgesStatistics, output='indices'
:type stats: dictionary
:arg atoms: Atomic object from which atoms are considered
:type atoms: :class:`.Atomic`
"""
residueInfo = getResInfo(atoms, **kwargs)
infoOutput = {}
for key, value in stats.items():
x_id, y_id = key
x_info, y_info = residueInfo[x_id], residueInfo[y_id]
newKey = (x_info, y_info)
infoOutput[newKey] = value
return infoOutput
[docs]
def calcWaterBridgesStatistics(frames, trajectory, **kwargs):
"""Returns statistics. Value is percentage of bridge appearance of frames for each residue.
:arg frames: list of water bridges from calcWaterBridgesTrajectory(), output='atomic'
:type frames: list
:arg output: return dictorinary whose keys are tuples of resnames or resids
default is 'indices'
:type output: 'info' | 'indices'
:arg filename: name of file to save statistic information if wanted
default is None
:type filename: string
:arg considered_atoms_sel: selection string for which atoms to consider
Default is **"protein"**
:type considered_atoms_sel: str
"""
output = kwargs.pop('output', 'indices')
filename = kwargs.pop('filename', None)
if output not in ['info', 'indices']:
raise TypeError('Output should be info or indices!')
considered_atoms_sel = kwargs.pop('considered_atoms_sel', None)
if considered_atoms_sel is not None:
trajectory.select(considered_atoms_sel)
allCoordinates = trajectory.getCoordsets()
interactionCount = DictionaryList(0)
distances = DictionaryList([])
resNames = {}
for frameIndex, frame in enumerate(frames):
frameCombinations = []
for bridge in frame:
proteinAtoms = bridge.proteins
for atom_1, atom_2 in combinations(proteinAtoms, r=2):
ind_1, ind_2 = atom_1.getIndex(), atom_2.getIndex()
res_1, res_2 = atom_1.getResnum(), atom_2.getResnum()
if res_1 == res_2:
continue
if (res_1, res_2) not in frameCombinations:
interactionCount[(res_1, res_2)] += 1
interactionCount[(res_2, res_1)] += 1
frameCombinations += [(res_1, res_2), (res_2, res_1)]
coords = allCoordinates[frameIndex]
atom_1_coords, atom_2_coords = coords[ind_1], coords[ind_2]
distance = calcDistance(atom_1_coords, atom_2_coords)
distances[(res_1, res_2)] += [distance]
distances[(res_2, res_1)] += [distance]
res_1_name, res_2_name = getResidueName(
atom_1), getResidueName(atom_2)
resNames[res_1] = res_1_name
resNames[res_2] = res_2_name
interactionCount.removeDuplicateKeys(
lambda keys, key: (key[1], key[0]) in keys)
tableHeader = '{0:<15}{1:<15}{2:<10}{3:<10}{4:<10}'.format(
"RES1", "RES2", "PERC", "DIST_AVG", "DIST_STD")
LOGGER.info(tableHeader)
info = {}
file = open(filename, 'w') if filename else None
if file:
file.write(tableHeader + '\n')
for key in interactionCount.keys():
percentage = 100 * interactionCount[key]/len(frames)
distAvg = np.average(distances[key])
distStd = np.std(distances[key])
pairInfo = {"percentage": percentage,
"distAvg": distAvg, "distStd": distStd}
outputKey = key
x, y = key
if output == 'info':
outputKey = (resNames[x], resNames[y])
info[outputKey] = pairInfo
elif output == 'indices':
key1, key2 = (x, y), (y, x)
info[key1], info[key2] = pairInfo, pairInfo
tableRow = '{0:<15}{1:<15}{2:<10.3f}{3:<10.3f}{4:<10.3f}'.format(
resNames[x], resNames[y], percentage, distAvg, distStd)
LOGGER.info(tableRow)
if file:
file.write(tableRow + '\n')
if file:
file.close()
return info
[docs]
def calcWaterBridgeMatrix(data, metric):
"""Returns matrix which has metric as value and residue ids as ax indices.
:arg data: dictionary returned by calcWaterBridgesStatistics, output='indices'
:type data: dict
:arg metric: dict key from data
:type metric: 'percentage' | 'distAvg' | 'distStd'
"""
maxResnum = max(max(key) for key in data.keys()) + 1
resMatrix = np.zeros((maxResnum, maxResnum), dtype=float)
for key, value in data.items():
resMatrix[key] = value[metric]
return resMatrix
[docs]
def showWaterBridgeMatrix(data, metric):
"""Shows matrix which has percentage/avg distance as value and residue ids as ax indices.
:arg data: dictionary returned by calcWaterBridgesStatistics, output='indices'
:type data: dict
:arg metric: dict key from data
:type metric: 'percentage' | 'distAvg' | 'distStd'
"""
import matplotlib.pyplot as plt
matrix = calcWaterBridgeMatrix(data, metric)
titles = {
'percentage': 'Interaction percentage',
'distAvg': 'Average distance',
'distStd': 'Distance standard deviation'
}
showMatrix(matrix)
plt.title(titles[metric])
def reduceTo1D(list, elementSel=lambda x: x, sublistSel=lambda x: x):
return [elementSel(element) for sublist in list for element in sublistSel(sublist)]
def mofifyBeta(bridgeFrames, atoms):
residueOccurances = {}
for frame in bridgeFrames:
frameResnums = set(reduceTo1D(
frame, lambda a: a.getResnum(), lambda b: b.proteins))
for res in frameResnums:
residueOccurances[res] = residueOccurances.get(res, 0) + 1
atoms.setBetas(0)
for resnum, value in residueOccurances.items():
residueAtoms = atoms.select(
'resnum {0}'.format(resnum))
beta = value/len(bridgeFrames)
residueAtoms.setBetas(beta)
[docs]
def calcBridgingResiduesHistogram(frames, **kwargs):
"""Calculates, plots and returns number of frames that each residue is involved in making water bridges, sorted by value.
:arg frames: list of water bridges from calcWaterBridgesTrajectory(), output='atomic'
:type frames: list
:arg clip: maximal number of residues on graph; to represent all set None
default is 20
:type clip: int
:arg use_segname: whether to use segname to label residues
default is False, because then the labels get long
:type use_segname: bool
"""
show_plot = kwargs.pop('show_plot', False)
use_segname = kwargs.get('use_segname', False)
clip = kwargs.pop('clip', 20)
if clip == None:
clip = 0
residuesWithCount = {}
for frame in frames:
frameResidues = set(reduceTo1D(
frame, lambda x: getResidueName(x, use_segname=use_segname),
lambda wb: wb.proteins))
for res in frameResidues:
residuesWithCount[res] = residuesWithCount.get(res, 0) + 1
sortedResidues = sorted(residuesWithCount.items(),
key=lambda r: r[1])
labels, values = zip(*sortedResidues[-clip:])
if show_plot:
import matplotlib.pyplot as plt
plt.figure(figsize=(5, 3 + 0.11 * len(labels)))
plt.barh(labels, values)
plt.xlabel('Number of frame appearances')
plt.ylabel('Residue')
plt.title('Water bridging residues')
plt.tight_layout()
plt.margins(y=0.01)
plt.gca().xaxis.set_label_position('top')
plt.gca().xaxis.tick_top()
if SETTINGS['auto_show']:
showFigure()
return sortedResidues
def showBridgingResiduesHistogram(frames, **kwargs):
kwargs['show_plot'] = True
return calcBridgingResiduesHistogram(frames, **kwargs)
def getBridgingResidues(frames, residue):
residuesWithCount = {}
for frame in frames:
frameResidues = []
for bridge in frame:
resNames = list(map(getResidueName, bridge.proteins))
if residue in resNames:
frameResidues += resNames
if not frameResidues:
continue
frameResidues = set(frameResidues)
frameResidues.remove(residue)
for res in frameResidues:
residuesWithCount[res] = residuesWithCount.get(res, 0) + 1
sortedResidues = sorted(residuesWithCount.items(),
key=lambda r: r[1], reverse=True)
return sortedResidues
def getWaterCountDistribution(frames, res_a, res_b):
waters = []
for frame in frames:
for bridge in frame:
resNames = list(map(getResidueName, bridge.proteins))
if not (res_a in resNames and res_b in resNames):
continue
waterInvolvedCount = len(bridge.waters)
waters.append(waterInvolvedCount)
return waters
def getDistanceDistribution(frames, res_a, res_b, trajectory):
distances = []
allCoordinates = trajectory.getCoordsets()
for frameIndex, frame in enumerate(frames):
for bridge in frame:
resNames = list(map(getResidueName, bridge.proteins))
if not (res_a in resNames and res_b in resNames):
continue
for atom_1, atom_2 in combinations(bridge.proteins, r=2):
coords = allCoordinates[frameIndex]
atom_1_coords, atom_2_coords = coords[atom_1.getIndex(
)], coords[atom_2.getIndex()]
dist = calcDistance(atom_1_coords, atom_2_coords)
distances.append(dist)
return distances
def getResidueLocationDistribution(frames, res_a, res_b):
locationInfo = {"backbone": 0, "side": 0}
result = {res_a: locationInfo.copy(), res_b: locationInfo.copy()}
for frame in frames:
for bridge in frame:
atoms_a = filter(lambda a: getResidueName(a)
== res_a, bridge.proteins)
atoms_b = filter(lambda a: getResidueName(a)
== res_b, bridge.proteins)
if not atoms_a or not atoms_b:
continue
def atomType(a): return 'backbone' if a.getName() in [
'CA', 'C', 'N', 'O'] else 'side'
for atom in atoms_a:
result[res_a][atomType(atom)] += 1
for atom in atoms_b:
result[res_b][atomType(atom)] += 1
return result
[docs]
def calcWaterBridgesDistribution(frames, res_a, res_b=None, **kwargs):
"""Returns distribution for certain metric and plots if possible.
:arg res_a: name of first residue
:type frames: str
:arg res_b: name of second residue
default is None
:type frames: str
:arg metric: 'residues' returns names and frame count of residues interacting with res_a,
'waters' returns water count for each bridge between res_a and res_b
'distance' returns distance between each pair of protein atoms involved in bridge between res_a and res_b
'location' returns dictionary with backbone/sidechain count information
:type metric: 'residues' | 'waters' | 'distance' | 'location'
:trajectory: DCD file - necessary for distance distribution
:arg output: return 2D matrices or dictionary where key is residue info
default is 'dict'
:type output: 'dict' | 'indices'
"""
show_plot = kwargs.pop('show_plot', False)
metric = kwargs.pop('metric', 'residues')
trajectory = kwargs.pop('trajectory', None)
if metric == 'distance' and not trajectory:
raise TypeError(
'Distance distribution measurement needs trajectory argument!')
methods = {
'residues': lambda: getBridgingResidues(frames, res_a),
'waters': lambda: getWaterCountDistribution(frames, res_a, res_b),
'distance': lambda: getDistanceDistribution(frames, res_a, res_b, trajectory),
'location': lambda: getResidueLocationDistribution(frames, res_a, res_b)
}
result = methods[metric]()
if metric in ['waters', 'distance'] and show_plot:
import matplotlib.pyplot as plt
plt.hist(result, rwidth=0.95, density=True)
plt.xlabel('Value')
plt.ylabel('Probability')
plt.title('Distribution: {0}'.format(metric))
if SETTINGS['auto_show']:
showFigure()
return result
def showWaterBridgesDistribution(frames, res_a, res_b=None, **kwargs):
kwargs['show_plot'] = True
return calcWaterBridgesDistribution(frames, res_a, res_b, **kwargs)
[docs]
def savePDBWaterBridges(bridges, atoms, filename):
"""Saves single PDB with occupancy on protein atoms and waters involved bridges.
:arg bridges: atomic output from calcWaterBridges
:type bridges: list
:arg atoms: Atomic object from which atoms are considered
:type atoms: :class:`.Atomic`
:arg filename: name of file to be saved
:type filename: string
"""
atoms = atoms.copy()
mofifyBeta([bridges], atoms)
atoms.setOccupancies(0)
atoms.select('beta = 1').setOccupancies(1)
proteinAtoms = atoms.select('protein')
waterOxygens = reduceTo1D(
bridges, lambda w: w.getIndex(), lambda b: b.waters)
waterAtoms = atoms.select(
'same residue as water within 1.6 of index {0}'.format(
" ".join(map(str, waterOxygens))))
atomsToSave = proteinAtoms.toAtomGroup() + waterAtoms.toAtomGroup()
return writePDB(filename, atomsToSave)
[docs]
def savePDBWaterBridgesTrajectory(bridgeFrames, atoms, filename, trajectory=None, max_proc=1):
"""Saves one PDB per frame with occupancy and beta on protein atoms and waters forming bridges in frame.
:arg bridgeFrames: atomic output from calcWaterBridgesTrajectory
:type bridgeFrames: list
:arg atoms: Atomic object from which atoms are considered
:type atoms: :class:`.Atomic`
:arg filename: name of file to be saved; must end in .pdb
:type filename: string
:arg trajectory: trajectory data (not needed for multi-model PDB)
:type trajectory: :class:`.Trajectory', :class:`.Ensemble`, :class:`.Atomic`
"""
if not trajectory and atoms.numCoordsets() < len(bridgeFrames):
raise TypeError('Provide parsed trajectory!')
filename = filename[:filename.rfind('.')]
atoms = atoms.copy()
mofifyBeta(bridgeFrames, atoms)
def saveBridgesFrame(trajectory, atoms, frameIndex, frame):
LOGGER.info('Frame: {0}'.format(frameIndex))
if trajectory:
coords = trajectory[frameIndex].getCoords()
atoms.setCoords(coords)
else:
atoms.setACSIndex(frameIndex)
waterAtoms = reduceTo1D(frame, sublistSel=lambda b: b.waters)
waterResidues = atoms.select(
'same residue as water within 1.6 of index {0}'.format(
" ".join(map(lambda a: str(a.getIndex()), waterAtoms))))
bridgeProteinAtoms = reduceTo1D(
frame, lambda p: p.getResnum(), lambda b: b.proteins)
atoms.setOccupancies(0)
atoms.select('resid {0}'.format(
" ".join(map(str, bridgeProteinAtoms)))).setOccupancies(1)
atomsToSave = atoms.select(
'protein').toAtomGroup() + waterResidues.toAtomGroup()
if trajectory:
writePDB('{0}_{1}.pdb'.format(filename, frameIndex),
atomsToSave)
else:
writePDB('{0}_{1}.pdb'.format(filename, frameIndex),
atomsToSave, csets=frameIndex)
if max_proc == 1:
for frameIndex, frame in enumerate(bridgeFrames):
saveBridgesFrame(trajectory, atoms, frameIndex, frame)
else:
frameIndex = 0
numFrames = len(bridgeFrames)
while frameIndex < numFrames:
processes = []
for _ in range(max_proc):
p = mp.Process(target=saveBridgesFrame, args=(trajectory, atoms, frameIndex,
bridgeFrames[frameIndex]))
p.start()
processes.append(p)
frameIndex += 1
if frameIndex >= numFrames:
break
for p in processes:
p.join()
def getBridgeIndicesString(bridge):
return ' '.join(map(lambda a: str(a.getIndex()), bridge.proteins))\
+ '|'\
+ ' '.join(map(lambda a: str(a.getIndex()), bridge.waters))
[docs]
def saveWaterBridges(atomicBridges, filename):
"""Save water bridges as information (.txt) or WaterBridges (.wb) parsable file.
:arg atomicBridges: atomic output from calcWaterBridges/Trajectory
:type atomicBridges: list
:arg filename: path where file should be saved
:type filename: string
"""
isSingleBridge = isinstance(atomicBridges[0], AtomicOutput)
isInfoOutput = filename.split('.')[-1] == 'txt'
file = open(filename, 'w')
if isSingleBridge:
atomicBridges = [atomicBridges]
if isInfoOutput:
info = getWaterBridgesInfoOutput(atomicBridges)
for frameIndex, frame in enumerate(info):
file.write('FRAME {0}\n'.format(frameIndex))
for bridge in frame:
file.write(' '.join(map(str, bridge)) + '\n')
return file.close()
for frame in atomicBridges:
for bridge in frame:
line = getBridgeIndicesString(bridge)
file.write(line + '\n')
file.write('ENDFRAME\n')
return file.close()
def getBridgeFromIndices(proteinIndices, waterIndices, atoms):
proteins = list(map(lambda i: atoms[i], proteinIndices))
waters = list(map(lambda i: atoms[i], waterIndices))
atomicBridge = AtomicOutput(proteins, waters)
return atomicBridge
[docs]
def parseWaterBridges(filename, atoms):
"""Parse water bridges from .wb file saved by saveWaterBridges, returns atomic type.
:arg filename: path of file where bridges are stored
:type filename: string
:arg atoms: Atomic object on which calcWaterBridges was performed
:type atoms: :class:`.Atomic`
"""
bridgesFrames = []
currentBridges = []
with open(filename, 'r') as file:
for line in file:
line = line.strip()
if line == 'ENDFRAME':
bridgesFrames.append(currentBridges)
currentBridges = []
continue
proteinString, waterString = line.split('|')
proteinIndices = map(int, proteinString.split())
waterIndices = map(int, waterString.split())
bridge = getBridgeFromIndices(proteinIndices, waterIndices, atoms)
currentBridges.append(bridge)
if len(bridgesFrames) == 1:
bridgesFrames = bridgesFrames[0]
return bridgesFrames
[docs]
def findClusterCenters(file_pattern, **kwargs):
""" Find molecules that are forming cluster in 3D space.
:arg file_pattern: file pattern for anlaysis
it can include '*'
example:'file_*.pdb' will analyze file_1.pdb, file_2.pdb, etc.
:type file_pattern: str
:arg selection: selection string
by default 'water and name "O.*"' is used
:type selection: str
:arg distC: distance to other molecules
:type distC: int, float
default is 0.3
:arg numC: min number of molecules in a cluster
default is 3
:type numC: int
:arg filename: filename for output pdb file with clusters
Default of **None** leads to
'clusters_'+file_pattern.split("*")[0]+'.pdb'
:type filename: str
"""
import glob
import numpy as np
selection = kwargs.pop('selection', 'water and name "O.*"')
distC = kwargs.pop('distC', 0.3)
numC = kwargs.pop('numC', 3)
filename = kwargs.pop('filename', None)
matching_files = glob.glob(file_pattern)
matching_files.sort()
coords_all = parsePDB(matching_files[0]).select(selection).toAtomGroup()
for i in matching_files[1:]:
coords = parsePDB(i).select(selection).toAtomGroup()
coords_all += coords
removeResid = []
removeCoords = []
for ii in range(len(coords_all)):
if 'water' in selection.split() or np.any([water in selection.split() for water in DEFAULTS['water']]):
sel = coords_all.select('water within '+str(distC)+' of center',
center=coords_all.getCoords()[ii])
else:
sel = coords_all.select(str(selection)+' within '+str(distC)+' of center',
center=coords_all.getCoords()[ii])
if sel is not None and len(sel) <= int(numC):
removeResid.append(coords_all.getResnums()[ii])
removeCoords.append(list(coords_all.getCoords()[ii]))
if len(removeCoords) == coords_all.numAtoms():
raise ValueError('No waters were selected. You may need to align your trajectory \
or change default parameters for detecting water clusters (increase distC \
or decrease numC)')
selectedWaters = AtomGroup()
sel_waters = []
for j in coords_all.getCoordsets()[0].tolist():
if j not in removeCoords:
sel_waters.append(j)
coords_wat = np.array([sel_waters], dtype=float)
selectedWaters.setCoords(coords_wat)
selectedWaters.setNames(['DUM']*len(selectedWaters))
selectedWaters.setResnums(range(1, len(selectedWaters)+1))
selectedWaters.setResnames(['DUM']*len(selectedWaters))
if filename is None:
try:
filename = 'clusters_'+file_pattern.split("*")[0]+'.pdb'
except:
filename = 'clusters.pdb'
writePDB(filename, selectedWaters)
LOGGER.info("Results are saved in {0}.".format(filename))
[docs]
def filterStructuresWithoutWater(structures, min_water=0, filenames=None):
"""This function will filter out structures from *structures* that have no water
or fewer water molecules than *min_water*.
:arg structures: list of :class:`.Atomic` structures to be filtered
:type structures: list
:arg min_water: minimum number of water O atoms,
default is 0
:type min_water: int
:arg filenames: an optional list of filenames to filter too
This is an output argument
:type filenames: list
"""
if not isinstance(structures, list):
raise TypeError('structures should be a list')
if not np.all([isinstance(struct, Atomic) for struct in structures]):
raise ValueError('elements of structures should be Atomic objects')
if not isinstance(min_water, int):
raise TypeError('min_water should be an integer')
if filenames is None: filenames = []
if not isinstance(filenames, list):
raise TypeError('filenames should be None or a list')
if len(filenames) not in [0, len(structures)]:
raise TypeError('filenames should have the same length as structures')
if not np.all([isinstance(filename, str) for filename in filenames]):
raise ValueError('elements of filenames should be strings')
if not np.all([os.path.exists(filename) for filename in filenames]):
raise ValueError('at least one of the filenames does not exist')
have_filenames = len(filenames)>0
new_structures = []
numStructures = len(structures)
for i, struct in enumerate(reversed(structures)):
title = struct.getTitle()
waters = struct.select('water and name O')
if waters == None:
LOGGER.warn(title+" doesn't contain water molecules")
if have_filenames:
filenames.pop(numStructures-i-1)
continue
numWaters = waters.numAtoms()
if numWaters < min_water:
LOGGER.warn(title+" doesn't contain enough water molecules ({0})".format(numWaters))
if have_filenames:
filenames.pop(numStructures-i-1)
continue
new_structures.append(struct)
return list(reversed(new_structures))
[docs]
def selectSurroundingsBox(atoms, select, **kwargs):
"""Select the surroundings of *select* within *atoms* using
a bounding box with optional *padding*.
:arg return_selstr: whether to return the final selstr
Default False
:type return_selstr: bool
"""
if not isinstance(atoms, Atomic):
raise TypeError('atoms should be an Atomic object')
if isinstance(select, str):
select = atoms.select(select)
if not isinstance(select, Atomic):
raise TypeError('select should be a valid selection or selection string')
padding = kwargs.get('padding', 0)
if not isinstance(padding, Number):
raise TypeError('padding should be a number')
if padding < 0:
raise ValueError('padding should be a positive number')
return_selstr = kwargs.get('return_selstr', False)
if not isinstance(return_selstr, bool):
raise TypeError('return_selstr should be a bool')
minCoords = select.getCoords().min(axis=0)
maxCoords = select.getCoords().max(axis=0)
if padding > 0:
minCoords -= padding
maxCoords += padding
selstr = 'same residue as ((x `{0} to {1}`) and (y `{2} to {3}`) and (z `{4} to {5}`))'.format(
minCoords[0], maxCoords[0], minCoords[1], maxCoords[1], minCoords[2], maxCoords[2])
if return_selstr:
return selstr
return atoms.select(selstr)
[docs]
def findCommonSelectionTraj(atoms, traj, selstr, **kwargs):
"""Select *selstr* within *atoms* for each frame in *traj*
using a bounding box with optional *padding*.
:arg expand_selection: whether to expand selections with
:meth:`.selectSurroundingsBox`. Default False
:type expand_selection: bool
Returns the common selection and corresponding indices and
optionally the corresponding selstr if *return_selstr* is **True**
"""
if not isinstance(atoms, Atomic):
raise TypeError('atoms should be an Atomic object')
if not isinstance(selstr, str):
raise TypeError('selstr should be a string')
expand_selection = kwargs.get('expand_selection', False)
if not isinstance(expand_selection, bool):
raise TypeError('expand_selection should be a bool')
return_selstr = kwargs.pop('return_selstr', False) # not passed on
if not isinstance(return_selstr, bool):
raise TypeError('return_selstr should be a bool')
indices = None
if selstr is not None:
LOGGER.info('Finding common selection')
indices = []
for frame0 in traj:
atoms_copy = atoms.copy()
atoms_copy.setCoords(frame0.getCoords())
selection = atoms_copy.select(selstr)
if expand_selection:
selection = selectSurroundingsBox(atoms_copy, selection, **kwargs)
indices.extend(list(selection.getIndices()))
indices = np.unique(indices)
selection = atoms_copy[indices]
if return_selstr:
return selection, indices, selstr
return selection, indices