Ignore:
Timestamp:
09/10/20 13:51:02 (4 years ago)
Author:
ymipsl
Message:

Big update on on going work related to data distribution and transfer between clients and servers.
Revisite of the source and store filter using "connectors".

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/node/domain.cpp

    r1918 r1930  
    17451745      this->completeLonLatClient(); 
    17461746      this->initializeLocalElement() ; 
    1747       this->addFullView() ; 
    1748       this->addWorkflowView() ; 
    1749       this->addModelView() ; 
     1747      this->addFullView() ; // probably do not automatically add View, but only if requested 
     1748      this->addWorkflowView() ; // probably do not automatically add View, but only if requested 
     1749      this->addModelView() ; // probably do not automatically add View, but only if requested 
    17501750      // testing ? 
     1751     /* 
    17511752      CLocalView* local = localElement_->getView(CElementView::WORKFLOW) ; 
    17521753      CLocalView* model = localElement_->getView(CElementView::MODEL) ; 
     
    17691770      gridTest1.transfer(data_i_index,out1,-111) ; 
    17701771      gridTest2.transfer(out1, out2,-111) ; 
    1771        
     1772    */   
    17721773      this->checkAttributes_done_ = true; 
    17731774   } 
     
    18201821     for(int k=0;k<dataSize;k++) 
    18211822     { 
    1822         i=data_i_index(k)+data_ibegin ; // bad 
    1823         j=data_j_index(k)+data_jbegin ; // bad 
    1824         if (i>=0 && i<ni && j>=0 && j<nj) index(k)=i+j*ni ; 
    1825         else index(k)=-1 ; 
     1823        if (data_dim==2) 
     1824        { 
     1825          i=data_i_index(k)+data_ibegin ; // bad 
     1826          j=data_j_index(k)+data_jbegin ; // bad 
     1827          if (i>=0 && i<ni && j>=0 && j<nj) index(k)=i+j*ni ; 
     1828          else index(k)=-1 ; 
     1829        } 
     1830        else if (data_dim==1) 
     1831        { 
     1832          i=data_i_index(k)+data_ibegin ; // bad 
     1833          if (i>=0 && i<ni*nj) index(k)=i ; 
     1834          else index(k)=-1 ; 
     1835        } 
    18261836     } 
    18271837     localElement_->addView(CElementView::MODEL, index) ; 
     
    21942204    this->sendAllAttributesToServer(client)  ; 
    21952205    this->sendDistributionAttributes(client);    
    2196     this->sendIndex(client);        
    2197     this->sendLonLat(client); 
    2198     this->sendArea(client);     
    2199     this->sendDataIndex(client); 
    2200  
    2201     // test new connector functionnality 
    2202     this->sendDomainDistribution(client) ; 
     2206    //this->sendIndex(client);        
     2207    //this->sendLonLat(client); 
     2208    //this->sendArea(client);     
     2209    //this->sendDataIndex(client); 
     2210 
    22032211  } 
    22042212 
     
    22322240  } 
    22332241 
     2242 
     2243  void CDomain::computeRemoteElement(CContextClient* client, EDistributionType type) 
     2244  TRY 
     2245  { 
     2246    CContext* context = CContext::getCurrent(); 
     2247    map<int, CArray<size_t,1>> globalIndex ; 
     2248 
     2249    if (type==EDistributionType::BANDS) // Bands distribution to send to file server 
     2250    { 
     2251      int nbServer = client->serverSize; 
     2252      std::vector<int> nGlobDomain(2); 
     2253      nGlobDomain[0] = this->ni_glo; 
     2254      nGlobDomain[1] = this->nj_glo; 
     2255 
     2256      // to be changed in future, need to rewrite more simply domain distribution 
     2257      CServerDistributionDescription serverDescription(nGlobDomain, nbServer); 
     2258      int distributedPosition ; 
     2259      if (isUnstructed_) distributedPosition = 0 ; 
     2260      else distributedPosition = 1 ; 
     2261       
     2262      std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin(); 
     2263      std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes(); 
     2264      vector<unordered_map<size_t,vector<int>>> indexServerOnElement ; 
     2265      CArray<int,1> axisDomainOrder(1) ; axisDomainOrder(0)=2 ; 
     2266      auto zeroIndex=serverDescription.computeServerGlobalByElement(indexServerOnElement, context->getIntraCommRank(), context->getIntraCommSize(), 
     2267                                                                  axisDomainOrder,distributedPosition) ; 
     2268      // distribution is very bad => to redo 
     2269      // convert indexServerOnElement => map<int,CArray<size_t,1>> - need to be changed later 
     2270      map<int, vector<size_t>> vectGlobalIndex ; 
     2271      for(auto& indexRanks : indexServerOnElement[0]) 
     2272      { 
     2273        size_t index=indexRanks.first ; 
     2274        auto& ranks=indexRanks.second ; 
     2275        for(int rank : ranks) vectGlobalIndex[rank].push_back(index) ; 
     2276      } 
     2277      for(auto& vect : vectGlobalIndex ) globalIndex.emplace(vect.first, CArray<size_t,1>(vect.second.data(), shape(vect.second.size()),duplicateData)) ;  
     2278    } 
     2279    else if (type==EDistributionType::NONE) // domain is not distributed ie all servers get the same local domain 
     2280    { 
     2281      int nbServer = client->serverSize; 
     2282      int nglo=ni_glo*nj_glo ; 
     2283      CArray<size_t,1> indGlo ; 
     2284      for(size_t i=0;i<nglo;i++) indGlo(i) = i ; 
     2285      for (auto& rankServer : client->getRanksServerLeader()) globalIndex[rankServer] = indGlo ;  
     2286    } 
     2287    remoteElement_[client] = new CDistributedElement(ni_glo*nj_glo, globalIndex) ; 
     2288    remoteElement_[client]->addFullView() ; 
     2289  } 
     2290  CATCH 
     2291 
     2292  
     2293 
     2294  void CDomain::distributeToServer(CContextClient* client, map<int, CArray<size_t,1>>& globalIndex, const string& domainId) 
     2295  TRY 
     2296  { 
     2297    string serverDomainId = domainId.empty() ? this->getId() : domainId ; 
     2298    CContext* context = CContext::getCurrent(); 
     2299 
     2300    this->sendAllAttributesToServer(client, serverDomainId)  ; 
     2301 
     2302    CDistributedElement scatteredElement(ni_glo*nj_glo, globalIndex) ; 
     2303    scatteredElement.addFullView() ; 
     2304    CScattererConnector scattererConnector(localElement_->getView(CElementView::FULL), scatteredElement.getView(CElementView::FULL), context->getIntraComm()) ; 
     2305    scattererConnector.computeConnector() ; 
     2306 
     2307    // phase 0 
     2308    // send remote element to construct the full view on server, ie without hole  
     2309    CEventClient event0(getType(), EVENT_ID_DOMAIN_DISTRIBUTION); 
     2310    CMessage message0 ; 
     2311    message0<<serverDomainId<<0 ;  
     2312    remoteElement_[client]->sendToServer(client,event0,message0) ;  
     2313     
     2314    // phase 1 
     2315    // send the full view of element to construct the connector which connect distributed data coming from client to the full local view 
     2316    CEventClient event1(getType(), EVENT_ID_DOMAIN_DISTRIBUTION); 
     2317    CMessage message1 ; 
     2318    message1<<serverDomainId<<1<<localElement_->getView(CElementView::FULL)->getGlobalSize() ;  
     2319    scattererConnector.transfer(localElement_->getView(CElementView::FULL)->getGlobalIndex(),client,event1,message1) ; 
     2320     
     2321    sendDistributedAttributes(client, scattererConnector, domainId) ; 
     2322 
     2323   
     2324    // phase 2 send the mask : data index + mask2D 
     2325    CArray<bool,1> maskIn(localElement_->getView(CElementView::WORKFLOW)->getSize()); 
     2326    CArray<bool,1> maskOut ; 
     2327    CLocalConnector workflowToFull(localElement_->getView(CElementView::WORKFLOW), localElement_->getView(CElementView::FULL)) ; 
     2328    workflowToFull.computeConnector() ; 
     2329    maskIn=true ; 
     2330    workflowToFull.transfer(maskIn,maskOut,false) ; 
     2331 
     2332 
     2333    // phase 3 : prepare grid scatterer connector to send data from client to server 
     2334    map<int,CArray<size_t,1>> workflowGlobalIndex ; 
     2335    map<int,CArray<bool,1>> maskOut2 ;  
     2336    scattererConnector.transfer(maskOut, maskOut2, false) ; 
     2337    scatteredElement.addView(CElementView::WORKFLOW, maskOut2) ; 
     2338    scatteredElement.getView(CElementView::WORKFLOW)->getGlobalIndexView(workflowGlobalIndex) ; 
     2339    // create new workflow view for scattered element 
     2340    CDistributedElement clientToServerElement(scatteredElement.getGlobalSize(), workflowGlobalIndex) ; 
     2341    clientToServerElement.addFullView() ; 
     2342    CEventClient event2(getType(), EVENT_ID_DOMAIN_DISTRIBUTION); 
     2343    CMessage message2 ; 
     2344    message2<<serverDomainId<<2 ;  
     2345    clientToServerElement.sendToServer(client, event2, message2) ;  
     2346    clientToServerConnector_[client] = new CScattererConnector(localElement_->getView(CElementView::WORKFLOW), 
     2347                                                              clientToServerElement.getView(CElementView::FULL), context->getIntraComm()) ; 
     2348    clientToServerConnector_[client]->computeConnector() ; 
     2349 
     2350 
     2351    CEventClient event3(getType(), EVENT_ID_DOMAIN_DISTRIBUTION); 
     2352    CMessage message3 ; 
     2353    message3<<serverDomainId<<3 ;  
     2354    clientToServerConnector_[client]->transfer(maskIn,client,event3,message3) ;  
     2355     
     2356  } 
     2357  CATCH 
     2358  
     2359  void CDomain::recvDomainDistribution(CEventServer& event) 
     2360  TRY 
     2361  { 
     2362    string domainId; 
     2363    int phasis ; 
     2364    for (auto& subEvent : event.subEvents) (*subEvent.buffer) >> domainId >> phasis ; 
     2365    get(domainId)->receivedDomainDistribution(event, phasis); 
     2366  } 
     2367  CATCH 
     2368 
     2369  void CDomain::receivedDomainDistribution(CEventServer& event, int phasis) 
     2370  TRY 
     2371  { 
     2372    CContext* context = CContext::getCurrent(); 
     2373    if (phasis==0) // receive the remote element to construct the full view 
     2374    { 
     2375      localElement_ = new  CLocalElement(context->getIntraCommRank(),event) ; 
     2376      localElement_->addFullView() ; 
     2377      // construct the local dimension and indexes 
     2378      auto& globalIndex=localElement_->getGlobalIndex() ; 
     2379      int nij=globalIndex.numElements() ; 
     2380      int minI=ni_glo,maxI=-1,minJ=nj_glo,maxJ=-1 ; 
     2381      int i,j ; 
     2382      int niGlo=ni_glo, njGlo=njGlo ; 
     2383      for(int ij=0;ij<nij;ij++) 
     2384      { 
     2385        j=globalIndex(ij)/niGlo ; 
     2386        i=globalIndex(ij)%niGlo ; 
     2387        if (i<minI) minI=i ; 
     2388        if (i>maxI) maxI=i ; 
     2389        if (j<minJ) minJ=j ; 
     2390        if (j>maxJ) maxJ=j ; 
     2391      }   
     2392      if (maxI>=minI) { ibegin=minI ; ni=maxI-minI+1 ; } 
     2393      else {ibegin=0; ni=0 ;} 
     2394      if (maxJ>=minJ) { jbegin=minJ ; nj=maxJ-minJ+1 ; } 
     2395      else {jbegin=0; nj=0 ;} 
     2396 
     2397    } 
     2398    else if (phasis==1) // receive the sent view from client to construct the full distributed full view on server 
     2399    { 
     2400      CContext* context = CContext::getCurrent(); 
     2401      CDistributedElement* elementFrom = new  CDistributedElement(event) ; 
     2402      elementFrom->addFullView() ; 
     2403      gathererConnector_ = new CGathererConnector(elementFrom->getView(CElementView::FULL), localElement_->getView(CElementView::FULL)) ; 
     2404      gathererConnector_->computeConnector() ;  
     2405    } 
     2406    else if (phasis==2) 
     2407    { 
     2408      delete gathererConnector_ ; 
     2409      elementFrom_ = new  CDistributedElement(event) ; 
     2410      elementFrom_->addFullView() ; 
     2411      gathererConnector_ =  new CGathererConnector(elementFrom_->getView(CElementView::FULL), localElement_->getView(CElementView::FULL)) ; 
     2412      gathererConnector_ -> computeConnector() ; 
     2413    } 
     2414    else if (phasis==3) 
     2415    { 
     2416      CArray<bool,1> localMask ; 
     2417      gathererConnector_->transfer(event,localMask,false) ; 
     2418      localElement_->addView(CElementView::WORKFLOW, localMask) ; 
     2419      mask_1d.reference(localMask.copy()) ; 
     2420  
     2421      serverFromClientConnector_ = new CGathererConnector(elementFrom_->getView(CElementView::FULL), localElement_->getView(CElementView::WORKFLOW)) ; 
     2422      serverFromClientConnector_->computeConnector() ; 
     2423    } 
     2424  } 
     2425  CATCH 
     2426 
     2427 
     2428  void CDomain::sendDistributedAttributes(CContextClient* client, CScattererConnector& scattererConnector,  const string& domainId) 
     2429  { 
     2430    string serverDomainId = domainId.empty() ? this->getId() : domainId ; 
     2431    CContext* context = CContext::getCurrent(); 
     2432 
     2433    if (hasLonLat) 
     2434    { 
     2435      { // send longitude 
     2436        CEventClient event(getType(), EVENT_ID_SEND_DISTRIBUTED_ATTRIBUTE); 
     2437        CMessage message ; 
     2438        message<<serverDomainId<<string("lon") ;  
     2439        scattererConnector.transfer(lonvalue, client, event,message) ; 
     2440      } 
     2441       
     2442      { // send latitude 
     2443        CEventClient event(getType(), EVENT_ID_SEND_DISTRIBUTED_ATTRIBUTE); 
     2444        CMessage message ; 
     2445        message<<serverDomainId<<string("lat") ;  
     2446        scattererConnector.transfer(latvalue, client, event, message) ; 
     2447      } 
     2448    } 
     2449 
     2450    if (hasBounds) 
     2451    {  
     2452      { // send longitude boudaries 
     2453        CEventClient event(getType(), EVENT_ID_SEND_DISTRIBUTED_ATTRIBUTE); 
     2454        CMessage message ; 
     2455        message<<serverDomainId<<string("boundslon") ;  
     2456        scattererConnector.transfer(nvertex, bounds_lonvalue, client, event, message ) ; 
     2457      } 
     2458 
     2459      { // send latitude boudaries 
     2460        CEventClient event(getType(), EVENT_ID_SEND_DISTRIBUTED_ATTRIBUTE); 
     2461        CMessage message ; 
     2462        message<<serverDomainId<<string("boundslat") ;  
     2463        scattererConnector.transfer(nvertex, bounds_latvalue, client, event, message ) ; 
     2464      } 
     2465    } 
     2466 
     2467    if (hasArea) 
     2468    {  // send area 
     2469      CEventClient event(getType(), EVENT_ID_SEND_DISTRIBUTED_ATTRIBUTE); 
     2470      CMessage message ; 
     2471      message<<serverDomainId<<string("area") ;  
     2472      scattererConnector.transfer(areavalue, client, event,message) ; 
     2473    } 
     2474  } 
     2475 
     2476  void CDomain::recvDistributedAttributes(CEventServer& event) 
     2477  TRY 
     2478  { 
     2479    string domainId; 
     2480    string type ; 
     2481    for (auto& subEvent : event.subEvents) (*subEvent.buffer) >> domainId >> type ; 
     2482    get(domainId)->recvDistributedAttributes(event, type); 
     2483  } 
     2484  CATCH 
     2485 
     2486  void CDomain::recvDistributedAttributes(CEventServer& event, const string& type) 
     2487  TRY 
     2488  { 
     2489    if (type=="lon")  
     2490    { 
     2491      CArray<double,1> value ; 
     2492      gathererConnector_->transfer(event, value, 0.);  
     2493      lonvalue_2d.resize(ni,nj) ; 
     2494      lonvalue_2d=CArray<double,2>(value.dataFirst(),shape(ni,nj),neverDeleteData) ;  
     2495    } 
     2496    else if (type=="lat") 
     2497    { 
     2498      CArray<double,1> value ; 
     2499      gathererConnector_->transfer(event, value, 0.);  
     2500      latvalue_2d.resize(ni,nj) ; 
     2501      latvalue_2d=CArray<double,2>(value.dataFirst(),shape(ni,nj),neverDeleteData) ;  
     2502    } 
     2503    else if (type=="boundslon") 
     2504    { 
     2505      CArray<double,1> value ; 
     2506      gathererConnector_->transfer(event, nvertex, value, 0.);  
     2507      bounds_lon_2d.resize(nvertex,ni,nj) ; 
     2508      bounds_lon_2d=CArray<double,3>(value.dataFirst(),shape(nvertex,ni,nj),neverDeleteData) ;  
     2509    } 
     2510    else if (type=="boundslat") 
     2511    { 
     2512      CArray<double,1> value ; 
     2513      gathererConnector_->transfer(event, nvertex, value, 0.);  
     2514      bounds_lat_2d.resize(nvertex,ni,nj) ; 
     2515      bounds_lat_2d=CArray<double,3>(value.dataFirst(),shape(nvertex,ni,nj),neverDeleteData) ;  
     2516    } 
     2517    else if (type=="area")  
     2518    { 
     2519      CArray<double,1> value ; 
     2520      gathererConnector_->transfer(event, value, 0.);  
     2521      area.resize(ni,nj) ; 
     2522      area=CArray<double,2>(value.dataFirst(),shape(ni,nj),neverDeleteData) ;  
     2523    } 
     2524  } 
     2525  CATCH 
    22342526 
    22352527  void CDomain::sendDomainDistribution(CContextClient* client, const string& domainId) 
     
    23082600   
    23092601 
    2310   void CDomain::recvDomainDistribution(CEventServer& event) 
    2311   TRY 
    2312   { 
    2313     string domainId; 
    2314     int phasis ; 
    2315     for (auto& subEvent : event.subEvents) (*subEvent.buffer) >> domainId >> phasis ; 
    2316     get(domainId)->receivedDomainDistribution(event, phasis); 
    2317   } 
    2318   CATCH 
    2319  
    2320   void CDomain::receivedDomainDistribution(CEventServer& event, int phasis) 
    2321   TRY 
    2322   { 
    2323     CContext* context = CContext::getCurrent(); 
    2324     if (phasis==0) 
    2325     { 
    2326       localElement_ = new  CLocalElement(context->getIntraCommRank(),event) ; 
    2327       localElement_->addFullView() ; 
    2328     } 
    2329     else if (phasis==1) 
    2330     { 
    2331       CContext* context = CContext::getCurrent(); 
    2332       CDistributedElement* elementFrom = new  CDistributedElement(event) ; 
    2333       elementFrom->addFullView() ; 
    2334       gathererConnector_ = new CGathererConnector(elementFrom->getView(CElementView::FULL), localElement_->getView(CElementView::FULL)) ; 
    2335       gathererConnector_->computeConnector() ;  
    2336     } 
    2337     else if (phasis==2) 
    2338     { 
    2339       CArray<size_t,1> globalIndex ; 
    2340       //gathererConnector_->transfer(event,globalIndex) ; 
    2341       CGridGathererConnector gridGathererConnector({gathererConnector_}) ; 
    2342       gridGathererConnector.transfer(event, globalIndex) ; 
    2343     } 
    2344     else if (phasis==3) 
    2345     { 
    2346  
    2347     } 
    2348   } 
    2349   CATCH 
    2350  
     2602  
    23512603   
    23522604 
     
    27072959          return true; 
    27082960          break; 
     2961        case EVENT_ID_SEND_DISTRIBUTED_ATTRIBUTE: 
     2962          recvDistributedAttributes(event); 
     2963          return true; 
     2964          break;   
    27092965        default: 
    27102966          ERROR("bool CDomain::dispatchEvent(CEventServer& event)", 
Note: See TracChangeset for help on using the changeset viewer.