Ignore:
Timestamp:
09/10/20 13:51:02 (4 years ago)
Author:
ymipsl
Message:

Big update on on going work related to data distribution and transfer between clients and servers.
Revisite of the source and store filter using "connectors".

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/node/axis.cpp

    r1918 r1930  
    344344 
    345345 
    346    void CAxis::initializeLocalElement(void) 
    347    { 
    348       // after checkAttribute index of size n 
    349       int rank = CContext::getCurrent()->getIntraCommRank() ; 
    350        
    351       CArray<size_t,1> ind(n) ; 
    352       for (int i=0;i<n;i++) ind(i)=index(i) ; 
    353  
    354       localElement_ = new CLocalElement(rank, n_glo, ind) ; 
    355    } 
    356  
    357    void CAxis::addFullView(void) 
    358    { 
    359       CArray<int,1> index(n) ; 
    360       for(int i=0; i<n ; i++) index(i)=i ; 
    361       localElement_ -> addView(CElementView::FULL, index) ; 
    362    } 
    363  
    364    void CAxis::addWorkflowView(void) 
    365    { 
    366      // mask + data are included into data_index 
    367      int nk=data_index.numElements() ; 
    368      int nMask=0 ; 
    369      for(int k=0;k<nk;k++) if (data_index(k)>=0 && data_index(k)<n) nMask++ ; 
    370       
    371      CArray<int,1> index(nMask) ; 
    372      nMask=0 ; 
    373      for(int k=0;k<nk;k++)  
    374        if (data_index(k)>=0 && data_index(k)<n)  
    375        { 
    376          index(nMask) = data_index(k) ; 
    377          nMask++ ; 
    378        } 
    379      localElement_ -> addView(CElementView::WORKFLOW, index) ; 
    380    } 
    381  
    382    void CAxis::addModelView(void) 
    383    { 
    384      // information for model view is stored in data_index 
    385      localElement_->addView(CElementView::MODEL, data_index) ; 
    386    } 
    387  
    388    void CAxis::computeModelToWorkflowConnector(void) 
    389    {  
    390      CLocalView* srcView=getLocalView(CElementView::MODEL) ; 
    391      CLocalView* dstView=getLocalView(CElementView::WORKFLOW) ; 
    392      modelToWorkflowConnector_ = new CLocalConnector(srcView, dstView);  
    393      modelToWorkflowConnector_->computeConnector() ; 
    394    } 
    395  
    396346   /*! 
    397347      Check the validity of data, fill in values if any, and apply mask. 
     
    543493           break; 
    544494         case EVENT_ID_DISTRIBUTED_ATTRIBUTES: 
     495           recvDistributedAttributes_old(event); 
     496           return true; 
     497           break; 
     498         case EVENT_ID_AXIS_DISTRIBUTION: 
     499           recvAxisDistribution(event); 
     500           return true; 
     501           break; 
     502         case EVENT_ID_SEND_DISTRIBUTED_ATTRIBUTE: 
    545503           recvDistributedAttributes(event); 
    546504           return true; 
     
    671629         || (index.numElements() != n_glo)) 
    672630     { 
    673        sendDistributedAttributes(client, axisId);        
     631       sendDistributedAttributes_old(client, axisId);        
    674632     } 
    675633     else 
     
    11711129    In future, if new attributes are added, they should also be processed in this function 
    11721130  */ 
    1173   void CAxis::sendDistributedAttributes(CContextClient* client, const string& axisId) 
     1131  void CAxis::sendDistributedAttributes_old(CContextClient* client, const string& axisId) 
    11741132  TRY 
    11751133  { 
     
    12781236    \param [in] event event containing data of these attributes 
    12791237  */ 
    1280   void CAxis::recvDistributedAttributes(CEventServer& event) 
     1238  void CAxis::recvDistributedAttributes_old(CEventServer& event) 
    12811239  TRY 
    12821240  { 
     
    12931251      buffers.push_back(buffer); 
    12941252    } 
    1295     get(axisId)->recvDistributedAttributes(ranks, buffers); 
     1253    get(axisId)->recvDistributedAttributes_old(ranks, buffers); 
    12961254  } 
    12971255  CATCH 
     
    13021260    \param [in] buffers buffer containing data sent from the sender 
    13031261  */ 
    1304   void CAxis::recvDistributedAttributes(vector<int>& ranks, vector<CBufferIn*> buffers) 
     1262  void CAxis::recvDistributedAttributes_old(vector<int>& ranks, vector<CBufferIn*> buffers) 
    13051263  TRY 
    13061264  { 
     
    15851543  CATCH_DUMP_ATTR 
    15861544 
     1545 
     1546   ////////////////////////////////////////////////////////////////////////////////////// 
     1547   //  this part is related to distribution, element definition, views and connectors  // 
     1548   ////////////////////////////////////////////////////////////////////////////////////// 
     1549 
     1550   void CAxis::initializeLocalElement(void) 
     1551   { 
     1552      // after checkAttribute index of size n 
     1553      int rank = CContext::getCurrent()->getIntraCommRank() ; 
     1554       
     1555      CArray<size_t,1> ind(n) ; 
     1556      for (int i=0;i<n;i++) ind(i)=index(i) ; 
     1557 
     1558      localElement_ = new CLocalElement(rank, n_glo, ind) ; 
     1559   } 
     1560 
     1561   void CAxis::addFullView(void) 
     1562   { 
     1563      CArray<int,1> index(n) ; 
     1564      for(int i=0; i<n ; i++) index(i)=i ; 
     1565      localElement_ -> addView(CElementView::FULL, index) ; 
     1566   } 
     1567 
     1568   void CAxis::addWorkflowView(void) 
     1569   { 
     1570     // mask + data are included into data_index 
     1571     int nk=data_index.numElements() ; 
     1572     int nMask=0 ; 
     1573     for(int k=0;k<nk;k++) if (data_index(k)>=0 && data_index(k)<n) nMask++ ; 
     1574      
     1575     CArray<int,1> index(nMask) ; 
     1576     nMask=0 ; 
     1577     for(int k=0;k<nk;k++)  
     1578       if (data_index(k)>=0 && data_index(k)<n)  
     1579       { 
     1580         index(nMask) = data_index(k) ; 
     1581         nMask++ ; 
     1582       } 
     1583     localElement_ -> addView(CElementView::WORKFLOW, index) ; 
     1584   } 
     1585 
     1586   void CAxis::addModelView(void) 
     1587   { 
     1588     // information for model view is stored in data_index 
     1589     localElement_->addView(CElementView::MODEL, data_index) ; 
     1590   } 
     1591 
     1592   void CAxis::computeModelToWorkflowConnector(void) 
     1593   {  
     1594     CLocalView* srcView=getLocalView(CElementView::MODEL) ; 
     1595     CLocalView* dstView=getLocalView(CElementView::WORKFLOW) ; 
     1596     modelToWorkflowConnector_ = new CLocalConnector(srcView, dstView);  
     1597     modelToWorkflowConnector_->computeConnector() ; 
     1598   } 
     1599 
     1600 
     1601   void CAxis::computeRemoteElement(CContextClient* client, EDistributionType type) 
     1602  { 
     1603    CContext* context = CContext::getCurrent(); 
     1604    map<int, CArray<size_t,1>> globalIndex ; 
     1605 
     1606    if (type==EDistributionType::BANDS) // Bands distribution to send to file server 
     1607    { 
     1608      int nbServer = client->serverSize; 
     1609      int nbClient = client->clientSize ; 
     1610      int rankClient = client->clientRank ; 
     1611      int size = nbServer / nbClient ; 
     1612      int start ; 
     1613      if (nbServer%nbClient > rankClient) 
     1614      { 
     1615       start = (size+1) * rankClient ; 
     1616       size++ ; 
     1617      } 
     1618      else start = size*rankClient + nbServer%nbClient ; 
     1619      
     1620      for(int i=0; i<size; i++) 
     1621      {  
     1622        int rank=start+i ;  
     1623        size_t indSize = n_glo/nbServer ; 
     1624        size_t indStart ; 
     1625        if (n_glo % nbServer > rank) 
     1626        { 
     1627          indStart = (indSize+1) * rank ; 
     1628          indSize++ ; 
     1629        } 
     1630        else indStart = indSize*rank + n_glo%nbServer ; 
     1631        
     1632        auto& globalInd =  globalIndex[rank] ; 
     1633        globalInd.resize(indSize) ; 
     1634        for(size_t n = 0 ; n<indSize; n++) globalInd(n)=indStart+n ; 
     1635      } 
     1636    } 
     1637    else if (type==EDistributionType::NONE) // domain is not distributed ie all servers get the same local domain 
     1638    { 
     1639      int nbServer = client->serverSize; 
     1640      size_t nglo=n_glo ; 
     1641      CArray<size_t,1> indGlo(nglo) ; 
     1642      for(size_t i=0;i<nglo;i++) indGlo(i) = i ; 
     1643      for (auto& rankServer : client->getRanksServerLeader()) globalIndex[rankServer].reference(indGlo.copy()); ;  
     1644    } 
     1645    remoteElement_[client] = new CDistributedElement(n_glo, globalIndex) ; 
     1646    remoteElement_[client]->addFullView() ; 
     1647  } 
     1648  
     1649  void CAxis::distributeToServer(CContextClient* client, std::map<int, CArray<size_t,1>>& globalIndex, const string& axisId) 
     1650  { 
     1651    string serverAxisId = axisId.empty() ? this->getId() : axisId ; 
     1652    CContext* context = CContext::getCurrent(); 
     1653 
     1654    this->sendAllAttributesToServer(client, serverAxisId)  ; 
     1655 
     1656    CDistributedElement scatteredElement(n_glo,globalIndex) ; 
     1657    scatteredElement.addFullView() ; 
     1658    CScattererConnector scattererConnector(localElement_->getView(CElementView::FULL), scatteredElement.getView(CElementView::FULL), context->getIntraComm()) ; 
     1659    scattererConnector.computeConnector() ; 
     1660     
     1661    // phase 0 
     1662    // send remote element to construct the full view on server, ie without hole  
     1663    CEventClient event0(getType(), EVENT_ID_AXIS_DISTRIBUTION); 
     1664    CMessage message0 ; 
     1665    message0<<serverAxisId<<0 ;  
     1666    remoteElement_[client]->sendToServer(client,event0,message0) ;  
     1667     
     1668    // phase 1 
     1669    // send the full view of element to construct the connector which connect distributed data coming from client to the full local view 
     1670    CEventClient event1(getType(), EVENT_ID_AXIS_DISTRIBUTION); 
     1671    CMessage message1 ; 
     1672    message1<<serverAxisId<<1<<localElement_->getView(CElementView::FULL)->getGlobalSize() ;  
     1673    scattererConnector.transfer(localElement_->getView(CElementView::FULL)->getGlobalIndex(),client,event1,message1) ; 
     1674 
     1675    sendDistributedAttributes(client, scattererConnector, axisId) ; 
     1676   
     1677    // phase 2 send the mask : data index + mask2D 
     1678    CArray<bool,1> maskIn(localElement_->getView(CElementView::WORKFLOW)->getSize()); 
     1679    CArray<bool,1> maskOut ; 
     1680    CLocalConnector workflowToFull(localElement_->getView(CElementView::WORKFLOW), localElement_->getView(CElementView::FULL)) ; 
     1681    workflowToFull.computeConnector() ; 
     1682    maskIn=true ; 
     1683    workflowToFull.transfer(maskIn,maskOut,false) ; 
     1684 
     1685    // phase 3 : prepare grid scatterer connector to send data from client to server 
     1686    map<int,CArray<size_t,1>> workflowGlobalIndex ; 
     1687    map<int,CArray<bool,1>> maskOut2 ;  
     1688    scattererConnector.transfer(maskOut, maskOut2) ; 
     1689    scatteredElement.addView(CElementView::WORKFLOW, maskOut2) ; 
     1690    scatteredElement.getView(CElementView::WORKFLOW)->getGlobalIndexView(workflowGlobalIndex) ; 
     1691    // create new workflow view for scattered element 
     1692    CDistributedElement clientToServerElement(scatteredElement.getGlobalSize(), workflowGlobalIndex) ; 
     1693    clientToServerElement.addFullView() ; 
     1694    CEventClient event2(getType(), EVENT_ID_AXIS_DISTRIBUTION); 
     1695    CMessage message2 ; 
     1696    message2<<serverAxisId<<2 ;  
     1697    clientToServerElement.sendToServer(client, event2, message2) ;  
     1698    clientToServerConnector_[client] = new CScattererConnector(localElement_->getView(CElementView::WORKFLOW), 
     1699                                                              clientToServerElement.getView(CElementView::FULL), context->getIntraComm()) ; 
     1700    clientToServerConnector_[client]->computeConnector() ; 
     1701 
     1702 
     1703    CEventClient event3(getType(), EVENT_ID_AXIS_DISTRIBUTION); 
     1704    CMessage message3 ; 
     1705    message3<<serverAxisId<<3 ;  
     1706    clientToServerConnector_[client]->transfer(maskIn,client,event3,message3) ;  
     1707 
     1708 
     1709 
     1710  } 
     1711 
     1712  void CAxis::recvAxisDistribution(CEventServer& event) 
     1713  TRY 
     1714  { 
     1715    string axisId; 
     1716    int phasis ; 
     1717    for (auto& subEvent : event.subEvents) (*subEvent.buffer) >> axisId >> phasis ; 
     1718    get(axisId)->receivedAxisDistribution(event, phasis); 
     1719  } 
     1720  CATCH 
     1721 
     1722 
     1723  void CAxis::receivedAxisDistribution(CEventServer& event, int phasis) 
     1724  TRY 
     1725  { 
     1726    CContext* context = CContext::getCurrent(); 
     1727    if (phasis==0) // receive the remote element to construct the full view 
     1728    { 
     1729      localElement_ = new  CLocalElement(context->getIntraCommRank(),event) ; 
     1730      localElement_->addFullView() ; 
     1731      // construct the local dimension and indexes 
     1732      auto& globalIndex=localElement_->getGlobalIndex() ; 
     1733      int nk=globalIndex.numElements() ; 
     1734      int minK=n_glo,maxK=-1 ; 
     1735      int nGlo=n_glo ; 
     1736      int indGlo ; 
     1737      for(int k=0;k<nk;k++) 
     1738      { 
     1739        indGlo=globalIndex(k) ; 
     1740        if (indGlo<minK) minK=indGlo ; 
     1741        if (indGlo>maxK) maxK=indGlo ; 
     1742      }   
     1743      if (maxK>=minK) { begin=minK ; n=maxK-minK+1 ; } 
     1744      else {begin=0; n=0 ;} 
     1745 
     1746    } 
     1747    else if (phasis==1) // receive the sent view from client to construct the full distributed full view on server 
     1748    { 
     1749      CContext* context = CContext::getCurrent(); 
     1750      CDistributedElement* elementFrom = new  CDistributedElement(event) ; 
     1751      elementFrom->addFullView() ; 
     1752      gathererConnector_ = new CGathererConnector(elementFrom->getView(CElementView::FULL), localElement_->getView(CElementView::FULL)) ; 
     1753      gathererConnector_->computeConnector() ;  
     1754    } 
     1755    else if (phasis==2) 
     1756    { 
     1757      delete gathererConnector_ ; 
     1758      elementFrom_ = new  CDistributedElement(event) ; 
     1759      elementFrom_->addFullView() ; 
     1760      gathererConnector_ =  new CGathererConnector(elementFrom_->getView(CElementView::FULL), localElement_->getView(CElementView::FULL)) ; 
     1761      gathererConnector_ -> computeConnector() ; 
     1762    } 
     1763    else if (phasis==3) 
     1764    { 
     1765      CArray<bool,1> localMask ; 
     1766      gathererConnector_->transfer(event,localMask,false) ; 
     1767      localElement_->addView(CElementView::WORKFLOW, localMask) ; 
     1768      mask.reference(localMask.copy()) ; 
     1769  
     1770      serverFromClientConnector_ = new CGathererConnector(elementFrom_->getView(CElementView::FULL), localElement_->getView(CElementView::WORKFLOW)) ; 
     1771      serverFromClientConnector_->computeConnector() ; 
     1772    } 
     1773  } 
     1774  CATCH 
     1775 
     1776  void CAxis::sendDistributedAttributes(CContextClient* client, CScattererConnector& scattererConnector, const string& axisId) 
     1777  { 
     1778    string serverAxisId = axisId.empty() ? this->getId() : axisId ; 
     1779    CContext* context = CContext::getCurrent(); 
     1780 
     1781    if (hasValue) 
     1782    { 
     1783      { // send level value 
     1784        CEventClient event(getType(), EVENT_ID_SEND_DISTRIBUTED_ATTRIBUTE); 
     1785        CMessage message ; 
     1786        message<<serverAxisId<<string("value") ;  
     1787        scattererConnector.transfer(value, client, event,message) ; 
     1788      } 
     1789    } 
     1790 
     1791    if (hasBounds) 
     1792    { 
     1793      { // send bounds level value 
     1794        CEventClient event(getType(), EVENT_ID_SEND_DISTRIBUTED_ATTRIBUTE); 
     1795        CMessage message ; 
     1796        message<<serverAxisId<<string("bounds") ;  
     1797        scattererConnector.transfer(2, bounds, client, event,message) ; 
     1798      } 
     1799    } 
     1800 
     1801    if (hasLabel) 
     1802    { 
     1803      { // send label 
     1804        CEventClient event(getType(), EVENT_ID_SEND_DISTRIBUTED_ATTRIBUTE); 
     1805        CMessage message ; 
     1806        message<<serverAxisId<<string("label") ; 
     1807        // something to do ? => convert string label into char ? 
     1808        //clientToServerConnector_[client]->transfer(2, bounds, client, event,message) ; 
     1809      } 
     1810    } 
     1811  } 
     1812 
     1813  void CAxis::recvDistributedAttributes(CEventServer& event) 
     1814  TRY 
     1815  { 
     1816    string axisId; 
     1817    string type ; 
     1818    for (auto& subEvent : event.subEvents) (*subEvent.buffer) >> axisId >> type ; 
     1819    get(axisId)->recvDistributedAttributes(event, type); 
     1820  } 
     1821  CATCH 
     1822 
     1823  void CAxis::recvDistributedAttributes(CEventServer& event, const string& type) 
     1824  TRY 
     1825  { 
     1826    if (type=="value")  
     1827    { 
     1828      gathererConnector_->transfer(event, value, 0.);  
     1829    } 
     1830    else if (type=="bounds") 
     1831    { 
     1832      CArray<double,1> value ; 
     1833      gathererConnector_->transfer(event, 2, value, 0.);  
     1834      bounds.resize(2,n) ; 
     1835      bounds=CArray<double,2>(bounds.dataFirst(),shape(2,n),neverDeleteData) ;  
     1836    } 
     1837    else if (type=="label") 
     1838    { 
     1839        
     1840    } 
     1841  } 
     1842  CATCH 
     1843 
    15871844  DEFINE_REF_FUNC(Axis,axis) 
    15881845 
Note: See TracChangeset for help on using the changeset viewer.