Changeset 1025 for XIOS/dev/dev_olga/src/node/field.cpp
- Timestamp:
- 01/11/17 15:14:22 (7 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_olga/src/node/field.cpp
r1021 r1025 23 23 #include "temporal_filter.hpp" 24 24 #include "spatial_transform_filter.hpp" 25 #include "file_server_writer_filter.hpp" 25 26 26 27 namespace xios{ … … 234 235 void CField::recvUpdateData(CEventServer& event) 235 236 { 236 vector<int> ranks; 237 vector<CBufferIn*> buffers; 237 std::map<int,CBufferIn*> rankBuffers; 238 238 239 239 list<CEventServer::SSubEvent>::iterator it; … … 245 245 CBufferIn* buffer = it->buffer; 246 246 *buffer >> fieldId; 247 ranks.push_back(rank); 248 buffers.push_back(buffer); 249 } 250 get(fieldId)->recvUpdateData(ranks,buffers); 247 rankBuffers[rank] = buffer; 248 } 249 get(fieldId)->recvUpdateData(rankBuffers); 251 250 } 252 251 253 void CField::recvUpdateData( vector<int>& ranks, vector<CBufferIn*>& buffers)252 void CField::recvUpdateData(std::map<int,CBufferIn*>& rankBuffers) 254 253 { 255 254 CContext* context = CContext::getCurrent(); 256 255 257 if (data_srv.empty()) 256 size_t sizeData = 0; 257 if (0 == recvDataSrv.numElements()) 258 { 259 for (map<int, CArray<size_t, 1> >::iterator it = grid->outIndexFromClient.begin(); it != grid->outIndexFromClient.end(); ++it) 260 { 261 sizeData += it->second.numElements(); 262 } 263 264 // Gather all data from different clients 265 recvDataSrv.resize(sizeData); 266 recvFoperationSrv = boost::shared_ptr<func::CFunctor>(new func::CInstant(recvDataSrv)); 267 } 268 269 CArray<double,1> recv_data_tmp(recvDataSrv.numElements()); 270 sizeData = 0; 271 const CDate& currDate = context->getCalendar()->getCurrentDate(); 272 const CDate opeDate = last_operation_srv +freq_op + freq_operation_srv - freq_op; 273 274 if (opeDate <= currDate) 258 275 { 259 276 for (map<int, CArray<size_t, 1> >::iterator it = grid->outIndexFromClient.begin(); it != grid->outIndexFromClient.end(); ++it) 260 { 261 int rank = it->first; 262 data_srv.insert(std::make_pair(rank, CArray<double,1>(it->second.numElements()))); 263 foperation_srv.insert(pair<int,boost::shared_ptr<func::CFunctor> >(rank,boost::shared_ptr<func::CFunctor>(new func::CInstant(data_srv[rank])))); 264 } 265 } 277 { 278 CArray<double,1> tmp; 279 *(rankBuffers[it->first]) >> tmp; 280 recv_data_tmp(Range(sizeData,sizeData+it->second.numElements()-1)) = tmp; 281 sizeData += it->second.numElements(); 282 } 283 } 284 285 this->setData(recv_data_tmp); 286 } 287 288 void CField::writeUpdateData(const CArray<double,1>& data) 289 { 290 CContext* context = CContext::getCurrent(); 266 291 267 292 const CDate& currDate = context->getCalendar()->getCurrentDate(); … … 271 296 if (opeDate <= currDate) 272 297 { 273 for (int n = 0; n < ranks.size(); n++) 274 { 275 CArray<double,1> data_tmp; 276 *buffers[n] >> data_tmp; 277 (*foperation_srv[ranks[n]])(data_tmp); 278 } 298 (*recvFoperationSrv)(data); 279 299 last_operation_srv = currDate; 300 // sendUpdateData(fieldData); 301 // Redirecting data to the correct secondary server 302 //int fileIdx = std::find(context->enabledFiles.begin(), context->enabledFiles.end(), this->file) - context->enabledFiles.begin(); 303 //int srvId = fileIdx % context->clientPrimServer.size(); 304 //sendUpdateData(fieldData, context->clientPrimServer[srvId]); 280 305 } 281 306 282 307 if (writeDate < (currDate + freq_operation_srv)) 283 308 { 284 for (int n = 0; n < ranks.size(); n++) 285 { 286 this->foperation_srv[ranks[n]]->final(); 287 } 288 309 recvFoperationSrv->final(); 289 310 last_Write_srv = writeDate; 290 }291 292 if (context->hasClient && context->hasServer)293 {294 size_t writtenSize;295 // if (field->getUseCompressedOutput())296 // writtenSize = grid->getNumberWrittenIndexes();297 // else298 writtenSize = grid->getWrittenDataSize();299 300 CArray<double,1> fieldData(writtenSize);301 // if (!field->default_value.isEmpty()) fieldData = field->default_value;302 303 // if (field->getUseCompressedOutput())304 // field->outputCompressedField(fieldData);305 // else306 this->outputField(fieldData);307 // sendUpdateData(fieldData);308 // Redirecting data to the correct secondary server309 int fileIdx = std::find(context->enabledFiles.begin(), context->enabledFiles.end(), this->file) - context->enabledFiles.begin();310 int srvId = fileIdx % context->clientPrimServer.size();311 sendUpdateData(fieldData, context->clientPrimServer[srvId]);312 }313 if (!context->hasClient && context->hasServer)314 {315 // size_t writtenSize;316 // if (this->getUseCompressedOutput())317 // writtenSize = grid->getNumberWrittenIndexes();318 // else319 // writtenSize = grid->getWrittenDataSize();320 //321 // CArray<double,1> fieldData(writtenSize);322 323 // if (this->getUseCompressedOutput())324 // this->outputCompressedField(fieldData);325 // else326 // this->outputField(fieldData);327 311 writeField(); 328 } 329 330 lastlast_Write_srv = last_Write_srv; 331 312 lastlast_Write_srv = last_Write_srv; 313 } 332 314 } 315 316 // void CField::recvUpdateData(vector<int>& ranks, vector<CBufferIn*>& buffers) 317 // { 318 // CContext* context = CContext::getCurrent(); 319 320 // if (data_srv.empty()) 321 // { 322 // for (map<int, CArray<size_t, 1> >::iterator it = grid->outIndexFromClient.begin(); it != grid->outIndexFromClient.end(); ++it) 323 // { 324 // int rank = it->first; 325 // data_srv.insert(std::make_pair(rank, CArray<double,1>(it->second.numElements()))); 326 // foperation_srv.insert(pair<int,boost::shared_ptr<func::CFunctor> >(rank,boost::shared_ptr<func::CFunctor>(new func::CInstant(data_srv[rank])))); 327 // } 328 // } 329 330 // const CDate& currDate = context->getCalendar()->getCurrentDate(); 331 // const CDate opeDate = last_operation_srv +freq_op + freq_operation_srv - freq_op; 332 // const CDate writeDate = last_Write_srv + freq_write_srv; 333 334 // if (opeDate <= currDate) 335 // { 336 // for (int n = 0; n < ranks.size(); n++) 337 // { 338 // CArray<double,1> data_tmp; 339 // *buffers[n] >> data_tmp; 340 // (*foperation_srv[ranks[n]])(data_tmp); 341 // } 342 // last_operation_srv = currDate; 343 // } 344 345 // if (writeDate < (currDate + freq_operation_srv)) 346 // { 347 // for (int n = 0; n < ranks.size(); n++) 348 // { 349 // this->foperation_srv[ranks[n]]->final(); 350 // } 351 352 // last_Write_srv = writeDate; 353 // } 354 355 // if (context->hasClient && context->hasServer) 356 // { 357 // size_t writtenSize; 358 // // if (field->getUseCompressedOutput()) 359 // // writtenSize = grid->getNumberWrittenIndexes(); 360 // // else 361 // writtenSize = grid->getWrittenDataSize(); 362 363 // CArray<double,1> fieldData(writtenSize); 364 // // if (!field->default_value.isEmpty()) fieldData = field->default_value; 365 366 // // if (field->getUseCompressedOutput()) 367 // // field->outputCompressedField(fieldData); 368 // // else 369 // this->outputField(fieldData); 370 // sendUpdateData(fieldData); 371 // } 372 // if (!context->hasClient && context->hasServer) 373 // { 374 // writeField(); 375 // } 376 377 // lastlast_Write_srv = last_Write_srv; 378 379 // } 333 380 334 381 void CField::writeField(void) … … 656 703 657 704 //---------------------------------------------------------------- 658 659 void CField::solveOnlyReferenceEnabledField(bool doSending2Server)660 {661 CContext* context = CContext::getCurrent();662 if (!isReferenceSolved)663 {664 isReferenceSolved = true;665 666 if (context->hasClient && !context->hasServer)667 // if (context->hasClient)668 {669 solveRefInheritance(true);670 if (hasDirectFieldReference()) getDirectFieldReference()->solveOnlyReferenceEnabledField(false);671 }672 // else if (context->hasServer)673 if (context->hasServer)674 solveServerOperation();675 676 solveGridReference();677 678 if (context->hasClient && !context->hasServer)679 // if (context->hasClient)680 {681 solveGenerateGrid();682 buildGridTransformationGraph();683 }684 }685 }686 705 687 706 /*! … … 784 803 } 785 804 } 786 805 806 void CField::solveAllEnabledFields() 807 { 808 CContext* context = CContext::getCurrent(); 809 bool hasClient = context->hasClient; 810 bool hasServer = context->hasServer; 811 812 if (!isReferenceSolved) 813 { 814 isReferenceSolved = true; 815 816 if (hasClient && !hasServer) 817 { 818 solveRefInheritance(true); 819 if (hasDirectFieldReference()) getDirectFieldReference()->solveAllEnabledFields(); 820 } 821 822 if (hasServer) 823 solveServerOperation(); 824 825 solveGridReference(); 826 827 if (hasClient && !hasServer) 828 { 829 solveGenerateGrid(); 830 buildGridTransformationGraph(); 831 } 832 833 solveGridDomainAxisRef(false); 834 835 if (hasClient && !hasServer) 836 { 837 solveTransformedGrid(); 838 } 839 840 solveGridDomainAxisRef(false); 841 } 842 } 843 844 void CField::checkGridOfEnabledFields() 845 { 846 solveCheckMaskIndex(false); 847 } 848 849 void CField::sendGridOfEnabledFields() 850 { 851 solveGridDomainAxisRef(true); 852 solveCheckMaskIndex(true); 853 } 854 855 856 void CField::solveOnlyReferenceEnabledField(bool doSending2Server) 857 { 858 CContext* context = CContext::getCurrent(); 859 if (!isReferenceSolved) 860 { 861 isReferenceSolved = true; 862 863 if (context->hasClient && !context->hasServer) 864 // if (context->hasClient) 865 { 866 solveRefInheritance(true); 867 if (hasDirectFieldReference()) getDirectFieldReference()->solveOnlyReferenceEnabledField(false); 868 } 869 // else if (context->hasServer) 870 if (context->hasServer) 871 solveServerOperation(); 872 873 solveGridReference(); 874 875 if (context->hasClient && !context->hasServer) 876 // if (context->hasClient) 877 { 878 solveGenerateGrid(); 879 buildGridTransformationGraph(); 880 } 881 } 882 } 883 787 884 void CField::solveAllReferenceEnabledField(bool doSending2Server) 788 885 { … … 794 891 areAllReferenceSolved = true; 795 892 796 //if (context->hasClient)893 // if (context->hasClient) 797 894 if (context->hasClient && !context->hasServer) 798 895 { … … 800 897 if (hasDirectFieldReference()) getDirectFieldReference()->solveAllReferenceEnabledField(false); 801 898 } 802 //else if (context->hasServer)803 if (context->hasServer && !context->hasClient)899 else if (context->hasServer) 900 // if (context->hasServer && !context->hasClient) 804 901 solveServerOperation(); 805 902 … … 885 982 void CField::buildFilterGraph(CGarbageCollector& gc, bool enableOutput) 886 983 { 887 if (!areAllReferenceSolved) solveAllReferenceEnabledField(false); 888 889 // Start by building a filter which can provide the field's instant data 890 if (!instantDataFilter) 891 { 892 // Check if we have an expression to parse 984 // if (!areAllReferenceSolved) solveAllReferenceEnabledField(false); 985 if (!isReferenceSolved) solveAllEnabledFields(); 986 CContext* context = CContext::getCurrent(); 987 bool hasWriterServer = context->hasServer && !context->hasClient; 988 bool hasIntermediateServer = context->hasServer && context->hasClient; 989 990 if (hasWriterServer) 991 { 992 if (!instantDataFilter) 993 instantDataFilter = clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(grid)); 994 995 // If the field data is to be read by the client or/and written to a file 996 if (enableOutput && !storeFilter && !fileWriterFilter) 997 { 998 if (file && (file->mode.isEmpty() || file->mode == CFile::mode_attr::write)) 999 { 1000 fileServerWriterFilter = boost::shared_ptr<CFileServerWriterFilter>(new CFileServerWriterFilter(gc, this)); 1001 instantDataFilter->connectOutput(fileServerWriterFilter, 0); 1002 } 1003 } 1004 } 1005 else if (hasIntermediateServer) 1006 { 1007 if (!instantDataFilter) 1008 instantDataFilter = clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(grid)); 1009 1010 // If the field data is to be read by the client or/and written to a file 1011 if (enableOutput && !storeFilter && !fileWriterFilter) 1012 { 1013 if (file && (file->mode.isEmpty() || file->mode == CFile::mode_attr::write)) 1014 { 1015 fileWriterFilter = boost::shared_ptr<CFileWriterFilter>(new CFileWriterFilter(gc, this)); 1016 instantDataFilter->connectOutput(fileWriterFilter, 0); 1017 } 1018 } 1019 } 1020 else 1021 { 1022 // Start by building a filter which can provide the field's instant data 1023 if (!instantDataFilter) 1024 { 1025 // Check if we have an expression to parse 893 1026 if (hasExpression()) 894 {1027 { 895 1028 boost::scoped_ptr<IFilterExprNode> expr(parseExpr(getExpression() + '\0')); 896 1029 boost::shared_ptr<COutputPin> filter = expr->reduce(gc, *this); … … 912 1045 913 1046 instantDataFilter = filter; 914 }915 // Check if we have a reference on another field916 else if (!field_ref.isEmpty())917 instantDataFilter = getFieldReference(gc);918 // Check if the data is to be read from a file919 else if (file && !file->mode.isEmpty() && file->mode == CFile::mode_attr::read)1047 } 1048 // Check if we have a reference on another field 1049 else if (!field_ref.isEmpty()) 1050 instantDataFilter = getFieldReference(gc); 1051 // Check if the data is to be read from a file 1052 else if (file && !file->mode.isEmpty() && file->mode == CFile::mode_attr::read) 920 1053 instantDataFilter = serverSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid, 921 1054 freq_offset.isEmpty() ? NoneDu : freq_offset, 922 1055 true)); 923 else // The data might be passed from the model1056 else // The data might be passed from the model 924 1057 instantDataFilter = clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid)); 925 } 926 927 // If the field data is to be read by the client or/and written to a file 928 if (enableOutput && !storeFilter && !fileWriterFilter) 929 { 930 if (!read_access.isEmpty() && read_access) 931 { 932 storeFilter = boost::shared_ptr<CStoreFilter>(new CStoreFilter(gc, CContext::getCurrent(), grid)); 933 instantDataFilter->connectOutput(storeFilter, 0); 934 } 935 936 if (file && (file->mode.isEmpty() || file->mode == CFile::mode_attr::write)) 937 { 938 fileWriterFilter = boost::shared_ptr<CFileWriterFilter>(new CFileWriterFilter(gc, this)); 939 getTemporalDataFilter(gc, file->output_freq)->connectOutput(fileWriterFilter, 0); 1058 } 1059 1060 // If the field data is to be read by the client or/and written to a file 1061 if (enableOutput && !storeFilter && !fileWriterFilter) 1062 { 1063 if (!read_access.isEmpty() && read_access) 1064 { 1065 storeFilter = boost::shared_ptr<CStoreFilter>(new CStoreFilter(gc, CContext::getCurrent(), grid)); 1066 instantDataFilter->connectOutput(storeFilter, 0); 1067 } 1068 1069 if (file && (file->mode.isEmpty() || file->mode == CFile::mode_attr::write)) 1070 { 1071 fileWriterFilter = boost::shared_ptr<CFileWriterFilter>(new CFileWriterFilter(gc, this)); 1072 getTemporalDataFilter(gc, file->output_freq)->connectOutput(fileWriterFilter, 0); 1073 } 940 1074 } 941 1075 } … … 1285 1419 void CField::outputField(CArray<double,1>& fieldOut) 1286 1420 { 1287 map<int, CArray<double,1> >::iterator it; 1288 1289 for (it = data_srv.begin(); it != data_srv.end(); it++) 1290 { 1291 grid->outputField(it->first, it->second, fieldOut.dataFirst()); 1292 } 1421 // map<int, CArray<double,1> >::iterator it; 1422 1423 // for (it = data_srv.begin(); it != data_srv.end(); it++) 1424 // { 1425 // grid->outputField(it->first, it->second, fieldOut.dataFirst()); 1426 // } 1427 grid->outputField(recvDataSrv, fieldOut); 1293 1428 } 1294 1429
Note: See TracChangeset
for help on using the changeset viewer.