Commit 2c43abe4 authored by Mitar's avatar Mitar

Updating coverage validation script.

parent b9e2306f
......@@ -2,13 +2,16 @@
import argparse
import glob
import gzip
import itertools
import json
import os.path
import yaml
parser = argparse.ArgumentParser(description="Compute pipeline coverage.")
parser.add_argument('interface_versions', metavar='INTERFACE', nargs='*', help="interface version(s) to compute coverage for", default=())
arguments = parser.parse_args()
D3M_PIPELINES = {
'f596cd77-25f8-4d4c-a350-bb30ab1e58f6',
}
def primitives_used_in_pipeline(pipeline):
......@@ -25,7 +28,46 @@ def primitives_used_in_pipeline(pipeline):
return primitives
for interface_version in arguments.interface_versions:
def pipelines_used_in_pipeline_run(pipeline_run):
pipeline_ids = set()
pipeline_ids.add(pipeline_run['pipeline']['id'])
if 'data_preparation' in pipeline_run['run']:
pipeline_ids.add(pipeline_run['run']['data_preparation']['pipeline']['id'])
if 'scoring' in pipeline_run['run']:
pipeline_ids.add(pipeline_run['run']['scoring']['pipeline']['id'])
return pipeline_ids
def resolve_pipeline(pipelines_dir, pipeline_id):
pipeline_path = os.path.join(pipelines_dir, '{pipeline_id}.json'.format(pipeline_id=pipeline_id))
try:
with open(pipeline_path, 'r', encoding='utf8') as pipeline_file:
return json.load(pipeline_file)
except FileNotFoundError:
pass
for extension in ['yml', 'yaml']:
pipeline_path = os.path.join(pipelines_dir, '{pipeline_id}.{extension}'.format(pipeline_id=pipeline_id, extension=extension))
try:
with open(pipeline_path, 'r', encoding='utf8') as pipeline_file:
return yaml.safe_load(pipeline_file)
except FileNotFoundError:
pass
if pipeline_id in D3M_PIPELINES:
return None
print("Could not resolve '{pipeline_id}' in '{pipelines_dir}'.".format(
pipeline_id=pipeline_id,
pipelines_dir=pipelines_dir,
))
return None
def process_interface_version(interface_version):
known_primitives = {}
primitives_used_in_pipelines = set()
......@@ -35,24 +77,35 @@ for interface_version in arguments.interface_versions:
known_primitives[primitive_annotation['id']] = primitive_annotation
for pipeline_path in glob.iglob('{interface_version}/*/*/*/pipelines/*.json'.format(interface_version=interface_version)):
with open(pipeline_path, 'r', encoding='utf8') as pipeline_file:
pipeline = json.load(pipeline_file)
primitives_used_in_pipelines.update(primitives_used_in_pipeline(pipeline))
for pipeline_run_path in itertools.chain(
glob.iglob('{interface_version}/*/*/*/pipeline_runs/*.yml.gz'.format(interface_version=interface_version)),
glob.iglob('{interface_version}/*/*/*/pipeline_runs/*.yml.gz'.format(interface_version=interface_version)),
):
pipelines_dir = os.path.sep.join(pipeline_run_path.split(os.path.sep)[:-2] + ['pipelines'])
for pipeline_path in glob.iglob('{interface_version}/*/*/*/pipelines/*.yml'.format(interface_version=interface_version)):
with open(pipeline_path, 'r', encoding='utf8') as pipeline_file:
pipeline = yaml.safe_load(pipeline_file)
with gzip.open(pipeline_run_path, 'rt', encoding='utf8') as pipeline_run_file:
pipeline_runs = yaml.safe_load_all(pipeline_run_file)
primitives_used_in_pipelines.update(primitives_used_in_pipeline(pipeline))
for pipeline_path in glob.iglob('{interface_version}/*/*/*/pipelines/*.yaml'.format(interface_version=interface_version)):
with open(pipeline_path, 'r', encoding='utf8') as pipeline_file:
pipeline = yaml.safe_load(pipeline_file)
for pipeline_run in pipeline_runs:
for pipeline_id in pipelines_used_in_pipeline_run(pipeline_run):
pipeline = resolve_pipeline(pipelines_dir, pipeline_id)
primitives_used_in_pipelines.update(primitives_used_in_pipeline(pipeline))
if pipeline:
primitives_used_in_pipelines.update(primitives_used_in_pipeline(pipeline))
for primitive_id, primitive in known_primitives.items():
if primitive_id not in primitives_used_in_pipelines:
print(primitive['source']['name'], primitive['id'], primitive['python_path'])
def main():
parser = argparse.ArgumentParser(description="Compute pipeline coverage.")
parser.add_argument('interface_versions', metavar='INTERFACE', nargs='*', help="interface version(s) to compute coverage for", default=())
arguments = parser.parse_args()
for interface_version in arguments.interface_versions:
process_interface_version(interface_version)
if __name__ == '__main__':
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment