Commits (26)
__version__ = '1.6.4'
__version__ = '2.0.0'
......@@ -31,9 +31,9 @@ from datetime import datetime
import sympy as sym
import pylab as pl
import copy
from tqdm import tqdm
from .scripttime import ScriptTime
from .spectrum import Spectrum, weighted_average
from .progress import Progress
from . import mass_dictionaries # import mass dictionaries
from itertools import combinations_with_replacement as cwr
from IsoSpecPy import IsoThreshold
......@@ -619,18 +619,10 @@ def isotope_pattern_hybrid(
))
sortlist = sorted(sortlist) # sorted list of elements based on the length of their isotope patterns
sortlist.reverse()
if verbose is True:
prog = Progress(
last=len(sortlist) - 1,
percent=False,
fraction=False,
)
spec = None
for lenlist, element in sortlist:
if verbose is True:
prog.string = f'Adding element {element} to isotope pattern'
prog.write(1)
# todo convert to context tqdm (update string)
for lenlist, element in tqdm(sortlist, desc='adding element to isotope pattern', disable=not verbose):
if spec is None:
spec = Spectrum(
autodec(fwhm), # decimal places
......@@ -640,9 +632,6 @@ def isotope_pattern_hybrid(
filler=0., # fill with zeros, not None
specin=eleips[element], # supply masses and abundances as initialization spectrum
)
if verbose is True:
prog.fin()
continue
spec.add_element(eleips[element][0], eleips[element][1])
spec.normalize(100.) # normalize spectrum object
......@@ -655,8 +644,6 @@ def isotope_pattern_hybrid(
threshold,
3 * 10 ** -consolidate
)
if verbose is True:
sys.stdout.write(' DONE\n')
return spec
......@@ -776,19 +763,10 @@ def isotope_pattern_combinatoric(
empty=True, # whether or not to use emptyspec
filler=0., # fill with zeros, not None
)
if verbose is True:
counter = 0 # create a counter
iterations = int(cpu_list_product([numberofcwr(n, k) for n, k in nk])) # number of iterations
prog = Progress( # create a progress instance
string='Processing isotope combination',
last=iterations
)
for comb in product(*iterators):
if verbose is True:
counter += 1
# remaining = st.progress(counter,iterations,'combinations')
prog.write(counter)
iterations = int(cpu_list_product([numberofcwr(n, k) for n, k in nk])) # number of iterations
for comb in tqdm(product(*iterators), desc='processing isotope combination', total=iterations, disable=not verbose):
num = 1 # number of combinations counter
x = 0. # mass value
y = 1. # intensity value
......@@ -812,8 +790,6 @@ def isotope_pattern_combinatoric(
ele, iso = string_to_isotope(element) # determine element and isotope
spec.shift_x(mass_dict[ele][iso][0] * comp[element]) # shift the x values by the isotopic mass
spec.normalize() # normalize the spectrum object
if verbose is True:
prog.fin()
return spec
......@@ -852,8 +828,6 @@ def isotope_pattern_multiplicative(
for key in comp: # for each element
if key in mass_dict: # if not a single isotope
if verbose is True:
prog = Progress(string=f'Processing element {key}', last=comp[key])
masses = [] # list for masses of each isotope
abunds = [] # list for abundances
for mass in mass_dict[key]:
......@@ -861,9 +835,8 @@ def isotope_pattern_multiplicative(
if mass_dict[key][mass][1] > 0: # if abundance is nonzero
masses.append(mass_dict[key][mass][0])
abunds.append(mass_dict[key][mass][1])
for n in range(comp[key]): # for n number of each element
if verbose is True:
prog.write(n + 1)
msg = f'Processing element {key}'
for n in tqdm(range(comp[key]), desc=msg, disable=not verbose): # for n number of each element
if spec is None: # if spectrum object has not been defined
spec = Spectrum(
decpl, # decimal places
......@@ -890,8 +863,6 @@ def isotope_pattern_multiplicative(
)
else: # if specific isotope
ele, iso = string_to_isotope(key) # find element and isotope
if verbose is True:
prog = Progress(string=f'Processing isotope {key}', fraction=False, percent=False)
if spec is None: # if spectrum object has not been defined
spec = Spectrum(
decpl, # decimal places
......@@ -903,9 +874,8 @@ def isotope_pattern_multiplicative(
filler=0. # fill with zeros, not None
)
continue
# todo add tqdm progress bar
spec.shift_x(mass_dict[ele][iso][0]) # offset spectrum object by the mass of that
if verbose is True:
prog.fin(' ')
spec.normalize()
if verbose is True:
sys.stdout.write('DONE\n')
......@@ -1474,7 +1444,7 @@ class IPMolecule(Molecule):
dropmethod=None,
emptyspec=True,
groupmethod='weighted',
ipmethod='hybrid',
ipmethod='isospec',
keepall=False,
npeaks=5000,
resolution=5000,
......
"""
Module for interacting with mzML files
"""
from .structure import mzML
__all__ = [
'mzML',
]
if __name__ == '__main__':
filename = 'MultiTest'
mzml = mzML(filename, verbose=True, ftt=True)
# sp = {
# 'pos':{'bounds':[325,327],'affin':'+','spectrum':Spectrum(3),'raw':[]},
# 'neg':{'bounds':[348,350],'affin':'-','spectrum':Spectrum(3),'raw':[]},
# 'uv':{'bounds':[378,None],'affin':'UV','raw':[]}
# }
import os
import pathlib
import subprocess
import sys
from .logging import logger
def file_present(filepath):
"""checks for the presence of the specified file or directory in the current working directory"""
tf = os.path.isfile(filepath) # look for file first
if tf is False: # if file cannot be found, look for directory
tf = os.path.isdir(filepath)
return tf
def pw_convert(filename,
bit=64,
compression=True,
gzip=True,
verbose=True,
out_directory=None
):
"""
Runs msconvert.exe from ProteoWizard to convert Waters .RAW format to .mzXML
which can then be parsed by python.
module requirements: os, subprocess, sys
ProteoWizard must be installed for this script to function.
go to
http://proteowizard.sourceforge.net/downloads.shtml
to download
This script assumes that the ProteoWizard is installed under either
c:\\program files\\proteowizard
or
c:\\program files (x86)\\proteowizard
If you use this python script to convert to mzML, you should cite the paper of the folks who wrote the program
Chambers, M.C. Nature Biotechnology 2012, 30, 918-920
doi 10.1038/nbt.2377
:param filename: file name to convert
:param bit: floating point bit precision (32 or 64)
:param compression: enable zlib compression of data in output file (saves space but increases processing time)
:param gzip: enable gzip compression of output file (saves disk space but increases processing time)
:param verbose: verbose for subprocess call
:param out_directory: optional output directory (if not specified, the file will be saved to the same directory as
the data)
:return: the file path for the generated file
"""
def find_all(fname, path):
"""
Finds all files of a given name within a specified directory.
Adapted from http://stackoverflow.com/questions/1724693/find-a-file-in-python
Module dependancies: os
"""
locations = []
for root, dirs, files in os.walk(path):
if fname in files:
locations.append(os.path.join(root, fname))
return locations
if sys.platform != 'win32':
raise OSError(
'The function that converts to mzML is limited to Windows operating systems.\n'
'You can manually convert to *.mzML using the proteowizard standalone package '
'and supply that mzML file to this script')
locs = []
for val in ['c:\\program files\\proteowizard',
'c:\\program files (x86)\\proteowizard']: # searches for msconvert.exe in expected folders
locs.extend(find_all('msconvert.exe', val))
if len(locs) == 0: # if script cannot find msconvert.exe
raise IOError(
'The python script could not find msconvert.exe\n'
'Please ensure that ProteoWizard is installed in either:\n'
'c:\\program files\\proteowizard\nor\nc:\\program files (x86)\\proteowizard')
filename = pathlib.Path(filename)
if out_directory is None:
out_directory = filename.parent
if bit not in [32, 64]:
raise ValueError(
f'an invalid floating point precision was specified"{bit}".')
callstring = " ".join([
f'{locs[-1]} "{filename}"', # main call
f'-o "{out_directory}"', # output directory
'--mzML',
'--gzip' if gzip else '', # gzip compression
'--zlib' if compression else '', # zlib compression
f'--{bit}', # floating point precision
'--verbose' if verbose else '', # verbose mode
])
out_exten = f'.mzML{".gz" if gzip else ""}'
logger.info(f'Generating mzML file from {filename}')
subprocess.call(callstring)
logger.info('conversion DONE')
filename.with_suffix(f'.mzML{".gz" if gzip else ""}')
def fix_extension(fn):
"""tries to fix invalid file extensions"""
oopsx = {'.mzm': 'l', '.mz': 'ml', '.m': 'zml', '.': 'mzml'} # incomplete mzml extensions
oopsr = {'.ra': 'w', '.r': 'aw', '.': 'raw'} # incomplete raw extionsions
oopsg = {'.mzml.g': 'z', '.mzml.': 'gz', '.mzml': '.gz', '.mzm': 'l.gz', '.mz': 'ml.gz', '.m': 'zml.gz',
'.': 'mzml.gz'} # incomplete gz extensions
# looks for missing extensions first
if file_present(fn + '.mzml.gz') is True:
return fn + '.mzml.gz'
if file_present(fn + '.mzml') is True:
return fn + '.mzml'
for key in oopsg: # tries to complete mzml.gz shortenings
if fn.lower().endswith(key) is True:
if file_present(fn + oopsg[key]) is True:
return fn + oopsg[key]
for key in oopsx: # tries to complete mzml shortenings
if fn.lower().endswith(key) is True:
if file_present(fn + oopsx[key]) is True:
return fn + oopsx[key]
for key in oopsr: # tries to complete raw shortenings
if fn.lower().endswith(key) is True:
if file_present(fn + oopsr[key]) is True:
return fn + oopsr[key]
if file_present(fn + '.raw') is True: # finally looks for raw file
return fn + '.raw'
raise FileNotFoundError(f'The file {fn} could not be located in the current working directory')
\ No newline at end of file
import logging
logger = logging.getLogger(__name__)
import base64
import re
import zlib
import warnings
from typing import Union, Iterable
from xml.etree import ElementTree
import numpy as np
from .psims import CVParameterSet, stringtodigit
# decoding formats for decoding mzML binary data array strings
decode_formats = {
'MS:1000519': ['<', 'i'], # signed 32-bit little-endian integer
# 'MS:1000520':['',''], # [OBSOLETE] Signed 16-bit float
'MS:1000521': ['<', 'f'], # 32-bit precision little-endian floating point conforming to IEEE-754
'MS:1000522': ['<', 'l'], # Signed 64-bit little-endian integer
'MS:1000523': ['<', 'd'], # 64-bit precision little-endian floating point conforming to IEEE-754.
}
# id string parsing for different manufacturers
_waters_id_re = re.compile('function=(?P<fn>\d+)\sprocess=(?P<proc>\d+)\sscan=(?P<scan>\d+)')
_agilent_id_re = re.compile('scanId=(?P<scan>\d+)')
# prefix for xml elements
_xml_element_prefix = '{http://psi.hupo.org/ms/mzml}'
def branch_attributes(branch: ElementTree.Element):
"""
Pulls all the attributes of an xml.dom.minidom xml branch.
These are generally things like index, id, etc.
:param xml.dom.minidom branch: An xml.dom.minidom object.
:return: A dictionary of attributes with each key being the attribute name and its value being the value of that
attribute.
:rtype: dict
**Notes**
The script will attempt to convert any values to float or
integer in order to reduce TypeErrors when trying to use
the extracted values.
"""
return {key: stringtodigit(val) for key, val in branch.attrib.items()}
def decodeformat(p: CVParameterSet, speclen: int):
"""
Determines the decode format from the accession parameter
:param p: extracted CVParamterSet of the data array
:param speclen: length of the spectrum (retrievable from the XML file)
:return: decode format
:rtype: str
"""
for key in set(decode_formats) & p.keys(): # find the set combination of the possibilities
return f'{decode_formats[key][0]}{speclen}{decode_formats[key][1]}' # create the decode format
def associate_data_type(cv_params: CVParameterSet):
"""
Associates a parameter set with a numpy data type
:param cv_params: set of cv parameters associated with a list of values
:return: numpy data type
"""
param_map = {
'523': np.float64,
'521': np.float32,
'522': np.int64,
'519': np.int32,
'520': np.float16,
}
for key, dtype in param_map.items():
if f'MS:1000{key}' in cv_params:
return dtype
raise ValueError('a data type could not be determined from this parameter set')
def gettext(nodelist):
"""gets text from a simple XML object"""
rc = []
for node in nodelist:
if node.nodeType == node.TEXT_NODE:
rc.append(node.data)
return ''.join(rc)
def spectrum_array(spectrum: ElementTree.Element) -> np.ndarray:
"""
Extracts and converts binary data to a numpy ndarray.
:param spectrum: A spectrum branch element. This element is expected to have two child nodes containing
binaryDataArrays.
"""
# spectrum length (defined in the spectrum attributes)
speclen = int(spectrum.attrib.get('defaultArrayLength'))
out = np.ndarray((2, speclen))
# iterate over the binary data arrays
for ind, bda in enumerate(_findall_ele('binaryDataArrayList', 'binaryDataArray', parent=spectrum)):
p = CVParameterSet.create_from_branch(bda) # grab cvparameters
# pull the binary string
binary_string = _find_ele('binary', parent=bda).text
# decode the string
decoded = base64.standard_b64decode(binary_string)
# if the string is compressed, decompress
if 'MS:1000574' in p:
decoded = zlib.decompress(decoded)
# retrieve the data from buffer with the associated format
out[ind] = np.frombuffer(
decoded,
associate_data_type(p)
)
return out
def get_element_units(spectrum: ElementTree.Element):
"""
Retrieves the units of the provided element. Assumes that the element has binaryDataArrayList and binaryDataArray
children which each have a unit-type controlled variable.
:param spectrum: XML spectrum Element
:return: x and y units corresponding to the spectrum
"""
units = []
# iterate over the binary data arrays
for bda in _findall_ele('binaryDataArrayList', 'binaryDataArray', parent=spectrum):
p = CVParameterSet.create_from_branch(bda) # grab cvparameters
for cv in p:
if cv.unit is not None:
units.append(cv.unit)
return units
def extract_spectrum(spectrum: ElementTree.Element, units: bool = False) -> list:
"""
Extracts and converts binary data to two lists.
:param spectrum: A spectrum branch element. This element is expected to have two child nodes containing
binaryDataArrays.
:param units: whether to extract the units from the spectrum
:return:
"""
warnings.warn(
'list retrieval is no longer recommended, please refactor your code to use ndarrays and spectrum_array',
DeprecationWarning,
stacklevel=2,
)
# todo go through all usages and refactor to array handling
out = spectrum_array(spectrum).tolist()
if units is not False: # extends the units onto out
out.extend(get_element_units(spectrum))
return out
def fps(element: ElementTree.Element):
"""
Extracts function #, process #, and scan # from the idstring of a spectrum branch
:param element: XML element to retrieve from
:return:
"""
# pull id string from scan attribute
idstring = element.attrib['id']
match = _waters_id_re.match(idstring)
if match is not None:
return (
int(match.group('fn')),
int(match.group('proc')),
int(match.group('scan')),
)
else:
# todo create generalized catch
match = _agilent_id_re.match(idstring)
return (
1,
None,
int(match.group('scan'))
)
def scan_properties(parameters: Union[CVParameterSet, ElementTree.Element]):
"""
Determines the scan properties of the provided spectrum.
:param parameters: CVParam parameters
:return:
"""
"""determines the scan properties of the provided spectrum"""
mstypes = { # ms accession keys and their respective names (for spectrum identification)
'MS:1000928': 'calibration spectrum',
'MS:1000294': 'mass spectrum',
'MS:1000322': 'charge inversion mass spectrum',
'MS:1000325': 'constant neutral gain spectrum',
'MS:1000326': 'constant neutral loss spectrum',
'MS:1000328': 'e/2 mass spectrum',
'MS:1000341': 'precursor ion spectrum',
'MS:1000343': 'product ion spectrum',
'MS:1000579': 'MS1 spectrum',
'MS:1000580': 'MSn spectrum',
'MS:1000581': 'CRM spectrum',
'MS:1000582': 'SIM spectrum',
'MS:1000583': 'SRM spectrum',
}
othertypes = { # other accession keys (non-MS)
'MS:1000620': 'PDA spectrum',
'MS:1000804': 'electromagnetic radiation spectrum',
'MS:1000805': 'emission spectrum',
'MS:1000806': 'absorption spectrum',
}
out = {}
if isinstance(parameters, CVParameterSet): # handed a cvparam class object (expected)
p = parameters
else: # handed a tree or branch (generate the cvparam class object)
p = CVParameterSet.create_from_branch(parameters)
for acc in p.keys() & mstypes.keys(): # check for ms spectrum
out['acc'] = acc # accession code
out['name'] = mstypes[acc] # name of spectrum
out['type'] = 'MS' # it is a mass spectrum
out['level'] = p['MS:1000511'].value # ms level
out['window'] = [p['MS:1000501'].value, p['MS:1000500'].value] # scan window
if 'MS:1000129' in p: # negative scan
out['mode'] = '-'
elif 'MS:1000130' in p: # positive scan
out['mode'] = '+'
if 'MS:1000827' in p: # if there is an isolation window target m/z
out['target'] = p['MS:1000827'].value
# if MSn > 2, not sure how to handle this (will have to be hard coded later as I have no examples)
elif out['level'] > 2:
raise ValueError(
'This script has not been coded to handle MSn > 2, please contact the author of the class')
return out
for acc in p.keys() & othertypes.keys(): # if the scan is something else
out['acc'] = acc # accession code
out['name'] = othertypes[acc] # name of spectrum
if 'MS:1000804' in p: # if it is a UV-Vis
out['type'] = 'UV'
else: # other other type (not handled by script)
raise KeyError(
'The script has not been coded to handle spectra types other than MS and UV-Vis. '
'Please contact the authors to get this functionality included.')
return out
def _find_ele(*paths, parent: ElementTree.Element = None) -> ElementTree.Element:
"""
Performs a find on the element tree.
Each element name must be prefixed by "{http://psi.hupo.org/ms/mzml}", so this function saves that step.
:param paths: path names
:param parent: parent to search (if not provided, root will be used)
:return: element tree at location
"""
return parent.find(
"/".join([
f'{_xml_element_prefix}{path}' for path in paths
])
)
def _findall_ele(*paths, parent: ElementTree.Element = None) -> Iterable[ElementTree.Element]:
"""
Performs a findall on the element tree.
Each element name must be prefixed by "{http://psi.hupo.org/ms/mzml}", so this function saves that step.
:param paths: path names
:param parent: parent to search (if not provided, root will be used)
:return: element tree at location
"""
return parent.findall(
"/".join([
f'{_xml_element_prefix}{path}' for path in paths
])
)
......@@ -3,6 +3,7 @@ import os
import obonet
import textwrap
import urllib
from xml.etree import ElementTree
cv_param_def = None # default state for loaded CV parameters
......@@ -360,6 +361,57 @@ class CVParameterSet(object):
if self.cv_values[key].value is not None:
print(f'value: {self.cv_values[key].value}') # followed by the value
@classmethod
def branch_to_dict(cls,
branch: ElementTree.Element,
search_children: bool = True,
) -> dict:
"""
Searches through children of a branch and retrieves cvparameter values and details.
:param branch: branch to search
:param search_children: whether to search child branches for additional cvparameters
:return: dictionary of cv parameters
"""
_cvparam_name = '{http://psi.hupo.org/ms/mzml}cvParam'
out = {}
for param_element in branch.findall(_cvparam_name):
acc = param_element.attrib.get('accession') # accession key
out[acc] = {}
for attribute, value in param_element.attrib.items(): # pull all the attributes
if attribute != 'accession':
# attempt to convert to integer or float, keep as string otherwise
out[acc][attribute] = stringtodigit(value)
if search_children is True:
for child in branch:
if child.tag != _cvparam_name:
out.update(
cls.branch_to_dict(
child,
search_children=True,
),
)
return out
@classmethod
def create_from_branch(cls,
branch: ElementTree.Element,
search_children: bool = True,
) -> "CVParameterSet":
"""
Creates a class instance from an XML branch
:param branch: XML branch to interpret
:param search_children: whether to search child branches for additional cvparameters
:return: cv parameter set associated with that branch
"""
return cls(
**cls.branch_to_dict(
branch=branch,
search_children=search_children,
)
)
class CVParameterDefinitions(CVParameterSet):
def __init__(self,
......
import sys
import warnings
warnings.warn(
'the pythoms.progress module as been deprecated, please switch to tqdm',
DeprecationWarning,
stacklevel=2,
)
class Progress(object):
......
......@@ -41,6 +41,7 @@ IGNORE
"""
import os
import sys
import warnings
import scipy as sci
import numpy as np
from .spectrum import Spectrum
......@@ -525,6 +526,14 @@ def trimspectrum(x: list, y: list, left: float, right: float, outside: bool = Fa
:return: new spectrum
:rtype: tuple of list
"""
if isinstance(x, np.ndarray) and isinstance(np.ndarray):
return slice_array(
y,
x,
left,
right,
outside=outside,
)
# find indicies
l = locate_in_list(x, left, 'greater')
r = locate_in_list(x, right, 'lesser')
......@@ -1235,3 +1244,76 @@ def strtolist(string):
except ValueError:
out.append(float(temp))
return out
def check_integer(val, name):
"""
This function checks that the supplied values are integers greater than 1
A integer value that is non-negative is required for the summing function.
Please check your input value.
"""
if type(val) != list and type(val) != tuple: # if only one value given for n
val = [val]
for num in val:
if type(num) != int:
raise ValueError(f'The {name} value ({num}) is not an integer.')
if num < 1:
raise ValueError(f'The {name} value ({num}) is less than 1.')
return val
def slice_array(value_array: np.ndarray,
check_array: np.ndarray = True,
lower_bound: float = None,
upper_bound: float = None,
outside: bool = False,
) -> np.ndarray:
"""
Slices a numpy ndarray with the provided conditions
:param value_array: array of values to integrate
:param check_array: array of values to check against (the value of the check array must be between the lower and
upper bounds
:param lower_bound: lower bound for the check array
:param upper_bound: upper bound for the check array
:param outside: Whether to include the next point outside of the trimmed spectrum. This provides continuity if the
spectrum is to be used for image generation.
:return: sliced x,y array
"""
# todo support outside argument
if outside:
warnings.warn(
'the outside argument is not supported by the slice_array function, convert to list and use trimspectrum',
stacklevel=2,
)
if lower_bound is not None:
if upper_bound is None: # todo accept single condition
raise ValueError(f'upper bound must also be specified')
truth_array = (lower_bound <= check_array) & (check_array <= upper_bound)
else:
truth_array = check_array
# todo see if there's a more efficient way to do this
out = np.ndarray((2, truth_array.sum()))
out[0] = check_array[truth_array]
out[1] = value_array[truth_array]
return out
def integrate_numpy(value_array: np.ndarray,
check_array: np.ndarray = True,
lower_bound: float = None,
upper_bound: float = None,
) -> float:
"""
Integrates a value array with the provided conditions.
:param value_array: array of values to integrate
:param check_array: array of values to check against (the value of the check array must be between the lower and
upper bounds
:param lower_bound: lower bound for the check array
:param upper_bound: upper bound for the check array
:return: integral (sum)
"""
_, y = slice_array(value_array, check_array, lower_bound, upper_bound)
return y.sum()
......@@ -45,8 +45,8 @@ setup(
'scipy>=1.1.0',
'sympy>=1.1.1',
'obonet==0.2.5', # they changed attribute names without deprecationwarnings, so only this version is verified
'numpy',
'isospecpy>=2.0.2',
'tqdm>=4.46.0',
],
keywords=KEYWORDS,
)
......@@ -57,5 +57,8 @@ class TestMolecule(unittest.TestCase):
)
self.ipmol - 'PPh3' # test subtraction
self.ipmol + 'PPh3' # test addition
mol2 = IPMolecule('N(Et)2(CH2(13C)H2(2H))2')
self.ipmol + mol2 # test class addition
mol2 = IPMolecule(
'N(Et)2(CH2(13C)H2(2H))2',
ipmethod='multiplicative',
)
mol2 + self.ipmol # test class addition
import unittest
import os
from pythoms.mzml import mzML, branch_attributes, branch_cvparams
from pythoms.mzml import mzML
from pythoms.mzml.parsing import branch_attributes
from pythoms.mzml.psims import CVParameterSet
class TestmzML(unittest.TestCase):
......@@ -26,7 +28,7 @@ class TestmzML(unittest.TestCase):
@mzml.foreachscan
def testperspec(spectrum):
p = branch_cvparams(spectrum)
p = CVParameterSet.create_from_branch(spectrum)
return p["MS:1000016"].value
self.assertEqual( # test spectrum decorator
......