Changeset 1765 for XIOS/dev/dev_ym/XIOS_SERVICES/src/server.cpp
- Timestamp:
- 11/06/19 11:03:38 (5 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_SERVICES/src/server.cpp
r1764 r1765 44 44 CServersRessource* CServer::serversRessource_=nullptr ; 45 45 46 void CServer::initRessources(void) 47 { 48 auto ressourcesManager=CXios::getRessourcesManager() ; 49 auto servicesManager=CXios::getServicesManager() ; 50 auto contextsManager=CXios::getContextsManager() ; 51 auto daemonsManager=CXios::getDaemonsManager() ; 52 auto serversRessource=CServer::getServersRessource() ; 53 54 if (serversRessource->isServerLeader()) 55 { 56 // ressourcesManager->createPool("LMDZ",ressourcesManager->getRessourcesSize()/2) ; 57 // ressourcesManager->createPool("NEMO",ressourcesManager->getRessourcesSize()/2) ; 58 ressourcesManager->createPool("LMDZ",ressourcesManager->getRessourcesSize()) ; 59 servicesManager->createServices("LMDZ", "ioserver", CServicesManager::IO_SERVER, 8, 5) ; 60 for(int i=0 ; i<5;i++) 61 { 62 contextsManager->createServerContext("LMDZ","ioserver",i,"lmdz") ; 63 } 64 } 65 66 67 68 while (true) 69 { 70 daemonsManager->eventLoop() ; 71 } 72 73 74 } 75 46 76 47 void CServer::initialize(void) 77 48 { … … 318 289 } 319 290 320 //---------------------------------------------------------------321 /*!322 * \fn void CServer::initialize(void)323 * Creates intraComm for each possible type of servers (classical, primary or secondary).324 * Creates interComm and stores them into the following lists:325 * classical server -- interCommLeft326 * primary server -- interCommLeft and interCommRight327 * secondary server -- interCommLeft for each pool.328 * IMPORTANT: CXios::usingServer2 should NOT be used beyond this function. Use CServer::serverLevel instead.329 */330 void CServer::initialize_old(void)331 {332 int initialized ;333 MPI_Initialized(&initialized) ;334 if (initialized) is_MPI_Initialized=true ;335 else is_MPI_Initialized=false ;336 int rank ;337 338 CXios::launchRessourcesManager(true) ;339 CXios::launchServicesManager(true) ;340 CXios::launchContextsManager(true) ;341 342 initRessources() ;343 // Not using OASIS344 if (!CXios::usingOasis)345 {346 347 if (!is_MPI_Initialized)348 {349 MPI_Init(NULL, NULL);350 }351 CTimer::get("XIOS").resume() ;352 353 boost::hash<string> hashString ;354 unsigned long hashServer = hashString(CXios::xiosCodeId);355 356 unsigned long* hashAll ;357 unsigned long* srvLevelAll ;358 359 int size ;360 int myColor ;361 int i,c ;362 MPI_Comm newComm;363 364 MPI_Comm_size(CXios::globalComm, &size) ;365 MPI_Comm_rank(CXios::globalComm, &rank_);366 367 hashAll=new unsigned long[size] ;368 MPI_Allgather(&hashServer, 1, MPI_LONG, hashAll, 1, MPI_LONG, CXios::globalComm) ;369 370 map<unsigned long, int> colors ;371 map<unsigned long, int> leaders ;372 map<unsigned long, int>::iterator it ;373 374 // (1) Establish client leaders, distribute processes between two server levels375 std::vector<int> srvRanks;376 for(i=0,c=0;i<size;i++)377 {378 if (colors.find(hashAll[i])==colors.end())379 {380 colors[hashAll[i]]=c ;381 leaders[hashAll[i]]=i ;382 c++ ;383 }384 if (CXios::usingServer2)385 if (hashAll[i] == hashServer)386 srvRanks.push_back(i);387 }388 389 if (CXios::usingServer2)390 {391 int reqNbProc = srvRanks.size()*CXios::ratioServer2/100.;392 if (reqNbProc<1 || reqNbProc==srvRanks.size())393 {394 error(0)<<"WARNING: void CServer::initialize(void)"<<endl395 << "It is impossible to dedicate the requested number of processes = "<<reqNbProc396 <<" to secondary server. XIOS will run in the classical server mode."<<endl;397 }398 else399 {400 if (CXios::nbPoolsServer2 == 0) CXios::nbPoolsServer2 = reqNbProc;401 int firstSndSrvRank = srvRanks.size()*(100.-CXios::ratioServer2)/100. ;402 int poolLeader = firstSndSrvRank;403 //*********** (1) Comment out the line below to set one process per pool404 sndServerGlobalRanks.push_back(srvRanks[poolLeader]);405 int nbPools = CXios::nbPoolsServer2;406 if ( nbPools > reqNbProc || nbPools < 1)407 {408 error(0)<<"WARNING: void CServer::initialize(void)"<<endl409 << "It is impossible to allocate the requested number of pools = "<<nbPools410 <<" on the secondary server. It will be set so that there is one process per pool."<<endl;411 nbPools = reqNbProc;412 }413 int remainder = ((int) (srvRanks.size()*CXios::ratioServer2/100.)) % nbPools;414 int procsPerPool = ((int) (srvRanks.size()*CXios::ratioServer2/100.)) / nbPools;415 for (i=0; i<srvRanks.size(); i++)416 {417 if (i >= firstSndSrvRank)418 {419 if (rank_ == srvRanks[i])420 {421 serverLevel=2;422 }423 poolLeader += procsPerPool;424 if (remainder != 0)425 {426 ++poolLeader;427 --remainder;428 }429 //*********** (2) Comment out the two lines below to set one process per pool430 if (poolLeader < srvRanks.size())431 sndServerGlobalRanks.push_back(srvRanks[poolLeader]);432 //*********** (3) Uncomment the line below to set one process per pool433 // sndServerGlobalRanks.push_back(srvRanks[i]);434 }435 else436 {437 if (rank_ == srvRanks[i]) serverLevel=1;438 }439 }440 if (serverLevel==2)441 {442 info(50)<<"The number of secondary server pools is "<< sndServerGlobalRanks.size() <<endl ;443 for (i=0; i<sndServerGlobalRanks.size(); i++)444 {445 if (rank_>= sndServerGlobalRanks[i])446 {447 if ( i == sndServerGlobalRanks.size()-1)448 {449 myColor = colors.size() + sndServerGlobalRanks[i];450 }451 else if (rank_< sndServerGlobalRanks[i+1])452 {453 myColor = colors.size() + sndServerGlobalRanks[i];454 break;455 }456 }457 }458 }459 }460 }461 462 // (2) Create intraComm463 if (serverLevel != 2) myColor=colors[hashServer];464 MPI_Comm_split(CXios::globalComm, myColor, rank_, &intraComm) ;465 466 // (3) Create interComm467 if (serverLevel == 0)468 {469 int clientLeader;470 for(it=leaders.begin();it!=leaders.end();it++)471 {472 if (it->first!=hashServer)473 {474 clientLeader=it->second ;475 int intraCommSize, intraCommRank ;476 MPI_Comm_size(intraComm,&intraCommSize) ;477 MPI_Comm_rank(intraComm,&intraCommRank) ;478 info(50)<<"intercommCreate::server (classical mode) "<<rank_<<" intraCommSize : "<<intraCommSize479 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< clientLeader<<endl ;480 481 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ;482 interCommLeft.push_back(newComm) ;483 }484 }485 }486 else if (serverLevel == 1)487 {488 int clientLeader, srvSndLeader;489 int srvPrmLeader ;490 491 for (it=leaders.begin();it!=leaders.end();it++)492 {493 if (it->first != hashServer)494 {495 clientLeader=it->second ;496 int intraCommSize, intraCommRank ;497 MPI_Comm_size(intraComm, &intraCommSize) ;498 MPI_Comm_rank(intraComm, &intraCommRank) ;499 info(50)<<"intercommCreate::server (server level 1) "<<rank_<<" intraCommSize : "<<intraCommSize500 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< clientLeader<<endl ;501 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ;502 interCommLeft.push_back(newComm) ;503 }504 }505 506 for (int i = 0; i < sndServerGlobalRanks.size(); ++i)507 {508 int intraCommSize, intraCommRank ;509 MPI_Comm_size(intraComm, &intraCommSize) ;510 MPI_Comm_rank(intraComm, &intraCommRank) ;511 info(50)<<"intercommCreate::client (server level 1) "<<rank_<<" intraCommSize : "<<intraCommSize512 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< sndServerGlobalRanks[i]<<endl ;513 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, sndServerGlobalRanks[i], 1, &newComm) ;514 interCommRight.push_back(newComm) ;515 }516 }517 else518 {519 int clientLeader;520 clientLeader = leaders[hashString(CXios::xiosCodeId)];521 int intraCommSize, intraCommRank ;522 MPI_Comm_size(intraComm, &intraCommSize) ;523 MPI_Comm_rank(intraComm, &intraCommRank) ;524 info(50)<<"intercommCreate::server (server level 2) "<<rank_<<" intraCommSize : "<<intraCommSize525 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< clientLeader<<endl ;526 527 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 1, &newComm) ;528 interCommLeft.push_back(newComm) ;529 }530 531 delete [] hashAll ;532 533 }534 // using OASIS535 else536 {537 int size;538 int myColor;539 int* srvGlobalRanks;540 if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);541 542 CTimer::get("XIOS").resume() ;543 MPI_Comm localComm;544 oasis_get_localcomm(localComm);545 MPI_Comm_rank(localComm,&rank_) ;546 547 // (1) Create server intraComm548 if (!CXios::usingServer2)549 {550 MPI_Comm_dup(localComm, &intraComm);551 }552 else553 {554 int globalRank;555 MPI_Comm_size(localComm,&size) ;556 MPI_Comm_rank(CXios::globalComm,&globalRank) ;557 srvGlobalRanks = new int[size] ;558 MPI_Allgather(&globalRank, 1, MPI_INT, srvGlobalRanks, 1, MPI_INT, localComm) ;559 560 int reqNbProc = size*CXios::ratioServer2/100.;561 if (reqNbProc < 1 || reqNbProc == size)562 {563 error(0)<<"WARNING: void CServer::initialize(void)"<<endl564 << "It is impossible to dedicate the requested number of processes = "<<reqNbProc565 <<" to secondary server. XIOS will run in the classical server mode."<<endl;566 MPI_Comm_dup(localComm, &intraComm);567 }568 else569 {570 int firstSndSrvRank = size*(100.-CXios::ratioServer2)/100. ;571 int poolLeader = firstSndSrvRank;572 //*********** (1) Comment out the line below to set one process per pool573 // sndServerGlobalRanks.push_back(srvGlobalRanks[poolLeader]);574 int nbPools = CXios::nbPoolsServer2;575 if ( nbPools > reqNbProc || nbPools < 1)576 {577 error(0)<<"WARNING: void CServer::initialize(void)"<<endl578 << "It is impossible to allocate the requested number of pools = "<<nbPools579 <<" on the secondary server. It will be set so that there is one process per pool."<<endl;580 nbPools = reqNbProc;581 }582 int remainder = ((int) (size*CXios::ratioServer2/100.)) % nbPools;583 int procsPerPool = ((int) (size*CXios::ratioServer2/100.)) / nbPools;584 for (int i=0; i<size; i++)585 {586 if (i >= firstSndSrvRank)587 {588 if (globalRank == srvGlobalRanks[i])589 {590 serverLevel=2;591 }592 poolLeader += procsPerPool;593 if (remainder != 0)594 {595 ++poolLeader;596 --remainder;597 }598 //*********** (2) Comment out the two lines below to set one process per pool599 // if (poolLeader < size)600 // sndServerGlobalRanks.push_back(srvGlobalRanks[poolLeader]);601 //*********** (3) Uncomment the line below to set one process per pool602 sndServerGlobalRanks.push_back(srvGlobalRanks[i]);603 }604 else605 {606 if (globalRank == srvGlobalRanks[i]) serverLevel=1;607 }608 }609 if (serverLevel==2)610 {611 info(50)<<"The number of secondary server pools is "<< sndServerGlobalRanks.size() <<endl ;612 for (int i=0; i<sndServerGlobalRanks.size(); i++)613 {614 if (globalRank>= sndServerGlobalRanks[i])615 {616 if (i == sndServerGlobalRanks.size()-1)617 {618 myColor = sndServerGlobalRanks[i];619 }620 else if (globalRank< sndServerGlobalRanks[i+1])621 {622 myColor = sndServerGlobalRanks[i];623 break;624 }625 }626 }627 }628 if (serverLevel != 2) myColor=0;629 MPI_Comm_split(localComm, myColor, rank_, &intraComm) ;630 }631 }632 633 string codesId=CXios::getin<string>("oasis_codes_id") ;634 vector<string> oasisCodeId=splitRegex(codesId,"\\s*,\\s*") ;635 636 vector<string>::iterator it ;637 638 MPI_Comm newComm ;639 int globalRank ;640 MPI_Comm_rank(CXios::globalComm,&globalRank);641 642 // (2) Create interComms with models643 for(it=oasisCodeId.begin();it!=oasisCodeId.end();it++)644 {645 oasis_get_intercomm(newComm,*it) ;646 if ( serverLevel == 0 || serverLevel == 1)647 {648 interCommLeft.push_back(newComm) ;649 if (rank_==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;650 }651 }652 653 // (3) Create interComms between primary and secondary servers654 int intraCommSize, intraCommRank ;655 MPI_Comm_size(intraComm,&intraCommSize) ;656 MPI_Comm_rank(intraComm, &intraCommRank) ;657 658 if (serverLevel == 1)659 {660 for (int i = 0; i < sndServerGlobalRanks.size(); ++i)661 {662 int srvSndLeader = sndServerGlobalRanks[i];663 info(50)<<"intercommCreate::client (server level 1) "<<globalRank<<" intraCommSize : "<<intraCommSize664 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< srvSndLeader<<endl ;665 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvSndLeader, 0, &newComm) ;666 interCommRight.push_back(newComm) ;667 }668 }669 else if (serverLevel == 2)670 {671 info(50)<<"intercommCreate::server (server level 2)"<<globalRank<<" intraCommSize : "<<intraCommSize672 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< srvGlobalRanks[0] <<endl ;673 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvGlobalRanks[0], 0, &newComm) ;674 interCommLeft.push_back(newComm) ;675 }676 if (CXios::usingServer2) delete [] srvGlobalRanks ;677 678 bool oasisEnddef=CXios::getin<bool>("call_oasis_enddef",true) ;679 if (!oasisEnddef) oasis_enddef() ;680 }681 682 683 MPI_Comm_rank(intraComm, &rank) ;684 if (rank==0) isRoot=true;685 else isRoot=false;686 687 eventScheduler = new CEventScheduler(intraComm) ;688 }689 291 690 292 void CServer::finalize(void) … … 699 301 for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++) 700 302 MPI_Comm_free(&(*it)); 701 702 // for (std::list<MPI_Comm>::iterator it = interComm.begin(); it != interComm.end(); it++)703 // MPI_Comm_free(&(*it));704 705 // for (std::list<MPI_Comm>::iterator it = interCommLeft.begin(); it != interCommLeft.end(); it++)706 // MPI_Comm_free(&(*it));707 303 708 304 for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++) … … 723 319 report(100)<<CTimer::getAllCumulatedTime()<<endl ; 724 320 } 725 726 void CServer::eventLoop(void)727 {728 bool stop=false ;729 730 CTimer::get("XIOS server").resume() ;731 while(!stop)732 {733 if (isRoot)734 {735 listenContext();736 listenRootContext();737 listenOasisEnddef() ;738 listenRootOasisEnddef() ;739 if (!finished) listenFinalize() ;740 }741 else742 {743 listenRootContext();744 listenRootOasisEnddef() ;745 if (!finished) listenRootFinalize() ;746 }747 748 contextEventLoop() ;749 if (finished && contextList.empty()) stop=true ;750 eventScheduler->checkEvent() ;751 }752 CTimer::get("XIOS server").suspend() ;753 }754 755 void CServer::listenFinalize(void)756 {757 list<MPI_Comm>::iterator it, itr;758 int msg ;759 int flag ;760 761 for(it=interCommLeft.begin();it!=interCommLeft.end();it++)762 {763 MPI_Status status ;764 traceOff() ;765 MPI_Iprobe(0,0,*it,&flag,&status) ;766 traceOn() ;767 if (flag==true)768 {769 MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;770 info(20)<<" CServer : Receive client finalize"<<endl ;771 // Sending server finalize message to secondary servers (if any)772 for(itr=interCommRight.begin();itr!=interCommRight.end();itr++)773 {774 MPI_Send(&msg,1,MPI_INT,0,0,*itr) ;775 }776 MPI_Comm_free(&(*it));777 interCommLeft.erase(it) ;778 break ;779 }780 }781 782 if (interCommLeft.empty())783 {784 int i,size ;785 MPI_Comm_size(intraComm,&size) ;786 MPI_Request* requests= new MPI_Request[size-1] ;787 MPI_Status* status= new MPI_Status[size-1] ;788 789 for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;790 MPI_Waitall(size-1,requests,status) ;791 792 finished=true ;793 delete [] requests ;794 delete [] status ;795 }796 }797 798 799 void CServer::listenRootFinalize()800 {801 int flag ;802 MPI_Status status ;803 int msg ;804 805 traceOff() ;806 MPI_Iprobe(0,4,intraComm, &flag, &status) ;807 traceOn() ;808 if (flag==true)809 {810 MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;811 finished=true ;812 }813 }814 815 816 /*!817 * Root process is listening for an order sent by client to call "oasis_enddef".818 * The root client of a compound send the order (tag 5). It is probed and received.819 * When the order has been received from each coumpound, the server root process ping the order to the root processes of the secondary levels of servers (if any).820 * After, it also inform (asynchronous call) other processes of the communicator that the oasis_enddef call must be done821 */822 823 void CServer::listenOasisEnddef(void)824 {825 int flag ;826 MPI_Status status ;827 list<MPI_Comm>::iterator it;828 int msg ;829 static int nbCompound=0 ;830 int size ;831 static bool sent=false ;832 static MPI_Request* allRequests ;833 static MPI_Status* allStatus ;834 835 836 if (sent)837 {838 MPI_Comm_size(intraComm,&size) ;839 MPI_Testall(size,allRequests, &flag, allStatus) ;840 if (flag==true)841 {842 delete [] allRequests ;843 delete [] allStatus ;844 sent=false ;845 }846 }847 848 849 for(it=interCommLeft.begin();it!=interCommLeft.end();it++)850 {851 MPI_Status status ;852 traceOff() ;853 MPI_Iprobe(0,5,*it,&flag,&status) ; // tags oasis_endded = 5854 traceOn() ;855 if (flag==true)856 {857 MPI_Recv(&msg,1,MPI_INT,0,5,*it,&status) ; // tags oasis_endded = 5858 nbCompound++ ;859 if (nbCompound==interCommLeft.size())860 {861 for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)862 {863 MPI_Send(&msg,1,MPI_INT,0,5,*it) ; // tags oasis_endded = 5864 }865 MPI_Comm_size(intraComm,&size) ;866 allRequests= new MPI_Request[size] ;867 allStatus= new MPI_Status[size] ;868 for(int i=0;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,5,intraComm,&allRequests[i]) ; // tags oasis_endded = 5869 sent=true ;870 }871 }872 }873 }874 875 /*!876 * Processes probes message from root process if oasis_enddef call must be done.877 * When the order is received it is scheduled to be treated in a synchronized way by all server processes of the communicator878 */879 void CServer::listenRootOasisEnddef(void)880 {881 int flag ;882 MPI_Status status ;883 const int root=0 ;884 int msg ;885 static bool eventSent=false ;886 887 if (eventSent)888 {889 boost::hash<string> hashString;890 size_t hashId = hashString("oasis_enddef");891 if (eventScheduler->queryEvent(0,hashId))892 {893 oasis_enddef() ;894 eventSent=false ;895 }896 }897 898 traceOff() ;899 MPI_Iprobe(root,5,intraComm, &flag, &status) ;900 traceOn() ;901 if (flag==true)902 {903 MPI_Recv(&msg,1,MPI_INT,root,5,intraComm,&status) ; // tags oasis_endded = 5904 boost::hash<string> hashString;905 size_t hashId = hashString("oasis_enddef");906 eventScheduler->registerEvent(0,hashId);907 eventSent=true ;908 }909 }910 911 912 913 914 915 void CServer::listenContext(void)916 {917 918 MPI_Status status ;919 int flag ;920 static char* buffer ;921 static MPI_Request request ;922 static bool recept=false ;923 int rank ;924 int count ;925 926 if (recept==false)927 {928 traceOff() ;929 MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;930 traceOn() ;931 if (flag==true)932 {933 rank=status.MPI_SOURCE ;934 MPI_Get_count(&status,MPI_CHAR,&count) ;935 buffer=new char[count] ;936 MPI_Irecv((void*)buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;937 recept=true ;938 }939 }940 else941 {942 traceOff() ;943 MPI_Test(&request,&flag,&status) ;944 traceOn() ;945 if (flag==true)946 {947 rank=status.MPI_SOURCE ;948 MPI_Get_count(&status,MPI_CHAR,&count) ;949 recvContextMessage((void*)buffer,count) ;950 delete [] buffer ;951 recept=false ;952 }953 }954 }955 956 void CServer::recvContextMessage(void* buff,int count)957 {958 static map<string,contextMessage> recvContextId;959 map<string,contextMessage>::iterator it ;960 CBufferIn buffer(buff,count) ;961 string id ;962 int clientLeader ;963 int nbMessage ;964 965 buffer>>id>>nbMessage>>clientLeader ;966 967 it=recvContextId.find(id) ;968 if (it==recvContextId.end())969 {970 contextMessage msg={0,0} ;971 pair<map<string,contextMessage>::iterator,bool> ret ;972 ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;973 it=ret.first ;974 }975 it->second.nbRecv+=1 ;976 it->second.leaderRank+=clientLeader ;977 978 if (it->second.nbRecv==nbMessage)979 {980 int size ;981 MPI_Comm_size(intraComm,&size) ;982 // MPI_Request* requests= new MPI_Request[size-1] ;983 // MPI_Status* status= new MPI_Status[size-1] ;984 MPI_Request* requests= new MPI_Request[size] ;985 MPI_Status* status= new MPI_Status[size] ;986 987 CMessage msg ;988 msg<<id<<it->second.leaderRank;989 int messageSize=msg.size() ;990 void * sendBuff = new char[messageSize] ;991 CBufferOut sendBuffer(sendBuff,messageSize) ;992 sendBuffer<<msg ;993 994 // Include root itself in order not to have a divergence995 for(int i=0; i<size; i++)996 {997 MPI_Isend(sendBuff,sendBuffer.count(),MPI_CHAR,i,2,intraComm,&requests[i]) ;998 }999 1000 recvContextId.erase(it) ;1001 delete [] requests ;1002 delete [] status ;1003 1004 }1005 }1006 1007 void CServer::listenRootContext(void)1008 {1009 MPI_Status status ;1010 int flag ;1011 static std::vector<void*> buffers;1012 static std::vector<MPI_Request> requests ;1013 static std::vector<int> counts ;1014 static std::vector<bool> isEventRegistered ;1015 static std::vector<bool> isEventQueued ;1016 MPI_Request request;1017 1018 int rank ;1019 const int root=0 ;1020 boost::hash<string> hashString;1021 size_t hashId = hashString("RegisterContext");1022 1023 // (1) Receive context id from the root, save it into a buffer1024 traceOff() ;1025 MPI_Iprobe(root,2,intraComm, &flag, &status) ;1026 traceOn() ;1027 if (flag==true)1028 {1029 counts.push_back(0);1030 MPI_Get_count(&status,MPI_CHAR,&(counts.back())) ;1031 buffers.push_back(new char[counts.back()]) ;1032 requests.push_back(request);1033 MPI_Irecv((void*)(buffers.back()),counts.back(),MPI_CHAR,root,2,intraComm,&(requests.back())) ;1034 isEventRegistered.push_back(false);1035 isEventQueued.push_back(false);1036 nbContexts++;1037 }1038 1039 for (int ctxNb = 0; ctxNb < nbContexts; ctxNb++ )1040 {1041 // (2) If context id is received, register an event1042 MPI_Test(&requests[ctxNb],&flag,&status) ;1043 if (flag==true && !isEventRegistered[ctxNb])1044 {1045 eventScheduler->registerEvent(ctxNb,hashId);1046 isEventRegistered[ctxNb] = true;1047 }1048 // (3) If event has been scheduled, call register context1049 if (eventScheduler->queryEvent(ctxNb,hashId) && !isEventQueued[ctxNb])1050 {1051 registerContext(buffers[ctxNb],counts[ctxNb]) ;1052 isEventQueued[ctxNb] = true;1053 delete [] buffers[ctxNb] ;1054 }1055 }1056 1057 }1058 1059 void CServer::registerContext(void* buff, int count, int leaderRank)1060 {1061 string contextId;1062 CBufferIn buffer(buff, count);1063 // buffer >> contextId;1064 buffer >> contextId>>leaderRank;1065 CContext* context;1066 1067 info(20) << "CServer : Register new Context : " << contextId << endl;1068 1069 if (contextList.find(contextId) != contextList.end())1070 ERROR("void CServer::registerContext(void* buff, int count, int leaderRank)",1071 << "Context '" << contextId << "' has already been registred");1072 1073 context=CContext::create(contextId);1074 contextList[contextId]=context;1075 1076 // Primary or classical server: create communication channel with a client1077 // (1) create interComm (with a client)1078 // (2) initialize client and server (contextClient and contextServer)1079 MPI_Comm inter;1080 if (serverLevel < 2)1081 {1082 MPI_Comm contextInterComm;1083 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, leaderRank, 10+leaderRank, &contextInterComm);1084 MPI_Intercomm_merge(contextInterComm,1,&inter);1085 MPI_Barrier(inter);1086 MPI_Comm_free(&inter);1087 context->initServer(intraComm,contextInterComm);1088 contextInterComms.push_back(contextInterComm);1089 1090 }1091 // Secondary server: create communication channel with a primary server1092 // (1) duplicate interComm with a primary server1093 // (2) initialize client and server (contextClient and contextServer)1094 // Remark: in the case of the secondary server there is no need to create an interComm calling MPI_Intercomm_create,1095 // because interComm of CContext is defined on the same processes as the interComm of CServer.1096 // So just duplicate it.1097 else if (serverLevel == 2)1098 {1099 MPI_Comm_dup(interCommLeft.front(), &inter);1100 contextInterComms.push_back(inter);1101 context->initServer(intraComm, contextInterComms.back());1102 }1103 1104 // Primary server:1105 // (1) send create context message to secondary servers1106 // (2) initialize communication channels with secondary servers (create contextClient and contextServer)1107 if (serverLevel == 1)1108 {1109 int i = 0, size;1110 MPI_Comm_size(intraComm, &size) ;1111 for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++, ++i)1112 {1113 StdString str = contextId +"_server_" + boost::lexical_cast<string>(i);1114 CMessage msg;1115 int messageSize;1116 msg<<str<<size<<rank_ ;1117 messageSize = msg.size() ;1118 buff = new char[messageSize] ;1119 CBufferOut buffer(buff,messageSize) ;1120 buffer<<msg ;1121 MPI_Send(buff, buffer.count(), MPI_CHAR, sndServerGlobalRanks[i], 1, CXios::globalComm) ;1122 MPI_Comm_dup(*it, &inter);1123 contextInterComms.push_back(inter);1124 MPI_Comm_dup(intraComm, &inter);1125 contextIntraComms.push_back(inter);1126 context->initClient(contextIntraComms.back(), contextInterComms.back()) ;1127 delete [] buff ;1128 }1129 }1130 }1131 1132 void CServer::contextEventLoop(bool enableEventsProcessing /*= true*/)1133 {1134 bool isFinalized ;1135 map<string,CContext*>::iterator it ;1136 1137 for(it=contextList.begin();it!=contextList.end();it++)1138 {1139 isFinalized=it->second->isFinalized();1140 if (isFinalized)1141 {1142 contextList.erase(it) ;1143 break ;1144 }1145 else1146 it->second->eventLoop(enableEventsProcessing);1147 //ym it->second->checkBuffersAndListen(enableEventsProcessing);1148 }1149 }1150 1151 //! Get rank of the current process in the intraComm1152 int CServer::getRank()1153 {1154 int rank;1155 MPI_Comm_rank(intraComm,&rank);1156 return rank;1157 }1158 1159 vector<int>& CServer::getSecondaryServerGlobalRanks()1160 {1161 return sndServerGlobalRanks;1162 }1163 321 1164 322 /*!
Note: See TracChangeset
for help on using the changeset viewer.