Loading python_ags4/AGS4.py +10 −56 Original line number Diff line number Diff line Loading @@ -64,8 +64,6 @@ def AGS4_to_dict(filepath_or_buffer, encoding='utf-8', get_line_numbers=False, r function. """ from rich import print as rprint if _is_file_like(filepath_or_buffer): f = filepath_or_buffer f.seek(0) Loading Loading @@ -105,7 +103,6 @@ def AGS4_to_dict(filepath_or_buffer, encoding='utf-8', get_line_numbers=False, r msg = f"{group} group duplicated in Line {i}. Cannot parse file without overwriting data, "\ "therefore please combine all duplicate groups first." rprint(f"[red] ERROR: {msg}[/red]") logger.error(msg) raise AGS4Error(msg) Loading @@ -126,7 +123,6 @@ def AGS4_to_dict(filepath_or_buffer, encoding='utf-8', get_line_numbers=False, r if rename_duplicate_headers is False: raise AGS4Error(f"HEADER row in {group} (Line {i}) has duplicate entries") rprint(f"[yellow] WARNING: HEADER row in [bold]{group}[/bold] (Line {i}) has duplicate entries.[/yellow]") logger.warning(f"HEADER row in {group} (Line {i}) has duplicate entries.") # Rename duplicate headers by appending a number Loading @@ -142,9 +138,6 @@ def AGS4_to_dict(filepath_or_buffer, encoding='utf-8', get_line_numbers=False, r temp[i] = temp[i]+'_'+str(item_count[item]['count']) rprint(f'[blue] INFO: Duplicate column {item} found and renamed as {item}_{count}.[/blue]') rprint('[blue] Automatically renamed columns do not conform to AGS4 Rules 19a and 19b.[/blue]') rprint('[blue] Therefore, please review the data and rename or drop duplicate columns as appropriate.[/blue]') logger.info(f'Duplicate column {item} found and renamed as {item}_{count}. ' 'Automatically renamed columns do not conform to AGS4 Rules 19a and 19b. ' 'Therefore, please review the data and rename or drop duplicate columns as appropriate.') Loading @@ -170,7 +163,7 @@ def AGS4_to_dict(filepath_or_buffer, encoding='utf-8', get_line_numbers=False, r # Check whether line has the same number of entries as the # number of headings in the group. If not, print error and exit. if len(temp) != len(headings[group]): rprint(f"[red] Error: Line {i} does not have the same number of entries as the HEADING row in [bold]{group}[/bold].[/red]") logger.error(f"Line {i} does not have the same number of entries as the HEADING row in {group}.") raise AGS4Error(f"Line {i} does not have the same number of entries as the HEADING row in {group}.") for i in range(0, len(temp)): Loading Loading @@ -282,7 +275,6 @@ def AGS4_to_excel(input_file, output_file, encoding='utf-8', rename_duplicate_he """ from pandas import ExcelWriter from rich import print as rprint from openpyxl.utils import get_column_letter # Extract AGS4 file into a dictionary of dictionaries Loading @@ -295,7 +287,6 @@ def AGS4_to_excel(input_file, output_file, encoding='utf-8', rename_duplicate_he 'alphabetical': 'alphabetically', 'hierarchical': 'according to the hierarchy defined in the dictionary'} msg = f'WARNING: Worksheets in Excel file will be sorted {sorting_desc[sorting_strategy]}. The original group order will be lost.' rprint(f"[yellow]{msg}[/yellow]") logger.warning(f"{msg}") list_of_tables = sort_groups(tables, sorting_strategy=sorting_strategy) Loading @@ -305,20 +296,20 @@ def AGS4_to_excel(input_file, output_file, encoding='utf-8', rename_duplicate_he # Exit if there is no AGS4 tables in the input file if len(list_of_tables) == 0: rprint('[red] ERROR: No valid AGS4 data found in input file.[/red]') logger.error('No valid AGS4 data found in input file.') raise AGS4Error('No valid AGS4 data found in input file.') # Write to Excel file with ExcelWriter(output_file, engine='openpyxl') as writer: for key in list_of_tables: rprint(f'[green]Writing data from... [bold]{key}[/bold][/green]') logger.info(f'Writing data from... {key}') # Check table size and issue warning for large files that could crash the program if 25000 < tables[key].shape[0] < 100000: rprint(f'[blue] INFO: {key} has {tables[key].shape[0]} rows, so it will take about a minute to export.[/blue]') logger.info(f'{key} has {tables[key].shape[0]} rows, so it will take about a minute to export.') elif tables[key].shape[0] > 100000: rprint(f'[yellow] WARNING: {key} has {tables[key].shape[0]} rows, so it may take a few minutes to export.[/yellow]') rprint('[yellow] The program will terminate if it runs out of memory in the process.[/yellow]') logger.warning(f'{key} has {tables[key].shape[0]} rows, so it may take a few minutes to export. ' 'The program will terminate if it runs out of memory in the process.') tables[key].to_excel(writer, sheet_name=key, index=False) Loading Loading @@ -362,8 +353,6 @@ def dataframe_to_AGS4(tables, headings, filepath, mode='w', index=False, encodin None """ from rich import print as rprint # Open file and write/append data with open(filepath, mode, newline='', encoding=encoding) as f: for key in tables: Loading @@ -384,16 +373,10 @@ def dataframe_to_AGS4(tables, headings, filepath, mode='w', index=False, encodin df.loc[mask, :] = df.loc[mask, :].apply(lambda x: x.str.replace('""', '"')) # Write table to file rprint(f'[green]Writing data from... [bold]{key}[/bold][green]') logger.info(f'Writing data from... {key}') f.write('"GROUP"'+","+'"'+key+'"'+'\r\n') if key not in headings: if warnings is True: rprint(f"[yellow] WARNING: Input 'headings' dictionary does not have an entry named [bold]{key}[/bold].[/yellow]") rprint(f"[italic yellow] All columns in the {key} table will be exported in the default order.[/italic yellow]") rprint("[italic yellow] Please check column order and ensure AGS4 Rule 7 is still satisfied.[/italic yellow]") logger.warning(f"Input 'headings' dictionary does not have an entry named {key}. " f"All columns in the {key} table will be exported in the default order. " "Please check column order and ensure AGS4 Rule 7 is still satisfied.") Loading @@ -412,10 +395,6 @@ def dataframe_to_AGS4(tables, headings, filepath, mode='w', index=False, encodin missing_cols = set(headings[key]).difference(set(df.columns)) columns = [x for x in headings[key] if x not in missing_cols] if warnings is True: rprint(f"[yellow] WARNING: Columns {', '.join(missing_cols)} not found in the {key} table" " although they are in the headings dictionary..[/yellow]") logger.warning(f"Columns {', '.join(missing_cols)} not found in the {key} table although they are in the headings dictionary.") df.to_csv(f, index=index, quoting=1, columns=columns, lineterminator='\r\n', encoding=encoding) Loading Loading @@ -450,7 +429,6 @@ def excel_to_AGS4(input_file, output_file, format_numeric_columns=True, dictiona """ from pandas import read_excel from rich import print as rprint # Read data from Excel file in to a dictionary of dataframes tables = read_excel(input_file, sheet_name=None, engine='openpyxl') Loading @@ -464,13 +442,11 @@ def excel_to_AGS4(input_file, output_file, format_numeric_columns=True, dictiona if 'HEADING' in df: valid_tables.append(key) else: rprint(f'[yellow] WARNING: Worksheet [bold]{key}[/bold] dropped as it does not have a HEADING column.[/yellow]') logger.warning(f'Worksheet {key} dropped as it does not have a HEADING column.') continue # List column names that don't conform to Rule 19 (using a negative look-ahead regex) for col_name in df.filter(regex=r'^(?!HEADING|^[A-Z0-9]{4}_[A-Z0-9]{1,4}$)', axis='columns'): rprint(f'[yellow] WARNING: Column [bold]{col_name}[/bold] dropped as name does not conform to AGS4 Rule 19.[/yellow]') logger.warning(f'Column {col_name} dropped as name does not conform to AGS4 Rule 19.') # Drop columns that don't conform to Rule 19 Loading @@ -481,13 +457,11 @@ def excel_to_AGS4(input_file, output_file, format_numeric_columns=True, dictiona # Finally format numeric column if required if format_numeric_columns is True: rprint(f'[green]Formatting columns in... [bold]{key}[/bold][/green]') logger.info(f'Formatting columns in... {key}') tables[key] = convert_to_text(df, dictionary=dictionary) # Export dictionary of DataFrames to AGS4 file if len(valid_tables) == 0: rprint('[red] ERROR: No valid AGS4 data found in input file. Please see warning messages above.[/red]') logger.warning('No valid AGS4 data found in input file. Please see warning messages above.') else: dataframe_to_AGS4({key: tables[key] for key in valid_tables}, {}, output_file, warnings=False) Loading Loading @@ -562,7 +536,6 @@ def convert_to_text(dataframe, dictionary=None): """ from python_ags4 import check from rich import print as rprint # Make copy of dataframe and reset index to make sure numbering # starts from zero Loading @@ -579,8 +552,6 @@ def convert_to_text(dataframe, dictionary=None): df = format_numeric_column(df, col, TYPE) else: rprint("[red] ERROR: Cannot convert to text as UNIT and/or TYPE row(s) are missing.") rprint("[red] Please provide dictonary file or add UNIT & TYPE rows to input file to proceed.[/red]") logger.error('Cannot convert to text as UNIT and/or TYPE row(s) are missing. ' 'Please provide dictonary file or add UNIT & TYPE rows to input file to proceed.') raise AGS4Error("Cannot convert to text as UNIT and/or TYPE row(s) are missing. " Loading Loading @@ -637,7 +608,6 @@ def convert_to_text(dataframe, dictionary=None): df = format_numeric_column(df, col, TYPE) except IndexError: rprint(f"[yellow] WARNING: [bold]{col}[/bold] not found in the dictionary file.[/yellow]") logger.warning(f'{col} not found in the dictionary file.') return df.sort_index().reset_index(drop=True) Loading @@ -661,8 +631,6 @@ def format_numeric_column(dataframe, column_name, TYPE): Dataframe with formatted data. ''' from rich import print as rprint df = dataframe.copy() col = column_name Loading Loading @@ -692,11 +660,9 @@ def format_numeric_column(dataframe, column_name, TYPE): pass except ValueError: rprint(f"[yellow] WARNING: Numeric data in [bold]{col:<9}[/bold] not reformatted as it had one or more non-numeric entries.[/yellow]") logger.warning(f"Numeric data in {col:<9} not reformatted as it had one or more non-numeric entries.") except TypeError: rprint(f"[yellow] WARNING: Numeric data in [bold]{col:<9}[/bold] not reformatted as it had one or more non-numeric entries.[/yellow]") logger.warning(f"Numeric data in {col:<9} not reformatted as it had one or more non-numeric entries.") return df Loading Loading @@ -750,7 +716,6 @@ def check_file(filepath_or_buffer, standard_AGS4_dictionary=None, rename_duplica """ from python_ags4 import check from rich import print as rprint import traceback ags_errors = {} Loading Loading @@ -787,7 +752,7 @@ def check_file(filepath_or_buffer, standard_AGS4_dictionary=None, rename_duplica logger.info('Checking lines...') if print_output: rprint('[green] Checking lines...[/green]') logger.info('Checking lines...') for i, line in enumerate(f, start=1): Loading Loading @@ -835,16 +800,12 @@ def check_file(filepath_or_buffer, standard_AGS4_dictionary=None, rename_duplica # Import data into Pandas dataframes to run group checks logger.info('Loading tables...') if print_output: rprint('[green] Loading tables...[/green]') f.seek(0) tables, headings, line_numbers = AGS4_to_dataframe(f, get_line_numbers=True, rename_duplicate_headers=rename_duplicate_headers) # Group Checks logger.info('Checking headings and groups...') if print_output: rprint('[green] Checking headings and groups...[/green]') ags_errors = check.rule_2(tables, headings, line_numbers, ags_errors=ags_errors) ags_errors = check.rule_2b(tables, headings, line_numbers, ags_errors=ags_errors) Loading Loading @@ -878,8 +839,6 @@ def check_file(filepath_or_buffer, standard_AGS4_dictionary=None, rename_duplica dictionary = check.combine_DICT_tables(tables_std_dict, tables) logger.info('Checking file schema...') if print_output: rprint('[green] Checking file schema...[/green]') ags_errors = check.rule_7_2(headings, dictionary, line_numbers, ags_errors=ags_errors) ags_errors = check.rule_9(headings, dictionary, line_numbers, ags_errors=ags_errors) Loading Loading @@ -928,8 +887,6 @@ def check_file(filepath_or_buffer, standard_AGS4_dictionary=None, rename_duplica except Exception as err: logger.exception(err) if print_output: rprint(f'[red]\n{traceback.format_exc()}[/red]') ags_errors = check.add_error_msg(ags_errors, 'General', '-', '', 'Could not complete validation. Please fix listed errors and try again.') Loading Loading @@ -966,7 +923,6 @@ def write_error_report(ags_errors, output_file, show_warnings=False, show_fyi=Fa None ''' from rich import print as rprint import textwrap error_count, warnings_count, fyi_count = count_errors(ags_errors) Loading Loading @@ -1045,11 +1001,11 @@ def write_error_report(ags_errors, output_file, show_warnings=False, show_fyi=Fa f.write(f''' Line {entry['line']:<8} {entry['group'].strip('"'):<7} {entry['desc']}\r\n''') f.write('\r\n') rprint(f'\n[yellow]Error report saved in {output_file}[/yellow]\n') logger.info(f'Error report saved in {output_file}') except FileNotFoundError: rprint('[red]\nERROR: Invalid output file path. Error report could not be saved.[/red]') rprint('[red] Please ensure that the specified directory exists.[/red]') logger.error('Invalid output file path. Error report could not be saved. ' 'Please ensure that the specified directory exists.') except TypeError: # Nothing to do if output_file is None Loading Loading @@ -1104,7 +1060,6 @@ def sort_groups(tables, sorting_strategy='dictionary'): """ from .check import pick_standard_dictionary, combine_DICT_tables from rich import print as rprint # Combine standard dictionary with DICT table in input file to create an extended dictionary # This extended dictionary is used to check the table order Loading Loading @@ -1146,7 +1101,6 @@ def sort_groups(tables, sorting_strategy='dictionary'): for item in sorted(set(tables.keys()).difference(set(sorted_tables.keys()))): msg = f'WARNING:Table {item} appended to the end as it was either not found in the dictionary '\ 'or its parent group is not defined under DICT_PGRP.' rprint(f"[yellow]{msg}[/yellow]") logger.warning(f"{msg}") sorted_tables[item] = tables[item] Loading python_ags4/check.py +0 −10 Original line number Diff line number Diff line Loading @@ -96,7 +96,6 @@ def combine_DICT_tables(*ags_tables): from pandas import DataFrame, concat from .AGS4 import AGS4Error from rich import print as rprint # Initialize DataFrame to hold all dictionary entries master_DICT = DataFrame() Loading @@ -108,15 +107,12 @@ def combine_DICT_tables(*ags_tables): except KeyError: # KeyError if there is no DICT table in an input file rprint('[yellow] WARNING: DICT group not found in input file.[/yellow]') logger.warning('DICT group not found in input file.') # Check whether master_DICT is empty if master_DICT.shape[0] == 0: msg = 'No DICT groups available to proceed with checking. '\ 'Please ensure the input file has a DICT group or provide file with standard AGS4 dictionary.' rprint(f'[red] ERROR: {msg}[/red]') logger.error(msg) raise AGS4Error(msg) Loading Loading @@ -198,7 +194,6 @@ def pick_standard_dictionary(tables=None, dict_version=None): """ from pathlib import Path from rich import print as rprint # Select standard dictionary based on TRAN_AGS try: Loading @@ -210,27 +205,22 @@ def pick_standard_dictionary(tables=None, dict_version=None): path_to_standard_dictionary = Path(__file__).parent / STANDARD_DICT_FILES[dict_version] else: rprint('[yellow] WARNING: Standard dictionary for AGS4 version specified in TRAN_AGS not available.[/yellow]') rprint(f'[yellow] Defaulting to standard dictionary v{LATEST_DICT_VERSION}.[/yellow]') logger.warning('Standard dictionary for AGS4 version specified in TRAN_AGS not available. ' f'Defaulting to standard dictionary v{LATEST_DICT_VERSION}.') path_to_standard_dictionary = Path(__file__).parent / STANDARD_DICT_FILES[LATEST_DICT_VERSION] except KeyError: # TRAN table not in file rprint(f'[yellow] WARNING: TRAN_AGS not found. Defaulting to standard dictionary v{LATEST_DICT_VERSION}.[/yellow]') logger.warning(f'TRAN_AGS not found. Defaulting to standard dictionary v{LATEST_DICT_VERSION}.') path_to_standard_dictionary = Path(__file__).parent / STANDARD_DICT_FILES[LATEST_DICT_VERSION] except IndexError: # No DATA rows in TRAN table rprint(f'[yellow] WARNING: TRAN_AGS not found. Defaulting to standard dictionary v{LATEST_DICT_VERSION}.[/yellow]') logger.warning(f'TRAN_AGS not found. Defaulting to standard dictionary v{LATEST_DICT_VERSION}.') path_to_standard_dictionary = Path(__file__).parent / STANDARD_DICT_FILES[LATEST_DICT_VERSION] except TypeError: # TRAN table not found and dict_version not valid rprint(f'[yellow] WARNING: Neither TRAN_AGS nor dict_version is valid. Defaulting to standard dictionary v{LATEST_DICT_VERSION}.[/yellow]') logger.warning(f'TRAN_AGS not found. Defaulting to standard dictionary v{LATEST_DICT_VERSION}.') path_to_standard_dictionary = Path(__file__).parent / STANDARD_DICT_FILES[LATEST_DICT_VERSION] Loading Loading
python_ags4/AGS4.py +10 −56 Original line number Diff line number Diff line Loading @@ -64,8 +64,6 @@ def AGS4_to_dict(filepath_or_buffer, encoding='utf-8', get_line_numbers=False, r function. """ from rich import print as rprint if _is_file_like(filepath_or_buffer): f = filepath_or_buffer f.seek(0) Loading Loading @@ -105,7 +103,6 @@ def AGS4_to_dict(filepath_or_buffer, encoding='utf-8', get_line_numbers=False, r msg = f"{group} group duplicated in Line {i}. Cannot parse file without overwriting data, "\ "therefore please combine all duplicate groups first." rprint(f"[red] ERROR: {msg}[/red]") logger.error(msg) raise AGS4Error(msg) Loading @@ -126,7 +123,6 @@ def AGS4_to_dict(filepath_or_buffer, encoding='utf-8', get_line_numbers=False, r if rename_duplicate_headers is False: raise AGS4Error(f"HEADER row in {group} (Line {i}) has duplicate entries") rprint(f"[yellow] WARNING: HEADER row in [bold]{group}[/bold] (Line {i}) has duplicate entries.[/yellow]") logger.warning(f"HEADER row in {group} (Line {i}) has duplicate entries.") # Rename duplicate headers by appending a number Loading @@ -142,9 +138,6 @@ def AGS4_to_dict(filepath_or_buffer, encoding='utf-8', get_line_numbers=False, r temp[i] = temp[i]+'_'+str(item_count[item]['count']) rprint(f'[blue] INFO: Duplicate column {item} found and renamed as {item}_{count}.[/blue]') rprint('[blue] Automatically renamed columns do not conform to AGS4 Rules 19a and 19b.[/blue]') rprint('[blue] Therefore, please review the data and rename or drop duplicate columns as appropriate.[/blue]') logger.info(f'Duplicate column {item} found and renamed as {item}_{count}. ' 'Automatically renamed columns do not conform to AGS4 Rules 19a and 19b. ' 'Therefore, please review the data and rename or drop duplicate columns as appropriate.') Loading @@ -170,7 +163,7 @@ def AGS4_to_dict(filepath_or_buffer, encoding='utf-8', get_line_numbers=False, r # Check whether line has the same number of entries as the # number of headings in the group. If not, print error and exit. if len(temp) != len(headings[group]): rprint(f"[red] Error: Line {i} does not have the same number of entries as the HEADING row in [bold]{group}[/bold].[/red]") logger.error(f"Line {i} does not have the same number of entries as the HEADING row in {group}.") raise AGS4Error(f"Line {i} does not have the same number of entries as the HEADING row in {group}.") for i in range(0, len(temp)): Loading Loading @@ -282,7 +275,6 @@ def AGS4_to_excel(input_file, output_file, encoding='utf-8', rename_duplicate_he """ from pandas import ExcelWriter from rich import print as rprint from openpyxl.utils import get_column_letter # Extract AGS4 file into a dictionary of dictionaries Loading @@ -295,7 +287,6 @@ def AGS4_to_excel(input_file, output_file, encoding='utf-8', rename_duplicate_he 'alphabetical': 'alphabetically', 'hierarchical': 'according to the hierarchy defined in the dictionary'} msg = f'WARNING: Worksheets in Excel file will be sorted {sorting_desc[sorting_strategy]}. The original group order will be lost.' rprint(f"[yellow]{msg}[/yellow]") logger.warning(f"{msg}") list_of_tables = sort_groups(tables, sorting_strategy=sorting_strategy) Loading @@ -305,20 +296,20 @@ def AGS4_to_excel(input_file, output_file, encoding='utf-8', rename_duplicate_he # Exit if there is no AGS4 tables in the input file if len(list_of_tables) == 0: rprint('[red] ERROR: No valid AGS4 data found in input file.[/red]') logger.error('No valid AGS4 data found in input file.') raise AGS4Error('No valid AGS4 data found in input file.') # Write to Excel file with ExcelWriter(output_file, engine='openpyxl') as writer: for key in list_of_tables: rprint(f'[green]Writing data from... [bold]{key}[/bold][/green]') logger.info(f'Writing data from... {key}') # Check table size and issue warning for large files that could crash the program if 25000 < tables[key].shape[0] < 100000: rprint(f'[blue] INFO: {key} has {tables[key].shape[0]} rows, so it will take about a minute to export.[/blue]') logger.info(f'{key} has {tables[key].shape[0]} rows, so it will take about a minute to export.') elif tables[key].shape[0] > 100000: rprint(f'[yellow] WARNING: {key} has {tables[key].shape[0]} rows, so it may take a few minutes to export.[/yellow]') rprint('[yellow] The program will terminate if it runs out of memory in the process.[/yellow]') logger.warning(f'{key} has {tables[key].shape[0]} rows, so it may take a few minutes to export. ' 'The program will terminate if it runs out of memory in the process.') tables[key].to_excel(writer, sheet_name=key, index=False) Loading Loading @@ -362,8 +353,6 @@ def dataframe_to_AGS4(tables, headings, filepath, mode='w', index=False, encodin None """ from rich import print as rprint # Open file and write/append data with open(filepath, mode, newline='', encoding=encoding) as f: for key in tables: Loading @@ -384,16 +373,10 @@ def dataframe_to_AGS4(tables, headings, filepath, mode='w', index=False, encodin df.loc[mask, :] = df.loc[mask, :].apply(lambda x: x.str.replace('""', '"')) # Write table to file rprint(f'[green]Writing data from... [bold]{key}[/bold][green]') logger.info(f'Writing data from... {key}') f.write('"GROUP"'+","+'"'+key+'"'+'\r\n') if key not in headings: if warnings is True: rprint(f"[yellow] WARNING: Input 'headings' dictionary does not have an entry named [bold]{key}[/bold].[/yellow]") rprint(f"[italic yellow] All columns in the {key} table will be exported in the default order.[/italic yellow]") rprint("[italic yellow] Please check column order and ensure AGS4 Rule 7 is still satisfied.[/italic yellow]") logger.warning(f"Input 'headings' dictionary does not have an entry named {key}. " f"All columns in the {key} table will be exported in the default order. " "Please check column order and ensure AGS4 Rule 7 is still satisfied.") Loading @@ -412,10 +395,6 @@ def dataframe_to_AGS4(tables, headings, filepath, mode='w', index=False, encodin missing_cols = set(headings[key]).difference(set(df.columns)) columns = [x for x in headings[key] if x not in missing_cols] if warnings is True: rprint(f"[yellow] WARNING: Columns {', '.join(missing_cols)} not found in the {key} table" " although they are in the headings dictionary..[/yellow]") logger.warning(f"Columns {', '.join(missing_cols)} not found in the {key} table although they are in the headings dictionary.") df.to_csv(f, index=index, quoting=1, columns=columns, lineterminator='\r\n', encoding=encoding) Loading Loading @@ -450,7 +429,6 @@ def excel_to_AGS4(input_file, output_file, format_numeric_columns=True, dictiona """ from pandas import read_excel from rich import print as rprint # Read data from Excel file in to a dictionary of dataframes tables = read_excel(input_file, sheet_name=None, engine='openpyxl') Loading @@ -464,13 +442,11 @@ def excel_to_AGS4(input_file, output_file, format_numeric_columns=True, dictiona if 'HEADING' in df: valid_tables.append(key) else: rprint(f'[yellow] WARNING: Worksheet [bold]{key}[/bold] dropped as it does not have a HEADING column.[/yellow]') logger.warning(f'Worksheet {key} dropped as it does not have a HEADING column.') continue # List column names that don't conform to Rule 19 (using a negative look-ahead regex) for col_name in df.filter(regex=r'^(?!HEADING|^[A-Z0-9]{4}_[A-Z0-9]{1,4}$)', axis='columns'): rprint(f'[yellow] WARNING: Column [bold]{col_name}[/bold] dropped as name does not conform to AGS4 Rule 19.[/yellow]') logger.warning(f'Column {col_name} dropped as name does not conform to AGS4 Rule 19.') # Drop columns that don't conform to Rule 19 Loading @@ -481,13 +457,11 @@ def excel_to_AGS4(input_file, output_file, format_numeric_columns=True, dictiona # Finally format numeric column if required if format_numeric_columns is True: rprint(f'[green]Formatting columns in... [bold]{key}[/bold][/green]') logger.info(f'Formatting columns in... {key}') tables[key] = convert_to_text(df, dictionary=dictionary) # Export dictionary of DataFrames to AGS4 file if len(valid_tables) == 0: rprint('[red] ERROR: No valid AGS4 data found in input file. Please see warning messages above.[/red]') logger.warning('No valid AGS4 data found in input file. Please see warning messages above.') else: dataframe_to_AGS4({key: tables[key] for key in valid_tables}, {}, output_file, warnings=False) Loading Loading @@ -562,7 +536,6 @@ def convert_to_text(dataframe, dictionary=None): """ from python_ags4 import check from rich import print as rprint # Make copy of dataframe and reset index to make sure numbering # starts from zero Loading @@ -579,8 +552,6 @@ def convert_to_text(dataframe, dictionary=None): df = format_numeric_column(df, col, TYPE) else: rprint("[red] ERROR: Cannot convert to text as UNIT and/or TYPE row(s) are missing.") rprint("[red] Please provide dictonary file or add UNIT & TYPE rows to input file to proceed.[/red]") logger.error('Cannot convert to text as UNIT and/or TYPE row(s) are missing. ' 'Please provide dictonary file or add UNIT & TYPE rows to input file to proceed.') raise AGS4Error("Cannot convert to text as UNIT and/or TYPE row(s) are missing. " Loading Loading @@ -637,7 +608,6 @@ def convert_to_text(dataframe, dictionary=None): df = format_numeric_column(df, col, TYPE) except IndexError: rprint(f"[yellow] WARNING: [bold]{col}[/bold] not found in the dictionary file.[/yellow]") logger.warning(f'{col} not found in the dictionary file.') return df.sort_index().reset_index(drop=True) Loading @@ -661,8 +631,6 @@ def format_numeric_column(dataframe, column_name, TYPE): Dataframe with formatted data. ''' from rich import print as rprint df = dataframe.copy() col = column_name Loading Loading @@ -692,11 +660,9 @@ def format_numeric_column(dataframe, column_name, TYPE): pass except ValueError: rprint(f"[yellow] WARNING: Numeric data in [bold]{col:<9}[/bold] not reformatted as it had one or more non-numeric entries.[/yellow]") logger.warning(f"Numeric data in {col:<9} not reformatted as it had one or more non-numeric entries.") except TypeError: rprint(f"[yellow] WARNING: Numeric data in [bold]{col:<9}[/bold] not reformatted as it had one or more non-numeric entries.[/yellow]") logger.warning(f"Numeric data in {col:<9} not reformatted as it had one or more non-numeric entries.") return df Loading Loading @@ -750,7 +716,6 @@ def check_file(filepath_or_buffer, standard_AGS4_dictionary=None, rename_duplica """ from python_ags4 import check from rich import print as rprint import traceback ags_errors = {} Loading Loading @@ -787,7 +752,7 @@ def check_file(filepath_or_buffer, standard_AGS4_dictionary=None, rename_duplica logger.info('Checking lines...') if print_output: rprint('[green] Checking lines...[/green]') logger.info('Checking lines...') for i, line in enumerate(f, start=1): Loading Loading @@ -835,16 +800,12 @@ def check_file(filepath_or_buffer, standard_AGS4_dictionary=None, rename_duplica # Import data into Pandas dataframes to run group checks logger.info('Loading tables...') if print_output: rprint('[green] Loading tables...[/green]') f.seek(0) tables, headings, line_numbers = AGS4_to_dataframe(f, get_line_numbers=True, rename_duplicate_headers=rename_duplicate_headers) # Group Checks logger.info('Checking headings and groups...') if print_output: rprint('[green] Checking headings and groups...[/green]') ags_errors = check.rule_2(tables, headings, line_numbers, ags_errors=ags_errors) ags_errors = check.rule_2b(tables, headings, line_numbers, ags_errors=ags_errors) Loading Loading @@ -878,8 +839,6 @@ def check_file(filepath_or_buffer, standard_AGS4_dictionary=None, rename_duplica dictionary = check.combine_DICT_tables(tables_std_dict, tables) logger.info('Checking file schema...') if print_output: rprint('[green] Checking file schema...[/green]') ags_errors = check.rule_7_2(headings, dictionary, line_numbers, ags_errors=ags_errors) ags_errors = check.rule_9(headings, dictionary, line_numbers, ags_errors=ags_errors) Loading Loading @@ -928,8 +887,6 @@ def check_file(filepath_or_buffer, standard_AGS4_dictionary=None, rename_duplica except Exception as err: logger.exception(err) if print_output: rprint(f'[red]\n{traceback.format_exc()}[/red]') ags_errors = check.add_error_msg(ags_errors, 'General', '-', '', 'Could not complete validation. Please fix listed errors and try again.') Loading Loading @@ -966,7 +923,6 @@ def write_error_report(ags_errors, output_file, show_warnings=False, show_fyi=Fa None ''' from rich import print as rprint import textwrap error_count, warnings_count, fyi_count = count_errors(ags_errors) Loading Loading @@ -1045,11 +1001,11 @@ def write_error_report(ags_errors, output_file, show_warnings=False, show_fyi=Fa f.write(f''' Line {entry['line']:<8} {entry['group'].strip('"'):<7} {entry['desc']}\r\n''') f.write('\r\n') rprint(f'\n[yellow]Error report saved in {output_file}[/yellow]\n') logger.info(f'Error report saved in {output_file}') except FileNotFoundError: rprint('[red]\nERROR: Invalid output file path. Error report could not be saved.[/red]') rprint('[red] Please ensure that the specified directory exists.[/red]') logger.error('Invalid output file path. Error report could not be saved. ' 'Please ensure that the specified directory exists.') except TypeError: # Nothing to do if output_file is None Loading Loading @@ -1104,7 +1060,6 @@ def sort_groups(tables, sorting_strategy='dictionary'): """ from .check import pick_standard_dictionary, combine_DICT_tables from rich import print as rprint # Combine standard dictionary with DICT table in input file to create an extended dictionary # This extended dictionary is used to check the table order Loading Loading @@ -1146,7 +1101,6 @@ def sort_groups(tables, sorting_strategy='dictionary'): for item in sorted(set(tables.keys()).difference(set(sorted_tables.keys()))): msg = f'WARNING:Table {item} appended to the end as it was either not found in the dictionary '\ 'or its parent group is not defined under DICT_PGRP.' rprint(f"[yellow]{msg}[/yellow]") logger.warning(f"{msg}") sorted_tables[item] = tables[item] Loading
python_ags4/check.py +0 −10 Original line number Diff line number Diff line Loading @@ -96,7 +96,6 @@ def combine_DICT_tables(*ags_tables): from pandas import DataFrame, concat from .AGS4 import AGS4Error from rich import print as rprint # Initialize DataFrame to hold all dictionary entries master_DICT = DataFrame() Loading @@ -108,15 +107,12 @@ def combine_DICT_tables(*ags_tables): except KeyError: # KeyError if there is no DICT table in an input file rprint('[yellow] WARNING: DICT group not found in input file.[/yellow]') logger.warning('DICT group not found in input file.') # Check whether master_DICT is empty if master_DICT.shape[0] == 0: msg = 'No DICT groups available to proceed with checking. '\ 'Please ensure the input file has a DICT group or provide file with standard AGS4 dictionary.' rprint(f'[red] ERROR: {msg}[/red]') logger.error(msg) raise AGS4Error(msg) Loading Loading @@ -198,7 +194,6 @@ def pick_standard_dictionary(tables=None, dict_version=None): """ from pathlib import Path from rich import print as rprint # Select standard dictionary based on TRAN_AGS try: Loading @@ -210,27 +205,22 @@ def pick_standard_dictionary(tables=None, dict_version=None): path_to_standard_dictionary = Path(__file__).parent / STANDARD_DICT_FILES[dict_version] else: rprint('[yellow] WARNING: Standard dictionary for AGS4 version specified in TRAN_AGS not available.[/yellow]') rprint(f'[yellow] Defaulting to standard dictionary v{LATEST_DICT_VERSION}.[/yellow]') logger.warning('Standard dictionary for AGS4 version specified in TRAN_AGS not available. ' f'Defaulting to standard dictionary v{LATEST_DICT_VERSION}.') path_to_standard_dictionary = Path(__file__).parent / STANDARD_DICT_FILES[LATEST_DICT_VERSION] except KeyError: # TRAN table not in file rprint(f'[yellow] WARNING: TRAN_AGS not found. Defaulting to standard dictionary v{LATEST_DICT_VERSION}.[/yellow]') logger.warning(f'TRAN_AGS not found. Defaulting to standard dictionary v{LATEST_DICT_VERSION}.') path_to_standard_dictionary = Path(__file__).parent / STANDARD_DICT_FILES[LATEST_DICT_VERSION] except IndexError: # No DATA rows in TRAN table rprint(f'[yellow] WARNING: TRAN_AGS not found. Defaulting to standard dictionary v{LATEST_DICT_VERSION}.[/yellow]') logger.warning(f'TRAN_AGS not found. Defaulting to standard dictionary v{LATEST_DICT_VERSION}.') path_to_standard_dictionary = Path(__file__).parent / STANDARD_DICT_FILES[LATEST_DICT_VERSION] except TypeError: # TRAN table not found and dict_version not valid rprint(f'[yellow] WARNING: Neither TRAN_AGS nor dict_version is valid. Defaulting to standard dictionary v{LATEST_DICT_VERSION}.[/yellow]') logger.warning(f'TRAN_AGS not found. Defaulting to standard dictionary v{LATEST_DICT_VERSION}.') path_to_standard_dictionary = Path(__file__).parent / STANDARD_DICT_FILES[LATEST_DICT_VERSION] Loading