Dispatcher.h 4.25 KB
Newer Older
Danny Perez committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
//
//  Dispatcher.h
//  ParSplice
//
//  Created by Danny Perez on 11/11/15.
//  Copyright (c) 2015 dp. All rights reserved.
//

#ifndef __ParSplice__Dispatcher__
#define __ParSplice__Dispatcher__

#include <stdio.h>
#include <mpi.h>
#include "ParSpliceCommon.h"


class Dispatcher{
public:
    Dispatcher(MPI_Comm commp_,MPI_Comm commc_, int nTask_, int parent){
        commp=commp_;
        commc=commc_;
        nTask=nTask_;
        root=parent;
        
        
        pthread_mutex_init(&lock, NULL);
        
        pthread_attr_t attr;
        pthread_attr_init(&attr);
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
       
        //spin the two threads
        int ret=pthread_create(&serverTh,&attr,&Dispatcher::serverThHandle,this);
        
        ret=pthread_create(&dispatcherTh,&attr,&Dispatcher::dispatcherThHandle,this);
    };
    
    static void *serverThHandle(void *context){
        Dispatcher *p=static_cast< Dispatcher* >(context);
        p->server();
        return NULL;
    };
    
    static void *dispatcherThHandle(void *context){
        Dispatcher *p=static_cast< Dispatcher* >(context);
        p->dispatcher();
        return NULL;
    };
    
    void server(){
        std::vector<parsplice::Label> uintCommBuff(nTask);
        while(true){
            
            MPI_Bcast(&(uintCommBuff[0]), nTask, MPI::UNSIGNED , root, commp);
            BOOST_LOG_SEV(lg,info) <<"#Received tasks from "<<root;
            
            pthread_mutex_lock(&lock);
            tasks.clear();
            for(int i=0;i<nTask;i++){
                tasks.push_back(uintCommBuff[i]);
            }
            {
                logging::record rec = lg.open_record();
                if (rec)
                {
                    logging::record_ostream strm(rec);
                    strm<<"========== TASKS ==========\n";
                    
                    for(int i=0;i<nTask;i++){
                        strm<<tasks[i]<<" ";
                    }
                    
                    
                    strm.flush();
                    lg.push_record(boost::move(rec));
                }
            }
            pthread_mutex_unlock(&lock);
            
        }
    };
    
    void dispatcher(){
        BOOST_LOG_SEV(lg,info) <<"#Dispatcher dispatcher ";
        
        
        std::vector<parsplice::Label> uintCommBuff(1);
        
        MPI_Request req;
        int completed=0;
        MPI_Status status;
        
        //post an initial non-blocking receive
        MPI_Irecv(&(uintCommBuff[0]),1,MPI::UNSIGNED,MPI::ANY_SOURCE,MPI_ANY_TAG,commc,&req);
        while(true){
            MPI_Test(&req,&completed,&status);
            
            if(completed){
                int producerID=status.MPI_SOURCE;
                requestQueue.push_back(producerID);
                BOOST_LOG_SEV(lg,info) <<"#Received request from "<<producerID;
                
                //post the next receive
                completed=0;
                MPI_Irecv(&(uintCommBuff[0]),1,MPI::UNSIGNED,MPI::ANY_SOURCE,MPI_ANY_TAG,commc,&req);
            }
            
            BOOST_LOG_SEV(lg,info) <<"#Processing "<< requestQueue.size() << " requests";
            
            //process
            pthread_mutex_lock(&lock);
            if(tasks.size()>0){
                //randomly sample with replacement
                for(std::list<int>::iterator it=requestQueue.begin();it!=requestQueue.end();it++){
                    boost::random::uniform_int_distribution<int> d(0,int(tasks.size()-1));
                    uintCommBuff[0]=tasks[d(rand)];
                    MPI_Send(&(uintCommBuff[0]),1,MPI::UNSIGNED,*it,1,commc);
                    BOOST_LOG_SEV(lg,info) <<"#Processed request from "<<*it;
                }
                requestQueue.clear();
            }
            pthread_mutex_unlock(&lock);
            
        }
        
    };
protected:
    int nTask;
    int root;
    std::deque<parsplice::Label> tasks;
    boost::random::mt19937 rand;
    std::list<int> requestQueue;
    MPI_Comm commc;
    MPI_Comm commp;
    
    
    pthread_t serverTh;
    pthread_t dispatcherTh;
    pthread_mutex_t lock;
    boost::log::sources::severity_logger< boost::log::trivial::severity_level > lg;
};
#endif /* defined(__ParSplice__Dispatcher__) */