Commit 58b1419f authored by David Hendriks's avatar David Hendriks
Browse files

working on a better way of setting up the different processes

parent 856e96e4
Loading
Loading
Loading
Loading
+74 −26
Original line number Diff line number Diff line
@@ -661,6 +661,9 @@ class Population:
                )
            )

        # print("During this run {} systems failed with a total probability of {}".format(self.grid_options['failed_count'], self.grid_options['failed_prob']))
        # print("The full argline command strings are stored in {}".format(os.path.join(self.grid_options['tmp_dir'], 'failed_systems.txt')))

        ##
        # Clean up code: remove files, unset values.
        self._cleanup()
@@ -680,14 +683,26 @@ class Population:
        # https://stackoverflow.com/questions/28740955/working-with-pathos-multiprocessing-tool-in-python-and

        # TODO: make good example of how to deal with a result_dict
        # https://www.programcreek.com/python/example/58176/multiprocessing.Value
        # https://stackoverflow.com/questions/17377426/shared-variable-in-pythons-multiprocessing
        manager = pathos_multiprocess.Manager()
        self.grid_options["result_dict"] = manager.dict()
        prob_counter = manager.Value('i', 0)
        count_counter = manager.Value('i', 0)
        counter_lock = manager.Lock()
        error_exceeded_counter = manager.Value('i', 0)

        self.grid_options['error_exceeded'] = error_exceeded_counter
        self.grid_options['failed_prob'] = prob_counter
        self.grid_options['failed_count'] = count_counter
        self.custom_options['counter_lock'] = counter_lock

        # Create pool
        pool = Pool(processes=self.grid_options["amt_cores"])

        # Execute
        # TODO: calculate the chunksize value based on: total starcount and cores used.

        _ = list(
            pool.imap_unordered(
                self.evolve_system_mp, self._yield_system_mp(), chunksize=20
@@ -699,6 +714,15 @@ class Population:
        pool.close()
        pool.join()

        print("Total shit: {}".format(self.grid_options['failed_prob'].value))
        print("Total amt: {}".format(self.grid_options['failed_count'].value))

        # if results:
        #     print(results)
        #     self.grid_options['error'] = 1
        #     self.grid_options['failed_count'] += len(results)
        #     self.grid_options['failed_prob'] += sum(results)

    def _evolve_population_lin(self):
        """
        Function to evolve the population linearly (i.e. 1 core, no multiprocessing methods)
@@ -723,11 +747,17 @@ class Population:
            if self.grid_options["parse_function"]:
                self.grid_options["parse_function"](self, out)

    def evolve_system_mp(self, binary_cmdline_string):

    def evolve_system_mp(self, full_system_dict):
        """
        Function that the multiprocessing evolution method calls to evolve a system
        """

        binary_cmdline_string = self._return_argline(full_system_dict)

        if not self.grid_options['error_exceeded'].value==1:
            print("NO MORE SYSTEMS")

            out = _binary_c_bindings.run_system(
                argstring=binary_cmdline_string,
                custom_logging_func_memaddr=self.grid_options[
@@ -737,8 +767,13 @@ class Population:
                population=1,
            )

            err = self._check_binary_c_error(out, full_system_dict)

            if err: return err
            if self.grid_options["parse_function"]:
                self.grid_options["parse_function"](self, out)
        else:
            print("NO MORE SYSTEMS")

    def _yield_system_mp(self):
        """
@@ -749,12 +784,12 @@ class Population:
            full_system_dict = self.bse_options.copy()
            full_system_dict.update(system)

            binary_cmdline_string = self._return_argline(full_system_dict)
            # binary_cmdline_string = self._return_argline(full_system_dict)

            self._print_info(
                i + 1, self.grid_options["total_starcount"], full_system_dict
            )
            yield binary_cmdline_string
            yield full_system_dict

        print("generator done")

@@ -870,6 +905,7 @@ class Population:
                "start_time_evolution"
            ] = time.time()  # Setting start time of grid


            #
            self._generate_grid_code(dry_run=False)

@@ -932,6 +968,9 @@ class Population:
        self.grid_options["count"] = 0
        self.grid_options["probtot"] = 0
        self.grid_options["system_generator"] = None
        self.grid_options["error"] = 0
        self.grid_options["failed_count"] = 0
        self.grid_options["failed_prob"] = 0

        # Remove files

@@ -2211,35 +2250,44 @@ class Population:
    #     Function to join the result dictionaries
    #     """

    def _check_binary_c_error(self, binary_c_output, argstring):
    def _check_binary_c_error(self, binary_c_output, system_dict):
        """
        Function to check whether binary_c throws an error and handle accordingly. 
    
        TODO: build mechanism to stop the run if too many failing systems found
        TODO: Clean up everything here
        """

        # print(binary_c_output)
        # print(argstring)

        if binary_c_output.startswith("SYSTEM_ERROR"):
            # print('error found')
            print("FAILING SYSTEM FAILING SYSTEM")

            with self.custom_options['counter_lock']:
                self.grid_options['failed_prob'].set(self.grid_options['failed_prob'].value + system_dict['probability'])
                self.grid_options['failed_count'].set(self.grid_options['failed_count'].value + 1)

                if self.grid_options['failed_count'].value > 20:
                    print("{}".format(self.grid_options['failed_count'].value))
                    print("stopping logging to file")

                    self.grid_options['error_exceeded'].set(1)


            # Set values
            self.grid_options['error'] = 1

            # Write arglines to file
            argstring = self._return_argline(system_dict)
            with open(os.path.join(self.grid_options['tmp_dir'], 'failed_systems.txt'), 'a+') as f:
                f.write(argstring)
                f.write(argstring+"\n")

            try:
                error_code = int(binary_c_output.splitlines()[0].split("with error code")[-1].split(":")[0].strip())

                print(int(error_code))
                # print(binary_c_output.splitlines()[0].split("with error code")[-1].split(":")[0])
                if not error_code in self.grid_options['failing_systems_error_codes']:
                    self.grid_options['failing_systems_error_codes'].apppend(error_code)
                # print(int(error_code))
                if not error_code in self.grid_options['failed_systems_error_codes']:
                    self.grid_options['failed_systems_error_codes'].append(error_code)
            except ValueError:
                print("failed to extract the error-code")

            self.grid_options['failed_count'] += 1
            self.grid_options['']


################################################################################################