Commits (27)
This diff is collapsed.
__version__ = '2.0.0'
__version__ = '2.1.0'
"""
Module for interacting with mzML files
"""
from .structure import mzML
from .structure import mzML, Trace
__all__ = [
'mzML',
'Trace',
]
......
......@@ -5,8 +5,10 @@ import gzip
import sys
import warnings
from random import random
from typing import Generator
from typing import Generator, List, MutableMapping
from xml.etree import ElementTree
from functools import wraps
from collections.abc import MutableSequence
import scipy as sci
from tqdm import tqdm
......@@ -45,6 +47,110 @@ class BoundsError(Warning):
self.warned[name] += 1
class Trace(MutableSequence):
def __init__(self,
*incoming_values: float,
center_bin: bool = False,
):
"""
A data class for managing binned values of a trace. The class stores raw data and retains binned traces
after they are generated (prevents re-binning data when the data is used in multiple locations).
:param incoming_values: values to add on instantiation
:param center_bin: whether to center the bin value
"""
self._raw_data: List[float] = []
self._binned_data: MutableMapping[int, List[float]] = {}
self.center_bin: bool = center_bin
# self._bin_valid: bool = False # todo enable tracking of bin validity (once new value added, prev bins invalid)
for value in incoming_values:
self.append(value)
def __getitem__(self, item) -> float:
return self._raw_data[item]
def __setitem__(self, key, value):
self._raw_data[key] = value
def __delitem__(self, key):
del self._raw_data[key]
def __len__(self):
return len(self._raw_data)
def __copy__(self):
return self.__class__(
*self._raw_data,
center_bin=self.center_bin,
)
@property
def raw_data(self) -> List[float]:
"""raw data list (unbinned)"""
return self._raw_data
def insert(self, index: int, val: float) -> None:
"""
Inserts a value at the position specified
:param index: index
:param val: value
"""
self._raw_data.append(val)
def get_binned_data(self, bin_num: int) -> List[float]:
"""
Bins the data into bins of the size specified. If the data has already been binned to that level, returns the
previously binned data.
:param bin_num: bin number to use
:return: list of binned values
"""
# if previously binned, return
if bin_num in self._binned_data:
return self._binned_data[bin_num]
# bin data
out = self.bin_trace(
bin_num,
self.raw_data,
bin_denom=bin_num if self.center_bin else 1,
)
# store data for later retrieval
self._binned_data[bin_num] = out
return out
@staticmethod
def bin_trace(bin_num: int, target_list: List[float], bin_denom=1):
"""
Bins a list of values into bins of size *bin_num*.
:param bin_num: Number of values to bin together. e.g. ``n = 4`` would bin the first four values into a single
value, then the next 4, etc.
:param target_list: List of values to bin.
:param bin_denom: Bin denominator. The calculated bin values will be divided by this value. e.g. if ``n = v`` the
output values will be an average of each bin.
:return: binned list
**Notes**
- If the size of the list is not divisible by `bin_num`, the final bin will not be included in the output list.
(The last values will be discarded.)
"""
out = []
delta = 0
ttemp = 0
for ind, val in enumerate(target_list):
delta += 1
ttemp += val # add current value to growing sum
if delta == bin_num: # critical number is reached
out.append(ttemp / float(bin_denom)) # append sum to list
delta = 0 # reset critical count and sum
ttemp = 0
return out
class mzML(object):
def __init__(self,
filename: str,
......@@ -124,7 +230,7 @@ class mzML(object):
self.nchroms = 0
self.functions = {}
for spectrum in self._spectra_elements:
for spectrum in self.spectra_elements:
try:
# try to retrieve function, process, and scan from attributes
func, proc, scan = fps(spectrum) # extract each value and convert to integer
......@@ -150,9 +256,9 @@ class mzML(object):
self.duration = None
self._BE = BoundsError() # load warning instance for integration
self.ftt = False
self._ftt_extracted = False
if ftt is True:
self.function_timetic()
self.extract_function_time_tic()
def __str__(self):
"""The string that is returned when printed"""
......@@ -187,7 +293,7 @@ class mzML(object):
if ind < 0 or ind > self.nscans:
raise IndexError("The scan index number #%d is outside of the mzML's scan index range (0-%d)" % (
ind, self.nscans - 1))
for spectrum in self._spectra_elements:
for spectrum in self.spectra_elements:
attr = branch_attributes(spectrum)
if attr['index'] == ind:
return spectrum_array(spectrum)
......@@ -198,19 +304,19 @@ class mzML(object):
raise ValueError(
"The supplied time %.3f is outside of this file's time range (0 - %.3f)" % (ind, self.duration))
ind = self.scan_index(ind)
for spectrum in self._spectra_elements:
for spectrum in self.spectra_elements:
attr = branch_attributes(spectrum)
if attr['index'] == ind:
return spectrum_array(spectrum)
@property
def _spectra_elements(self) -> Generator[ElementTree.Element, None, None]:
def spectra_elements(self) -> Generator[ElementTree.Element, None, None]:
"""generator of spectra elements"""
for element in _findall_ele('mzML', 'run', 'spectrumList', 'spectrum', parent=self.root):
yield element
@property
def _chromatogram_elements(self) -> Generator[ElementTree.Element, None, None]:
def chromatogram_elements(self) -> Generator[ElementTree.Element, None, None]:
"""generator of chromatogram elements"""
for element in _findall_ele('mzML', 'run', 'chromatogramList', 'chromatogram', parent=self.root):
yield element
......@@ -243,7 +349,7 @@ class mzML(object):
"""decorates the supplied function to run for every scan"""
out = []
msg = f'applying function to chromatogram'
for chromatogram in tqdm(self._chromatogram_elements, desc=msg, disable=self._mute_tqdm):
for chromatogram in tqdm(self.chromatogram_elements, desc=msg, disable=self._mute_tqdm):
out.append(fn(chromatogram, *args, **kwargs))
return out
......@@ -274,7 +380,7 @@ class mzML(object):
"""decorates the supplied function to run for every scan"""
out = []
msg = 'applying function to spectrum'
for spectrum in tqdm(self._spectra_elements, desc=msg, disable=self._mute_tqdm):
for spectrum in tqdm(self.spectra_elements, desc=msg, disable=self._mute_tqdm):
out.append(fn(spectrum, *args, **kwargs))
return out
......@@ -324,7 +430,7 @@ class mzML(object):
if 'level' in dct:
level = dct['level']
if affin is None and level is None:
if affin is None and level in [None, 1]:
return min(self.functions.keys()) # assume first function
elif affin == 'UV': # if UV-Vis affinity
......@@ -455,26 +561,60 @@ class mzML(object):
return pw_convert(fn, self.precision, self.compression, self.gzip_file, verbose=self.verbose)
return fn
def function_timetic(self):
def _ensure_ftt_extracted(fn):
"""
checks whether function, time, and TIC lists have been extracted and extracts them if not
(runtime method of ensuring the data is there)
"""
@wraps(fn)
def decorated(self: 'mzML', *args, **kwargs):
if self._ftt_extracted is False:
self.extract_function_time_tic()
return fn(self, *args, **kwargs)
return decorated
def extract_function_time_tic(self):
"""
extracts timepoints and tic lists for each function
this function is separate from mzml contents because it would increase load times significantly (~6x)
"""
# todo see if there's a way to sneakily do this the first time the user iterates through spectra
# todo come up with a data class to manage function metadata
msg = 'extracting timepoints and total ion current values'
for function in self.functions: # add timepoint and tic lists
self.functions[function]['timepoints'] = [] # list for timepoints
self.functions[function]['tic'] = [] # list for total ion current values
self.functions[function]['timepoints'] = Trace(center_bin=True) # list for timepoints
self.functions[function]['tic'] = Trace() # list for total ion current values
if 'level' in self.functions[function] and self.functions[function]['level'] > 1:
self.functions[function]['ce'] = [] # list for collision energies
for spectrum in tqdm(self._spectra_elements, desc=msg, disable=self._mute_tqdm):
attr = branch_attributes(spectrum)
self.functions[function]['ce'] = Trace() # list for collision energies
for spectrum in tqdm(self.spectra_elements, desc=msg, disable=self._mute_tqdm, total=self.nscans):
# attr = branch_attributes(spectrum)
function, proc, scan = fps(spectrum) # determine function, process, and scan numbers
p = CVParameterSet.create_from_branch(spectrum) # pull spectrum's cvparameters
self.functions[function]['timepoints'].append(p['MS:1000016'].value) # start scan time
self.functions[function]['tic'].append(p['MS:1000285'].value) # total ion current
if 'MS:1000045' in p:
self.functions[function]['ce'].append(p['MS:1000045'].value) # collision energy
self.ftt = True
self._ftt_extracted = True
@_ensure_ftt_extracted
def get_timepoints_of_function(self, function: int) -> Trace:
"""
Retrieves the timepoints of the specified function
:param function: function number
:return: Trace of data
"""
return self.functions[function]['timepoints']
@_ensure_ftt_extracted
def get_tic_of_function(self, function: int) -> Trace:
"""
Retrieves the total ion current (TIC) of the specified function
:param function: function number
:return: Trace of data
"""
return self.functions[function]['tic']
def integrate(self, name, start, end, x, y):
"""
......@@ -523,7 +663,7 @@ class mzML(object):
"""
msg = 'extracting chromatogram'
chroms = {} # dictionary of chromatograms
for chromatogram in tqdm(self._chromatogram_elements, desc=msg, disable=self._mute_tqdm):
for chromatogram in tqdm(self.chromatogram_elements, desc=msg, disable=self._mute_tqdm):
attr = branch_attributes(chromatogram) # pull attributes
chrom_array = spectrum_array(chromatogram)
xunit, yunit = get_element_units(chromatogram)
......@@ -535,6 +675,7 @@ class mzML(object):
}
return chroms
@_ensure_ftt_extracted
def pull_species_data(self, sp, sumspec=False):
"""
Extracts integrated data at every timepoint for all species specified in the sp dictionary
......@@ -578,11 +719,9 @@ class mzML(object):
dct=sp[species]) # associate each species in the spectrum with a function
if 'raw' not in sp[species]: # look for empty raw list
sp[species]['raw'] = []
if self.ftt is False: # if timepoints and tic values have not been extracted yet, extract those
self.function_timetic()
msg = 'extracting species data from spectrum'
for spectrum in tqdm(self._spectra_elements, desc=msg, disable=self._mute_tqdm):
for spectrum in tqdm(self.spectra_elements, desc=msg, disable=self._mute_tqdm, total=self.nscans):
function, proc, scan = fps(spectrum) # pull function, process, and scan numbers
# attr = branch_attributes(spectrum) # get attributes
spec_array = spectrum_array(spectrum) # generate spectrum
......@@ -610,6 +749,7 @@ class mzML(object):
"""a generator for scans in """
return
@_ensure_ftt_extracted
def retrieve_scans(self, start=None, end=None, mzstart=None, mzend=None, function=None, mute=False, outside=False):
"""
Retrieves the specified scans or time range from the specified function
......@@ -643,11 +783,9 @@ class mzML(object):
raise ValueError('The function "%d" is not in this mzml file.' % function)
start = self.scan_index(start, function, bias='greater')
end = self.scan_index(end, function, bias='lesser')
if self.ftt is False: # extract the timepoints and etc from the mzml
self.function_timetic()
msg = 'retrieving scans'
out = []
for spectrum in tqdm(self._spectra_elements, desc=msg, disable=self._mute_tqdm, total=end-start):
for spectrum in tqdm(self.spectra_elements, desc=msg, disable=self._mute_tqdm, total=end - start):
attr = branch_attributes(spectrum)
# func,proc,scan = self.fps(spectrum) # determine function, process, and scan numbers
# p = CVParameterSet.create_from_branch(spectrum)
......@@ -668,6 +806,7 @@ class mzML(object):
return out[0]
return out
@_ensure_ftt_extracted
def scan_index(self, scan=None, function=1, bias='lesser'):
"""
Determines the index for a scan or timepoint in a given function
......@@ -686,8 +825,6 @@ class mzML(object):
if bias == 'lesser': # used for end point
return self.functions[function]['sr'][1]
if type(scan) is float: # timepoint
if self.ftt is False:
self.function_timetic()
# return located index plus start of the scan range
return locate_in_list(self.functions[function]['timepoints'], scan, bias=bias) + self.functions[function]['sr'][0]
elif type(scan) is int: # scan number
......@@ -745,7 +882,7 @@ class mzML(object):
)
msg = 'combining spectra'
for spectrum in tqdm(self._spectra_elements, desc=msg, disable=self._mute_tqdm or mute, total=end-start):
for spectrum in tqdm(self.spectra_elements, desc=msg, disable=self._mute_tqdm or mute, total=end - start):
attr = branch_attributes(spectrum) # get attributes
if attr['index'] > end:
break
......@@ -754,3 +891,6 @@ class mzML(object):
spec.add_spectrum(x, y) # add spectrum to Spectrum object
out = spec.trim()
return out
# set as static method for decorator to work
_ensure_ftt_extracted = staticmethod(_ensure_ftt_extracted)
"""
Classes and methods for reconstructed single ion recording extraction from mzML files
"""
from .data import RSIRTarget
from .processing import RSIR
__all__ = [
'RSIRTarget',
'RSIR'
]
"""module for managing data and attributes required for RSIR analysis"""
import re
from typing import Union, List, Iterable
import numpy as np
from openpyxl.cell import Cell
from ..molecule import IPMolecule
from ..mzml.structure import mzML, Trace
from ..spectrum import Spectrum
from ..tome import slice_array, bindata
# pattern kwarg map
_kwarg_map = {
'name': 'name',
'function': 'func(tion)?',
'affinity': 'affin(ity)?',
'formula': '(molecular([\s_])?)?formula',
}
_kwarg_map = {
key: re.compile(pattern, re.IGNORECASE)
for key, pattern in _kwarg_map.items()
}
# regex for start and end mz
_start_mz_re = re.compile(
'(mz)?start([-_])?(mz)?',
re.IGNORECASE
)
_end_mz_re = re.compile(
'(mz)?end([-_])?(mz)?',
re.IGNORECASE
)
def _find_column(header_row: Iterable[Cell], pattern: re.Pattern):
"""
Finds the column index which matches the pattern
:param header_row: row of cells in the header of an Excel file
:param pattern: regex pattern
:return: column index for value
"""
for ind, cell in enumerate(header_row):
val = cell.value
if val is None:
continue
if pattern.search(val):
return ind
return None
def _find_columns(header_row: Iterable[Cell]) -> dict:
"""
Finds the column indicies for RSIRTarget kwargs
:param header_row: row of cells in the header of an Excel file
:return: kwarg/column index map
"""
out = {}
# find standard mappings
for key, pattern in _kwarg_map.items():
ind = _find_column(header_row, pattern)
if ind is not None:
out[key] = ind
return out
def get_bounds(row: Iterable[Cell], header_row: Iterable[Cell]):
"""
Gets the bounds from a species row based on the header row
:param header_row: row of cells in the header of an Excel file
:param row: row to search
:return: bounds
"""
out = [None, None]
for ind, header_cell in enumerate(header_row):
value = header_cell.value
if value is None:
continue
if _start_mz_re.search(header_cell.value) is not None:
out[0] = row[ind].value
if _end_mz_re.search(header_cell.value) is not None:
out[1] = row[ind].value
# if values were found, return out, otherwise return None
if any(val is not None for val in out):
return out
return None
def get_kwargs_from_row(row: Iterable[Cell],
column_map: dict,
header_row: Iterable[Cell],
) -> dict:
"""
Retrieves RSIRTarget kwargs from a row
:param row: excel row
:param column_map: column map for the sheet
:param header_row: row of cells in the header of an Excel file
:return: instantiation kwargs for RSIRTarget
"""
out = {}
for key, ind in column_map.items():
val = row[ind].value
if val is not None:
out[key] = val
out['bounds'] = get_bounds(row, header_row)
return out
class RSIRTarget:
# precision to use for sumspectrum
SPECTRUM_PRECISION = 3
def __init__(self,
formula: Union[IPMolecule, str] = None,
bounds: List[float] = None,
affinity: str = None,
function: int = None,
store_spectra: bool = False,
name: str = None,
level: int = 1,
conf_interval: float = 0.95,
):
"""
Data class for RSIR data
:param formula: molecular formula (or IPMolecule instance)
:param bounds: manually specified bounds
:param affinity: MS mode affinity (+ or -)
:param function: MS function to find the data in
:param store_spectra: flag to enable spectral storage if desired. If enabled, creates a Spectrum object
associated with the instance.
:param name: species name
:param level: MSn level (unless you're performing MS/MS experiments, this will be 1)
:param conf_interval: confidence interval for use in automatic bounds determination
"""
# todo support manually specified resolution
self._molecule: IPMolecule = None
self._bounds = None
self._affinity = None
self._name = None
# bounds error counter
self._bounds_warnings = 0
self._store_spectra = False
self._spectrum: Spectrum = None
self._conf_interval = conf_interval
self.name = name
self.formula = formula
self.bounds = bounds
self.affinity = affinity
self.function = function
self.level = level
self.conf_interval = conf_interval
# storage container for raw data
self.raw_data = Trace()
self.store_spectra = store_spectra
def __str__(self):
return f'{self.__class__.__name__} {self.name}'
def __repr__(self):
return f'{self.__class__.__name__}({self.name})'
@property
def name(self) -> str:
"""species name"""
if self._name is not None:
return self._name
elif self._molecule is not None:
return self._molecule.molecular_formula
elif self._bounds is not None:
out = f'm/z {self._bounds[0]:.1f}'
if self._bounds[1] is not None:
out += f' - {self._bounds[1]:.1f}'
return out
return 'unnamed'
@name.setter
def name(self, value: str):
self._name = str(value)
@name.deleter
def name(self):
self.name = None
@property
def formula(self) -> IPMolecule:
"""molecule object to reference"""
return self._molecule
@formula.setter
def formula(self, value: Union[IPMolecule, str]):
if value is None:
self._molecule = None
else:
self._molecule = IPMolecule(value)
@formula.deleter
def formula(self):
self._molecule = None
@property
def bounds(self) -> List[float]:
"""m/z bounds to use for intensity tracking"""
# return manually set bounds if specified
if self._bounds is not None:
return self._bounds
# or determined bounds from molecule class
elif self._molecule is not None:
return self._molecule.calculate_bounds(
conf=self.conf_interval,
)
else:
raise AttributeError(f'Bounds have not been set for this instance')
@bounds.setter
def bounds(self, value: List[float]):
if value is None:
self._bounds = None
return
# if a single value was specified
if len(value) != 2:
raise ValueError('current functionality requires bounds to be fully specified')
# todo enable ability to just specify single m/z
value = [value, None]
# if two values were specified, check range
elif value[0] > value[1]:
raise ValueError('the start m/z must be less than the end m/z')
self._bounds = value
@bounds.deleter
def bounds(self):
self.bounds = None
@property
def bounds_warnings(self) -> int:
"""count of the number of bounds warnings during processing"""
return self._bounds_warnings
@property
def affinity(self) -> str:
"""MS type affinity (+/-/UV)"""
return self._affinity
@affinity.setter
def affinity(self, value: str):
if value is None:
self._affinity = None
else:
if value not in ['+', '-', 'UV']:
raise ValueError(f'the provided value "{value}" is not a valid affinity (+, -, or UV)')
self._affinity = value
@property
def store_spectra(self) -> bool:
"""whether to store spectra when processing"""
return self._store_spectra
@store_spectra.setter
def store_spectra(self, value: bool):
if value is True and self._spectrum is None:
bounds = self.bounds
self._spectrum = Spectrum(
self.SPECTRUM_PRECISION,
start=bounds[0],
end=bounds[1],
)
self._store_spectra = value
@property
def spectrum(self) -> Spectrum:
"""spectrum object associated with the instance"""
return self._spectrum
@property
def molecule(self) -> IPMolecule:
"""molecule instance associated with the target (only applicable if a formula is being used)"""
return self._molecule
@property
def confidence_interval(self) -> float:
"""confidence interval to use when determining bounds from an IPMolecule instance"""
return self._conf_interval
@confidence_interval.setter
def confidence_interval(self, value: float):
self._conf_interval = value
def pull_from_mzml(self, mzml: mzML):
"""
Retrieves corresponding data from an mzML file. The instance will automatically associate itself with the
appropriate mzML function.
:param mzml: mzML instance
"""
pass
def add_from_spectrum(self, x: Union[np.ndarray, List[float]], y: Union[np.ndarray, List[float]]):
"""
Finds location within the the array and integrates, appending that value to the list of raw data.
:param x: x array
:param y: y array
:return:
"""
# casting to array is highly inefficient with the current structure but it works
if isinstance(x, np.ndarray) is False:
x = np.asarray(x)
if isinstance(y, np.ndarray) is False:
y = np.asarray(y)
# slice array
self_slice = slice_array(
y,
x,
*self.bounds
)
if self.spectrum is not None:
self.spectrum.add_spectrum(*self_slice)
intensity = self_slice[1].sum()
self.raw_data.append(intensity)
return intensity
def get_binned_data(self, bin_num: int) -> List[float]:
"""
Bins the raw data into the averaged bins of the provided number (smoothing by average). Note that if the last
bin is not of the size prescribed by bin_num, the last bin will be excluded from the result.
:param bin_num: number of consecutive scans to bin.
:return: list of binned intensities
"""
return self.raw_data.get_binned_data(bin_num)
"""
Legacy processing method used by pythoms < 2.0
"""
import sys
import warnings
import pylab as pl
from ..molecule import IPMolecule
from ..mzml import mzML
from ..scripttime import ScriptTime
from ..spectrum import Spectrum
from ..tome import check_integer, bindata, normalize_lists
from ..xlsx import XLSX
from .processing import RSIR
def get_pysir_output(rsir_instance: RSIR):
"""
Legacy method for retrieving data from an RSIR instance in the format expected by legacy usages of the pyrsir
function.
:return: expected return format from the pyrsir function
"""
warnings.warn(
'get_pyrsir_output uses the outdated method of retrieving data from a PyRSIR analysis, please access the '
'data directly from the instance as needed. Data is reprocessed to generate the output of functions. ',
DeprecationWarning,
stacklevel=2,
)
# total ion current dictionary mimic
tic_mimic = {
f'raw{rsir_instance.mzml.functions[func]["mode"]}': rsir_instance.mzml.get_tic_of_function(func)
for func in rsir_instance.mzml.functions
}
# time dictionary mimic
time_mimic = {
f'raw{rsir_instance.mzml.functions[func]["mode"]}': rsir_instance.mzml.get_timepoints_of_function(func)
for func in rsir_instance.mzml.functions
}
# bin the data
for bin_num in rsir_instance.bin_numbers:
for mode in ['+', '-']: # note that this won't work for all cases
for func in rsir_instance.mzml.functions:
if mode == rsir_instance.mzml.functions[func]['mode']:
key = f'{bin_num}sum{mode}'
tic_mimic[key] = rsir_instance.bin_tic_of_function(func, bin_num)
time_mimic[key] = rsir_instance.bin_time_of_function(func, bin_num)
# species dictionary mimic
sp_mimic = {
target.name: {
'bounds': target.bounds,
'function': target.function,
'formula': target.formula,
'affin': target.affinity,
'raw': target.raw_data,
'spectrum': target.spectrum.trim(),
}
for target in rsir_instance.targets
}
for target in sp_mimic:
affin = sp_mimic[target]["affin"]
# sp_mimic[target][f'norm{affin}'] = normalize_lists(
# sp_mimic[target]['raw'],
# tic_mimic[f'raw{affin}']
# )
for bin_num in rsir_instance.bin_numbers:
sp_mimic[target][f'{bin_num}sum'] = bindata(bin_num, sp_mimic[target]['raw'])
sp_mimic[target][f'{bin_num}norm'] = normalize_lists(
sp_mimic[target][f'{bin_num}sum'],
tic_mimic[f'{bin_num}sum{affin}']
)
return (
rsir_instance.mzml, # mzml instance
sp_mimic, # species dictionary
time_mimic, # time dictionary
tic_mimic, # TIC dictionary
rsir_instance.mzml.pull_chromatograms(), # chromatograms
)
def pyrsir(
filename,
xlsx,
n,
plot=True, # plot the data for a quick look
verbose=True, # chatty
bounds_confidence=0.99, #
combine_spectra=True, # whether or not to output a summed spectrum
return_data=False, #
):
"""
A method for generating reconstructed single ion monitoring traces.
:param filename: path to mzML or raw file to process
:param xlsx: path to excel file with correctly formatted columns
:param n: number of scans to sum together (for binning algorithm)
:param plot: whether to plot and show the data for a quick look
:param verbose: chatty mode
:param bounds_confidence: confidence interval for automatically generated bounds (only applicable if molecular
formulas are provided).
:param combine_spectra: whether to output a summed spectrum
:param return_data: whether to return data (if the data from the function is required by another function)
:return:
"""
warnings.warn(
'the pyrsir function has been replaced with functionality in the RSIR class. Please update your code to leverage '
'that class and its functionality. ',
DeprecationWarning,
stacklevel=2,
)
def plots():
"""
Function for generating a set of plots for rapid visual assessment of the supplied n-level
Outputs all MS species with the same sum level onto the same plot
requirements: pylab as pl
"""
pl.clf() # clears and closes old figure (if still open)
pl.close()
nplots = len(n) + 1
# raw data
pl.subplot(nplots, 1, 1) # top plot
for mode in mskeys:
modekey = 'raw' + mode
if modekey in rtime.keys():
pl.plot(rtime[modekey], tic[modekey], linewidth=0.75, label='TIC') # plot tic
for key in sp: # plot each species
if sp[key]['affin'] is mode:
pl.plot(rtime[modekey], sp[key]['raw'], linewidth=0.75, label=key)
pl.title('Raw Data')
pl.ylabel('Intensity')
pl.tick_params(axis='x', labelbottom='off')
# summed data
loc = 2
for num in n:
pl.subplot(nplots, 1, loc)
sumkey = str(num) + 'sum'
for mode in mskeys:
modekey = str(num) + 'sum' + mode
if modekey in rtime.keys():
pl.plot(rtime[modekey], tic[modekey], linewidth=0.75, label='TIC') # plot tic
for key in sp:
if sp[key]['affin'] is mode: # if a MS species
pl.plot(rtime[modekey], sp[key][sumkey], linewidth=0.75, label=key)
pl.title('Summed Data (n=%i)' % (num))
pl.ylabel('Intensity')
pl.tick_params(axis='x', labelbottom='off')
loc += 1
pl.tick_params(axis='x', labelbottom='on')
pl.show()
def output():
"""
Writes the retrieved and calculated values to the excel workbook using the XLSX object
"""
if newpeaks is True: # looks for and deletes any sheets where the data will be changed
if verbose is True:
sys.stdout.write('Clearing duplicate XLSX sheets.')
delete = []
for key in newsp: # generate strings to look for in excel file
delete.append('Raw Data (' + sp[key]['affin'] + ')')
for num in n:
delete.append(str(num) + ' Sum (' + sp[key]['affin'] + ')')
delete.append(str(num) + ' Normalized (' + sp[key]['affin'] + ')')
delete.append('Isotope Patterns')
xlfile.removesheets(delete) # remove those sheets
if verbose is True:
sys.stdout.write(' DONE.\n')
if verbose is True:
sys.stdout.write('Writing to "%s"' % xlfile.bookname)
sys.stdout.flush()
for mode in mskeys: # write raw data to sheets
modekey = 'raw' + mode
if modekey in rtime.keys():
sheetname = 'Raw Data (' + mode + ')'
xlfile.writersim(sp, rtime[modekey], 'raw', sheetname, mode, tic[modekey])
for num in n: # write summed and normalized data to sheets
sumkey = str(num) + 'sum'
normkey = str(num) + 'norm'
for mode in mskeys:
modekey = 'raw' + mode
if modekey in rtime.keys():
if max(n) > 1: # if data were summed
sheetname = str(num) + ' Sum (' + mode + ')'
xlfile.writersim(sp, rtime[sumkey + mode], sumkey, sheetname, mode,
tic[sumkey + mode]) # write summed data
sheetname = str(num) + ' Normalized (' + mode + ')'
xlfile.writersim(sp, rtime[sumkey + mode], normkey, sheetname, mode) # write normalized data
for key in sorted(sp.keys(), key=lambda x: str(x)): # write isotope patterns
if sp[key]['affin'] in mskeys:
xlfile.writemultispectrum(
sp[key]['spectrum'][0], # x values
sp[key]['spectrum'][1], # y values
key, # name of the spectrum
xunit='m/z', # x unit
yunit='Intensity (counts)', # y unit
sheetname='Isotope Patterns', # sheet name
chart=True, # output excel chart
)
if rd is None:
for key, val in sorted(chroms.items()): # write chromatograms
xlfile.writemultispectrum(chroms[key]['x'], chroms[key]['y'], chroms[key]['xunit'],
chroms[key]['yunit'], 'Function Chromatograms', key)
uvstuff = False
for key in sp: # check for UV-Vis spectra
if sp[key]['affin'] is 'UV':
uvstuff = True
break
if uvstuff is True:
for ind, val in enumerate(tic['rawUV']): # normalize the UV intensities
tic['rawUV'][ind] = val / 1000000.
xlfile.writersim(sp, rtime['rawUV'], 'raw', 'UV-Vis', 'UV', tic['rawUV']) # write UV-Vis data to sheet
if sum_spectra is not None: # write all summed spectra
for fn in sum_spectra:
specname = '%s %s' % (mzml.functions[fn]['mode'], mzml.functions[fn]['level'])
if 'target' in mzml.functions[fn]:
specname += ' %.3f' % mzml.functions[fn]['target']
specname += ' (%.3f-%.3f)' % (mzml.functions[fn]['window'][0], mzml.functions[fn]['window'][1])
xlfile.writemultispectrum(
sum_spectra[fn][0], # x values
sum_spectra[fn][1], # y values
specname, # name of the spectrum
xunit='m/z', # x unit
yunit='Intensity (counts)', # y unit
sheetname='Summed Spectra', # sheet name
chart=True, # output excel chart
)
if verbose is True:
sys.stdout.write(' DONE\n')
def prepformula(dct):
"""looks for formulas in a dictionary and prepares them for pullspeciesdata"""
for species in dct:
if 'affin' not in dct[species]: # set affinity if not specified
fn = dct[species]['function']
if mzml.functions[fn]['type'] == 'MS':
dct[species]['affin'] = mzml.functions[fn]['mode']
if mzml.functions[fn]['type'] == 'UV':
dct[species]['affin'] = 'UV'
if 'formula' in dct[species] and dct[species]['formula'] is not None:
try:
dct[species]['mol'].res = res # sets resolution in Molecule object
except NameError:
res = int(mzml.auto_resolution())
dct[species]['mol'].res = res
# dct[species]['mol'].sigma = dct[species]['mol'].sigmafwhm()[1] # recalculates sigma with new resolution
dct[species]['bounds'] = dct[species]['mol'].bounds # caclulates bounds
return dct
# ----------------------------------------------------------
# -------------------PROGRAM BEGINS-------------------------
# ----------------------------------------------------------
if verbose is True:
stime = ScriptTime()
stime.printstart()
n = check_integer(n, 'number of scans to sum') # checks integer input and converts to list
if type(xlsx) != dict:
if verbose is True:
sys.stdout.write('Loading processing parameters from excel file')
sys.stdout.flush()
xlfile = XLSX(xlsx, verbose=verbose)
sp = xlfile.pullrsimparams()
else: # if parameters were provided in place of an excel file
sp = xlsx
mskeys = ['+', '-']
for key in sp:
if 'formula' in sp[key] and sp[key]['formula'] is not None: # if formula is specified
sp[key]['mol'] = IPMolecule(sp[key]['formula']) # create Molecule object
sp[key]['bounds'] = sp[key]['mol'].calculate_bounds(
bounds_confidence
) # generate bounds from molecule object with this confidence interval
if verbose is True:
sys.stdout.write(' DONE\n')
rtime = {} # empty dictionaries for time and tic
tic = {}
rd = False
for mode in mskeys: # look for existing positive and negative mode raw data
try:
modedata, modetime, modetic = xlfile.pullrsim('Raw Data (' + mode + ')')
except KeyError:
continue
except UnboundLocalError: # catch for if pyrsir was not handed an excel file
continue
if verbose is True:
sys.stdout.write('Existing (%s) mode raw data were found, grabbing those values.' % mode)
sys.stdout.flush()
rd = True # bool that rd is present
modekey = 'raw' + mode
sp.update(modedata) # update sp dictionary with raw data
for key in modedata: # check for affinities
if 'affin' not in sp[key]:
sp[key]['affin'] = mode
rtime[modekey] = list(modetime) # update time list
tic[modekey] = list(modetic) # update tic list
if verbose is True:
sys.stdout.write(' DONE\n')
# sp = prepformula(sp)
newpeaks = False
if rd is True:
newsp = {}
sum_spectra = None
for key in sp: # checks whether there is a MS species that does not have raw data
if 'raw' not in sp[key]:
newsp[key] = sp[key] # create references in the namespace
if len(newsp) is not 0:
newpeaks = True
if verbose is True:
sys.stdout.write('Some peaks are not in the raw data, extracting these from raw file.\n')
ips = xlfile.pullmultispectrum(
'Isotope Patterns') # pull predefined isotope patterns and add them to species
for species in ips: # set spectrum list
sp[species]['spectrum'] = [ips[species]['x'], ips[species]['y']]
mzml = mzML(filename) # load mzML class
sp = prepformula(sp) # prep formula etc for summing
newsp = prepformula(newsp) # prep formula species for summing
for species in newsp:
if 'spectrum' not in newsp[species]:
newsp[species]['spectrum'] = Spectrum(3, newsp[species]['bounds'][0], newsp[species]['bounds'][1])
newsp = mzml.pull_species_data(newsp) # pull data
else:
if verbose is True:
sys.stdout.write('No new peaks were specified. Proceeding directly to summing and normalization.\n')
if rd is False: # if no raw data is present, process mzML file
mzml = mzML(filename, verbose=verbose) # load mzML class
sp = prepformula(sp)
sp, sum_spectra = mzml.pull_species_data(sp, combine_spectra) # pull relevant data from mzML
chroms = mzml.pull_chromatograms() # pull chromatograms from mzML
rtime = {}
tic = {}
for key in sp: # compare predicted isotope patterns to the real spectrum and save standard error of the regression
func = sp[key]['function']
if mzml.functions[func]['type'] == 'MS': # determine mode key
if combine_spectra is True:
sp[key]['spectrum'] = sum_spectra[sp[key]['function']].trim(
xbounds=sp[key]['bounds']) # extract the spectrum object
mode = 'raw' + mzml.functions[func]['mode']
if mzml.functions[func]['type'] == 'UV':
mode = 'rawUV'
if mode not in rtime: # if rtime and tic have not been pulled from that function
rtime[mode] = mzml.functions[func]['timepoints']
tic[mode] = mzml.functions[func]['tic']
# if 'formula' in sp[key] and sp[key]['formula'] is not None:
# sp[key]['match'] = sp[key]['mol'].compare(sp[key]['spectrum'])
if combine_spectra is True:
for fn in sum_spectra:
sum_spectra[fn] = sum_spectra[fn].trim() # convert Spectrum objects into x,y lists
# if max(n) > 1: # run combine functions if n > 1
for num in n: # for each n to sum
if verbose is True:
sys.stdout.write('\r%d Summing species traces.' % num)
sumkey = str(num) + 'sum'
for key in sp: # bin each species
if sp[key]['affin'] in mskeys or mzml.functions[sp[key]['function']][
'type'] == 'MS': # if species is MS related
sp[key][sumkey] = bindata(num, sp[key]['raw'])
for mode in mskeys:
sumkey = str(num) + 'sum' + mode
modekey = 'raw' + mode
if modekey in rtime.keys(): # if there is data for that mode
rtime[sumkey] = bindata(num, rtime[modekey], num)
tic[sumkey] = bindata(num, tic[modekey])
if verbose is True:
sys.stdout.write(' DONE\n')
sys.stdout.flush()
# else:
# for key in sp: # create key for normalization
# sp[key]['1sum'] = sp[key]['raw']
for num in n: # normalize each peak's chromatogram
if verbose is True:
sys.stdout.write('\r%d Normalizing species traces.' % num)
sys.stdout.flush()
sumkey = str(num) + 'sum'
normkey = str(num) + 'norm'
for mode in mskeys:
modekey = 'raw' + mode
if modekey in rtime.keys(): # if there is data for that mode
for key in sp: # for each species
if sp[key]['affin'] in mskeys or mzml.functions[sp[key]['function']][
'type'] == 'MS': # if species has affinity
sp[key][normkey] = []
for ind, val in enumerate(sp[key][sumkey]):
# sp[key][normkey].append(val/(mzml.function[func]['tic'][ind]+0.01)) #+0.01 to avoid div/0 errors
sp[key][normkey].append(
val / (tic[sumkey + sp[key]['affin']][ind] + 0.01)) # +0.01 to avoid div/0 errors
if verbose is True:
sys.stdout.write(' DONE\n')
if return_data is True: # if data is to be used by another function, return the calculated data
return mzml, sp, rtime, tic, chroms
# import pickle #pickle objects (for troubleshooting)
# pickle.dump(rtime,open("rtime.p","wb"))
# pickle.dump(tic,open("tic.p","wb"))
# pickle.dump(chroms,open("chroms.p","wb"))
# pickle.dump(sp,open("sp.p","wb"))
output() # write data to excel file
if verbose is True:
sys.stdout.write('\rUpdating paramters')
sys.stdout.flush()
xlfile.updatersimparams(sp) # update summing parameters
if verbose is True:
sys.stdout.write(' DONE\n')
if verbose is True:
sys.stdout.write('\rSaving "%s" (this may take some time)' % xlfile.bookname)
sys.stdout.flush()
xlfile.save()
if verbose is True:
sys.stdout.write(' DONE\n')
if verbose is True:
if verbose is True:
sys.stdout.write('Plotting traces')
if plot is True:
plots() # plots for quick review
if verbose is True:
sys.stdout.write(' DONE\n')
if verbose is True:
stime.printelapsed()
This diff is collapsed.
......@@ -44,6 +44,7 @@ import sys
import warnings
import scipy as sci
import numpy as np
from typing import Iterable
from .spectrum import Spectrum
from bisect import bisect_left, bisect_right
from .colour import Colour
......@@ -497,6 +498,29 @@ def normalize(lst, maxval=1.):
return lst
def normalize_lists(target: Iterable[float], divisor: Iterable[float]) -> Iterable:
"""
Normalizes a target list value-by-value by the divisor list. Lists must be the same size
:param target: target list
:param divisor: list to divide each element of the target list by
:return: list of normalized target values
"""
if len(target) != len(divisor):
raise ValueError(
f'the length of the target ({len(target)}) is not equal to the length of the divisor list ({len(divisor)}'
)
out = []
for num, denom in zip(target, divisor):
try:
out.append(num / denom)
except (ValueError, ZeroDivisionError):
# original method of avoiding zero division error
# todo consider appending None instead
out.append(num / 0.0001)
return out
def localmax(x: list, y: list, xval: float, lookwithin: float = 1.):
"""
Finds the local maximum within +/- lookwithin of the xval
......
......@@ -3,6 +3,8 @@ Class for opening and handling excel files with commonly used data formats
"""
import sys
import openpyxl as op
import numpy as np
from typing import Iterable
from openpyxl.utils.exceptions import InvalidFileException
......@@ -1005,6 +1007,30 @@ class XLSX(object):
chart.series.append(series) # add the data to the chart
ws.add_chart(chart, 'E1') # add the chart to the worksheet
def write_array_to_sheet(self,
arr: np.ndarray,
header_row: Iterable = None,
sheetname: str = None,
save: bool = False,
):
"""
Writes the provided data array to sheet.
:param arr: numpy ndarray. Expects the first dimension of the array to be row-like.
(the function will iterate over the first dimension and write number of columns equivalent to the
size of the second dimension.
:param sheetname: sheet to write the data to
:param save: whether to save after finishing
"""
sheet = self.wb.create_sheet(sheetname)
# if a header row was provided
if header_row is not None:
sheet.append(header_row)
for row in arr:
sheet.append(list(row))
if save is True:
self.save()
if __name__ == '__main__':
name = 'Useless delete this'
......
......@@ -2,43 +2,58 @@ import os
import shutil
import unittest
from PyRSIR import pyrsir
from pythoms.rsir import RSIR
multitest_xl = os.path.join(os.path.dirname(__file__), 'multitest_pyrsir_validation.xlsx')
multitest_mz = os.path.join(os.path.dirname(__file__), 'MultiTest.mzML.gz')
rxn_xl = os.path.join(os.path.dirname(__file__), 'LY-2015-09-15 06 pyrsir example.xlsx')
rxn_mz = os.path.join(os.path.dirname(__file__), 'LY-2015-09-15 06.mzML.gz')
class TestPyRSIR(unittest.TestCase):
def setUp(self) -> None:
# target xlsx and mzml pairings
self.targets = [
[
os.path.join(os.path.dirname(__file__), 'multitest_pyrsir_validation.xlsx'),
os.path.join(os.path.dirname(__file__), 'MultiTest.mzML.gz')
],
[
os.path.join(os.path.dirname(__file__), 'LY-2015-09-15 06 pyrsir example.xlsx'),
os.path.join(os.path.dirname(__file__), 'LY-2015-09-15 06.mzML.gz'),
]
]
# create backups
for xlfile, _ in self.targets:
def test_multitest(self):
"""runs the multitest stage"""
shutil.copy(
multitest_xl,
f'{multitest_xl}.bak'
)
try:
rsir = RSIR(
multitest_mz,
bin_numbers=[3, 5],
)
rsir.add_targets_from_xlsx(
multitest_xl,
)
rsir.extract_data()
rsir.write_rsir_to_excel(f'{multitest_xl}')
finally:
shutil.copy(
xlfile,
f'{xlfile}.bak'
f'{multitest_xl}.bak',
multitest_xl,
)
os.remove(f'{multitest_xl}.bak')
def test(self):
for xlfile, msfile in self.targets:
pyrsir(
msfile,
xlfile,
3,
plot=False,
verbose=False,
def test_rxn(self):
"""runs the reaction profiling stage"""
shutil.copy(
rxn_xl,
f'{rxn_xl}.bak'
)
try:
rsir = RSIR(
rxn_mz,
bin_numbers=[3, 5, 10],
)
def tearDown(self) -> None:
for xlfile, _ in self.targets:
rsir.add_targets_from_xlsx(
rxn_xl,
)
rsir.extract_data()
rsir.write_rsir_to_excel(f'{rxn_xl}')
finally:
shutil.copy(
f'{xlfile}.bak',
xlfile,
f'{rxn_xl}.bak',
rxn_xl,
)
os.remove(f'{xlfile}.bak')
os.remove(f'{rxn_xl}.bak')
......@@ -12,7 +12,7 @@ accomplished using Rapid Environment Editor (https://www.rapidee.com/).
"""
import matplotlib.animation as animation
from PyRSIR import pyrsir
from pythoms.rsir.legacy import pyrsir
from pythoms.tome import bindata, binnspectra
from pythoms.colour import Colour
from bisect import bisect_left, bisect_right
......