Commit 7ab61a57 authored by Matthew Berkeley's avatar Matthew Berkeley

BUSCO 4.0.2

parent 69cbc465
4.0.2
- Issue #182 partially fixed
4.0.1
- Issue #164 fixed
- Issue #166 fixed
......
......@@ -29,6 +29,14 @@ To get help on BUSCO use: ``busco -h`` and ``python3 scripts/generate_plot.py -h
**!!!** Don't use "odb9" datasets with BUSCOv4. If you need to reproduce previous analyses, use BUSCOv3 (https://gitlab.com/ezlab/busco/-/tags/3.0.2)
Note: When running auto-lineage, the initial results for eukaryotes are incomplete. This is deliberate, as these initial
results are used merely to determine whether the genome scores highest against the bacteria, archaea or eukaryota
datasets. If the eukaryota dataset is selected, BUSCO then attempts to place the input assembly on the eukaryote
phylogenetic tree before running a complete BUSCO assessment using the selected child dataset. Unless the top-level
eukaryota dataset is selected as the best match for the input file, the eukaryota dataset run will not complete. So
while the specific dataset run will return accurate results, the generic eukaryota dataset run should be considered
unreliable.
**How to cite BUSCO**
*BUSCO: Assessing Genome Assembly and Annotation Completeness.*
......
import argparse
from busco.BuscoConfig import PseudoConfig
from busco.ConfigManager import BuscoConfigManager
from busco.BuscoLogger import BuscoLogger
import os
import sys
logger = BuscoLogger.get_logger(__name__)
class ListLineagesAction(argparse.Action):
def __init__(self, option_strings, dest, nargs=0, default="==SUPPRESS==", **kwargs):
super().__init__(option_strings, dest, nargs=nargs, default=default, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
try:
self.config_manager = BuscoConfigManager({})
except SystemExit as se:
logger.error("The config file is necessary here as it contains remote and local path locations for "
"downloading dataset information")
logger.error(se)
raise SystemExit
self.config = PseudoConfig(self.config_manager.config_file)
try:
self.config.load()
self.print_lineages()
except SystemExit as se:
logger.error(se)
finally:
os.remove("busco_{}.log".format(BuscoLogger.random_id))
parser.exit()
def print_lineages(self):
if self.config.update:
lineages_list_file = self.download_lineages_list()
else:
lineages_list_file = self.config.existing_downloads[0]
with open(lineages_list_file, "r") as f:
print("".join(f.readlines()))
def download_lineages_list(self):
lineages_list_file = self.config.downloader.get("lineages_list.txt", "information")
return lineages_list_file
class CleanHelpAction(argparse.Action):
def __init__(self, option_strings, dest, nargs=0, default="==SUPPRESS==", **kwargs):
super().__init__(option_strings, dest, nargs=nargs, default=default, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
parser.print_help()
try:
os.remove("busco_{}.log".format(BuscoLogger.random_id))
except OSError:
pass
parser.exit()
class CleanVersionAction(argparse.Action):
def __init__(self, option_strings, version=None, dest="==SUPPRESS==", nargs=0, default="==SUPPRESS==", **kwargs):
super().__init__(option_strings, dest, nargs=nargs, default=default, **kwargs)
self.version = version
def __call__(self, parser, namespace, values, option_string=None):
version = self.version
if version is None:
version = parser.version
formatter = parser._get_formatter()
formatter.add_text(version)
parser._print_message(formatter.format_help(), sys.stdout)
try:
os.remove("busco_{}.log".format(BuscoLogger.random_id))
except OSError:
pass
parser.exit()
from Bio import SeqIO
from busco.BuscoTools import TBLASTNRunner, MKBLASTRunner
from busco.Toolset import Tool
from busco.BuscoLogger import BuscoLogger
from busco.BuscoLogger import LogDecorator as log
import subprocess
import os
from abc import ABCMeta, abstractmethod
logger = BuscoLogger.get_logger(__name__)
class NucleotideAnalysis(metaclass=ABCMeta):
LETTERS = ["A", "C", "T", "G", "N"]
# explanation of ambiguous codes found here: https://www.dnabaser.com/articles/IUPAC%20ambiguity%20codes.html
AMBIGUOUS_CODES = ["Y", "R", "W", "S", "K", "M", "D", "V", "H", "B"]
MAX_FLANK = 20000
def __init__(self, config):
# Variables inherited from BuscoAnalysis
self._config = None
self._cpus = None
self._input_file = None
super().__init__(config) # Initialize BuscoAnalysis
self._long = self._config.getboolean("busco_run", "long")
self._flank = self._define_flank()
self._ev_cutoff = self._config.getfloat("busco_run", "evalue")
self._region_limit = self._config.getint("busco_run", "limit")
self.blast_cpus = self._cpus
if not self.check_nucleotide_file(self._input_file):
raise SystemExit("Please provide a nucleotide file as input")
def check_nucleotide_file(self, filename):
i = 0
for record in SeqIO.parse(filename, "fasta"):
for letter in record.seq.upper():
if i > 5000:
break
i += 1
if letter not in type(self).LETTERS and letter not in type(self).AMBIGUOUS_CODES:
return False
else:
continue # only continue to next record of 5000 has not been hit
break # If for loop exits with "break", the else clause is skipped and the outer loop also breaks.
return True
def _define_flank(self):
"""
TODO: Add docstring
:return:
"""
try:
size = os.path.getsize(self._input_file) / 1000 # size in mb
flank = int(size / 50) # proportional flank size
# Ensure value is between 5000 and MAX_FLANK
flank = min(max(flank, 5000), type(self).MAX_FLANK)
except IOError: # Input data is only validated during run_analysis. This will catch any IO issues before that.
raise SystemExit("Impossible to read the fasta file {}".format(self._input_file))
return flank
@abstractmethod
def init_tools(self): # todo: This should be an abstract method
"""
Initialize all required tools for Genome Eukaryote Analysis:
MKBlast, TBlastn, Augustus and Augustus scripts: GFF2GBSmallDNA, new_species, etraining
:return:
"""
super().init_tools()
def check_tool_dependencies(self):
super().check_tool_dependencies()
def _get_blast_version(self):
blast_version_call = subprocess.check_output([self._tblastn_tool.cmd, "-version"], shell=False)
blast_version = ".".join(blast_version_call.decode("utf-8").split("\n")[0].split()[1].rsplit(".")[:-1])
return blast_version
def _run_mkblast(self):
self.mkblast_runner = MKBLASTRunner(self._mkblast_tool, self._input_file, self.main_out, self._cpus)
self.mkblast_runner.run()
@log("Running a BLAST search for BUSCOs against created database", logger)
def _run_tblastn(self, missing_and_frag_only=False, ancestral_variants=False):
incomplete_buscos = (self.hmmer_runner.missing_buscos + list(self.hmmer_runner.fragmented_buscos.keys())
if missing_and_frag_only else None) # This parameter is only used on the re-run
self.tblastn_runner = TBLASTNRunner(self._tblastn_tool, self._input_file, self.run_folder, self._lineage_dataset,
self.mkblast_runner.output_db, self._ev_cutoff, self.blast_cpus,
self._region_limit, self._flank, missing_and_frag_only, ancestral_variants,
incomplete_buscos)
self.tblastn_runner.run()
coords = self.tblastn_runner._get_coordinates()
coords = self.tblastn_runner._filter_best_matches(coords) # Todo: remove underscores from non-hidden methods
self.tblastn_runner._write_coordinates_to_file(coords) # writes to "coordinates.tsv"
self.tblastn_runner._write_contigs(coords)
return coords
class ProteinAnalysis:
LETTERS = ["F", "L", "I", "M", "V", "S", "P", "T", "A", "Y", "X", "H", "Q", "N", "K", "D", "E", "C", "W", "R", "G"]
NUCL_LETTERS = ["A", "C", "T", "G", "N"]
def __init__(self, config):
super().__init__(config)
if not self.check_protein_file(self._input_file):
raise SystemExit('Please provide a protein file as input')
def check_protein_file(self, filename):
for i, record in enumerate(SeqIO.parse(filename, "fasta")):
if i > 10:
break
for letter in record.seq:
if letter.upper() not in type(self).NUCL_LETTERS and letter.upper() in type(self).LETTERS:
return True
elif letter.upper() not in type(self).LETTERS:
return False
else:
continue
return False # if file only contains "A", "T", "C", "G", "N", it is unlikely to be a protein file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
#!/usr/bin/env python
# coding: utf-8
"""
.. module:: BuscoDownloadManager
:synopsis: BuscoDownloadManager manage the version and download the most recent file
.. versionadded:: 4.0.0
.. versionchanged:: 4.0.0
Copyright (c) 2016-2020, Evgeny Zdobnov ([email protected])
Licensed under the MIT license. See LICENSE.md file.
"""
import os
import time
import glob
import tarfile
import hashlib
import urllib.request
from urllib.error import URLError
import gzip
from busco.BuscoLogger import BuscoLogger
from busco.BuscoLogger import LogDecorator as log
logger = BuscoLogger.get_logger(__name__)
class BuscoDownloadManager:
"""
This class obtains and manages the version of data files required to run a BUSCO analysis.
When the config parameter `offline` is set, no attempt to download is made.
When the config parameter `auto_update_files` is set, new versions replace old versions.s
Else, a warning is produced.
"""
version_files = {}
def __init__(self, config):
"""
:param config: Values of all parameters to be used during the analysis
:type config: BuscoConfig
"""
self.offline = config.getboolean("busco_run", "offline")
self.update_data = config.getboolean("busco_run", "update-data")
self.download_base_url = config.get("busco_run", "download_base_url")
self.local_download_path = config.get("busco_run", "download_path")
self._create_main_download_dir()
if not type(self).version_files and not self.offline:
self._load_versions()
def _create_main_download_dir(self):
if not os.path.exists(self.local_download_path):
os.makedirs(self.local_download_path)
def _load_versions(self):
try:
versions_file = self._obtain_versions_file()
with open(versions_file, "r") as v_file:
for line in v_file:
line = line.strip().split("\t")
dataset_name = line[0]
dataset_date = line[1]
dataset_hash = line[2]
type(self).version_files.update({dataset_name: (dataset_date, dataset_hash)})
except URLError as e:
if self.offline:
logger.warning("Unable to verify BUSCO datasets because of offline mode")
else:
SystemExit(e)
return
@log("Downloading information on latest versions of BUSCO data...", logger)
def _obtain_versions_file(self):
remote_filepath = os.path.join(self.download_base_url, "file_versions.tsv")
local_filepath = os.path.join(self.local_download_path, "file_versions.tsv")
try:
urllib.request.urlretrieve(remote_filepath, local_filepath)
except URLError:
SystemExit("Cannot reach {}".format(remote_filepath))
return local_filepath
def _create_category_dir(self, category):
# if the category folder does not exist, create it
category_folder = os.path.join(self.local_download_path, category)
if not os.path.exists(category_folder):
os.mkdir(category_folder)
return
@staticmethod
def _extract_creation_date(dataset_config_file):
dataset_date = None
with open(dataset_config_file, "r") as data_config:
for line in data_config:
line = line.strip().split("=")
if line[0] == "creation_date":
dataset_date = line[1]
break
if not dataset_date:
raise SystemExit("Creation date could not be extracted from dataset.cfg file.")
return dataset_date
def _check_existing_version(self, local_filepath, category, data_basename):
latest_update = type(self).version_files[data_basename][0]
path_basename, extension = os.path.splitext(data_basename)
if category == "lineages":
latest_version = ".".join([path_basename, latest_update])
try:
dataset_date = self._extract_creation_date(os.path.join(local_filepath, "dataset.cfg"))
up_to_date = dataset_date == latest_update
present = True
except FileNotFoundError:
up_to_date = False
present = False
else:
latest_version = ".".join([path_basename, latest_update, extension.lstrip(".")])
local_filepath = local_filepath.replace(data_basename, latest_version)
up_to_date = os.path.exists(local_filepath)
path_to_check, extension_to_check = os.path.splitext(local_filepath)
present = len(glob.glob("{}.*.{}".format(path_to_check[0:-11], extension_to_check[1:]))) > 0
hash = type(self).version_files[data_basename][1]
return present, up_to_date, latest_version, local_filepath, hash
def get(self, data_name, category):
if os.path.exists(data_name) and "/" in data_name:
logger.info("Using local {} directory {}".format(category, data_name))
return data_name
elif "/" in data_name:
raise SystemExit("{} does not exist".format(data_name))
if self.offline:
if category == 'lineages':
return os.path.join(self.local_download_path, category, data_name)
else:
basename, extension = os.path.splitext(data_name)
return sorted(glob.glob(os.path.join(
self.local_download_path, category, "{}.*.{}".format(basename, extension))))[-1]
data_basename = os.path.basename(data_name)
local_filepath = os.path.join(self.local_download_path, category, data_basename)
present, up_to_date, latest_version, local_filepath, hash = self._check_existing_version(local_filepath, category,
data_basename)
if (not up_to_date and self.update_data) or not present:
# download
self._create_category_dir(category)
compression_extension = ".tar.gz"
remote_filepath = os.path.join(self.download_base_url, category, latest_version+compression_extension)
if present and category == 'lineages':
self._rename_old_version(local_filepath)
download_success = self._download_file(remote_filepath, local_filepath+compression_extension, hash)
if download_success:
local_filepath = self._decompress_file(local_filepath+compression_extension)
if present:
logger.warning("The file or folder {} was updated automatically.".format(data_basename))
elif not up_to_date:
logger.warning("The file or folder {} is not the last available version. "
"To update all data files to the last version, add the parameter "
"--update-data in your next run.".format(local_filepath))
return local_filepath
def _rename_old_version(self, local_filepath):
if os.path.exists(local_filepath):
try:
os.rename(local_filepath, "{}.old".format(local_filepath))
logger.info("Renaming {} into {}.old".format(local_filepath, local_filepath))
except OSError:
try:
timestamp = time.time()
os.rename(local_filepath, "{}.old.{}".format(local_filepath, timestamp))
logger.info("Renaming {} into {}.old.{}".format(local_filepath, local_filepath, timestamp))
except OSError as e:
pass
return
@log("Downloading file {}", logger, func_arg=1)
def _download_file(self, remote_filepath, local_filepath, expected_hash):
try:
urllib.request.urlretrieve(remote_filepath, local_filepath)
observed_hash = type(self)._md5(local_filepath)
if observed_hash != expected_hash:
logger.error("md5 hash is incorrect: {} while {} expected".format(str(observed_hash), str(expected_hash)))
logger.info("deleting corrupted file {}".format(local_filepath))
os.remove(local_filepath)
raise SystemExit("BUSCO was unable to download or update all necessary files")
else:
logger.debug('md5 hash is {}'.format(observed_hash))
except URLError:
logger.error("Cannot reach {}".format(remote_filepath))
return False
return True
@staticmethod
def _md5(fname):
hash_md5 = hashlib.md5()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
@log("Decompressing file {}", logger, func_arg=1)
def _decompress_file(self, local_filepath):
unzipped_filename = local_filepath.replace(".gz", "")
if os.path.splitext(local_filepath)[1] == ".gz":
with gzip.open(os.path.join(local_filepath), "rb") as compressed_file:
with open(unzipped_filename, "wb") as decompressed_file:
for line in compressed_file:
decompressed_file.write(line)
os.remove(local_filepath)
local_filepath = unzipped_filename
if os.path.splitext(local_filepath)[1] == ".tar":
untarred_filename = local_filepath.replace(".tar", "")
with tarfile.open(local_filepath) as tar_file:
tar_file.extractall(os.path.dirname(local_filepath))
os.remove(local_filepath)
local_filepath = untarred_filename
return local_filepath
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
from busco.AutoLineage import AutoSelectLineage
from busco.BuscoConfig import BuscoConfigMain
from busco.BuscoLogger import BuscoLogger
from busco.BuscoLogger import LogDecorator as log
import os
import sys
logger = BuscoLogger.get_logger(__name__)
# todo: finalize config file
class BuscoConfigManager:
def __init__(self, params):
self.params = params
self.config_file = None
self.config = None
self.get_config_file()
self.runner = None
@log("Getting config file", logger, debug=True)
def get_config_file(self):
"""
Check for BUSCO config file specified as a command line argument;
if not present check if defined as an environment variable;
if not present use default config file.
:return config: A BuscoConfig object containing all the required configuration parameters
"""
try:
self.config_file = self.params["config_file"]
if self.config_file is not None:
return
except KeyError:
pass
if os.environ.get("BUSCO_CONFIG_FILE") and os.access(os.environ.get("BUSCO_CONFIG_FILE"), os.R_OK):
self.config_file = os.environ.get("BUSCO_CONFIG_FILE")
else:
raise SystemExit("Please specify a BUSCO config file using either "
"(i) an environment variable by entering 'export BUSCO_CONFIG_FILE=/path/to/config.ini' "
"or (ii) using the command line flag --config /path/to/config.ini")
return self.config_file
@log("Configuring BUSCO with {}", logger, attr_name="config_file")
def load_busco_config(self, clargs):
self.config = BuscoConfigMain(self.config_file, self.params, clargs)
self.config.validate()
if not self.config.check_lineage_present():
if not self.config.getboolean("busco_run", "auto-lineage") and not self.config.getboolean("busco_run", "auto-lineage-prok"):# and not self.config.getboolean("busco_run", "auto-lineage-euk"):
logger.warning("Running Auto Lineage Selector as no lineage dataset was specified. This will take a "
"little longer than normal. If you know what lineage dataset you want to use, please "
"specify this in the config file or using the -l (--lineage-dataset) flag in the "
"command line.")
self.config.set("busco_run", "auto-lineage", "True")
lineage_dataset_fullpath = self.auto_select_lineage() # full path
self.config.set("busco_run", "lineage_dataset", lineage_dataset_fullpath)
lineage_dataset = os.path.basename(lineage_dataset_fullpath) # base name
else:
if self.config.getboolean("busco_run", "auto-lineage") or self.config.getboolean("busco_run", "auto-lineage-prok"):# or self.config.getboolean("busco_run", "auto-lineage-euk"):
logger.warning("You have selected auto-lineage but you have also provided a lineage dataset. "
"BUSCO will proceed with the specified dataset. "
"To run auto-lineage do not specify a dataset.")
self.config.set("busco_run", "auto-lineage", "False")
self.config.set("busco_run", "auto-lineage-prok", "False")
self.config.set("busco_run", "auto-lineage-euk", "False")
lineage_dataset = self.config.get("busco_run", "lineage_dataset") # full path
self.config.set_results_dirname(lineage_dataset) # function always only uses basename
self.config.download_lineage_file(lineage_dataset) # full path will return, base name will attempt download
# Todo: clean up error messages
self.config.load_dataset_config()
return
@log("No lineage specified. Running lineage auto selector.\n", logger)
def auto_select_lineage(self):
asl = AutoSelectLineage(self.config)
asl.run_auto_selector()
asl.get_lineage_dataset()
lineage_dataset = asl.best_match_lineage_dataset
self.runner = asl.selected_runner
return lineage_dataset
#!/usr/bin/env python3
# coding: utf-8
"""
.. module:: GeneSetAnalysis
:synopsis: GeneSetAnalysis implements genome analysis specifics
.. versionadded:: 3.0.0
.. versionchanged:: 3.0.0
Copyright (c) 2016-2020, Evgeny Zdobnov ([email protected])
Licensed under the MIT license. See LICENSE.md file.
"""
from busco.BuscoAnalysis import BuscoAnalysis
from busco.BuscoLogger import BuscoLogger
from busco.Analysis import ProteinAnalysis
from Bio import SeqIO
logger = BuscoLogger.get_logger(__name__)
class GeneSetAnalysis(ProteinAnalysis, BuscoAnalysis):
"""
This class runs a BUSCO analysis on a gene set.
"""
_mode = 'proteins'
def __init__(self, config):
"""
Initialize an instance.
:param params: Values of all parameters that have to be defined
:type params: PipeConfig
"""
super().__init__(config)
self.sequences_aa = {record.id: record for record in list(SeqIO.parse(self._input_file, "fasta"))}
def _cleanup(self):
super()._cleanup()
def run_analysis(self):
"""
This function calls all needed steps for running the analysis.
"""
super().run_analysis()
self.run_hmmer(self._input_file)
self._write_buscos_to_file(self.sequences_aa)
self._cleanup()
# if self._tarzip:
# self._run_tarzip_hmmer_output()
return
def create_dirs(self):
super().create_dirs()
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
#!/usr/bin/env python
# coding: utf-8
"""
.. package:: busco
:synopsis: BUSCO - Benchmarking Universal Single-Copy Orthologs.
Copyright (c) 2016-2020, Evgeny Zdobnov ([email protected])
Licensed under the MIT license. See LICENSE.md file.
"""
from ._version import __version__ as version
__all__ = ["Actions", "Analysis", "AutoLineage", "BuscoAnalysis","BuscoConfig", "BuscoDownloadManager", "BuscoLogger",
"BuscoPlacer", "BuscoRunner", "BuscoTools", "ConfigManager", "GeneSetAnalysis", "GenomeAnalysis", "Toolset",
"TranscriptomeAnalysis", "BuscoConfig", "BuscoPlacer"]
__version__ = version
#!/usr/bin/env python
# coding: utf-8
"""
Copyright (c) 2016-2020, Evgeny Zdobnov ([email protected])
Licensed under the MIT license. See LICENSE.md file.
"""
__version__ = "4.0.2"
This diff is collapsed.
#!/home/cegg/berkeley/.virtualenvs/busco_env/bin/python
try:
from busco import run_BUSCO
except ImportError as err:
try:
import re
pattern_search = re.search("cannot import name '(?P<module_name>[\w]+)", err.msg)
missing_module = pattern_search.group("module_name")
if missing_module == "run_BUSCO":
print("BUSCO must be installed before it is run. Please enter 'python setup.py install (--user)'. See the user guide for more information.")
elif missing_module == "Bio":
print("Please install BioPython (https://biopython.org/) before running BUSCO.")
elif missing_module == "numpy":
print("Please install NumPy before running BUSCO.")
else:
print("Unable to find module {}. Please make sure it is installed. See the user guide and the GitLab issue "
"board (https://gitlab.com/ezlab/busco/issues) if you need further assistance.".format(missing_module))
except:
print(err.msg)
print("There was a problem installing BUSCO or importing one of its dependencies. See the user guide and the "
"GitLab issue board (https://gitlab.com/ezlab/busco/issues) if you need further assistance.")
raise SystemExit(0)
run_BUSCO.main()
\ No newline at end of file
......@@ -53,7 +53,7 @@ RCODE = '######################################\n'\
'# @version 4.0.0\n'\
'# @since BUSCO 2.0.0\n'\
'# \n' \
'# Copyright (c) 2016-2017, Evgeny Zdobnov ([email protected])\n'\
'# Copyright (c) 2016-2020, Evgeny Zdobnov ([email protected])\n'\
'# Licensed under the MIT license. See LICENSE.md file.\n'\
'#\n'\
'######################################\n'\
......
......@@ -6,7 +6,7 @@
.. versionadded:: 3.0.0
.. versionchanged:: 3.0.1
Copyright (c) 2016-2017, Evgeny Zdobnov ([email protected])
Copyright (c) 2016-2020, Evgeny Zdobnov ([email protected])
Licensed under the MIT license. See LICENSE.md file.
"""
......
......@@ -8,7 +8,7 @@
This is a logger for the pipeline that extends the default Python logger class
Copyright (c) 2016-2017, Evgeny Zdobnov ([email protected])
Copyright (c) 2016-2020, Evgeny Zdobnov ([email protected])
Licensed under the MIT license. See LICENSE.md file.
"""
......