Commit 0c88363f authored by David Hendriks's avatar David Hendriks
Browse files

REplaced the method where all the threads use their own generator with one...

REplaced the method where all the threads use their own generator with one where there is a centralized queue. Now debugging
parent d6de79e7
Loading
Loading
Loading
Loading
+182 −139
Original line number Diff line number Diff line
@@ -832,6 +832,54 @@ class Population:
                0,
            )


    def get_stream_logger(self, level=logging.DEBUG):
        """Return logger with configured StreamHandler."""
        stream_logger = logging.getLogger('stream_logger')
        stream_logger.handlers = []
        stream_logger.setLevel(level)
        sh = logging.StreamHandler()
        sh.setLevel(level)
        fmt = '[%(asctime)s %(levelname)-8s %(processName)s] --- %(message)s'
        formatter = logging.Formatter(fmt)
        sh.setFormatter(formatter)
        stream_logger.addHandler(sh)

        return stream_logger

    def system_queue_filler(self, job_queue, amt_cores):
        """
        Function that is responsible for keeping the queue filled.

        This will generate the systems until it is full, and then keeps trying to fill it.
        Will have to play with the size of this. 
        """
        stream_logger = self.get_stream_logger()
        stream_logger.debug(f"setting up the system_queue_filler now")


        # Setup of the generator
        self._generate_grid_code(dry_run=False)

        self._load_grid_function()

        generator = self.grid_options["_system_generator"](self, print_results=False)
        
        # TODO: build in method to handle with the HPC.
        # Continously fill the queue
        for system_number, system_dict in enumerate(generator):
            stream_logger.debug(f"producing: {system_number}")  # DEBUG
            job_queue.put((system_number, system_dict))

            # Print current size
            # print("Current size: {}".format(save_que.qsize()))


        # Send closing signal to workers. When they receive this they will terminate
        stream_logger.debug(f"Signaling stop to processes")  # DEBUG
        for _ in range(amt_cores):
            job_queue.put("STOP")

    def _evolve_population_grid(self):
        """
        Function to evolve the population with multiprocessing approach.
@@ -867,30 +915,52 @@ class Population:
        # https://www.programcreek.com/python/example/58176/multiprocessing.Value
        # https://stackoverflow.com/questions/17377426/shared-variable-in-pythons-multiprocessing

        # Create the pool
        pool = Pool(processes=self.grid_options["amt_cores"])

        # start the processes by giving them an ID value
        result = list(
            pool.imap_unordered(
                self._process_run_population_grid, range(self.grid_options["amt_cores"])
            )
        )
        # Set up the manager object that can share info between processes
        manager = pathos_multiprocess.Manager()
        job_queue = manager.Queue(maxsize=10)
        result_queue = manager.Queue(maxsize=self.grid_options['amt_cores'])

        # Handle clean termination of the whole multiprocessing (making sure there are no zombie
        # processes (https://en.wikipedia.org/wiki/Zombie_process))
        pool.close()
        pool.join()
        # Create process instances
        processes = []
        for ID in range(self.grid_options["amt_cores"]):
            processes.append(pathos_multiprocess.Process(target=self._process_run_population_grid, args=(job_queue, result_queue, ID)))

        print("OUTSIDE THREAD")
        print(Moecache.keys())
        print("OUTSIDE THREAD")
        # Activate the processes
        for p in processes:
            p.start()

        # Set up the system_queue
        self.system_queue_filler(job_queue, amt_cores=self.grid_options["amt_cores"])

        # Join the processes
        for p in processes:
            p.join()

        # Handle the results by merging all the dictionaries. How that merging happens exactly is
        # described in the merge_dicts description.
        combined_output_dict = {}
        for output_dict in result:

        sentinel = object()
        for output_dict in iter(result_queue.get, sentinel):
            combined_output_dict = merge_dicts(combined_output_dict, output_dict)
            if result_queue.empty():
                break

        # # Create the pool
        # pool = Pool(processes=self.grid_options["amt_cores"])

        # # start the processes by giving them an ID value
        # result = list(
        #     pool.imap_unordered(
        #         self._process_run_population_grid, range(self.grid_options["amt_cores"])
        #     )
        # )

        # # Handle clean termination of the whole multiprocessing (making sure there are no zombie
        # # processes (https://en.wikipedia.org/wiki/Zombie_process))
        # pool.close()
        # pool.join()

        # Put the values back as object properties
        self.grid_results = combined_output_dict["results"]
@@ -945,7 +1015,7 @@ class Population:
        if self.grid_options["parse_function"]:
            self.grid_options["parse_function"](self, out)

    def _process_run_population_grid(self, ID):
    def _process_run_population_grid(self, job_queue, result_queue, ID):
        """
        Function that loops over the whole generator, but only runs
        systems that fit to: if (localcounter+ID) % self.grid_options["amt_cores"] == 0
@@ -963,6 +1033,9 @@ class Population:
            ID  # Store the ID as a object property again, lets see if that works.
        )

        stream_logger = self.get_stream_logger()
        stream_logger.debug(f"Setting up processor: process-{self.process_ID}")

        # Set to starting up
        with open(
            os.path.join(
@@ -1003,17 +1076,7 @@ class Population:
                0,
            )

        #
        self._generate_grid_code(dry_run=False)

        # apparently we have to re-load this for every process, otherwise NameErrors arise (seems like a bug but I'm not sure)
        self._load_grid_function()

        # Set up generator
        generator = self.grid_options["_system_generator"](self, print_results=False)

        # Set up local variables
        running = True
        localcounter = (
            0  # global counter for the whole loop. (need to be ticked every loop)
        )
@@ -1024,17 +1087,13 @@ class Population:
            0  # counter for the actual amt of systems this thread ran
        )

        round_number_mod = 0  # rotating modulo

        total_time_calling_binary_c = 0

        total_mass_run = 0
        total_probability_weighted_mass_run = 0

        # Go over the generator
        while running:
            # round_number_mod = (localcounter+1)%self.grid_options["amt_cores"]

        # Go over the queue
        for system_number, system_dict in iter(job_queue.get, 'STOP'):
            if localcounter == 0:

                # Set status to running
@@ -1047,19 +1106,9 @@ class Population:
                ) as f:
                    f.write("RUNNING")

            try:
                # Get the system
                system = next(generator)

                # Check if the ID is the correct one for this process. This is the method we use to split this calculation over many cores and or machines
                if (localcounter + (ID + round_number_mod)) % self.grid_options[
                    "amt_cores"
                ] == 0:

            # Combine that with the other settings
            full_system_dict = self.bse_options.copy()
                    full_system_dict.update(system)

            full_system_dict.update(system_dict)

            # In the first system, explicitly check all the keys that are passed to see if
            # they match the keys known to binary_c.
@@ -1091,10 +1140,11 @@ class Population:

            #
            verbose_print(
                        "Process {} is handling system {}".format(ID, localcounter),
                "Process {} is handling system {}".format(ID, system_number),
                self.grid_options["verbosity"],
                2,
            )
            stream_logger.debug("Process {} is handling system {}".format(ID, system_number))

            # In some cases, the whole run crashes. To be able to figure out which system
            # that was on, we log each current system to a file (each thread has one).
@@ -1143,25 +1193,14 @@ class Population:
            # Keep track of systems:
            probability_of_systems_run += full_system_dict["probability"]
            number_of_systems_run += 1
            localcounter += 1

            # Tally up some numbers
            total_mass_system = full_system_dict.get("M_1", 0) + full_system_dict.get("M_1", 0) + full_system_dict.get("M_1", 0) + full_system_dict.get("M_1", 0)
            total_mass_run += total_mass_system
            total_probability_weighted_mass_run += total_mass_system * full_system_dict["probability"]

            except StopIteration:
                running = False

            # Rotate the round number mod. The idea here is to prevent a thread from always getting the same sampled period of whatever. This just rotates everyone
            if (localcounter + 1) % self.grid_options["amt_cores"] == 0:
                round_number_mod += 1

            # print("thread {} round_nr_mod {}. localcounter {}".format(ID, round_number_mod, localcounter))

            # Has to be here because this one is used for the (localcounter+ID) % (self..)
            localcounter += 1

        # Set status to running
        # Set status to finishing
        with open(
            os.path.join(
                self.grid_options["tmp_dir"], "process_status",
@@ -1170,6 +1209,7 @@ class Population:
            "w",
        ) as f:
            f.write("FINISHING")
        stream_logger.debug(f"Process-{self.process_ID} is finishing.")

        # Handle ensemble output: is ensemble==1, then either directly write that data to a file, or combine everything into 1 file.
        ensemble_json = {}  # Make sure it exists already
@@ -1293,7 +1333,10 @@ class Population:
        ) as f:
            f.write("FINISHED")

        return output_dict
        result_queue.put(output_dict)
        stream_logger.debug(f"Process-{self.process_ID} is finished.")

        return

    # Single system
    def evolve_single(self, clean_up_custom_logging_files: bool = True) -> Any: