Source code for msp2db.parse

# coding: utf-8
from __future__ import absolute_import, unicode_literals, print_function
import datetime
import re
import os
import pubchempy as pcp
import csv
import uuid
import six
from .re import get_compound_regex, get_meta_regex
from .db import get_connection, insert_query_m, _make_sql_compatible, db_dict
from .utils import get_precursor_mz, line_count, get_blank_dict

try:
    # For Python 3.0 and later
    from urllib.request import URLError
except ImportError:
    # Fall back to Python 2's urllib2
    from urllib2 import URLError

try:
    from http.client import BadStatusLine
except ImportError:
    from httplib import BadStatusLine


[docs]class LibraryData(object): """MSP file parser to SQL databases After creating a SQL database for the library spectra using create_db, MSP files can be parsed into the database using the LibraryData class. Example: >>> from msp2db.db import create_db >>> from msp2db.parse import LibraryData >>> db_pth = 'spectral_library.db' >>> create_db(file_pth=db_pth, db_type='sqlite', db_name='spectra') >>> libdata = LibraryData(msp_pth='MoNA-export-FAHFA.msp', >>> db_pth=db_pth, >>> db_type='sqlite', >>> schema='mona', >>> source='fahfa', >>> chunk=200) Args: msp_pth (str): path to msp file or directory [required] db_pth (str): path to sqlite database (only required when using SQLite database) [default None] source (str): Source of the msp files (e.g. massbank) [default 'unknown'] mslevel (int): If the msp file does not contain the mslevel this can be defined here [default None] polarity (str): If the msp file does not contain the polarity this can be defined here [default None] db_type (str): The type of database to submit to (either 'sqlite', 'mysql' or 'django_mysql') [default sqlite] user (str): Username for database (only required for non Django mysql databases) [default None] password (str): Password for database (only required for non Django mysql databases) [default None] mysql_db_name (str): Name of the mysql database (only required for non Django mysql databases) [default None] chunk (int): Chunks of spectra to parse data (useful to control memory usage) [default 200] schema (str): MSP files can vary based on how they were made, two standard schemas are available either 'mona' based on the MassBank of North America (MoNA) MSP files. And 'massbank' which is based on the more controlled MassBank MSP files https://github.com/MassBank/MassBank-data [default 'mona'] user_meta_regex (dict): For other MSP files not derived from either MoNA or MassBank a custom dictionary of regexes can be used [default None] user_compound_regex (dict): For other MSP files not derived from either MoNA or MassBank a custom dictionary of regexes can be used [default None] compound_lookup (boolean): Include compound lookup celery_obj (boolean): If using Django a Celery task object can be used to keep track on ongoing tasks [default False] Returns: LibraryData object """ def __init__(self, msp_pth, db_pth=None, mslevel=None, polarity=None, source='unknown', db_type='sqlite', password=None, user=None, mysql_db_name=None, chunk=200, schema='mona', user_meta_regex=None, user_compound_regex=None, compound_lookup=True, celery_obj=False): # get the database connection (either sqlite, mysql or Django mysql) conn = get_connection(db_type, db_pth, user, password, mysql_db_name) # set up object variables self.c = conn.cursor() self.conn = conn self.db_pth = db_pth self.meta_info_all = [] self.compound_info_all = [] self.compound_ids = [] self.get_compound_ids() self.spectra_all = [] self.spectra_annotation_all = [] self.start_spectra = False self.start_spectra_annotation = False self.ignore_additional_spectra_info = False self.collect_meta = True self.update_source = True self.source = source self.mslevel = mslevel self.polarity = polarity self.other_names = [] # Either get standard regexs or the user provided regexes if user_meta_regex: self.meta_regex = user_meta_regex else: self.meta_regex = get_meta_regex(schema=schema) if user_compound_regex: self.compound_regex = user_meta_regex else: self.compound_regex = get_compound_regex(schema=schema) # initiate the meta data self.meta_info = get_blank_dict(self.meta_regex) self.compound_info = get_blank_dict(self.compound_regex) self._get_current_ids() # parse the file(s) self._parse_files(msp_pth, chunk, db_type, celery_obj=celery_obj, compound_lookup=compound_lookup) def _get_current_ids(self, source=True, meta=True, spectra=True, spectra_annotation=True): """Get the current id for each table in the database Args: source (boolean): get the id for the table "library_spectra_source" will update self.current_id_origin meta (boolean): get the id for the table "library_spectra_meta" will update self.current_id_meta spectra (boolean): get the id for the table "library_spectra" will update self.current_id_spectra spectra_annotation (boolean): get the id for the table "library_spectra_annotation" will update self.current_id_spectra_annotation """ # get the cursor for the database connection c = self.c # Get the last uid for the spectra_info table if source: c.execute('SELECT max(id) FROM library_spectra_source') last_id_origin = c.fetchone()[0] if last_id_origin: self.current_id_origin = last_id_origin + 1 else: self.current_id_origin = 1 if meta: c.execute('SELECT max(id) FROM library_spectra_meta') last_id_meta = c.fetchone()[0] if last_id_meta: self.current_id_meta = last_id_meta + 1 else: self.current_id_meta = 1 if spectra: c.execute('SELECT max(id) FROM library_spectra') last_id_spectra = c.fetchone()[0] if last_id_spectra: self.current_id_spectra = last_id_spectra + 1 else: self.current_id_spectra = 1 if spectra_annotation: c.execute('SELECT max(id) FROM library_spectra_annotation') last_id_spectra_annotation = c.fetchone()[0] if last_id_spectra_annotation: self.current_id_spectra_annotation = last_id_spectra_annotation + 1 else: self.current_id_spectra_annotation = 1 def _parse_files(self, msp_pth, chunk, db_type, celery_obj=False, compound_lookup=True): """Parse the MSP files and insert into database Args: msp_pth (str): path to msp file or directory [required] db_type (str): The type of database to submit to (either 'sqlite', 'mysql' or 'django_mysql') [required] chunk (int): Chunks of spectra to parse data (useful to control memory usage) [required] celery_obj (boolean): If using Django a Celery task object can be used to keep track on ongoing tasks [default False] compound_lookup (boolean): Compound lookup """ if os.path.isdir(msp_pth): c = 0 for folder, subs, files in sorted(os.walk(msp_pth)): for msp_file in sorted(files): msp_file_pth = os.path.join(folder, msp_file) if os.path.isdir(msp_file_pth) or not msp_file_pth.lower().endswith(('txt', 'msp')): continue print('MSP FILE PATH', msp_file_pth) self.num_lines = line_count(msp_file_pth) # each file is processed separately but we want to still process in chunks so we save the number # of spectra currently being processed with the c variable with open(msp_file_pth, "r") as f: c = self._parse_lines(f, chunk, db_type, celery_obj, c, compound_lookup=compound_lookup) else: self.num_lines = line_count(msp_pth) with open(msp_pth, "r") as f: self._parse_lines(f, chunk, db_type, celery_obj, compound_lookup=compound_lookup) self.insert_data(remove_data=True, db_type=db_type) def _parse_lines(self, f, chunk, db_type, celery_obj=False, c=0, compound_lookup=True): """Parse the MSP files and insert into database Args: f (file object): the opened file object db_type (str): The type of database to submit to (either 'sqlite', 'mysql' or 'django_mysql') [required] chunk (int): Chunks of spectra to parse data (useful to control memory usage) [required] celery_obj (boolean): If using Django a Celery task object can be used to keep track on ongoing tasks [default False] c (int): Number of spectra currently processed (will reset to 0 after that chunk of spectra has been inserted into the database compound_lookup (bool): Compound lookup """ old = 0 for i, line in enumerate(f): line = line.rstrip() if i == 0: old = self.current_id_meta self._update_libdata(line, compound_lookup) if self.current_id_meta > old: old = self.current_id_meta c += 1 if c > chunk: if celery_obj: celery_obj.update_state(state='current spectra {}'.format(str(i)), meta={'current': i, 'total': self.num_lines}) print(self.current_id_meta) self.insert_data(remove_data=True, db_type=db_type) self.update_source = False c = 0 return c def _update_libdata(self, line, compound_lookup=True): """Update the library meta data from the current line being parsed Args: line (str): The current line of the of the file being parsed compound_lookup (bool): Compound lookup """ #################################################### # parse MONA Comments line #################################################### # The mona msp files contain a "comments" line that contains lots of other information normally separated # into by "" if re.match('^Comment.*$', line, re.IGNORECASE): comments = re.findall('"([^"]*)"', line) for c in comments: self._parse_meta_info(c) self._parse_compound_info(c) #################################################### # parse meta and compound info lines #################################################### # check the current line for both general meta data # and compound information self._parse_meta_info(line) if compound_lookup: self._parse_compound_info(line) #################################################### # End of meta data #################################################### # Most MSP files have the a standard line of text before the spectra information begins. Here we check # for this line and store the relevant details for the compound and meta information to be ready for insertion # into the database if self.collect_meta and (re.match('^Num Peaks(.*)$', line, re.IGNORECASE) or re.match('^PK\$PEAK:(.*)', line, re.IGNORECASE) or re.match('^PK\$ANNOTATION(.*)', line, re.IGNORECASE)): self._store_compound_info() self._store_meta_info() # Reset the temp meta and compound information self.meta_info = get_blank_dict(self.meta_regex) self.compound_info = get_blank_dict(self.compound_regex) self.other_names = [] self.collect_meta = False # ignore additional information in the 3rd column if using the MassBank spectra schema if re.match('^PK\$PEAK: m/z int\. rel\.int\.$', line, re.IGNORECASE): self.ignore_additional_spectra_info = True # Check if annnotation or spectra is to be in the next lines to be parsed if re.match('^Num Peaks(.*)$', line, re.IGNORECASE) or re.match('^PK\$PEAK:(.*)', line, re.IGNORECASE): self.start_spectra = True return elif re.match('^PK\$ANNOTATION(.*)', line, re.IGNORECASE): self.start_spectra_annotation = True match = re.match('^PK\$ANNOTATION:(.*)', line, re.IGNORECASE) columns = match.group(1) cl = columns.split() self.spectra_annotation_indexes = {i: cl.index(i) for i in cl} return #################################################### # Process annotation details #################################################### # e.g. molecular formula for each peak in the spectra if self.start_spectra_annotation: self._parse_spectra_annotation(line) #################################################### # Process spectra #################################################### if self.start_spectra: self._parse_spectra(line)
[docs] def get_compound_ids(self): """Extract the current compound ids in the database. Updates the self.compound_ids list """ cursor = self.conn.cursor() cursor.execute('SELECT inchikey_id FROM metab_compound') self.conn.commit() for row in cursor: if not row[0] in self.compound_ids: self.compound_ids.append(row[0])
def _store_compound_info(self): """Update the compound_info dictionary with the current chunk of compound details Note that we use the inchikey as unique identifier. If we can't find an appropiate inchikey we just use a random string (uuid4) suffixed with UNKNOWN """ other_name_l = [name for name in self.other_names if name != self.compound_info['name']] self.compound_info['other_names'] = ' <#> '.join(other_name_l) if not self.compound_info['inchikey_id']: self._set_inchi_pcc(self.compound_info['pubchem_id'], 'cid', 0) if not self.compound_info['inchikey_id']: self._set_inchi_pcc(self.compound_info['smiles'], 'smiles', 0) if not self.compound_info['inchikey_id']: self._set_inchi_pcc(self.compound_info['name'], 'name', 0) if not self.compound_info['inchikey_id']: print('WARNING, cant get inchi key for ', self.compound_info) print(self.meta_info) print('#########################') self.compound_info['inchikey_id'] = 'UNKNOWN_' + str(uuid.uuid4()) if not self.compound_info['pubchem_id'] and self.compound_info['inchikey_id']: self._set_inchi_pcc(self.compound_info['inchikey_id'], 'inchikey', 0) if not self.compound_info['name']: self.compound_info['name'] = 'unknown name' if not self.compound_info['inchikey_id'] in self.compound_ids: self.compound_info_all.append(tuple(self.compound_info.values()) + ( str(datetime.datetime.now()), str(datetime.datetime.now()), )) self.compound_ids.append(self.compound_info['inchikey_id']) def _store_meta_info(self): """Update the meta dictionary with the current chunk of meta data details """ # In the mass bank msp files, sometimes the precursor_mz is missing but we have the neutral mass and # the precursor_type (e.g. adduct) so we can calculate the precursor_mz if not self.meta_info['precursor_mz'] and self.meta_info['precursor_type'] and \ self.compound_info['exact_mass']: self.meta_info['precursor_mz'] = get_precursor_mz(float(self.compound_info['exact_mass']), self.meta_info['precursor_type']) if not self.meta_info['polarity']: # have to do special check for polarity (as sometimes gets missed) m = re.search('^\[.*\](\-|\+)', self.meta_info['precursor_type'], re.IGNORECASE) if m: polarity = m.group(1).strip() if polarity == '+': self.meta_info['polarity'] = 'positive' elif polarity == '-': self.meta_info['polarity'] = 'negative' if not self.meta_info['accession']: self.meta_info['accession'] = 'unknown accession' self.meta_info_all.append( (str(self.current_id_meta),) + tuple(self.meta_info.values()) + (str(self.current_id_origin), self.compound_info['inchikey_id'],) ) def _parse_spectra_annotation(self, line): """Parse and store the spectral annotation details """ if re.match('^PK\$NUM_PEAK(.*)', line, re.IGNORECASE): self.start_spectra_annotation = False return saplist = line.split() sarow = ( self.current_id_spectra_annotation, float(saplist[self.spectra_annotation_indexes['m/z']]) if 'm/z' in self.spectra_annotation_indexes else None, saplist[self.spectra_annotation_indexes[ 'tentative_formula']] if 'tentative_formula' in self.spectra_annotation_indexes else None, float(saplist[self.spectra_annotation_indexes[ 'mass_error(ppm)']]) if 'mass_error(ppm)' in self.spectra_annotation_indexes else None, self.current_id_meta) self.spectra_annotation_all.append(sarow) self.current_id_spectra_annotation += 1 def _parse_spectra(self, line): """Parse and store the spectral details """ if line in ['\n', '\r\n', '//\n', '//\r\n', '', '//']: self.start_spectra = False self.current_id_meta += 1 self.collect_meta = True return splist = line.split() if len(splist) > 2 and not self.ignore_additional_spectra_info: additional_info = ''.join(map(str, splist[2:len(splist)])) else: additional_info = '' srow = ( self.current_id_spectra, float(splist[0]), float(splist[1]), additional_info, self.current_id_meta) self.spectra_all.append(srow) self.current_id_spectra += 1 def _set_inchi_pcc(self, in_str, pcp_type, elem): """Check pubchem compounds via API for both an inchikey and any available compound details """ if not in_str: return 0 try: pccs = pcp.get_compounds(in_str, pcp_type) except pcp.BadRequestError as e: print(e) return 0 except pcp.TimeoutError as e: print(e) return 0 except pcp.ServerError as e: print(e) return 0 except URLError as e: print(e) return 0 except BadStatusLine as e: print(e) return 0 if pccs: pcc = pccs[elem] self.compound_info['inchikey_id'] = pcc.inchikey self.compound_info['pubchem_id'] = pcc.cid self.compound_info['molecular_formula'] = pcc.molecular_formula self.compound_info['molecular_weight'] = pcc.molecular_weight self.compound_info['exact_mass'] = pcc.exact_mass self.compound_info['smiles'] = pcc.canonical_smiles if len(pccs) > 1: print('WARNING, multiple compounds for ', self.compound_info) def _get_other_names(self, line): """Parse and extract any other names that might be recorded for the compound Args: line (str): line of the msp file """ m = re.search(self.compound_regex['other_names'][0], line, re.IGNORECASE) if m: self.other_names.append(m.group(1).strip()) def _parse_meta_info(self, line): """Parse and extract all meta data by looping through the dictionary of meta_info regexs updates self.meta_info Args: line (str): line of the msp file """ if self.mslevel: self.meta_info['ms_level'] = self.mslevel if self.polarity: self.meta_info['polarity'] = self.polarity for k, regexes in six.iteritems(self.meta_regex): for reg in regexes: m = re.search(reg, line, re.IGNORECASE) if m: self.meta_info[k] = m.group(1).strip() def _parse_compound_info(self, line): """Parse and extract all compound data by looping through the dictionary of compound_info regexs updates self.compound_info Args: line (str): line of the msp file """ for k, regexes in six.iteritems(self.compound_regex): for reg in regexes: if self.compound_info[k]: continue m = re.search(reg, line, re.IGNORECASE) if m: self.compound_info[k] = m.group(1).strip() self._get_other_names(line)
[docs] def insert_data(self, remove_data=False, db_type='sqlite'): """Insert data stored in the current chunk of parsing into the selected database Args: remove_data (boolean): Remove the data stored within the LibraryData object for the current chunk of processing db_type (str): The type of database to submit to either 'sqlite', 'mysql' or 'django_mysql' [default sqlite] """ if self.update_source: # print "insert ref id" import msp2db self.c.execute( "INSERT INTO library_spectra_source (id, name, parsing_software) VALUES" " ({a}, '{b}', 'msp2db-v{c}')".format(a=self.current_id_origin, b=self.source, c=msp2db.__version__)) self.conn.commit() if self.compound_info_all: self.compound_info_all = _make_sql_compatible(self.compound_info_all) cn = ', '.join(self.compound_info.keys()) + ',created_at,updated_at' insert_query_m(self.compound_info_all, columns=cn, conn=self.conn, table='metab_compound', db_type=db_type) self.meta_info_all = _make_sql_compatible(self.meta_info_all) cn = 'id,' + ', '.join(self.meta_info.keys()) + ',library_spectra_source_id, inchikey_id' insert_query_m(self.meta_info_all, columns=cn, conn=self.conn, table='library_spectra_meta', db_type=db_type) cn = "id, mz, i, other, library_spectra_meta_id" insert_query_m(self.spectra_all, columns=cn, conn=self.conn, table='library_spectra', db_type=db_type) if self.spectra_annotation_all: cn = "id, mz, tentative_formula, mass_error, library_spectra_meta_id" insert_query_m(self.spectra_annotation_all, columns=cn, conn=self.conn, table='library_spectra_annotation', db_type=db_type) # self.conn.close() if remove_data: self.meta_info_all = [] self.spectra_all = [] self.spectra_annotation_all = [] self.compound_info_all = [] self._get_current_ids(source=False)
[docs] def get_db_dict(self): """ Get a dictionary of the library spectra from the associated database Example: >>> from msp2db.db import create_db >>> from msp2db.parse import LibraryData >>> db_pth = 'spectral_library.db' >>> create_db(file_pth=db_pth, db_type='sqlite', db_name='spectra') >>> libdata = LibraryData(msp_pth='MoNA-export-FAHFA.msp', >>> db_pth=db_pth, >>> db_type='sqlite', >>> schema='mona', >>> source='fahfa', >>> chunk=200) >>> libdata.db_dict() If using a large database the resulting dictionary will be very large! Returns: A dictionary with the following keys 'library_spectra', 'library_spectra_meta', 'library_spectra_annotations', 'library_spectra_source' and 'metab_compound'. Where corresponding values for each key are list of list containing all the rows in the database. """ return db_dict(self.c)
[docs] def close(self): """ Close the database connections """ self.conn.close()
# build up list of inserts # add in bulk the splash keys
[docs]def add_splash_ids(splash_mapping_file_pth, conn, db_type='sqlite'): """ Add splash ids to database (in case stored in a different file to the msp files like for MoNA) Example: >>> from msp2db.db import get_connection >>> from msp2db.parse import add_splash_ids >>> conn = get_connection('sqlite', 'library.db') >>> add_splash_ids('splash_mapping_file.csv', conn, db_type='sqlite') Args: splash_mapping_file_pth (str): Path to the splash mapping file (needs to be csv format and have no headers, should contain two columns. The first the accession number the second the splash. e.g. AU100601, splash10-0a4i-1900000000-d2bc1c887f6f99ed0f74 \n """ # get dictionary of accession and library_spectra_meta_id cursor = conn.cursor() cursor.execute("SELECT id, accession FROM library_spectra_meta") accession_d = {row[1]: row[0] for row in cursor} if db_type == 'sqlite': type_sign = '?' else: type_sign = '%s' rows = [] c = 0 # loop through splash mapping file with open(splash_mapping_file_pth, "r") as f: for line in f: c+=1 line = line.rstrip() line_l = line.split(',') accession = line_l[0] splash = line_l[1] try: aid = accession_d[accession] except KeyError as e: print("can't find accession {}".format(accession)) continue row = (splash, aid) rows.append(row) if c > 200: print(row) cursor.executemany("UPDATE library_spectra_meta SET splash = {t} WHERE id = {t} ".format(t=type_sign), rows) conn.commit() rows = [] c = 0 cursor.executemany("UPDATE library_spectra_meta SET splash = {t} WHERE id = {t} ".format(t=type_sign), rows) conn.commit()