Commit 7cdffc4e authored by Christophe Gonzales's avatar Christophe Gonzales

added a simple multithreading architecture for processing the rows of DatabaseTables

parent dd0b1ced
Pipeline #23949440 passed with stages
in 64 minutes 18 seconds
......@@ -33,6 +33,8 @@
#include <numeric>
#include <algorithm>
#include <functional>
#include <exception>
#include <vector>
#include <agrum/agrum.h>
#include <agrum/core/set.h>
......@@ -702,6 +704,29 @@ namespace gum {
DBVector< std::size_t > __getKthIndices(const std::size_t k,
const bool k_is_input_col) const;
/// a method to process the rows of the database in multithreading
/** The function tries to execute function/functor exec_func using one
* or several threads. If an exception is raised by at least one thread,
* then function undo_func is executed to undo what exec_func
* did, and the exception is rethrown.
*
* @param exec_func this should be a function/functor/lambda that
* takes 2 arguments: the first one is an std::size_t containing the
* index of the first row that it should process, the second argument is
* an std::size_t equal to 1 + the index of the last row processed (so
* the processing is performed on [first,last). The return type of exec_func
* is a void. If a thread executing exec_func raises an exception, then
* before exiting, it should undo what it did.
* @param undo_func a Function/functor/lambda with the same
* prototype as exec_func. If a thread raises an exception, those that
* did not raise exceptions should undo what they did in order to restore
* the state that the database had before the execution of the thread. After
* calling undo_func, they should have restored this state.
*/
template <typename Functor1, typename Functor2>
void __threadProcessDatabase ( Functor1& exec_func,
Functor2& undo_func );
#endif /* DOXYGEN_SHOULD_SKIP_THIS */
};
......
......@@ -179,6 +179,118 @@ namespace gum {
return *this;
}
// a method to process the rows of the database in multithreading
template < template < typename > class ALLOC >
template < typename Functor1, typename Functor2>
void DatabaseTable< ALLOC >::__threadProcessDatabase (
Functor1& exec_func,
Functor2& undo_func ) {
// compute the number of threads to execute the code, the number N of
// rows that each thread should process and the number of rows that
// would remain after each thread has processed its N rows. For instance,
// if the database has 105 rows and there are 10 threads, each thread
// should process 10 rows and there would remain 5 rows
const std::size_t db_size = this->_rows.size ();
std::size_t nb_threads = db_size / this->_min_nb_rows_per_thread;
if ( nb_threads < 1 )
nb_threads = 1;
else if ( nb_threads > this->_max_nb_threads )
nb_threads = this->_max_nb_threads;
std::size_t nb_rows_par_thread = db_size / nb_threads;
std::size_t rest_rows = db_size - nb_rows_par_thread * nb_threads;
// if there is just one thread, let it process all the rows
if ( nb_threads == 1 ) {
exec_func (std::size_t(0), db_size);
return;
}
// here, we shall create the threads, but also one std::exception_ptr
// for each thread. This will allow us to catch the exception raised
// by the threads
std::vector<std::thread> threads;
threads.reserve ( nb_threads );
std::vector<std::exception_ptr>
func_exceptions ( nb_threads, nullptr );
// create a lambda that will execute exec_func while catching its exceptions
auto real_exec_func =
[&exec_func] (std::size_t begin,std::size_t end,
std::exception_ptr& exc) -> void {
try { exec_func(begin,end); }
catch ( ... ) { exc = std::current_exception(); }
};
// launch the threads
std::size_t begin_index = std::size_t(0);
for ( std::size_t i = std::size_t(0); i < nb_threads; ++i ) {
std::size_t end_index = begin_index + nb_rows_par_thread;
if ( rest_rows != std::size_t(0) ) {
++end_index;
--rest_rows;
}
threads.push_back(std::thread(std::ref(real_exec_func),
begin_index, end_index,
std::ref( func_exceptions[i])) );
begin_index = end_index;
}
// wait for the threads to complete their executions
std::for_each(threads.begin(),threads.end(),
std::mem_fn(&std::thread::join));
// now, check if one exception has been raised
bool exception_raised = false;
for ( const auto& exc : func_exceptions ) {
if ( exc != nullptr ) {
exception_raised = true;
break;
}
}
if ( exception_raised ) {
// create a lambda that will execute undo_func while catching
// its exceptions
auto real_undo_func =
[&undo_func] (std::size_t begin,std::size_t end,
std::exception_ptr& exc) -> void {
try { undo_func(begin,end); }
catch ( ... ) { exc = std::current_exception(); }
};
// launch the repair threads
threads.clear ();
begin_index = std::size_t(0);
std::vector<std::exception_ptr>
undo_func_exceptions ( nb_threads, nullptr );
for ( std::size_t i = std::size_t(0); i < nb_threads; ++i ) {
std::size_t end_index = begin_index + nb_rows_par_thread;
if ( rest_rows != std::size_t(0) ) {
++end_index;
--rest_rows;
}
// we just need to repair the threads that did not raise exceptions
if ( func_exceptions[i] == nullptr )
threads.push_back(std::thread(std::ref(real_undo_func),
begin_index, end_index,
std::ref( undo_func_exceptions[i])));
begin_index = end_index;
}
// wait for the threads to complete their executions
std::for_each(threads.begin(),threads.end(),
std::mem_fn(&std::thread::join));
// rethrow the exception
for ( const auto& exc : func_exceptions ) {
if ( exc != nullptr ) {
std::rethrow_exception( exc );
}
}
}
}
/// insert a new translator into the database
template < template < typename > class ALLOC >
......@@ -196,8 +308,21 @@ namespace gum {
// reserve some place for the new column in the records of the database
const std::size_t new_size = this->nbVariables() + 1;
for (auto& row : this->_rows)
row.reserve(new_size);
// create the lambda for reserving some memory for the new column
// and the one that undoes what it performed if some thread executing
// it raised an exception
auto reserve_lambda =
[this,new_size](std::size_t begin,std::size_t end) -> void {
for ( std::size_t i = begin; i < end; ++i )
this->_rows[i].row().reserve(new_size);
};
auto undo_reserve_lambda =
[](std::size_t begin,std::size_t end) ->void {};
// launch the threads executing the lambdas
this->__threadProcessDatabase ( reserve_lambda, undo_reserve_lambda );
// insert the translator into the translator set
const std::size_t pos =
......@@ -212,58 +337,35 @@ namespace gum {
}
// if the databaseTable is not empty, fill the column of the database
// corresponding to the translator with
// corresponding to the translator with missing values
if (!IDatabaseTable< DBTranslatedValue, ALLOC >::empty()) {
const DBTranslatedValue missing = __translators[pos].missingValue();
// check if we can parallelize the filling process
const std::size_t db_size = this->_rows.size ();
std::size_t nb_threads = db_size / this->_min_nb_rows_per_thread;
if ( nb_threads < 1 )
nb_threads = 1;
else if ( nb_threads > this->_max_nb_threads )
nb_threads = this->_max_nb_threads;
if ( nb_threads == 1 ) {
// here process in an unparallel fashion
for (auto& row : this->_rows)
row.row().push_back(missing);
for (auto& status_row : this->_has_row_missing_val)
status_row = IsMissing::True;
}
else {
// determine the number of elements per thread
std::size_t nb_rows_par_thread = db_size / nb_threads;
std::size_t rest_rows = db_size - nb_rows_par_thread * nb_threads;
// create the functor used by the threads to insert the missing
// values into the new column of the database
auto fill_functor =
[this,missing](std::size_t begin, std::size_t end) ->void {
for ( std::size_t i = begin; i < end; ++i ) {
// create the lambda for adding a new column filled wih a missing value
auto fill_lambda =
[this,missing] (std::size_t begin,std::size_t end) -> void {
std::size_t i = begin;
try {
for ( ; i < end; ++i ) {
this->_rows[i].row().push_back(missing);
this->_has_row_missing_val[i] = IsMissing::True;
}
};
// create the threads
std::vector<std::thread> threads;
threads.reserve ( nb_threads );
std::size_t begin_index = std::size_t(0);
for ( std::size_t i = std::size_t(0); i < nb_threads; ++i ) {
std::size_t end_index = begin_index + nb_rows_par_thread;
if ( rest_rows != std::size_t(0) ) {
++end_index;
--rest_rows;
}
threads.push_back(std::thread(fill_functor,begin_index,end_index));
begin_index = end_index;
}
// execute the threads
std::for_each(threads.begin(),threads.end(),
std::mem_fn(&std::thread::join));
}
catch ( ... ) {
for ( std::size_t j = begin; j < i; ++j )
this->_rows[i].row().pop_back ();
throw;
}
};
auto undo_fill_lambda =
[this] (std::size_t begin,std::size_t end) -> void {
for ( std::size_t i = begin; i < end; ++i )
this->_rows[i].row().pop_back ();
};
// launch the threads executing the lambdas
this->__threadProcessDatabase ( fill_lambda, undo_fill_lambda );
}
return pos;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment