Commits (19)
__version__ = '2.1.0'
__version__ = '2.1.6'
......@@ -31,13 +31,15 @@ from datetime import datetime
import sympy as sym
import pylab as pl
import copy
import logging
from tqdm import tqdm
from .scripttime import ScriptTime
from .spectrum import Spectrum, weighted_average
from . import mass_dictionaries # import mass dictionaries
from itertools import combinations_with_replacement as cwr
from IsoSpecPy import IsoThreshold
logger = logging.getLogger(__name__)
# flag for reminding folk to cite people
_CITATION_REMINDER = False
......@@ -67,9 +69,6 @@ mass_dict = getattr(
MASS_KEY,
)
st = ScriptTime(profile=True)
# valid start and end brackets
OPENING_BRACKETS = ['(', '{', '['] # opening brackets
CLOSING_BRACKETS = [')', '}', ']'] # closing brackets
......@@ -473,7 +472,7 @@ def bar_isotope_pattern(
if method not in VALID_GROUP_METHODS:
raise ValueError(f'The grouping method {method} is invalid. Choose from {", ".join(VALID_GROUP_METHODS)}')
if verbose is True:
sys.stdout.write('Generating bar isotope pattern')
logger.info('generating bar isotope pattern')
if isinstance(rawip, Spectrum): # if handed a Spectrum object, trim before continuing
rawip = rawip.trim()
groupedip = group_masses(rawip, delta / 2)
......@@ -490,7 +489,7 @@ def bar_isotope_pattern(
out[0][ind] = out[0][ind] # / abs(charge)
out[1][ind] = val / maxint * 100. # normalize to 100
if verbose is True:
sys.stdout.write(' DONE\n')
logger.info('bar isotope pattern generation complete')
return out
......@@ -555,6 +554,7 @@ def gaussian_isotope_pattern(
``[[m/z values],[intensity values]]``
:rtype: list
"""
logger.info('generating gaussian isotope pattern')
spec = Spectrum( # generate Spectrum object to encompass the entire region
autodec(fwhm),
start=min(barip[0]) - fwhm * 2,
......@@ -562,15 +562,14 @@ def gaussian_isotope_pattern(
empty=False, # whether or not to use emptyspec
filler=0., # fill with zeros, not None
)
for ind, val in enumerate(barip[0]): # generate normal distributions for each peak
# if verbose is True:
# sys.stdout.write('\rSumming m/z %.3f %d/%d' %(val,ind+1,len(self.barip[0])))
msg = 'calculating gaussian isotope pattern'
# generate normal distributions for each peak
for ind, val in enumerate(tqdm(barip[0], desc=msg, total=len(barip[0]), disable=not verbose)):
nd = normal_distribution(val, fwhm, barip[1][ind]) # generate normal distribution for that peak
spec.add_spectrum(nd[0], nd[1]) # add the generated spectrum to the total spectrum
spec.normalize() # normalize
gausip = spec.trim() # trim None values and output
if verbose is True:
sys.stdout.write(' DONE\n')
logger.info('gaussian isotope pattern generation complete')
return gausip
......@@ -603,6 +602,7 @@ def isotope_pattern_hybrid(
:return: isotope pattern as a Spectrum object
:rtype: Spectrum
"""
logger.info('generating hybrid isotope pattern')
eleips = {} # dictionary for storing the isotope patterns of each element
for element, number in composition.items():
eleips[element] = isotope_pattern_combinatoric( # calculate the isotope pattern for each element
......@@ -644,6 +644,7 @@ def isotope_pattern_hybrid(
threshold,
3 * 10 ** -consolidate
)
logger.info('hybrid isotope pattern generation complete')
return spec
......@@ -657,7 +658,6 @@ class ReiterableCWR(object):
return cwr(self.isos, self.number)
@st.profilefn
def num_permu(lst, isos):
"""
Calculates the number of unique permutations of the given set of isotopes for an element.
......@@ -677,7 +677,6 @@ def num_permu(lst, isos):
return int((num / denom).evalf()) # divide, evaluate, and return integer
@st.profilefn
def product(*iterables):
"""
cartesian product of iterables
......@@ -692,7 +691,6 @@ def product(*iterables):
yield (item,) + items
@st.profilefn
def numberofcwr(n, k):
"""
calculates the number of combinations with repitition
......@@ -713,7 +711,6 @@ def cpu_list_product(iterable):
return prod
@st.profilefn
def isotope_pattern_combinatoric(
comp: dict,
decpl: int,
......@@ -730,6 +727,7 @@ def isotope_pattern_combinatoric(
:return: raw isotope pattern as a Spectrum object
:rtype: Spectrum
"""
logger.info('generating combinatoric isotope pattern')
speciso = False # set state for specific isotope
isos = {} # isotopes dictionary
isosets = {} # set of isotopes for each element
......@@ -790,10 +788,10 @@ def isotope_pattern_combinatoric(
ele, iso = string_to_isotope(element) # determine element and isotope
spec.shift_x(mass_dict[ele][iso][0] * comp[element]) # shift the x values by the isotopic mass
spec.normalize() # normalize the spectrum object
logger.info('combinatoric isotope pattern generation complete')
return spec
@st.profilefn
def isotope_pattern_multiplicative(
comp: dict,
decpl: int,
......@@ -823,8 +821,7 @@ def isotope_pattern_multiplicative(
:rtype: Spectrum
"""
spec = None # initial state of spec
if verbose is True:
sys.stdout.write('Generating raw isotope pattern.\n')
logger.info('generating multiplicative isotope pattern')
for key in comp: # for each element
if key in mass_dict: # if not a single isotope
......@@ -877,8 +874,7 @@ def isotope_pattern_multiplicative(
# todo add tqdm progress bar
spec.shift_x(mass_dict[ele][iso][0]) # offset spectrum object by the mass of that
spec.normalize()
if verbose is True:
sys.stdout.write('DONE\n')
logger.info('multiplicative isotope pattern generation complete')
return spec
......@@ -899,6 +895,7 @@ def isotope_pattern_isospec(
:param kwargs: catch for extra kwargs
:return: Spectrum object
"""
logger.info('retrieving isotope pattern from isospec')
global _CITATION_REMINDER
if _CITATION_REMINDER is False: # remind the user on the first use
print('IsoSpecPy package was used, please cite https://dx.doi.org/10.1021/acs.analchem.6b01459')
......@@ -930,6 +927,7 @@ def isotope_pattern_isospec(
abund
)
spec.normalize() # normalize values to 100.
logger.info('isospec isotope pattern retrieval complete')
return spec
......@@ -1006,8 +1004,7 @@ class Molecule(object):
of a string formula
"""
if verbose is True:
sys.stdout.write(f'Generating molecule object from input {string}\n')
logger.debug(f'generating {self.__class__.__name__} object from input {string}')
# split charge into value and sign
self.charge, self.sign = interpret_charge(charge)
self.mass_key = mass_key # store mass dictionary that the script will use
......@@ -1019,9 +1016,6 @@ class Molecule(object):
else:
raise TypeError(f'The provided string type is not interpretable: {type(string)}')
if self.verbose is True:
self.print_details()
def __repr__(self):
return f'{self.__class__.__name__}({self.molecular_formula})'
......@@ -1415,19 +1409,18 @@ class Molecule(object):
def print_details(self):
"""prints the details of the generated molecule"""
sys.stdout.write(f'{self}\n')
sys.stdout.write(f'formula: {self.molecular_formula}\n')
sys.stdout.write(f'molecular weight: {round(self.molecular_weight, 6)}\n')
sys.stdout.write(f'monoisotopic mass: {round(self.monoisotopic_mass, 6)}\n')
sys.stdout.write('\n')
print(f'{self}')
print(f'formula: {self.molecular_formula}')
print(f'molecular weight: {round(self.molecular_weight, 6)}')
print(f'monoisotopic mass: {round(self.monoisotopic_mass, 6)}')
print('')
self.print_percent_composition()
def print_percent_composition(self):
"""prints the percent composition in a reader-friendly format"""
sys.stdout.write('elemental percent composition:\n')
pcomp = self.percent_composition
for element, percent in sorted(pcomp.items()):
sys.stdout.write(f'{element}: {percent * 100.:6.4}%\n')
print('elemental percent composition:')
for element, percent in sorted(self.percent_composition.items()):
print(f'{element}: {percent * 100.:6.4}%')
class IPMolecule(Molecule):
......@@ -1803,8 +1796,7 @@ class IPMolecule(Molecule):
'1116.14036964': {'bounds': (1115.9557393427547, 1116.3249999321531)}}
"""
if self.verbose is True:
sys.stdout.write('Calculating bounds from simulated gaussian isotope pattern')
logger.info('calculating bounds from simulated gaussian isotope pattern')
threshold = threshold * max(self.bar_isotope_pattern[1])
tempip = [[], []]
for ind, inten in enumerate(self.bar_isotope_pattern[1]): # checks for intensities above threshold
......@@ -1819,10 +1811,7 @@ class IPMolecule(Molecule):
else: # a general range that covers the entire isotope pattern
out = [stats.norm.interval(conf, tempip[0][0], scale=self.sigma)[0],
stats.norm.interval(conf, tempip[0][-1], scale=self.sigma)[1]]
if self.verbose is True:
if perpeak is False:
sys.stdout.write(': %.3f-%.3f' % (out[0], out[1]))
sys.stdout.write(' DONE\n')
logger.debug(f'caclulated bounds: {out[0]:.3f}-{out[1]:.3f}')
return out
def _calculate_ips(self):
......@@ -1860,6 +1849,8 @@ class IPMolecule(Molecule):
self.raw_isotope_pattern,
self.fwhm
)
if abs(self.error) > self.criticalerror:
logger.warning(f'the error of {self} is {self.error} (critical: {self.criticalerror})')
def compare(self, exp):
"""
......@@ -1975,15 +1966,15 @@ class IPMolecule(Molecule):
def print_details(self):
"""prints the details of the generated molecule"""
sys.stdout.write(f'{self}\n')
sys.stdout.write(f'formula: {self.molecular_formula}\n')
sys.stdout.write(f'molecular weight: {round(self.molecular_weight, self.decpl)}\n')
sys.stdout.write(f'monoisotopic mass: {round(self.monoisotopic_mass, self.decpl)}\n')
sys.stdout.write(f'estimated exact mass: {round(self.estimated_exact_mass, self.decpl)}\n')
sys.stdout.write(f'error: {self.error:.3}\n')
print(f'{self}')
print(f'formula: {self.molecular_formula}')
print(f'molecular weight: {round(self.molecular_weight, self.decpl)}')
print(f'monoisotopic mass: {round(self.monoisotopic_mass, self.decpl)}')
print(f'estimated exact mass: {round(self.estimated_exact_mass, self.decpl)}')
print(f'error: {self.error:.3}')
if abs(self.error) > self.criticalerror:
sys.stdout.write(f'WARNING: Error is greater than {self.criticalerror}!\n')
sys.stdout.write('\n')
print(f'WARNING: Error is greater than {self.criticalerror}!')
print('')
self.print_percent_composition()
def plot_bar_pattern(self):
......@@ -2037,9 +2028,7 @@ class IPMolecule(Molecule):
elif name.lower().endswith('.jdx') is False:
name += '.jdx'
if self.verbose is True:
sys.stdout.write(f'Saving {name} to {os.path.join(os.getcwd(), "molecules")}')
sys.stdout.flush()
logger.info(f'saving {name} to {os.path.join(os.getcwd(), "molecules")}')
header = [ # comment lines to put before data
# header items
......@@ -2087,9 +2076,7 @@ class IPMolecule(Molecule):
elif name.lower().endswith('.mol') is False:
name += '.mol'
if self.verbose is True:
sys.stdout.write(f'Saving {name} to {os.path.join(os.getcwd(), "molecules")}')
sys.stdout.flush()
logger.info(f'saving {name} to {os.path.join(os.getcwd(), "molecules")}')
with open(os.path.join(os.getcwd(), "molecules", name), 'wb') as outfile:
pickle.dump(
......@@ -2101,7 +2088,6 @@ class IPMolecule(Molecule):
if __name__ == '__main__': # for testing and troubleshooting
# st.printstart()
mol = IPMolecule(
'L2PdAr+I',
# charge= 2, # specify charge (if not specified in formula)
......@@ -2115,5 +2101,3 @@ if __name__ == '__main__': # for testing and troubleshooting
# keepall=True,
)
# mol.print_details()
# st.printelapsed()
# st.printprofiles()
......@@ -10,7 +10,7 @@ from xml.etree import ElementTree
from functools import wraps
from collections.abc import MutableSequence
import scipy as sci
import numpy as np
from tqdm import tqdm
from .logging import logger
......@@ -482,14 +482,14 @@ class mzML(object):
end = start + split
splity = []
for i in range(npeaks):
splity.append(sci.asarray(y[start:end]))
splity.append(np.asarray(y[start:end]))
start += split
end += split
out = []
for ind, section in enumerate(splity):
maxy = max(section)
if maxy == max(section[1:-1]): # if max is not at the edge of the spectrum
out.append(sci.where(section == maxy)[0][0] + split * ind)
out.append(np.where(section == maxy)[0][0] + split * ind)
return out
if function is None: # if no function is provided, use first
......
"""module for managing data and attributes required for RSIR analysis"""
import re
from typing import Union, List, Iterable
from typing import Union, List, Tuple, Iterable
import numpy as np
from openpyxl.cell import Cell
import matplotlib.pyplot as plt
from ..molecule import IPMolecule
from ..mzml.structure import mzML, Trace
from ..spectrum import Spectrum
from ..tome import slice_array, bindata
from ..tome import slice_array, normalize
# pattern kwarg map
......@@ -112,6 +113,75 @@ def get_kwargs_from_row(row: Iterable[Cell],
return out
def expand_bounds(left: float, right: float, value: float) -> List[float]:
"""
Expands the provided bounds by the provided value
:param left: left bound
:param right: right bound
:param value: expansion value
:return: new bounds
"""
return [
left - value,
right + value,
]
def visualize_target_bounds(spectrum: Union[Spectrum, List],
target: 'RSIRTarget',
ip_style='bar',
) -> Tuple[plt.Figure, plt.Axes]:
"""
Visualizes the target bounds overlaid on the spectrum.
:param spectrum: Spectrum instance or [x,y] list of the spectrum
:param target: RSIR target instance
:param ip_style: isotope pattern style to plot (bar or gaussian)
:return: figure, axes
"""
fig, ax = plt.subplots()
if isinstance(spectrum, Spectrum):
spectrum = spectrum.trim()
ax.plot(
*spectrum
)
for bound in target.bounds:
ax.axvline(bound, color='r', linestyle='-')
# todo include bound label
if target.molecule is not None:
if ip_style.lower() == 'gaussian':
gausip = target.molecule.gaussian_isotope_pattern
gausip[1] = normalize(
gausip[1],
max(spectrum[1])
)
ax.fill(
*gausip,
alpha=0.25
)
elif ip_style.lower() == 'bar':
barip = target.molecule.bar_isotope_pattern
barip[1] = normalize(
barip[1],
max(spectrum[1])
)
ax.bar(
*barip,
width=target.molecule.fwhm,
align='center',
alpha=0.25,
)
# expand bounds by 1
ax.set_xlim(*expand_bounds(*target.bounds, 1))
ax.set_ylabel('intensity')
ax.set_xlabel('m/z')
ax.set_title(target.name)
fig.tight_layout()
# todo set ylim
return fig, ax
class RSIRTarget:
# precision to use for sumspectrum
SPECTRUM_PRECISION = 3
......@@ -125,6 +195,8 @@ class RSIRTarget:
name: str = None,
level: int = 1,
conf_interval: float = 0.95,
resolution: float = 5000,
expand_bounds: float = 0.,
):
"""
Data class for RSIR data
......@@ -138,6 +210,9 @@ class RSIRTarget:
:param name: species name
:param level: MSn level (unless you're performing MS/MS experiments, this will be 1)
:param conf_interval: confidence interval for use in automatic bounds determination
:param resolution: resolution to use for bounds determination using IPMolecule instances
:param expand_bounds: parameter for automatically expanding bounds by some value. For example, if the bounds are
automatically determined from a molecule instance, the determined bounds will be expanded by 2* this value.
"""
# todo support manually specified resolution
self._molecule: IPMolecule = None
......@@ -150,8 +225,10 @@ class RSIRTarget:
self._spectrum: Spectrum = None
self._conf_interval = conf_interval
self.expand_bounds = expand_bounds
self.name = name
self.formula = formula
self.resolution = resolution
self.bounds = bounds
self.affinity = affinity
self.function = function
......@@ -185,7 +262,8 @@ class RSIRTarget:
@name.setter
def name(self, value: str):
self._name = str(value)
if value is not None:
self._name = str(value)
@name.deleter
def name(self):
......@@ -212,14 +290,18 @@ class RSIRTarget:
"""m/z bounds to use for intensity tracking"""
# return manually set bounds if specified
if self._bounds is not None:
return self._bounds
out = self._bounds
# or determined bounds from molecule class
elif self._molecule is not None:
return self._molecule.calculate_bounds(
out = self._molecule.calculate_bounds(
conf=self.conf_interval,
)
else:
raise AttributeError(f'Bounds have not been set for this instance')
return expand_bounds(
*out,
self.expand_bounds,
)
@bounds.setter
def bounds(self, value: List[float]):
......@@ -285,6 +367,17 @@ class RSIRTarget:
"""molecule instance associated with the target (only applicable if a formula is being used)"""
return self._molecule
@property
def resolution(self):
"""resolution associated with the instance"""
if self.molecule is not None:
return self.molecule.resolution
@resolution.setter
def resolution(self, value):
if self.molecule is not None:
self.molecule.resolution = value
@property
def confidence_interval(self) -> float:
"""confidence interval to use when determining bounds from an IPMolecule instance"""
......@@ -338,3 +431,17 @@ class RSIRTarget:
:return: list of binned intensities
"""
return self.raw_data.get_binned_data(bin_num)
def visualize_bounds(self, spectrum: Union[Spectrum, List], ip_style: str = 'bar') -> Tuple[plt.Figure, plt.Axes]:
"""
Visualizes the bounds of the species overlaid on the provided spectrum
:param spectrum: Spectrum instance or [x,y] list of the spectrum
:param ip_style: isotope pattern style to plot (bar or gaussian)
:return: figure, axes
"""
return visualize_target_bounds(
spectrum,
target=self,
ip_style=ip_style,
)
......@@ -181,6 +181,7 @@ class RSIR:
self.full_spectra: Mapping[int, Spectrum] = {}
self.logger = logger.getChild(self.__class__.__name__) # todo logger child from mzml filename
self.mzml = mzml_file
self.mzml.extract_function_time_tic()
if 'n' in kwargs:
warnings.warn( # v2.1.0
'the "n" kwarg will be deprecated, please use "bin_numbers" instead',
......@@ -241,6 +242,7 @@ class RSIR:
@store_full_spectra.setter
def store_full_spectra(self, value: bool):
# todo consider creating a designated class to associate a spectrum to a mzML function
# todo accept kwargs to use for instantiating Spectrum instances (e.g. narrow bounds, specify precision, etc.)
if value is True and len(self.full_spectra) == 0:
for func in self.mzml.functions:
self.full_spectra[func] = Spectrum(
......@@ -371,6 +373,8 @@ class RSIR:
n_plots,
sharex=True,
)
if n_plots == 1:
ax = [ax]
# plot raw data
for target in self.targets:
ax[0].plot(
......@@ -418,7 +422,7 @@ class RSIR:
:param save: whether to save after writing
"""
if isinstance(xlfile, XLSX) is False:
xlfile = XLSX(xlfile)
xlfile = XLSX(xlfile, create=True)
self.logger.info(f'writing data to {xlfile}')
# for each function defined in the mzML
for func_num, details in self.mzml.functions.items():
......@@ -475,7 +479,7 @@ class RSIR:
:param save: whether to save after writing
"""
if isinstance(xlfile, XLSX) is False:
xlfile = XLSX(xlfile)
xlfile = XLSX(xlfile, create=True)
# identify any targets with retrieved isotope patterns and write those to file
targets_with_spectra = sorted(
[
......@@ -512,12 +516,12 @@ class RSIR:
:param save: whether to save after writing
"""
if isinstance(xlfile, XLSX) is False:
xlfile = XLSX(xlfile)
xlfile = XLSX(xlfile, create=True)
if self.store_full_spectra is True:
self.logger.info(f'writing full spectra to file')
for func, spectrum in self.full_spectra.items():
details = self.mzml.functions[func]
sheet_name = f'{details["type"]}{details["mode"]} m/z {details["window"][0]}-{details["window"][1]}'
sheet_name = f'{details["type"]}{details["mode"]} {details["window"][0]}-{details["window"][1]}'
xlfile.writespectrum(
*spectrum.trim(),
sheet=sheet_name
......@@ -534,7 +538,7 @@ class RSIR:
:param xlfile: XLSX instance or path to excel file
"""
if isinstance(xlfile, XLSX) is False:
xlfile = XLSX(xlfile)
xlfile = XLSX(xlfile, create=True)
self.write_rsir_to_excel(xlfile, save=False)
self.write_isotope_patterns_to_excel(xlfile, save=False)
self.write_full_spectra_to_excel(xlfile, save=False)
......
......@@ -42,7 +42,6 @@ IGNORE
import os
import sys
import warnings
import scipy as sci
import numpy as np
from typing import Iterable
from .spectrum import Spectrum
......@@ -66,10 +65,10 @@ def resolution(x, y, index=None, threshold=5):
:param threshold: signal to noise threshold required to output a resolution
:return: resolution
"""
y = sci.asarray(y) # convert to array for efficiency
y = np.asarray(y) # convert to array for efficiency
if index is None: # find index and value of maximum
maxy = max(y)
index = sci.where(y == maxy)[0][0]
index = np.where(y == maxy)[0][0]
else:
maxy = y[index]
# if intensity to average is below this threshold (rough estimate of signal/noise)
......@@ -124,14 +123,14 @@ def autoresolution(x, y, n=10, v=True):
end = start + split
splity = []
for i in range(n):
splity.append(sci.asarray(y[start:end]))
splity.append(np.asarray(y[start:end]))
start += split
end += split
inds = []
for ind, section in enumerate(splity):
maxy = max(section)
if maxy == max(section[1:-1]): # if max is not at the edge of the spectrum
inds.append(sci.where(section == maxy)[0][0] + split * ind)
inds.append(np.where(section == maxy)[0][0] + split * ind)
res = []
for ind in inds: # for each of those peaks
......