Commit 2098ca65 authored by Lars Yunker's avatar Lars Yunker
Browse files

Merge branch '47-support-time-restriction-in-rsir' into 'master'

Resolve "support time restriction in RSIR"

Closes #47

See merge request !42
parents d1ff7283 ccd505dc
Pipeline #247214253 passed with stages
in 3 minutes and 6 seconds
__version__ = '2.2.3'
__version__ = '2.2.4'
......@@ -22,6 +22,31 @@ from .parsing import fps, scan_properties, branch_attributes, spectrum_array, ge
from .io import file_present, pw_convert, fix_extension
def find_some_peaks(values, peaks: int = 4) -> List[int]:
Roughly locates maximum peaks in each section of the provided list. The provided list will be broken into
the number of chunks defined by peaks and the maximum peak will be located within those chunks.
:param values: list of values to process
:param peaks: number of peaks to locate
:return: list of indicies of peak maximum
split = int(len(values) / peaks)
start = 0
end = start + split
splity = []
for i in range(peaks):
start += split
end += split
out = []
for ind, section in enumerate(splity):
maxy = max(section)
if maxy == max(section[1:-1]): # if max is not at the edge of the spectrum
out.append(np.where(section == maxy)[0][0] + split * ind)
return out
class BoundsError(Warning):
"""A warning class to handle bounds errors when integrating (used only by PyRSIR)"""
......@@ -482,23 +507,6 @@ class mzML(object):
:return: Estimated resolution of the spectrum
:rtype: float
def findsomepeaks(y):
"""roughly locates 4 peaks by maximum values in the spectrum and returns their index"""
split = int(len(y) / npeaks)
start = 0
end = start + split
splity = []
for i in range(npeaks):
start += split
end += split
out = []
for ind, section in enumerate(splity):
maxy = max(section)
if maxy == max(section[1:-1]): # if max is not at the edge of the spectrum
out.append(np.where(section == maxy)[0][0] + split * ind)
return out
if function is None: # if no function is provided, use first
function = self.associate_to_function()
if self.functions[function]['type'] != 'MS':
......@@ -527,7 +535,7 @@ class mzML(object):
res = []
for spec in summed: # calculate resolution for each scan range
inds = findsomepeaks(spec[1]) # find some peaks
inds = find_some_peaks(spec[1], npeaks) # find some peaks
for ind in inds: # for each of those peaks
res.append(resolution(spec[0], spec[1], ind, threshold=10))
res = [y for y in res if y is not None] # removes None values (below S/N)
......@@ -15,6 +15,7 @@ from tqdm import tqdm
from ..mzml import mzML
from ..mzml.parsing import spectrum_array, fps
from ..mzml.psims import CVParameterSet
from ..tome import bindata, normalize_lists
from ..xlsx import XLSX
from ..spectrum import Spectrum
......@@ -295,13 +296,18 @@ class RSIR:
def extract_data(self,
resolution_override: float = None,
start_time: float = None,
stop_time: float = None,
Extracts data from the loaded mzML file.
Extracts data from the loaded mzML file. Note that if either start or stop time are specified, additional context
associations will be required to retrieve the accompanying time array for the targets.
:param resolution_override: optional specification of the resolution to use for calculating integration bounds
from IPMolecule instances. If not specified, the resolution will be automatically calculated from
the mzML file.
:param start_time: optional restriction of time for analysis (data from before this will not be processed)
:param stop_time: optional restriction of stop time for analysis (data following this time will not be processed)
if self.mzml is None:
self.logger.error('an mzML instance has not been loaded')
......@@ -333,6 +339,12 @@ class RSIR:
for element in tqdm(self.mzml.spectra_elements, desc='performing rsir analysis', total=self.mzml.nscans):
spectrum = spectrum_array(element)
func, proc, scan = fps(element)
cv_params = CVParameterSet.create_from_branch(element)
# do not process if outside of specified range
if start_time and cv_params['MS:1000016'].value < start_time:
elif stop_time and stop_time < cv_params['MS:1000016'].value:
for target in self.targets:
if target.function == func:
......@@ -341,6 +353,31 @@ class RSIR:'data extraction complete')
self._processed = True
def get_time_of_function(self,
function: int,
start_time: float = None,
stop_time: float = None,
) -> List[float]:
Retrieve the time list of the provided function.
:param function: function number
:param start_time: optional start time (times smaller than this will not be included)
:param stop_time: optional end time (times larger than this will not be included
:return: list of scan times
out = self.mzml.get_timepoints_of_function(function)
# restrict times if specified
if start_time or stop_time:
start_time = start_time or 0.
stop_time = stop_time or self.mzml.duration
out = [
for val in out
if start_time <= val <= stop_time
return out
def bin_tic_of_function(self, function: int, bin_num: int) -> List[float]:
Bins the TIC values of the specified function
......@@ -2,7 +2,7 @@ import os
import shutil
import unittest
from pythoms.rsir import RSIR
from pythoms.rsir import RSIR, RSIRTarget
multitest_xl = os.path.join(os.path.dirname(__file__), 'multitest_pyrsir_validation.xlsx')
multitest_mz = os.path.join(os.path.dirname(__file__), 'MultiTest.mzML.gz')
......@@ -35,6 +35,25 @@ class TestPyRSIR(unittest.TestCase):
def test_time_restriction(self):
"""tests the time restriction capability of RSIR"""
target = RSIRTarget(bounds=(241.6, 243.2))
rsir = RSIR(multitest_mz)
len(rsir.get_time_of_function(1, start_time=0.1, stop_time=0.4))
len(target.raw_data) == 5,
'ensure traces have been restricted'
# def test_rxn(self):
# """runs the reaction profiling stage"""
# shutil.copy(
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment