Ignore:
Timestamp:
12/12/19 18:15:14 (5 years ago)
Author:
ymipsl
Message:
  • Preparing coupling functionalities.
  • Make some cleaner things

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/node/context.cpp

    r1766 r1784  
    2424#include "cxios.hpp" 
    2525#include "client.hpp" 
     26#include "coupler_in.hpp" 
     27#include "coupler_out.hpp" 
    2628 
    2729namespace xios { 
     
    353355   { 
    354356      intraComm_=intraComm ; 
     357      MPI_Comm_rank(intraComm_, &intraCommRank_) ; 
     358      MPI_Comm_size(intraComm_, &intraCommSize_) ; 
     359 
    355360      serviceType_ = CServicesManager::CLIENT ; 
    356361      if (serviceType_==CServicesManager::CLIENT) 
     
    385390     hasServer=true; 
    386391     intraComm_=intraComm ; 
     392     MPI_Comm_rank(intraComm_, &intraCommRank_) ; 
     393     MPI_Comm_size(intraComm_, &intraCommSize_) ; 
     394 
    387395     serviceType_=serviceType ; 
    388396 
     
    544552    } 
    545553 
     554    for (auto it=couplerClient_.begin(); it!=couplerClient_.end(); ++it) 
     555    { 
     556      if (!finalized) it->second->checkBuffers(); 
     557    } 
     558 
     559    for (auto it=couplerServer_.begin(); it!=couplerServer_.end(); ++it) 
     560    { 
     561      if (!finalized) it->second->eventLoop(enableEventsProcessing); 
     562    } 
     563 
    546564    if (server!=nullptr) if (!finalized) finished &= server->eventLoop(enableEventsProcessing); 
    547565   
     
    549567  } 
    550568 
    551   
     569  void CContext::addCouplingChanel(const std::string& context, bool out) 
     570  { 
     571     vector<string> vectStr=splitRegex(context,"::") ; 
     572     string poolId=vectStr[0] ; 
     573     string serviceId=poolId ; 
     574     string contextId=vectStr[1] ; 
     575 
     576     int contextLeader ; 
     577     int type = CServicesManager::CLIENT ; 
     578     string contextName=CXios::getContextsManager()->getServerContextName(poolId, serviceId, 0, type, contextId) ; 
     579      
     580     if (couplerClient_.find(contextName)==couplerClient_.end()) 
     581     { 
     582       bool ok=CXios::getContextsManager()->getContextLeader(contextName, contextLeader, getIntraComm()) ; 
     583      
     584       MPI_Comm interComm, interCommClient, interCommServer  ; 
     585       MPI_Comm intraCommClient, intraCommServer ; 
     586 
     587       if (ok) MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ; 
     588 
     589       MPI_Comm_dup(intraComm_, &intraCommClient) ; 
     590       MPI_Comm_dup(intraComm_, &intraCommServer) ; 
     591       if (out) 
     592       { 
     593         MPI_Comm_dup(interComm, &interCommClient) ; 
     594         MPI_Comm_dup(interComm, &interCommServer) ; 
     595         CContextClient* client = new CContextClient(this, intraCommClient, interCommClient); 
     596         CContextServer* server = new CContextServer(this, intraCommServer, interCommServer); 
     597       } 
     598       else 
     599       { 
     600          MPI_Comm_dup(interComm, &interCommServer) ; 
     601          MPI_Comm_dup(interComm, &interCommClient) ; 
     602          CContextServer* server = new CContextServer(this, intraCommServer, interCommServer); 
     603          CContextClient* client = new CContextClient(this, intraCommClient, interCommClient); 
     604       } 
     605       MPI_Comm_free(&interComm) ; 
     606 
     607 
     608      // for now, we don't now which beffer size must be used for client coupler 
     609      // It will be evaluated later. Fix a constant size for now... 
     610      // set to 10Mb for development 
     611       map<int,size_t> bufferSize, maxEventSize ; 
     612       for(int i=0;i<client->getRemoteSize();i++) 
     613       { 
     614         bufferSize[i]=10000000 ; 
     615         maxEventSize[i]=10000000 ; 
     616       } 
     617 
     618       client->setBufferSize(bufferSize, maxEventSize);     
     619        
     620       couplerClient_[contextName] = client ; 
     621       couplerServer_[contextName] = server ; 
     622     } 
     623  } 
    552624   
    553625  void CContext::globalEventLoop(void) 
     
    649721     postProcessing(); 
    650722 
     723     // Distribute files between secondary servers according to the data size 
     724     distributeFiles(); 
     725 
    651726     // Check grid and calculate its distribution 
    652727     checkGridEnabledFields(); 
    653   
    654      // Distribute files between secondary servers according to the data size 
    655      distributeFiles(); 
    656728 
    657729     setClientServerBuffer(client, (hasClient && !hasServer)); 
     
    659731         setClientServerBuffer(clientPrimServer[i], true); 
    660732 
     733     
    661734     if (hasClient) 
    662      { 
    663       // Send all attributes of current context to server 
    664       this->sendAllAttributesToServer(); 
    665  
    666       // Send all attributes of current calendar 
    667       CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer(); 
     735     {  
     736       if (hasServer) 
     737       {  
     738         for (auto it=clientPrimServer.begin(); it!=clientPrimServer.end();++it)  
     739         { 
     740           this->sendAllAttributesToServer(*it); // Send all attributes of current context to server 
     741           CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer(*it); // Send all attributes of current calendar 
     742         } 
     743       } 
     744       else 
     745       { 
     746         this->sendAllAttributesToServer(client);   // Send all attributes of current context to server 
     747         CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer(client); // Send all attributes of current calendar 
     748       } 
     749 
    668750 
    669751      // We have enough information to send to server 
     
    779861    { 
    780862      buildFilterGraphOfFieldsWithReadAccess(); 
    781       postProcessFilterGraph(); 
     863      postProcessFilterGraph(); // For coupling in, modify this later 
    782864    } 
    783865     
     
    849931       enabledFiles[i]->checkGridOfEnabledFields();        
    850932     } 
     933 
     934     size = enabledCouplerOut.size(); 
     935     for (int i = 0; i < size; ++i) 
     936     { 
     937       enabledCouplerOut[i]->checkGridOfEnabledFields();        
     938     } 
    851939   } 
    852940   CATCH_DUMP_ATTR 
     
    872960      \param [in] sendToServer Flag to indicate whether calculated information will be sent 
    873961   */ 
    874    void CContext::solveOnlyRefOfEnabledFields(bool sendToServer) 
     962   void CContext::solveOnlyRefOfEnabledFields(void) 
    875963   TRY 
    876964   { 
     
    878966     for (int i = 0; i < size; ++i) 
    879967     { 
    880        this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer); 
     968       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(); 
    881969     } 
    882970 
     
    884972     { 
    885973       this->enabledFiles[i]->generateNewTransformationGridDest(); 
     974     } 
     975 
     976     size = this->enabledCouplerOut.size(); 
     977     for (int i = 0; i < size; ++i) 
     978     { 
     979       this->enabledCouplerOut[i]->solveOnlyRefOfEnabledFields(); 
     980     } 
     981 
     982     for (int i = 0; i < size; ++i) 
     983     { 
     984       this->enabledCouplerOut[i]->generateNewTransformationGridDest(); 
    886985     } 
    887986   } 
     
    894993      \param [in] sendToServer Flag to indicate whether calculated information will be sent 
    895994   */ 
    896    void CContext::solveAllRefOfEnabledFieldsAndTransform(bool sendToServer) 
     995   void CContext::solveAllRefOfEnabledFieldsAndTransform(void) 
    897996   TRY 
    898997   { 
     
    900999     for (int i = 0; i < size; ++i) 
    9011000     { 
    902        this->enabledFiles[i]->solveAllRefOfEnabledFieldsAndTransform(sendToServer); 
    903      } 
     1001       this->enabledFiles[i]->solveAllRefOfEnabledFieldsAndTransform(); 
     1002     } 
     1003 
     1004     size = this->enabledCouplerOut.size(); 
     1005     for (int i = 0; i < size; ++i) 
     1006     { 
     1007       this->enabledCouplerOut[i]->solveAllRefOfEnabledFieldsAndTransform(); 
     1008     } 
     1009 
    9041010   } 
    9051011   CATCH_DUMP_ATTR 
     
    9121018     { 
    9131019       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector); 
     1020     } 
     1021 
     1022     size = this->enabledCouplerOut.size(); 
     1023     for (int i = 0; i < size; ++i) 
     1024     { 
     1025       this->enabledCouplerOut[i]->buildFilterGraphOfEnabledFields(garbageCollector); 
    9141026     } 
    9151027   } 
     
    10021114     // Résolution des héritages par référence au niveau des fichiers. 
    10031115      const vector<CFile*> allFiles=CFile::getAll(); 
     1116      const vector<CCouplerIn*> allCouplerIn=CCouplerIn::getAll(); 
     1117      const vector<CCouplerOut*> allCouplerOut=CCouplerOut::getAll(); 
    10041118      const vector<CGrid*> allGrids= CGrid::getAll(); 
    10051119 
     
    10091123        for (unsigned int i = 0; i < allFiles.size(); i++) 
    10101124          allFiles[i]->solveFieldRefInheritance(apply); 
     1125 
     1126        for (unsigned int i = 0; i < allCouplerIn.size(); i++) 
     1127          allCouplerIn[i]->solveFieldRefInheritance(apply); 
     1128 
     1129        for (unsigned int i = 0; i < allCouplerOut.size(); i++) 
     1130          allCouplerOut[i]->solveFieldRefInheritance(apply); 
    10111131      } 
    10121132 
     
    10711191   CATCH_DUMP_ATTR 
    10721192 
     1193   void CContext::findEnabledCouplerIn(void) 
     1194   TRY 
     1195   { 
     1196      const std::vector<CCouplerIn*> allCouplerIn = CCouplerIn::getAll(); 
     1197      bool enabled ; 
     1198      for (size_t i = 0; i < allCouplerIn.size(); i++) 
     1199      { 
     1200        if (allCouplerIn[i]->enabled.isEmpty()) enabled=true ; 
     1201        else enabled=allCouplerIn[i]->enabled ; 
     1202        if (enabled) enabledCouplerIn.push_back(allCouplerIn[i]) ; 
     1203      } 
     1204   } 
     1205   CATCH_DUMP_ATTR 
     1206 
     1207   void CContext::findEnabledCouplerOut(void) 
     1208   TRY 
     1209   { 
     1210      const std::vector<CCouplerOut*> allCouplerOut = CCouplerOut::getAll(); 
     1211      bool enabled ; 
     1212      for (size_t i = 0; i < allCouplerOut.size(); i++) 
     1213      { 
     1214        if (allCouplerOut[i]->enabled.isEmpty()) enabled=true ; 
     1215        else enabled=allCouplerOut[i]->enabled ; 
     1216        if (enabled) enabledCouplerOut.push_back(allCouplerOut[i]) ; 
     1217      } 
     1218   } 
     1219   CATCH_DUMP_ATTR 
     1220 
     1221 
     1222 
     1223 
    10731224   void CContext::distributeFiles(void) 
    10741225   TRY 
     
    16691820      findEnabledWriteModeFiles(); 
    16701821      findEnabledReadModeFiles(); 
     1822      findEnabledCouplerIn(); 
     1823      findEnabledCouplerOut(); 
     1824       
     1825      createCouplerInterCommunicator() ; 
    16711826 
    16721827      // For now, only read files with client and only one level server 
     
    16891844 
    16901845      // Only search and rebuild all reference objects of enable fields, don't transform 
    1691       this->solveOnlyRefOfEnabledFields(false); 
     1846      this->solveOnlyRefOfEnabledFields(); 
    16921847 
    16931848      // Search and rebuild all reference object of enabled fields, and transform 
    1694       this->solveAllRefOfEnabledFieldsAndTransform(false); 
     1849      this->solveAllRefOfEnabledFieldsAndTransform(); 
    16951850 
    16961851      // Find all fields with read access from the public API 
     
    17001855 
    17011856      isPostProcessed = true; 
     1857   } 
     1858   CATCH_DUMP_ATTR 
     1859 
     1860   void CContext::createCouplerInterCommunicator(void) 
     1861   TRY 
     1862   { 
     1863      // juste for test now, in future need an scheduler to avoid dead-lock 
     1864      for(auto it=enabledCouplerOut.begin();it!=enabledCouplerOut.end();++it) 
     1865      { 
     1866        (*it)->createInterCommunicator() ; 
     1867      } 
     1868 
     1869      for(auto it=enabledCouplerIn.begin();it!=enabledCouplerIn.end();++it) 
     1870      { 
     1871        (*it)->createInterCommunicator() ; 
     1872      } 
    17021873   } 
    17031874   CATCH_DUMP_ATTR 
     
    19532124   TRY 
    19542125   { 
    1955      std::set<StdString> gridIds; 
     2126     std::set<pair<StdString,CContextClient*>> gridIds; 
     2127 
    19562128     int sizeFile = activeFiles.size(); 
    19572129     CFile* filePtr(NULL); 
     
    19662138       { 
    19672139         if (0 != enabledFields[numField]->getRelGrid()) 
    1968            gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId()); 
     2140           gridIds.insert(make_pair(CGrid::get(enabledFields[numField]->getRelGrid())->getId(),enabledFields[numField]->getContextClient())); 
    19692141       } 
    19702142     } 
     
    19732145     StdString gridDefRoot("grid_definition"); 
    19742146     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot); 
    1975      std::set<StdString>::const_iterator it, itE = gridIds.end(); 
    1976      for (it = gridIds.begin(); it != itE; ++it) 
    1977      { 
    1978        gridPtr->sendCreateChild(*it); 
    1979        CGrid::get(*it)->sendAllAttributesToServer(); 
    1980        CGrid::get(*it)->sendAllDomains(); 
    1981        CGrid::get(*it)->sendAllAxis(); 
    1982        CGrid::get(*it)->sendAllScalars(); 
     2147     for (auto it = gridIds.begin(); it != gridIds.end(); ++it) 
     2148     { 
     2149       gridPtr->sendCreateChild(it->first,it->second); 
     2150       CGrid::get(it->first)->sendAllAttributesToServer(it->second); 
     2151       CGrid::get(it->first)->sendAllDomains(it->second); 
     2152       CGrid::get(it->first)->sendAllAxis(it->second); 
     2153       CGrid::get(it->first)->sendAllScalars(it->second); 
    19832154     } 
    19842155   } 
     
    19892160   TRY 
    19902161   { 
    1991      std::set<StdString> domainIds, axisIds, scalarIds; 
     2162     std::set<pair<StdString,CContextClient*>> domainIds, axisIds, scalarIds; 
    19922163 
    19932164     // Find all reference domain and axis of all active fields 
     
    19992170       for (int j = 0; j < numEnabledFields; ++j) 
    20002171       { 
     2172         CContextClient* contextClient=enabledFields[j]->getContextClient() ; 
    20012173         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds(); 
    2002          if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]); 
    2003          if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]); 
    2004          if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]); 
     2174         if ("" != prDomAxisScalarId[0]) domainIds.insert(make_pair(prDomAxisScalarId[0],contextClient)); 
     2175         if ("" != prDomAxisScalarId[1]) axisIds.insert(make_pair(prDomAxisScalarId[1],contextClient)); 
     2176         if ("" != prDomAxisScalarId[2]) scalarIds.insert(make_pair(prDomAxisScalarId[2],contextClient)); 
    20052177       } 
    20062178     } 
     
    20122184     StdString scalarDefRoot("scalar_definition"); 
    20132185     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot); 
    2014      itE = scalarIds.end(); 
    2015      for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar) 
    2016      { 
    2017        if (!itScalar->empty()) 
     2186      
     2187     for (auto itScalar = scalarIds.begin(); itScalar != scalarIds.end(); ++itScalar) 
     2188     { 
     2189       if (!itScalar->first.empty()) 
    20182190       { 
    2019          scalarPtr->sendCreateChild(*itScalar); 
    2020          CScalar::get(*itScalar)->sendAllAttributesToServer(); 
     2191         scalarPtr->sendCreateChild(itScalar->first,itScalar->second); 
     2192         CScalar::get(itScalar->first)->sendAllAttributesToServer(itScalar->second); 
    20212193       } 
    20222194     } 
     
    20242196     StdString axiDefRoot("axis_definition"); 
    20252197     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot); 
    2026      itE = axisIds.end(); 
    2027      for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis) 
    2028      { 
    2029        if (!itAxis->empty()) 
     2198      
     2199     for (auto itAxis = axisIds.begin(); itAxis != axisIds.end(); ++itAxis) 
     2200     { 
     2201       if (!itAxis->first.empty()) 
    20302202       { 
    2031          axisPtr->sendCreateChild(*itAxis); 
    2032          CAxis::get(*itAxis)->sendAllAttributesToServer(); 
     2203         axisPtr->sendCreateChild(itAxis->first, itAxis->second); 
     2204         CAxis::get(itAxis->first)->sendAllAttributesToServer(itAxis->second); 
    20332205       } 
    20342206     } 
     
    20372209     StdString domDefRoot("domain_definition"); 
    20382210     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot); 
    2039      itE = domainIds.end(); 
    2040      for (itDom = domainIds.begin(); itDom != itE; ++itDom) 
    2041      { 
    2042        if (!itDom->empty()) { 
    2043           domPtr->sendCreateChild(*itDom); 
    2044           CDomain::get(*itDom)->sendAllAttributesToServer(); 
     2211      
     2212     for (auto itDom = domainIds.begin(); itDom != domainIds.end(); ++itDom) 
     2213     { 
     2214       if (!itDom->first.empty()) { 
     2215          domPtr->sendCreateChild(itDom->first, itDom->second); 
     2216          CDomain::get(itDom->first)->sendAllAttributesToServer(itDom->second); 
    20452217       } 
    20462218     } 
Note: See TracChangeset for help on using the changeset viewer.