Commit a8ef8e1f authored by David Hendriks's avatar David Hendriks
Browse files

restructured the multiprocessing so that the processes just use a %==0 kind of method

parent 58b1419f
Loading
Loading
Loading
Loading
+146 −57
Original line number Diff line number Diff line
@@ -668,10 +668,89 @@ class Population:
        # Clean up code: remove files, unset values.
        self._cleanup()

    def _process_run_population(self, ID):
        """
        Function that loops over the whole generator, but only runs systems that fit to: if (localcounter+ID) % self.grid_options["amt_cores"] == 0
        
        That way with 4 processes, process 1 runs sytem 0, 4, 8... process 2 runs system 1, 5, 9..., etc 

        This function is called by _evolve_population_mp
        """

        # apparently we have to re-load this for every process, otherwise NameErrors arise (seems like a bug but I'm not sure)
        spec = importlib.util.spec_from_file_location(
            "binary_c_python_grid",
            os.path.join(self.grid_options["gridcode_filename"]),
        )
        grid_file = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(grid_file)
        generator = grid_file.grid_code

        self.grid_options["system_generator"] = generator

        # Set up generator
        generator = self.grid_options["system_generator"](self)

        # Set up local variables
        running = True
        localcounter = 0

        print("Process {} started".format(ID))

        # Go over the generator
        while running:
            try:
                # Get the system
                system = next(generator)

                # Check if the ID is the correct one for this process
                if (localcounter+ID) % self.grid_options["amt_cores"] == 0:
                    # Combine that with the other settings
                    full_system_dict = self.bse_options.copy()
                    full_system_dict.update(system)

                    # self._print_info(
                    #     i + 1, self.grid_options["total_starcount"], full_system_dict
                    # )

                    # 
                    print("Process {} is handling system {}".format(ID, localcounter))

                    # Evolve the system
                    self._evolve_system_mp(full_system_dict)

            except StopIteration:
                print("Process {}: generator done".format(ID))
                running = False

            localcounter += 1

        # Return a set of results and errors
        output_dict = {
            "results": self.grid_options["results"], 
            "total_errors": self.grid_options['failed_count'], 
            "total_probability_errors": self.grid_options['failed_prob']
        }

        return output_dict

    def _evolve_population_mp(self):
        """
        Function to evolve the population with multiprocessing approach.
        Using pathos to be able to include class-owned functions.

        This function will create a pool with <self.grid_options["amt_cores"]> processes, and perform an imap_unordered to run the different `threads`. 
        Before this was done by giving a generator as the iterable, and have the processes get a certain chunksize each round. 
        Later on this seemed to be a bad decision, because it is difficult to pass information back to the main controller, and because with each new batch of systems a new object instance was created. 

        What I do now is I spawn these X amount of processes, and pass a range(self.grid_options["amt_cores"]) as iterable. 
        In that way, only once do they fetch a `job`, but that job is just a ID number. 
        With this ID number each thread/process loops over the whole generator, 
        but only runs the one <ID>'th system (if (localcounter+ID) % self.grid_options["amt_cores"]==0)' 

        When they are finished, these jobs are instructed to return a set of information (the result dict, TODO: describe what more)

        These resultation dictionaries are then merged and stored as object properties again. 
        """

        # TODO: make further use of a queue to handle jobs or at least
@@ -685,43 +764,68 @@ class Population:
        # TODO: make good example of how to deal with a result_dict
        # https://www.programcreek.com/python/example/58176/multiprocessing.Value
        # https://stackoverflow.com/questions/17377426/shared-variable-in-pythons-multiprocessing
        manager = pathos_multiprocess.Manager()
        self.grid_options["result_dict"] = manager.dict()
        prob_counter = manager.Value('i', 0)
        count_counter = manager.Value('i', 0)
        counter_lock = manager.Lock()
        error_exceeded_counter = manager.Value('i', 0)

        self.grid_options['error_exceeded'] = error_exceeded_counter
        self.grid_options['failed_prob'] = prob_counter
        self.grid_options['failed_count'] = count_counter
        self.custom_options['counter_lock'] = counter_lock

        # Create pool
        pool = Pool(processes=self.grid_options["amt_cores"])

        # Execute
        # TODO: calculate the chunksize value based on: total starcount and cores used.

        _ = list(
            pool.imap_unordered(
                self.evolve_system_mp, self._yield_system_mp(), chunksize=20
            )
        )
        pathos_multiprocess.freeze_support()  # needed for Windows
    
        # Create the pool
        pool = Pool(processes=self.grid_options["amt_cores"])

        # start the processes by giving them an ID value
        result = list(pool.imap_unordered(self._process_run_population, range(self.grid_options["amt_cores"])))

        # Handle clean termination of the whole multiprocessing (making sure there are no zombie
        # processes (https://en.wikipedia.org/wiki/Zombie_process))
        pool.close()
        pool.join()

        print("Total shit: {}".format(self.grid_options['failed_prob'].value))
        print("Total amt: {}".format(self.grid_options['failed_count'].value))
        # Handle the results
        print(result)

        combined_output_dict = {}
        for output_dict in result:
            combined_output_dict = merge_dicts(combined_output_dict, output_dict)

        print(combined_output_dict)


        # manager = pathos_multiprocess.Manager()
        # self.grid_options["result_dict"] = manager.dict()
        # prob_counter = manager.Value('i', 0)
        # count_counter = manager.Value('i', 0)
        # counter_lock = manager.Lock()
        # error_exceeded_counter = manager.Value('i', 0)

        # self.grid_options['error_exceeded'] = error_exceeded_counter
        # self.grid_options['failed_prob'] = prob_counter
        # self.grid_options['failed_count'] = count_counter
        # self.custom_options['counter_lock'] = counter_lock

        # # Create pool
        # pool = Pool(processes=self.grid_options["amt_cores"])

        # # Execute
        # # TODO: calculate the chunksize value based on: total starcount and cores used.

        # _ = list(
        #     pool.imap_unordered(
        #         self.evolve_system_mp, self._yield_system_mp(), chunksize=20
        #     )
        # )

        # # Handle clean termination of the whole multiprocessing (making sure there are no zombie
        # # processes (https://en.wikipedia.org/wiki/Zombie_process))
        # pool.close()
        # pool.join()

        # if results:
        #     print(results)
        #     self.grid_options['error'] = 1
        #     self.grid_options['failed_count'] += len(results)
        #     self.grid_options['failed_prob'] += sum(results)
        # print("Total shit: {}".format(self.grid_options['failed_prob'].value))
        # print("Total amt: {}".format(self.grid_options['failed_count'].value))

        # # if results:
        # #     print(results)
        # #     self.grid_options['error'] = 1
        # #     self.grid_options['failed_count'] += len(results)
        # #     self.grid_options['failed_prob'] += sum(results)

    def _evolve_population_lin(self):
        """
@@ -748,16 +852,18 @@ class Population:
                self.grid_options["parse_function"](self, out)


    def evolve_system_mp(self, full_system_dict):
    def _evolve_system_mp(self, full_system_dict):
        """
        Function that the multiprocessing evolution method calls to evolve a system

        this function is called by _process_run_population
        """

        binary_cmdline_string = self._return_argline(full_system_dict)

        if not self.grid_options['error_exceeded'].value==1:
            print("NO MORE SYSTEMS")

        # Check whether the amount of errors has been exceeded
        if not self.grid_options['error_exceeded']==1:
            # Get
            out = _binary_c_bindings.run_system(
                argstring=binary_cmdline_string,
                custom_logging_func_memaddr=self.grid_options[
@@ -769,30 +875,14 @@ class Population:

            err = self._check_binary_c_error(out, full_system_dict)

            # TODO: fix that this goes good. 
            if err: return err
            if self.grid_options["parse_function"]:
                self.grid_options["parse_function"](self, out)
        else:
            # Handle this better
            print("NO MORE SYSTEMS")

    def _yield_system_mp(self):
        """
        Function that the multiprocessing evolution method calls to yield systems
        """

        for i, system in enumerate(self.grid_options["system_generator"](self)):
            full_system_dict = self.bse_options.copy()
            full_system_dict.update(system)

            # binary_cmdline_string = self._return_argline(full_system_dict)

            self._print_info(
                i + 1, self.grid_options["total_starcount"], full_system_dict
            )
            yield full_system_dict

        print("generator done")

    # Single system
    def evolve_single(self, clean_up_custom_logging_files=True):
        """
@@ -2261,15 +2351,14 @@ class Population:
        if binary_c_output.startswith("SYSTEM_ERROR"):
            print("FAILING SYSTEM FAILING SYSTEM")

            with self.custom_options['counter_lock']:
                self.grid_options['failed_prob'].set(self.grid_options['failed_prob'].value + system_dict['probability'])
                self.grid_options['failed_count'].set(self.grid_options['failed_count'].value + 1)
            self.grid_options['failed_prob'] += system_dict['probability']
            self.grid_options['failed_count'] += 1 

                if self.grid_options['failed_count'].value > 20:
                    print("{}".format(self.grid_options['failed_count'].value))
            if self.grid_options['failed_count'] > 20:
                print("failed_count: {}".format(self.grid_options['failed_count']))
                print("stopping logging to file")

                    self.grid_options['error_exceeded'].set(1)
                self.grid_options['error_exceeded'] = 1


            # Set values
+2 −1
Original line number Diff line number Diff line
@@ -77,10 +77,11 @@ grid_options_defaults_dict = {
    "probtot": 0,  # total probability
    "weight": 1.0,  # weighting for the probability
    "repeat": 1.0,  # number of times to repeat each system (probability is adjusted to be 1/repeat)
    "results_per_worker": {},  # dict which can store info per worker. meh. doesnt work properly
    "results": {}, # dict to store the results. Every process fills this on its own and then it will be joined later
    "start_time_evolution": 0,  # Start time of the grid
    "end_time_evolution": 0,  # end time of the grid
    "error": 0,  # error?
    "error_exceeded": 0, # Flag whether the amt of errors have exceeded the limit
    "failed_count": 0,  # amt of failed systems
    "failed_prob": 0,  # Summed probability of failed systems
    "failed_systems_error_codes": [],