Commit f9083b3e authored by David Hendriks's avatar David Hendriks
Browse files

polishing the new processes and error handling

parent a8ef8e1f
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -929,6 +929,10 @@ def merge_dicts(dict_1, dict_2):
            elif isinstance(dict_1[key], dict) and isinstance(dict_2[key], dict):
                new_dict[key] = merge_dicts(dict_1[key], dict_2[key])

            # Booleans (has to be the type Bool, not just a 0 or 1)
            elif isinstance(dict_1[key], bool) and isinstance(dict_2[key], bool):
                new_dict[key] = dict_1[key] * dict_2[key]

            else:
                print(
                    "Object types {},{} not supported".format(
+87 −99
Original line number Diff line number Diff line
@@ -13,6 +13,7 @@ import copy
import json
import datetime
import time
import uuid
import logging
import argparse
import subprocess
@@ -20,7 +21,6 @@ import importlib.util

from pathos.helpers import mp as pathos_multiprocess

# from pathos.multiprocessing import ProcessingPool as Pool
from pathos.pools import _ProcessPool as Pool

from binarycpython.utils.grid_options_defaults import grid_options_defaults_dict
@@ -99,6 +99,9 @@ class Population:
        # Set some memory dicts
        self.persistent_data_memory_dict = {}

        # 
        self.process_ID = 0

    ###################################################
    # Argument functions
    ###################################################
@@ -637,6 +640,9 @@ class Population:
        TODO: include options for different ways of generating a population here.
        """

        # TODO: set a unique population_name here
        self.grid_options["unique_population_name"] = uuid.uuid4().hex

        ##
        # Prepare code/initialise grid.
        # set custom logging, set up store_memaddr, build grid code. dry run grid code.
@@ -661,8 +667,25 @@ class Population:
                )
            )

        # print("During this run {} systems failed with a total probability of {}".format(self.grid_options['failed_count'], self.grid_options['failed_prob']))
        # print("The full argline command strings are stored in {}".format(os.path.join(self.grid_options['tmp_dir'], 'failed_systems.txt')))
        verbose_print(
            "Population-{} finished!".format(self.grid_options['unique_population_name']), 
            self.grid_options["verbosity"], 
            0
        )

        if self.grid_options['errors_found']:
            # Some information afterwards
            verbose_print(
                "During the run {} failed systems were found, with a total probability of {} and with the following unique error codes: {} ".format(self.grid_options['failed_count'], self.grid_options['failed_prob'], self.grid_options['failed_systems_error_codes']), 
                self.grid_options["verbosity"], 
                0
            )
            # Some information afterwards
            verbose_print(
                "The full argline commands for {} these systems have been written to {}".format("ALL" if not self.grid_options['error_exceeded'] else "SOME (only the first ones, as there were too many to log all of them)", os.path.join(self.grid_options['tmp_dir'], 'failed_systemsX.txt')),
                self.grid_options["verbosity"], 
                0
            )

        ##
        # Clean up code: remove files, unset values.
@@ -677,16 +700,10 @@ class Population:
        This function is called by _evolve_population_mp
        """

        # apparently we have to re-load this for every process, otherwise NameErrors arise (seems like a bug but I'm not sure)
        spec = importlib.util.spec_from_file_location(
            "binary_c_python_grid",
            os.path.join(self.grid_options["gridcode_filename"]),
        )
        grid_file = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(grid_file)
        generator = grid_file.grid_code
        self.process_ID = ID # Store the ID as a object property again, lets see if that works. 

        self.grid_options["system_generator"] = generator
        # apparently we have to re-load this for every process, otherwise NameErrors arise (seems like a bug but I'm not sure)
        self._load_grid_function()

        # Set up generator
        generator = self.grid_options["system_generator"](self)
@@ -695,7 +712,7 @@ class Population:
        running = True
        localcounter = 0

        print("Process {} started".format(ID))
        verbose_print("Process {} started".format(ID), self.grid_options["verbosity"], 0)

        # Go over the generator
        while running:
@@ -705,6 +722,7 @@ class Population:

                # Check if the ID is the correct one for this process
                if (localcounter+ID) % self.grid_options["amt_cores"] == 0:

                    # Combine that with the other settings
                    full_system_dict = self.bse_options.copy()
                    full_system_dict.update(system)
@@ -720,7 +738,7 @@ class Population:
                    self._evolve_system_mp(full_system_dict)

            except StopIteration:
                print("Process {}: generator done".format(ID))
                verbose_print("Process {}: generator done".format(ID), self.grid_options["verbosity"], 0)
                running = False

            localcounter += 1
@@ -729,7 +747,9 @@ class Population:
        output_dict = {
            "results": self.grid_options["results"], 
            "total_errors": self.grid_options['failed_count'], 
            "total_probability_errors": self.grid_options['failed_prob']
            "total_probability_errors": self.grid_options['failed_prob'],
            "unique_error_codes": self.grid_options['failed_systems_error_codes'],
            "error_exceeded": self.grid_options['error_exceeded'],
        }

        return output_dict
@@ -765,9 +785,6 @@ class Population:
        # https://www.programcreek.com/python/example/58176/multiprocessing.Value
        # https://stackoverflow.com/questions/17377426/shared-variable-in-pythons-multiprocessing
    

        pathos_multiprocess.freeze_support()  # needed for Windows
    
        # Create the pool
        pool = Pool(processes=self.grid_options["amt_cores"])

@@ -779,53 +796,19 @@ class Population:
        pool.close()
        pool.join()

        # Handle the results
        print(result)

        # Handle the results by merging all the dictionaries. How that merging happens exactly is described in the merge_dicts description.
        combined_output_dict = {}
        for output_dict in result:
            combined_output_dict = merge_dicts(combined_output_dict, output_dict)

        # Put the values back as object properties
        print(combined_output_dict)


        # manager = pathos_multiprocess.Manager()
        # self.grid_options["result_dict"] = manager.dict()
        # prob_counter = manager.Value('i', 0)
        # count_counter = manager.Value('i', 0)
        # counter_lock = manager.Lock()
        # error_exceeded_counter = manager.Value('i', 0)

        # self.grid_options['error_exceeded'] = error_exceeded_counter
        # self.grid_options['failed_prob'] = prob_counter
        # self.grid_options['failed_count'] = count_counter
        # self.custom_options['counter_lock'] = counter_lock

        # # Create pool
        # pool = Pool(processes=self.grid_options["amt_cores"])

        # # Execute
        # # TODO: calculate the chunksize value based on: total starcount and cores used.

        # _ = list(
        #     pool.imap_unordered(
        #         self.evolve_system_mp, self._yield_system_mp(), chunksize=20
        #     )
        # )

        # # Handle clean termination of the whole multiprocessing (making sure there are no zombie
        # # processes (https://en.wikipedia.org/wiki/Zombie_process))
        # pool.close()
        # pool.join()

        # print("Total shit: {}".format(self.grid_options['failed_prob'].value))
        # print("Total amt: {}".format(self.grid_options['failed_count'].value))

        # # if results:
        # #     print(results)
        # #     self.grid_options['error'] = 1
        # #     self.grid_options['failed_count'] += len(results)
        # #     self.grid_options['failed_prob'] += sum(results)
        self.grid_options["results"] = combined_output_dict["results"]
        self.grid_options["failed_count"] = combined_output_dict["total_errors"]
        self.grid_options["failed_prob"] = combined_output_dict["total_probability_errors"]
        self.grid_options["failed_systems_error_codes"] = list(set(combined_output_dict["unique_error_codes"]))        
        self.grid_options["error_exceeded"] = combined_output_dict["error_exceeded"]

    def _evolve_population_lin(self):
        """
@@ -861,8 +844,6 @@ class Population:

        binary_cmdline_string = self._return_argline(full_system_dict)

        # Check whether the amount of errors has been exceeded
        if not self.grid_options['error_exceeded']==1:
        # Get
        out = _binary_c_bindings.run_system(
            argstring=binary_cmdline_string,
@@ -873,15 +854,12 @@ class Population:
            population=1,
        )

            err = self._check_binary_c_error(out, full_system_dict)
        # Check for errors
        _ = self._check_binary_c_error(out, full_system_dict)

            # TODO: fix that this goes good. 
            if err: return err
        # Have some user-defined function do stuff with the data. 
        if self.grid_options["parse_function"]:
            self.grid_options["parse_function"](self, out)
        else:
            # Handle this better
            print("NO MORE SYSTEMS")

    # Single system
    def evolve_single(self, clean_up_custom_logging_files=True):
@@ -1486,6 +1464,8 @@ class Population:

    def _load_grid_function(self):
        """
        Functon that loads the script containing the grid code. 

        TODO: Update this description
        Test function to run grid stuff. mostly to test the import
        """
@@ -2343,40 +2323,48 @@ class Population:
    def _check_binary_c_error(self, binary_c_output, system_dict):
        """
        Function to check whether binary_c throws an error and handle accordingly. 
    
        TODO: build mechanism to stop the run if too many failing systems found
        TODO: Clean up everything here
        """

        if binary_c_output.startswith("SYSTEM_ERROR"):
            print("FAILING SYSTEM FAILING SYSTEM")
            verbose_print(
                "FAILING SYSTEM FOUND",
                self.grid_options["verbosity"],
                0,
            )

            # Keep track of the amount of failed systems and their error codes
            self.grid_options['failed_prob'] += system_dict['probability']
            self.grid_options['failed_count'] += 1 
            self.grid_options['errors_found'] = True

            if self.grid_options['failed_count'] > 20:
                print("failed_count: {}".format(self.grid_options['failed_count']))
                print("stopping logging to file")

                self.grid_options['error_exceeded'] = 1


            # Set values
            self.grid_options['error'] = 1

            # Write arglines to file
            argstring = self._return_argline(system_dict)
            with open(os.path.join(self.grid_options['tmp_dir'], 'failed_systems.txt'), 'a+') as f:
                f.write(argstring+"\n")

            # Try catching the error code and keep track of the unique ones. 
            try:
                error_code = int(binary_c_output.splitlines()[0].split("with error code")[-1].split(":")[0].strip())

                # print(int(error_code))
                if not error_code in self.grid_options['failed_systems_error_codes']:
                    self.grid_options['failed_systems_error_codes'].append(error_code)
            except ValueError:
                print("failed to extract the error-code")
                verbose_print(
                    "Failed to extract the error-code",
                    self.grid_options["verbosity"],
                    1,
                )

            # Check if we have exceeded the amount of errors
            if self.grid_options['failed_count'] > self.grid_options['failed_systems_threshold']:
                if not self.grid_options['error_exceeded']:
                    verbose_print(
                        "Process {} exceeded the maximum ({}) amount of failing systems. Stopped logging them to files now".format(self.process_ID, self.grid_options['failed_systems_threshold']),
                        self.grid_options["verbosity"],
                        1,
                    )
                    self.grid_options['error_exceeded'] = True

            # If not, write the failing systems to files unique to each process
            else:
                # Write arglines to file
                argstring = self._return_argline(system_dict)
                with open(os.path.join(self.grid_options['tmp_dir'], 'failed_systems_process_{}.txt'.format(self.process_ID)), 'a+') as f:
                    f.write(argstring+"\n")

################################################################################################
+5 −3
Original line number Diff line number Diff line
@@ -10,6 +10,7 @@ grid_options_defaults_dict = {
    ##########################
    # general (or unordered..)
    ##########################
    "unique_population_name": 0 # Unique code for the population. Should be set only once by the controller process. 
    "amt_cores": 1,  # total amount of cores used to evolve the population
    "binary": 0,  # FLag on whether the systems are binary systems or single systems.
    "parse_function": None,  # FUnction to parse the output with.
@@ -80,11 +81,12 @@ grid_options_defaults_dict = {
    "results": {}, # dict to store the results. Every process fills this on its own and then it will be joined later
    "start_time_evolution": 0,  # Start time of the grid
    "end_time_evolution": 0,  # end time of the grid
    "error": 0,  # error?
    "error_exceeded": 0, # Flag whether the amt of errors have exceeded the limit
    "errors_found": False,  # Flag whether there are any errors from binary_c
    "error_exceeded": False, # Flag whether the amt of errors have exceeded the limit
    "failed_count": 0,  # amt of failed systems
    "failed_prob": 0,  # Summed probability of failed systems
    "failed_systems_error_codes": [],
    "failed_systems_threshold": 20, # Maximum failed systems per process allowed to fail before the process stops logging the failing systems. 
    "failed_systems_error_codes": [], # List to store the unique error codes
    "id": 0,  # Random id of this grid/population run,
    "modulo": 1,  # run modulo n of the grid.
    ## Grid type evolution