Commit 215aa308 authored by Jan Oliver Oelerich's avatar Jan Oliver Oelerich

Completely changed the communication / storage of intermediate results. This...

Completely changed the communication / storage of intermediate results. This is supposed to scale better when network is the bottle neck. We'll see.
parent 3099a266
This diff is collapsed.
......@@ -895,6 +895,112 @@ void IO::writeCBEDIntensities(unsigned int idefocus, unsigned int iconf,
nc_close(f_id);
}
string getTempFileName() {
Params &p = Params::getInstance();
auto &mpi_env = mpi::Environment::getInstance();
int mpi_rank = mpi_env.rank();
return output::fmt("%s/%s_%d.bin", p.tmpDir(), p.uuid(), mpi_rank);
}
void IO::writeTemporaryResult(unsigned int idefocus, unsigned int iconf,
const shared_ptr<GridManager> &gridman, ScanPoint &point,
shared_ptr<memory::buffer::number_buffer<float>> &ibuf,
shared_ptr<memory::buffer::number_buffer<float>> &cbuf) {
auto &mpi_env = mpi::Environment::getInstance();
int mpi_rank = mpi_env.rank();
auto tmpfile = std::ofstream(getTempFileName(), std::ios::binary | std::ios::app);
// the format is as follows:
// int rank
// for each pixel:
// unsigned int idefocus
// unsigned int iconf
// unsigned int index
// float[] adf_intensities
// float[] cbed_intensities
if (tmpfile.fail()) {
output::error("Couldn't open temporary binary file for writing.\n");
}
if(tmpfile.tellp() == 0) {
// empty file, write rank
tmpfile.write(reinterpret_cast<const char *>(&mpi_rank), sizeof(mpi_rank));
}
tmpfile.write(reinterpret_cast<const char *>(&idefocus), sizeof(idefocus));
tmpfile.write(reinterpret_cast<const char *>(&iconf), sizeof(iconf));
tmpfile.write(reinterpret_cast<const char *>(&point.index), sizeof(point.index));
if(point.hasAdfIntensities()) {
tmpfile.write(reinterpret_cast<const char *>(ibuf->ptr(point.adf_intensities)), point.adf_intensities.byte_size());
point.clearAdfIntensities(ibuf);
}
if(point.hasCbedIntensities()) {
tmpfile.write(reinterpret_cast<const char *>(cbuf->ptr(point.cbed_intensities)), point.cbed_intensities.byte_size());
point.clearCBEDIntensities(cbuf);
}
tmpfile.close();
}
void IO::copyFromTemporaryFile(const shared_ptr<GridManager> &gridman,
shared_ptr<memory::buffer::number_buffer<float>> &ibuf,
shared_ptr<memory::buffer::number_buffer<float>> &cbuf) {
auto &mpi_env = mpi::Environment::getInstance();
auto tmpfile = std::ifstream(getTempFileName(), std::ifstream::binary);
tmpfile.seekg(0, std::ios::end);
const int length = tmpfile.tellg();
tmpfile.seekg(0, std::ios::beg);
// first, read rank (and check, if it is correct...)
int mpi_rank_from_file;
tmpfile.read(reinterpret_cast<char *>(&mpi_rank_from_file), sizeof(mpi_rank_from_file));
if(mpi_rank_from_file != mpi_env.rank())
output::error("Reading wrong temporary file!\n");
unsigned int idefocus;
unsigned int iconf;
unsigned int pix_idx;
while((int)tmpfile.tellg() < length) {
tmpfile.read(reinterpret_cast<char *>(&idefocus), sizeof(idefocus));
tmpfile.read(reinterpret_cast<char *>(&iconf), sizeof(iconf));
tmpfile.read(reinterpret_cast<char *>(&pix_idx), sizeof(pix_idx));
ScanPoint p = gridman->scanPoints()[pix_idx];
if(p.adf) {
auto & entry = ibuf->add_empty();
tmpfile.read(reinterpret_cast<char *>(ibuf->ptr(entry)), entry.byte_size());
p.storeAdfIntensities(entry);
writeAdfIntensities(idefocus, iconf, gridman, p, ibuf);
p.clearAdfIntensities(ibuf);
}
if(p.cbed) {
auto & entry = cbuf->add_empty();
tmpfile.read(reinterpret_cast<char *>(cbuf->ptr(entry)), entry.byte_size());
p.storeCBEDIntensities(entry);
writeCBEDIntensities(idefocus, iconf, gridman, p, cbuf);
p.clearCBEDIntensities(cbuf);
}
}
tmpfile.close();
// remove the file, as all the data is now transferred.
remove(getTempFileName().c_str());
}
void IO::writeGrids(const shared_ptr<GridManager> &gridman) {
......
......@@ -102,6 +102,16 @@ namespace stemsalabim {
const std::shared_ptr<GridManager> &gridman, const ScanPoint &point,
std::shared_ptr<memory::buffer::number_buffer<float>> &cbuf);
static void writeTemporaryResult(unsigned int idefocus, unsigned int iconf,
const std::shared_ptr<GridManager> &gridman, ScanPoint &point,
std::shared_ptr<memory::buffer::number_buffer<float>> &ibuf,
std::shared_ptr<memory::buffer::number_buffer<float>> &cbuf);
void copyFromTemporaryFile(const std::shared_ptr<GridManager> &gridman,
std::shared_ptr<memory::buffer::number_buffer<float>> &ibuf,
std::shared_ptr<memory::buffer::number_buffer<float>> &cbuf);
/*!
* Write the various grids to the output file.
* @param gridman reference to the GridManager
......
......@@ -298,8 +298,11 @@ void Params::readParamsFromString(const string &prms) {
if(_tmp_dir.empty()) {
// use the path of the output file directory for tmp
size_t found = _output_file.find_last_of("/\\");
_tmp_dir = _output_file.substr(0,found);
int found = _output_file.find_last_of("/\\");
if(found < 0)
_tmp_dir = ".";
else
_tmp_dir = _output_file.substr(0,found);
}
if(_random_seed == 0)
......
This diff is collapsed.
......@@ -91,6 +91,7 @@ namespace stemsalabim {
_dirty_indices.push_back(id);
_finished = _finished_indices.size() == _tasks.size();
//printf("%lu %lu\n", _finished_indices.size(), _tasks.size());
if(_finished) {
_finished_conditional.notify_all();
}
......
......@@ -118,6 +118,10 @@ namespace stemsalabim {
size_t size() const { return _end - _start; }
size_t byte_size() const {
return size() * sizeof(prec_t);
}
private:
size_t _chunk{0};
size_t _start{0};
......
......@@ -541,7 +541,7 @@ namespace stemsalabim {
MPI_Probe(source, tag, MPI_COMM_WORLD, status._mpi_status.get());
MPI_Get_count(status._mpi_status.get(), dt, &count);
if((int) data.size() < count)
if((int) data.size() != count)
data.resize(count);
MPI_Recv(data.data(),
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment