New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/branches/2020/dev_r13296_HPC-07_mocavero_mpi3/src/OCE/LBC – NEMO

source: NEMO/branches/2020/dev_r13296_HPC-07_mocavero_mpi3/src/OCE/LBC/lib_mpp.F90 @ 13571

Last change on this file since 13571 was 13571, checked in by mocavero, 4 years ago

Align branch with trunk

  • Property svn:keywords set to Id
File size: 73.9 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   load_nml      : Read, condense and buffer namelist file into character array for use as an internal file
35   !!----------------------------------------------------------------------
36   !!----------------------------------------------------------------------
37   !!   mpp_start     : get local communicator its size and rank
38   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
39   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
40   !!   mpprecv       :
41   !!   mppsend       :
42   !!   mppscatter    :
43   !!   mppgather     :
44   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
45   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
46   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
47   !!   mpp_minloc    :
48   !!   mpp_maxloc    :
49   !!   mppsync       :
50   !!   mppstop       :
51   !!   mpp_ini_north : initialisation of north fold
52   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
53   !!   mpp_bcast_nml : broadcast/receive namelist character buffer from reading process to all others
54   !!----------------------------------------------------------------------
55   USE dom_oce        ! ocean space and time domain
56   USE in_out_manager ! I/O manager
57
58   IMPLICIT NONE
59   PRIVATE
60   !
61   PUBLIC   ctl_stop, ctl_warn, ctl_opn, ctl_nam, load_nml
62   PUBLIC   mpp_start, mppstop, mppsync, mpp_comm_free
63   PUBLIC   mpp_ini_north
64   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
65   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
66   PUBLIC   mppscatter, mppgather
67   PUBLIC   mpp_ini_znl
68   PUBLIC   mpp_ini_nc
69   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
70   PUBLIC   mppsend_sp, mpprecv_sp                          ! needed by TAM and ICB routines
71   PUBLIC   mppsend_dp, mpprecv_dp                          ! needed by TAM and ICB routines
72   PUBLIC   mpp_report
73   PUBLIC   mpp_bcast_nml
74   PUBLIC   tic_tac
75#if ! defined key_mpp_mpi
76   PUBLIC MPI_wait
77   PUBLIC MPI_Wtime
78#endif
79   
80   !! * Interfaces
81   !! define generic interface for these routine as they are called sometimes
82   !! with scalar arguments instead of array arguments, which causes problems
83   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
84   INTERFACE mpp_min
85      MODULE PROCEDURE mppmin_a_int, mppmin_int
86      MODULE PROCEDURE mppmin_a_real_sp, mppmin_real_sp
87      MODULE PROCEDURE mppmin_a_real_dp, mppmin_real_dp
88   END INTERFACE
89   INTERFACE mpp_max
90      MODULE PROCEDURE mppmax_a_int, mppmax_int
91      MODULE PROCEDURE mppmax_a_real_sp, mppmax_real_sp
92      MODULE PROCEDURE mppmax_a_real_dp, mppmax_real_dp
93   END INTERFACE
94   INTERFACE mpp_sum
95      MODULE PROCEDURE mppsum_a_int, mppsum_int
96      MODULE PROCEDURE mppsum_realdd, mppsum_a_realdd
97      MODULE PROCEDURE mppsum_a_real_sp, mppsum_real_sp
98      MODULE PROCEDURE mppsum_a_real_dp, mppsum_real_dp
99   END INTERFACE
100   INTERFACE mpp_minloc
101      MODULE PROCEDURE mpp_minloc2d_sp ,mpp_minloc3d_sp
102      MODULE PROCEDURE mpp_minloc2d_dp ,mpp_minloc3d_dp
103   END INTERFACE
104   INTERFACE mpp_maxloc
105      MODULE PROCEDURE mpp_maxloc2d_sp ,mpp_maxloc3d_sp
106      MODULE PROCEDURE mpp_maxloc2d_dp ,mpp_maxloc3d_dp
107   END INTERFACE
108
109   !! ========================= !!
110   !!  MPI  variable definition !!
111   !! ========================= !!
112#if   defined key_mpp_mpi
113!$AGRIF_DO_NOT_TREAT
114   INCLUDE 'mpif.h'
115!$AGRIF_END_DO_NOT_TREAT
116   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
117#else   
118   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
119   INTEGER, PUBLIC, PARAMETER ::   MPI_REAL = 4
120   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
121   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
122#endif
123
124   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
125
126   INTEGER, PUBLIC ::   mppsize        ! number of process
127   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
128!$AGRIF_DO_NOT_TREAT
129   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
130!$AGRIF_END_DO_NOT_TREAT
131
132   INTEGER :: MPI_SUMDD
133
134   ! variables used for zonal integration
135   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
136   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
137   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
138   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
139   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
140
141   ! variables used for MPI3 neighbourhood collectives
142   INTEGER, PUBLIC :: mpi_nc_com                   ! MPI3 neighbourhood collectives communicator
143   INTEGER, PUBLIC :: mpi_nc_all_com               ! MPI3 neighbourhood collectives communicator (with diagionals)
144
145   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
146   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
147   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
148   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
149   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
150   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
151   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
152   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
153   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
154
155   ! Communications summary report
156   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
157   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
158   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
159   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
160   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
161   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
162   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
163   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
164   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
165   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
166   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
167   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
168   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
169   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
170   !: name (used as id) of allreduce-delayed operations
171   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
172   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
173   !: component name where the allreduce-delayed operation is performed
174   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
175   TYPE, PUBLIC ::   DELAYARR
176      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
177      COMPLEX(dp), POINTER, DIMENSION(:) ::  y1d => NULL()
178   END TYPE DELAYARR
179   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
180   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
181
182   ! timing summary report
183   REAL(dp), DIMENSION(2), PUBLIC ::  waiting_time = 0._dp
184   REAL(dp)              , PUBLIC ::  compute_time = 0._dp, elapsed_time = 0._dp
185   
186   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
187
188   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
189   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
190   
191   !! * Substitutions
192#  include "do_loop_substitute.h90"
193   !!----------------------------------------------------------------------
194   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
195   !! $Id$
196   !! Software governed by the CeCILL license (see ./LICENSE)
197   !!----------------------------------------------------------------------
198CONTAINS
199
200   SUBROUTINE mpp_start( localComm )
201      !!----------------------------------------------------------------------
202      !!                  ***  routine mpp_start  ***
203      !!
204      !! ** Purpose :   get mpi_comm_oce, mpprank and mppsize
205      !!----------------------------------------------------------------------
206      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
207      !
208      INTEGER ::   ierr
209      LOGICAL ::   llmpi_init
210      !!----------------------------------------------------------------------
211#if defined key_mpp_mpi
212      !
213      CALL mpi_initialized ( llmpi_init, ierr )
214      IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_initialized' )
215
216      IF( .NOT. llmpi_init ) THEN
217         IF( PRESENT(localComm) ) THEN
218            WRITE(ctmp1,*) ' lib_mpp: You cannot provide a local communicator '
219            WRITE(ctmp2,*) '          without calling MPI_Init before ! '
220            CALL ctl_stop( 'STOP', ctmp1, ctmp2 )
221         ENDIF
222         CALL mpi_init( ierr )
223         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_init' )
224      ENDIF
225       
226      IF( PRESENT(localComm) ) THEN
227         IF( Agrif_Root() ) THEN
228            mpi_comm_oce = localComm
229         ENDIF
230      ELSE
231         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, ierr)
232         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_comm_dup' )
233      ENDIF
234
235# if defined key_agrif
236      IF( Agrif_Root() ) THEN
237         CALL Agrif_MPI_Init(mpi_comm_oce)
238      ELSE
239         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
240      ENDIF
241# endif
242
243      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
244      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
245      !
246      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
247      !
248#else
249      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
250      mppsize = 1
251      mpprank = 0
252#endif
253   END SUBROUTINE mpp_start
254
255
256   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
257      !!----------------------------------------------------------------------
258      !!                  ***  routine mppsend  ***
259      !!
260      !! ** Purpose :   Send messag passing array
261      !!
262      !!----------------------------------------------------------------------
263      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
264      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
265      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
266      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
267      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
268      !!
269      INTEGER ::   iflag
270      INTEGER :: mpi_working_type
271      !!----------------------------------------------------------------------
272      !
273#if defined key_mpp_mpi
274      IF (wp == dp) THEN
275         mpi_working_type = mpi_double_precision
276      ELSE
277         mpi_working_type = mpi_real
278      END IF
279      CALL mpi_isend( pmess, kbytes, mpi_working_type, kdest , ktyp, mpi_comm_oce, md_req, iflag )
280#endif
281      !
282   END SUBROUTINE mppsend
283
284
285   SUBROUTINE mppsend_dp( ktyp, pmess, kbytes, kdest, md_req )
286      !!----------------------------------------------------------------------
287      !!                  ***  routine mppsend  ***
288      !!
289      !! ** Purpose :   Send messag passing array
290      !!
291      !!----------------------------------------------------------------------
292      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
293      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
294      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
295      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
296      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
297      !!
298      INTEGER ::   iflag
299      !!----------------------------------------------------------------------
300      !
301#if defined key_mpp_mpi
302      CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
303#endif
304      !
305   END SUBROUTINE mppsend_dp
306
307
308   SUBROUTINE mppsend_sp( ktyp, pmess, kbytes, kdest, md_req )
309      !!----------------------------------------------------------------------
310      !!                  ***  routine mppsend  ***
311      !!
312      !! ** Purpose :   Send messag passing array
313      !!
314      !!----------------------------------------------------------------------
315      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
316      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
317      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
318      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
319      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
320      !!
321      INTEGER ::   iflag
322      !!----------------------------------------------------------------------
323      !
324#if defined key_mpp_mpi
325      CALL mpi_isend( pmess, kbytes, mpi_real, kdest , ktyp, mpi_comm_oce, md_req, iflag )
326#endif
327      !
328   END SUBROUTINE mppsend_sp
329
330
331   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
332      !!----------------------------------------------------------------------
333      !!                  ***  routine mpprecv  ***
334      !!
335      !! ** Purpose :   Receive messag passing array
336      !!
337      !!----------------------------------------------------------------------
338      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
339      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
340      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
341      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
342      !!
343      INTEGER :: istatus(mpi_status_size)
344      INTEGER :: iflag
345      INTEGER :: use_source
346      INTEGER :: mpi_working_type
347      !!----------------------------------------------------------------------
348      !
349#if defined key_mpp_mpi
350      ! If a specific process number has been passed to the receive call,
351      ! use that one. Default is to use mpi_any_source
352      use_source = mpi_any_source
353      IF( PRESENT(ksource) )   use_source = ksource
354      !
355      IF (wp == dp) THEN
356         mpi_working_type = mpi_double_precision
357      ELSE
358         mpi_working_type = mpi_real
359      END IF
360      CALL mpi_recv( pmess, kbytes, mpi_working_type, use_source, ktyp, mpi_comm_oce, istatus, iflag )
361#endif
362      !
363   END SUBROUTINE mpprecv
364
365   SUBROUTINE mpprecv_dp( ktyp, pmess, kbytes, ksource )
366      !!----------------------------------------------------------------------
367      !!                  ***  routine mpprecv  ***
368      !!
369      !! ** Purpose :   Receive messag passing array
370      !!
371      !!----------------------------------------------------------------------
372      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
373      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
374      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
375      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
376      !!
377      INTEGER :: istatus(mpi_status_size)
378      INTEGER :: iflag
379      INTEGER :: use_source
380      !!----------------------------------------------------------------------
381      !
382#if defined key_mpp_mpi
383      ! If a specific process number has been passed to the receive call,
384      ! use that one. Default is to use mpi_any_source
385      use_source = mpi_any_source
386      IF( PRESENT(ksource) )   use_source = ksource
387      !
388      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
389#endif
390      !
391   END SUBROUTINE mpprecv_dp
392
393
394   SUBROUTINE mpprecv_sp( ktyp, pmess, kbytes, ksource )
395      !!----------------------------------------------------------------------
396      !!                  ***  routine mpprecv  ***
397      !!
398      !! ** Purpose :   Receive messag passing array
399      !!
400      !!----------------------------------------------------------------------
401      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
402      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
403      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
404      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
405      !!
406      INTEGER :: istatus(mpi_status_size)
407      INTEGER :: iflag
408      INTEGER :: use_source
409      !!----------------------------------------------------------------------
410      !
411#if defined key_mpp_mpi
412      ! If a specific process number has been passed to the receive call,
413      ! use that one. Default is to use mpi_any_source
414      use_source = mpi_any_source
415      IF( PRESENT(ksource) )   use_source = ksource
416      !
417      CALL mpi_recv( pmess, kbytes, mpi_real, use_source, ktyp, mpi_comm_oce, istatus, iflag )
418#endif
419      !
420   END SUBROUTINE mpprecv_sp
421
422
423   SUBROUTINE mppgather( ptab, kp, pio )
424      !!----------------------------------------------------------------------
425      !!                   ***  routine mppgather  ***
426      !!
427      !! ** Purpose :   Transfert between a local subdomain array and a work
428      !!     array which is distributed following the vertical level.
429      !!
430      !!----------------------------------------------------------------------
431      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
432      INTEGER                           , INTENT(in   ) ::   kp     ! record length
433      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
434      !!
435      INTEGER :: itaille, ierror   ! temporary integer
436      !!---------------------------------------------------------------------
437      !
438      itaille = jpi * jpj
439#if defined key_mpp_mpi
440      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
441         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
442#else
443      pio(:,:,1) = ptab(:,:)
444#endif
445      !
446   END SUBROUTINE mppgather
447
448
449   SUBROUTINE mppscatter( pio, kp, ptab )
450      !!----------------------------------------------------------------------
451      !!                  ***  routine mppscatter  ***
452      !!
453      !! ** Purpose :   Transfert between awork array which is distributed
454      !!      following the vertical level and the local subdomain array.
455      !!
456      !!----------------------------------------------------------------------
457      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
458      INTEGER                             ::   kp     ! Tag (not used with MPI
459      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
460      !!
461      INTEGER :: itaille, ierror   ! temporary integer
462      !!---------------------------------------------------------------------
463      !
464      itaille = jpi * jpj
465      !
466#if defined key_mpp_mpi
467      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
468         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
469#else
470      ptab(:,:) = pio(:,:,1)
471#endif
472      !
473   END SUBROUTINE mppscatter
474
475   
476   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
477     !!----------------------------------------------------------------------
478      !!                   ***  routine mpp_delay_sum  ***
479      !!
480      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
481      !!
482      !!----------------------------------------------------------------------
483      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
484      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
485      COMPLEX(dp),      INTENT(in   ), DIMENSION(:) ::   y_in
486      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
487      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
488      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
489      !!
490      INTEGER ::   ji, isz
491      INTEGER ::   idvar
492      INTEGER ::   ierr, ilocalcomm
493      COMPLEX(dp), ALLOCATABLE, DIMENSION(:) ::   ytmp
494      !!----------------------------------------------------------------------
495#if defined key_mpp_mpi
496      ilocalcomm = mpi_comm_oce
497      IF( PRESENT(kcom) )   ilocalcomm = kcom
498
499      isz = SIZE(y_in)
500     
501      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
502
503      idvar = -1
504      DO ji = 1, nbdelay
505         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
506      END DO
507      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
508
509      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
510         !                                       --------------------------
511         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
512            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
513            DEALLOCATE(todelay(idvar)%z1d)
514            ndelayid(idvar) = -1                                      ! do as if we had no restart
515         ELSE
516            ALLOCATE(todelay(idvar)%y1d(isz))
517            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
518         END IF
519      ENDIF
520     
521      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
522         !                                       --------------------------
523         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
524         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
525         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
526      ENDIF
527
528      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
529
530      ! send back pout from todelay(idvar)%z1d defined at previous call
531      pout(:) = todelay(idvar)%z1d(:)
532
533      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
534# if defined key_mpi2
535      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
536      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )
537      ndelayid(idvar) = 1
538      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
539# else
540      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
541# endif
542#else
543      pout(:) = REAL(y_in(:), wp)
544#endif
545
546   END SUBROUTINE mpp_delay_sum
547
548   
549   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
550      !!----------------------------------------------------------------------
551      !!                   ***  routine mpp_delay_max  ***
552      !!
553      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
554      !!
555      !!----------------------------------------------------------------------
556      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
557      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
558      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
559      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
560      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
561      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
562      !!
563      INTEGER ::   ji, isz
564      INTEGER ::   idvar
565      INTEGER ::   ierr, ilocalcomm
566      INTEGER ::   MPI_TYPE
567      !!----------------------------------------------------------------------
568     
569#if defined key_mpp_mpi
570      if( wp == dp ) then
571         MPI_TYPE = MPI_DOUBLE_PRECISION
572      else if ( wp == sp ) then
573         MPI_TYPE = MPI_REAL
574      else
575        CALL ctl_stop( "Error defining type, wp is neither dp nor sp" )
576   
577      end if
578
579      ilocalcomm = mpi_comm_oce
580      IF( PRESENT(kcom) )   ilocalcomm = kcom
581
582      isz = SIZE(p_in)
583
584      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
585
586      idvar = -1
587      DO ji = 1, nbdelay
588         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
589      END DO
590      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
591
592      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
593         !                                       --------------------------
594         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
595            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
596            DEALLOCATE(todelay(idvar)%z1d)
597            ndelayid(idvar) = -1                                      ! do as if we had no restart
598         END IF
599      ENDIF
600
601      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
602         !                                       --------------------------
603         ALLOCATE(todelay(idvar)%z1d(isz))
604         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
605      ENDIF
606
607      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
608
609      ! send back pout from todelay(idvar)%z1d defined at previous call
610      pout(:) = todelay(idvar)%z1d(:)
611
612      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
613# if defined key_mpi2
614      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
615      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
616      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
617# else
618      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
619# endif
620#else
621      pout(:) = p_in(:)
622#endif
623
624   END SUBROUTINE mpp_delay_max
625
626   
627   SUBROUTINE mpp_delay_rcv( kid )
628      !!----------------------------------------------------------------------
629      !!                   ***  routine mpp_delay_rcv  ***
630      !!
631      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
632      !!
633      !!----------------------------------------------------------------------
634      INTEGER,INTENT(in   )      ::  kid 
635      INTEGER ::   ierr
636      !!----------------------------------------------------------------------
637#if defined key_mpp_mpi
638      IF( ndelayid(kid) /= -2 ) THEN 
639#if ! defined key_mpi2
640         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
641         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
642         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
643#endif
644         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
645         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
646      ENDIF
647#endif
648   END SUBROUTINE mpp_delay_rcv
649
650   SUBROUTINE mpp_bcast_nml( cdnambuff , kleng )
651      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
652      INTEGER                          , INTENT(INOUT) :: kleng
653      !!----------------------------------------------------------------------
654      !!                  ***  routine mpp_bcast_nml  ***
655      !!
656      !! ** Purpose :   broadcast namelist character buffer
657      !!
658      !!----------------------------------------------------------------------
659      !!
660      INTEGER ::   iflag
661      !!----------------------------------------------------------------------
662      !
663#if defined key_mpp_mpi
664      call MPI_BCAST(kleng, 1, MPI_INT, 0, mpi_comm_oce, iflag)
665      call MPI_BARRIER(mpi_comm_oce, iflag)
666!$AGRIF_DO_NOT_TREAT
667      IF ( .NOT. ALLOCATED(cdnambuff) ) ALLOCATE( CHARACTER(LEN=kleng) :: cdnambuff )
668!$AGRIF_END_DO_NOT_TREAT
669      call MPI_BCAST(cdnambuff, kleng, MPI_CHARACTER, 0, mpi_comm_oce, iflag)
670      call MPI_BARRIER(mpi_comm_oce, iflag)
671#endif
672      !
673   END SUBROUTINE mpp_bcast_nml
674
675   
676   !!----------------------------------------------------------------------
677   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
678   !!   
679   !!----------------------------------------------------------------------
680   !!
681#  define OPERATION_MAX
682#  define INTEGER_TYPE
683#  define DIM_0d
684#     define ROUTINE_ALLREDUCE           mppmax_int
685#     include "mpp_allreduce_generic.h90"
686#     undef ROUTINE_ALLREDUCE
687#  undef DIM_0d
688#  define DIM_1d
689#     define ROUTINE_ALLREDUCE           mppmax_a_int
690#     include "mpp_allreduce_generic.h90"
691#     undef ROUTINE_ALLREDUCE
692#  undef DIM_1d
693#  undef INTEGER_TYPE
694!
695   !!
696   !!   ----   SINGLE PRECISION VERSIONS
697   !!
698#  define SINGLE_PRECISION
699#  define REAL_TYPE
700#  define DIM_0d
701#     define ROUTINE_ALLREDUCE           mppmax_real_sp
702#     include "mpp_allreduce_generic.h90"
703#     undef ROUTINE_ALLREDUCE
704#  undef DIM_0d
705#  define DIM_1d
706#     define ROUTINE_ALLREDUCE           mppmax_a_real_sp
707#     include "mpp_allreduce_generic.h90"
708#     undef ROUTINE_ALLREDUCE
709#  undef DIM_1d
710#  undef SINGLE_PRECISION
711   !!
712   !!
713   !!   ----   DOUBLE PRECISION VERSIONS
714   !!
715!
716#  define DIM_0d
717#     define ROUTINE_ALLREDUCE           mppmax_real_dp
718#     include "mpp_allreduce_generic.h90"
719#     undef ROUTINE_ALLREDUCE
720#  undef DIM_0d
721#  define DIM_1d
722#     define ROUTINE_ALLREDUCE           mppmax_a_real_dp
723#     include "mpp_allreduce_generic.h90"
724#     undef ROUTINE_ALLREDUCE
725#  undef DIM_1d
726#  undef REAL_TYPE
727#  undef OPERATION_MAX
728   !!----------------------------------------------------------------------
729   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
730   !!   
731   !!----------------------------------------------------------------------
732   !!
733#  define OPERATION_MIN
734#  define INTEGER_TYPE
735#  define DIM_0d
736#     define ROUTINE_ALLREDUCE           mppmin_int
737#     include "mpp_allreduce_generic.h90"
738#     undef ROUTINE_ALLREDUCE
739#  undef DIM_0d
740#  define DIM_1d
741#     define ROUTINE_ALLREDUCE           mppmin_a_int
742#     include "mpp_allreduce_generic.h90"
743#     undef ROUTINE_ALLREDUCE
744#  undef DIM_1d
745#  undef INTEGER_TYPE
746!
747   !!
748   !!   ----   SINGLE PRECISION VERSIONS
749   !!
750#  define SINGLE_PRECISION
751#  define REAL_TYPE
752#  define DIM_0d
753#     define ROUTINE_ALLREDUCE           mppmin_real_sp
754#     include "mpp_allreduce_generic.h90"
755#     undef ROUTINE_ALLREDUCE
756#  undef DIM_0d
757#  define DIM_1d
758#     define ROUTINE_ALLREDUCE           mppmin_a_real_sp
759#     include "mpp_allreduce_generic.h90"
760#     undef ROUTINE_ALLREDUCE
761#  undef DIM_1d
762#  undef SINGLE_PRECISION
763   !!
764   !!   ----   DOUBLE PRECISION VERSIONS
765   !!
766
767#  define DIM_0d
768#     define ROUTINE_ALLREDUCE           mppmin_real_dp
769#     include "mpp_allreduce_generic.h90"
770#     undef ROUTINE_ALLREDUCE
771#  undef DIM_0d
772#  define DIM_1d
773#     define ROUTINE_ALLREDUCE           mppmin_a_real_dp
774#     include "mpp_allreduce_generic.h90"
775#     undef ROUTINE_ALLREDUCE
776#  undef DIM_1d
777#  undef REAL_TYPE
778#  undef OPERATION_MIN
779
780   !!----------------------------------------------------------------------
781   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
782   !!   
783   !!   Global sum of 1D array or a variable (integer, real or complex)
784   !!----------------------------------------------------------------------
785   !!
786#  define OPERATION_SUM
787#  define INTEGER_TYPE
788#  define DIM_0d
789#     define ROUTINE_ALLREDUCE           mppsum_int
790#     include "mpp_allreduce_generic.h90"
791#     undef ROUTINE_ALLREDUCE
792#  undef DIM_0d
793#  define DIM_1d
794#     define ROUTINE_ALLREDUCE           mppsum_a_int
795#     include "mpp_allreduce_generic.h90"
796#     undef ROUTINE_ALLREDUCE
797#  undef DIM_1d
798#  undef INTEGER_TYPE
799
800   !!
801   !!   ----   SINGLE PRECISION VERSIONS
802   !!
803#  define OPERATION_SUM
804#  define SINGLE_PRECISION
805#  define REAL_TYPE
806#  define DIM_0d
807#     define ROUTINE_ALLREDUCE           mppsum_real_sp
808#     include "mpp_allreduce_generic.h90"
809#     undef ROUTINE_ALLREDUCE
810#  undef DIM_0d
811#  define DIM_1d
812#     define ROUTINE_ALLREDUCE           mppsum_a_real_sp
813#     include "mpp_allreduce_generic.h90"
814#     undef ROUTINE_ALLREDUCE
815#  undef DIM_1d
816#  undef REAL_TYPE
817#  undef OPERATION_SUM
818
819#  undef SINGLE_PRECISION
820
821   !!
822   !!   ----   DOUBLE PRECISION VERSIONS
823   !!
824#  define OPERATION_SUM
825#  define REAL_TYPE
826#  define DIM_0d
827#     define ROUTINE_ALLREDUCE           mppsum_real_dp
828#     include "mpp_allreduce_generic.h90"
829#     undef ROUTINE_ALLREDUCE
830#  undef DIM_0d
831#  define DIM_1d
832#     define ROUTINE_ALLREDUCE           mppsum_a_real_dp
833#     include "mpp_allreduce_generic.h90"
834#     undef ROUTINE_ALLREDUCE
835#  undef DIM_1d
836#  undef REAL_TYPE
837#  undef OPERATION_SUM
838
839#  define OPERATION_SUM_DD
840#  define COMPLEX_TYPE
841#  define DIM_0d
842#     define ROUTINE_ALLREDUCE           mppsum_realdd
843#     include "mpp_allreduce_generic.h90"
844#     undef ROUTINE_ALLREDUCE
845#  undef DIM_0d
846#  define DIM_1d
847#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
848#     include "mpp_allreduce_generic.h90"
849#     undef ROUTINE_ALLREDUCE
850#  undef DIM_1d
851#  undef COMPLEX_TYPE
852#  undef OPERATION_SUM_DD
853
854   !!----------------------------------------------------------------------
855   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
856   !!   
857   !!----------------------------------------------------------------------
858   !!
859   !!
860   !!   ----   SINGLE PRECISION VERSIONS
861   !!
862#  define SINGLE_PRECISION
863#  define OPERATION_MINLOC
864#  define DIM_2d
865#     define ROUTINE_LOC           mpp_minloc2d_sp
866#     include "mpp_loc_generic.h90"
867#     undef ROUTINE_LOC
868#  undef DIM_2d
869#  define DIM_3d
870#     define ROUTINE_LOC           mpp_minloc3d_sp
871#     include "mpp_loc_generic.h90"
872#     undef ROUTINE_LOC
873#  undef DIM_3d
874#  undef OPERATION_MINLOC
875
876#  define OPERATION_MAXLOC
877#  define DIM_2d
878#     define ROUTINE_LOC           mpp_maxloc2d_sp
879#     include "mpp_loc_generic.h90"
880#     undef ROUTINE_LOC
881#  undef DIM_2d
882#  define DIM_3d
883#     define ROUTINE_LOC           mpp_maxloc3d_sp
884#     include "mpp_loc_generic.h90"
885#     undef ROUTINE_LOC
886#  undef DIM_3d
887#  undef OPERATION_MAXLOC
888#  undef SINGLE_PRECISION
889   !!
890   !!   ----   DOUBLE PRECISION VERSIONS
891   !!
892#  define OPERATION_MINLOC
893#  define DIM_2d
894#     define ROUTINE_LOC           mpp_minloc2d_dp
895#     include "mpp_loc_generic.h90"
896#     undef ROUTINE_LOC
897#  undef DIM_2d
898#  define DIM_3d
899#     define ROUTINE_LOC           mpp_minloc3d_dp
900#     include "mpp_loc_generic.h90"
901#     undef ROUTINE_LOC
902#  undef DIM_3d
903#  undef OPERATION_MINLOC
904
905#  define OPERATION_MAXLOC
906#  define DIM_2d
907#     define ROUTINE_LOC           mpp_maxloc2d_dp
908#     include "mpp_loc_generic.h90"
909#     undef ROUTINE_LOC
910#  undef DIM_2d
911#  define DIM_3d
912#     define ROUTINE_LOC           mpp_maxloc3d_dp
913#     include "mpp_loc_generic.h90"
914#     undef ROUTINE_LOC
915#  undef DIM_3d
916#  undef OPERATION_MAXLOC
917
918
919   SUBROUTINE mppsync()
920      !!----------------------------------------------------------------------
921      !!                  ***  routine mppsync  ***
922      !!
923      !! ** Purpose :   Massively parallel processors, synchroneous
924      !!
925      !!-----------------------------------------------------------------------
926      INTEGER :: ierror
927      !!-----------------------------------------------------------------------
928      !
929#if defined key_mpp_mpi
930      CALL mpi_barrier( mpi_comm_oce, ierror )
931#endif
932      !
933   END SUBROUTINE mppsync
934
935
936   SUBROUTINE mppstop( ld_abort ) 
937      !!----------------------------------------------------------------------
938      !!                  ***  routine mppstop  ***
939      !!
940      !! ** purpose :   Stop massively parallel processors method
941      !!
942      !!----------------------------------------------------------------------
943      LOGICAL, OPTIONAL, INTENT(in) :: ld_abort    ! source process number
944      LOGICAL ::   ll_abort
945      INTEGER ::   info
946      !!----------------------------------------------------------------------
947      ll_abort = .FALSE.
948      IF( PRESENT(ld_abort) ) ll_abort = ld_abort
949      !
950#if defined key_mpp_mpi
951      IF(ll_abort) THEN
952         CALL mpi_abort( MPI_COMM_WORLD )
953      ELSE
954         CALL mppsync
955         CALL mpi_finalize( info )
956      ENDIF
957#endif
958      IF( ll_abort ) STOP 123
959      !
960   END SUBROUTINE mppstop
961
962
963   SUBROUTINE mpp_comm_free( kcom )
964      !!----------------------------------------------------------------------
965      INTEGER, INTENT(in) ::   kcom
966      !!
967      INTEGER :: ierr
968      !!----------------------------------------------------------------------
969      !
970#if defined key_mpp_mpi
971      CALL MPI_COMM_FREE(kcom, ierr)
972#endif
973      !
974   END SUBROUTINE mpp_comm_free
975
976
977   SUBROUTINE mpp_ini_znl( kumout )
978      !!----------------------------------------------------------------------
979      !!               ***  routine mpp_ini_znl  ***
980      !!
981      !! ** Purpose :   Initialize special communicator for computing zonal sum
982      !!
983      !! ** Method  : - Look for processors in the same row
984      !!              - Put their number in nrank_znl
985      !!              - Create group for the znl processors
986      !!              - Create a communicator for znl processors
987      !!              - Determine if processor should write znl files
988      !!
989      !! ** output
990      !!      ndim_rank_znl = number of processors on the same row
991      !!      ngrp_znl = group ID for the znl processors
992      !!      ncomm_znl = communicator for the ice procs.
993      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
994      !!
995      !!----------------------------------------------------------------------
996      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
997      !
998      INTEGER :: jproc      ! dummy loop integer
999      INTEGER :: ierr, ii   ! local integer
1000      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
1001      !!----------------------------------------------------------------------
1002#if defined key_mpp_mpi
1003      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
1004      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
1005      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
1006      !
1007      ALLOCATE( kwork(jpnij), STAT=ierr )
1008      IF( ierr /= 0 ) CALL ctl_stop( 'STOP', 'mpp_ini_znl : failed to allocate 1D array of length jpnij')
1009
1010      IF( jpnj == 1 ) THEN
1011         ngrp_znl  = ngrp_world
1012         ncomm_znl = mpi_comm_oce
1013      ELSE
1014         !
1015         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
1016         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
1017         !-$$        CALL flush(numout)
1018         !
1019         ! Count number of processors on the same row
1020         ndim_rank_znl = 0
1021         DO jproc=1,jpnij
1022            IF ( kwork(jproc) == njmpp ) THEN
1023               ndim_rank_znl = ndim_rank_znl + 1
1024            ENDIF
1025         END DO
1026         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
1027         !-$$        CALL flush(numout)
1028         ! Allocate the right size to nrank_znl
1029         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
1030         ALLOCATE(nrank_znl(ndim_rank_znl))
1031         ii = 0
1032         nrank_znl (:) = 0
1033         DO jproc=1,jpnij
1034            IF ( kwork(jproc) == njmpp) THEN
1035               ii = ii + 1
1036               nrank_znl(ii) = jproc -1
1037            ENDIF
1038         END DO
1039         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
1040         !-$$        CALL flush(numout)
1041
1042         ! Create the opa group
1043         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
1044         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
1045         !-$$        CALL flush(numout)
1046
1047         ! Create the znl group from the opa group
1048         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
1049         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
1050         !-$$        CALL flush(numout)
1051
1052         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
1053         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
1054         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
1055         !-$$        CALL flush(numout)
1056         !
1057      END IF
1058
1059      ! Determines if processor if the first (starting from i=1) on the row
1060      IF ( jpni == 1 ) THEN
1061         l_znl_root = .TRUE.
1062      ELSE
1063         l_znl_root = .FALSE.
1064         kwork (1) = nimpp
1065         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
1066         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1067      END IF
1068
1069      DEALLOCATE(kwork)
1070#endif
1071
1072   END SUBROUTINE mpp_ini_znl
1073
1074   SUBROUTINE mpp_ini_nc
1075      !!----------------------------------------------------------------------
1076      !!               ***  routine mpp_ini_nc  ***
1077      !!
1078      !! ** Purpose :   Initialize special communicators for MPI3 neighbourhood
1079      !!                collectives
1080      !!
1081      !! ** Method  : - Create graph communicators starting from the processes   
1082      !!                distribution along i and j directions
1083      !
1084      !! ** output
1085      !!         mpi_nc_com = MPI3 neighbourhood collectives communicator
1086      !!         mpi_nc_all_com = MPI3 neighbourhood collectives communicator
1087      !!                          (with diagonals)
1088      !!
1089      !!----------------------------------------------------------------------
1090      INTEGER, DIMENSION(:), ALLOCATABLE :: ineigh, ineighalls, ineighallr
1091      INTEGER :: ideg, idegalls, idegallr, icont, icont1
1092      INTEGER :: ierr
1093      LOGICAL, PARAMETER :: ireord = .FALSE.
1094
1095#if defined key_mpp_mpi
1096
1097      ideg = 0
1098      idegalls = 0
1099      idegallr = 0
1100      icont = 0
1101      icont1 = 0
1102
1103      IF (nbondi .eq. 1) THEN
1104         ideg = ideg + 1
1105      ELSEIF (nbondi .eq. -1) THEN
1106         ideg = ideg + 1
1107      ELSEIF (nbondi .eq. 0) THEN
1108         ideg = ideg + 2
1109      ENDIF
1110
1111      IF (nbondj .eq. 1) THEN
1112         ideg = ideg + 1
1113      ELSEIF (nbondj .eq. -1) THEN
1114         ideg = ideg + 1
1115      ELSEIF (nbondj .eq. 0) THEN
1116         ideg = ideg + 2
1117      ENDIF
1118
1119      idegalls = ideg
1120      idegallr = ideg
1121
1122      IF (nones .ne. -1) idegalls = idegalls + 1
1123      IF (nonws .ne. -1) idegalls = idegalls + 1
1124      IF (noses .ne. -1) idegalls = idegalls + 1
1125      IF (nosws .ne. -1) idegalls = idegalls + 1
1126      IF (noner .ne. -1) idegallr = idegallr + 1
1127      IF (nonwr .ne. -1) idegallr = idegallr + 1
1128      IF (noser .ne. -1) idegallr = idegallr + 1
1129      IF (noswr .ne. -1) idegallr = idegallr + 1
1130
1131      ALLOCATE(ineigh(ideg))
1132      ALLOCATE(ineighalls(idegalls))
1133      ALLOCATE(ineighallr(idegallr))
1134
1135      IF (nbondi .eq. 1) THEN
1136         icont = icont + 1
1137         ineigh(icont) = nowe
1138         ineighalls(icont) = nowe
1139         ineighallr(icont) = nowe
1140      ELSEIF (nbondi .eq. -1) THEN
1141         icont = icont + 1
1142         ineigh(icont) = noea
1143         ineighalls(icont) = noea
1144         ineighallr(icont) = noea
1145      ELSEIF (nbondi .eq. 0) THEN
1146         icont = icont + 1
1147         ineigh(icont) = nowe
1148         ineighalls(icont) = nowe
1149         ineighallr(icont) = nowe
1150         icont = icont + 1
1151         ineigh(icont) = noea
1152         ineighalls(icont) = noea
1153         ineighallr(icont) = noea
1154      ENDIF
1155
1156      IF (nbondj .eq. 1) THEN
1157         icont = icont + 1
1158         ineigh(icont) = noso
1159         ineighalls(icont) = noso
1160         ineighallr(icont) = noso
1161      ELSEIF (nbondj .eq. -1) THEN
1162         icont = icont + 1
1163         ineigh(icont) = nono
1164         ineighalls(icont) = nono
1165         ineighallr(icont) = nono
1166      ELSEIF (nbondj .eq. 0) THEN
1167         icont = icont + 1
1168         ineigh(icont) = noso
1169         ineighalls(icont) = noso
1170         ineighallr(icont) = noso
1171         icont = icont + 1
1172         ineigh(icont) = nono
1173         ineighalls(icont) = nono
1174         ineighallr(icont) = nono
1175      ENDIF
1176
1177      icont1 = icont
1178      IF (nosws .ne. -1) THEN
1179         icont = icont + 1
1180         ineighalls(icont) = nosws
1181      ENDIF
1182      IF (noses .ne. -1) THEN
1183         icont = icont + 1
1184         ineighalls(icont) = noses
1185      ENDIF
1186      IF (nonws .ne. -1) THEN
1187         icont = icont + 1
1188         ineighalls(icont) = nonws
1189      ENDIF
1190      IF (nones .ne. -1) THEN
1191         icont = icont + 1
1192         ineighalls(icont) = nones
1193      ENDIF
1194      IF (noswr .ne. -1) THEN
1195         icont1 = icont1 + 1
1196         ineighallr(icont1) = noswr
1197      ENDIF
1198      IF (noser .ne. -1) THEN
1199         icont1 = icont1 + 1
1200         ineighallr(icont1) = noser
1201      ENDIF
1202      IF (nonwr .ne. -1) THEN
1203         icont1 = icont1 + 1
1204         ineighallr(icont1) = nonwr
1205      ENDIF
1206      IF (noner .ne. -1) THEN
1207         icont1 = icont1 + 1
1208         ineighallr(icont1) = noner
1209      ENDIF
1210
1211      CALL MPI_Dist_graph_create_adjacent(mpi_comm_oce, ideg, ineigh, MPI_UNWEIGHTED, ideg, ineigh, MPI_UNWEIGHTED, MPI_INFO_NULL, ireord, mpi_nc_com, ierr)
1212      CALL MPI_Dist_graph_create_adjacent(mpi_comm_oce, idegallr, ineighallr, MPI_UNWEIGHTED, idegalls, ineighalls, MPI_UNWEIGHTED, MPI_INFO_NULL, ireord, mpi_nc_all_com, ierr)
1213
1214      DEALLOCATE (ineigh)
1215      DEALLOCATE (ineighalls)
1216      DEALLOCATE (ineighallr)
1217#endif
1218   END SUBROUTINE mpp_ini_nc
1219
1220
1221
1222   SUBROUTINE mpp_ini_north
1223      !!----------------------------------------------------------------------
1224      !!               ***  routine mpp_ini_north  ***
1225      !!
1226      !! ** Purpose :   Initialize special communicator for north folding
1227      !!      condition together with global variables needed in the mpp folding
1228      !!
1229      !! ** Method  : - Look for northern processors
1230      !!              - Put their number in nrank_north
1231      !!              - Create groups for the world processors and the north processors
1232      !!              - Create a communicator for northern processors
1233      !!
1234      !! ** output
1235      !!      njmppmax = njmpp for northern procs
1236      !!      ndim_rank_north = number of processors in the northern line
1237      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1238      !!      ngrp_world = group ID for the world processors
1239      !!      ngrp_north = group ID for the northern processors
1240      !!      ncomm_north = communicator for the northern procs.
1241      !!      north_root = number (in the world) of proc 0 in the northern comm.
1242      !!
1243      !!----------------------------------------------------------------------
1244      INTEGER ::   ierr
1245      INTEGER ::   jjproc
1246      INTEGER ::   ii, ji
1247      !!----------------------------------------------------------------------
1248      !
1249#if defined key_mpp_mpi
1250      njmppmax = MAXVAL( njmppt )
1251      !
1252      ! Look for how many procs on the northern boundary
1253      ndim_rank_north = 0
1254      DO jjproc = 1, jpni
1255         IF( nfproc(jjproc) /= -1 )   ndim_rank_north = ndim_rank_north + 1
1256      END DO
1257      !
1258      ! Allocate the right size to nrank_north
1259      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
1260      ALLOCATE( nrank_north(ndim_rank_north) )
1261
1262      ! Fill the nrank_north array with proc. number of northern procs.
1263      ! Note : the rank start at 0 in MPI
1264      ii = 0
1265      DO ji = 1, jpni
1266         IF ( nfproc(ji) /= -1   ) THEN
1267            ii=ii+1
1268            nrank_north(ii)=nfproc(ji)
1269         END IF
1270      END DO
1271      !
1272      ! create the world group
1273      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
1274      !
1275      ! Create the North group from the world group
1276      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1277      !
1278      ! Create the North communicator , ie the pool of procs in the north group
1279      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
1280      !
1281#endif
1282   END SUBROUTINE mpp_ini_north
1283
1284
1285   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1286      !!---------------------------------------------------------------------
1287      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1288      !!
1289      !!   Modification of original codes written by David H. Bailey
1290      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1291      !!---------------------------------------------------------------------
1292      INTEGER                     , INTENT(in)    ::   ilen, itype
1293      COMPLEX(dp), DIMENSION(ilen), INTENT(in)    ::   ydda
1294      COMPLEX(dp), DIMENSION(ilen), INTENT(inout) ::   yddb
1295      !
1296      REAL(dp) :: zerr, zt1, zt2    ! local work variables
1297      INTEGER  :: ji, ztmp           ! local scalar
1298      !!---------------------------------------------------------------------
1299      !
1300      ztmp = itype   ! avoid compilation warning
1301      !
1302      DO ji=1,ilen
1303      ! Compute ydda + yddb using Knuth's trick.
1304         zt1  = real(ydda(ji)) + real(yddb(ji))
1305         zerr = zt1 - real(ydda(ji))
1306         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1307                + aimag(ydda(ji)) + aimag(yddb(ji))
1308
1309         ! The result is zt1 + zt2, after normalization.
1310         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1311      END DO
1312      !
1313   END SUBROUTINE DDPDD_MPI
1314
1315
1316   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
1317      !!----------------------------------------------------------------------
1318      !!                  ***  routine mpp_report  ***
1319      !!
1320      !! ** Purpose :   report use of mpp routines per time-setp
1321      !!
1322      !!----------------------------------------------------------------------
1323      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1324      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1325      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
1326      !!
1327      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
1328      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
1329      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
1330      !!----------------------------------------------------------------------
1331#if defined key_mpp_mpi
1332      !
1333      ll_lbc = .FALSE.
1334      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1335      ll_glb = .FALSE.
1336      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1337      ll_dlg = .FALSE.
1338      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
1339      !
1340      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1341      ncom_freq = ncom_fsbc
1342      !
1343      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1344         IF( ll_lbc ) THEN
1345            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1346            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1347            n_sequence_lbc = n_sequence_lbc + 1
1348            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1349            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1350            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1351            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1352         ENDIF
1353         IF( ll_glb ) THEN
1354            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1355            n_sequence_glb = n_sequence_glb + 1
1356            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1357            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1358         ENDIF
1359         IF( ll_dlg ) THEN
1360            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
1361            n_sequence_dlg = n_sequence_dlg + 1
1362            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1363            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
1364         ENDIF
1365      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1366         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1367         WRITE(numcom,*) ' '
1368         WRITE(numcom,*) ' ------------------------------------------------------------'
1369         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1370         WRITE(numcom,*) ' ------------------------------------------------------------'
1371         WRITE(numcom,*) ' '
1372         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1373         jj = 0; jk = 0; jf = 0; jh = 0
1374         DO ji = 1, n_sequence_lbc
1375            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1376            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1377            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1378            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1379         END DO
1380         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1381         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1382         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1383         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1384         WRITE(numcom,*) ' '
1385         WRITE(numcom,*) ' lbc_lnk called'
1386         DO ji = 1, n_sequence_lbc - 1
1387            IF ( crname_lbc(ji) /= 'already counted' ) THEN
1388               ccountname = crname_lbc(ji)
1389               crname_lbc(ji) = 'already counted'
1390               jcount = 1
1391               DO jj = ji + 1, n_sequence_lbc
1392                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1393                     jcount = jcount + 1
1394                     crname_lbc(jj) = 'already counted'
1395                  END IF
1396               END DO
1397               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
1398            END IF
1399         END DO
1400         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
1401            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(ncom_rec_max))
1402         END IF
1403         WRITE(numcom,*) ' '
1404         IF ( n_sequence_glb > 0 ) THEN
1405            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1406            jj = 1
1407            DO ji = 2, n_sequence_glb
1408               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1409                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1410                  jj = 0
1411               END IF
1412               jj = jj + 1 
1413            END DO
1414            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1415            DEALLOCATE(crname_glb)
1416         ELSE
1417            WRITE(numcom,*) ' No MPI global communication '
1418         ENDIF
1419         WRITE(numcom,*) ' '
1420         IF ( n_sequence_dlg > 0 ) THEN
1421            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1422            jj = 1
1423            DO ji = 2, n_sequence_dlg
1424               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1425                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1426                  jj = 0
1427               END IF
1428               jj = jj + 1 
1429            END DO
1430            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1431            DEALLOCATE(crname_dlg)
1432         ELSE
1433            WRITE(numcom,*) ' No MPI delayed global communication '
1434         ENDIF
1435         WRITE(numcom,*) ' '
1436         WRITE(numcom,*) ' -----------------------------------------------'
1437         WRITE(numcom,*) ' '
1438         DEALLOCATE(ncomm_sequence)
1439         DEALLOCATE(crname_lbc)
1440      ENDIF
1441#endif
1442   END SUBROUTINE mpp_report
1443
1444   
1445   SUBROUTINE tic_tac (ld_tic, ld_global)
1446
1447    LOGICAL,           INTENT(IN) :: ld_tic
1448    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1449    REAL(dp), DIMENSION(2), SAVE :: tic_wt
1450    REAL(dp),               SAVE :: tic_ct = 0._dp
1451    INTEGER :: ii
1452#if defined key_mpp_mpi
1453
1454    IF( ncom_stp <= nit000 ) RETURN
1455    IF( ncom_stp == nitend ) RETURN
1456    ii = 1
1457    IF( PRESENT( ld_global ) ) THEN
1458       IF( ld_global ) ii = 2
1459    END IF
1460   
1461    IF ( ld_tic ) THEN
1462       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1463       IF ( tic_ct > 0.0_dp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1464    ELSE
1465       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1466       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1467    ENDIF
1468#endif
1469   
1470   END SUBROUTINE tic_tac
1471
1472#if ! defined key_mpp_mpi
1473   SUBROUTINE mpi_wait(request, status, ierror)
1474      INTEGER                            , INTENT(in   ) ::   request
1475      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1476      INTEGER                            , INTENT(  out) ::   ierror
1477   END SUBROUTINE mpi_wait
1478
1479   
1480   FUNCTION MPI_Wtime()
1481      REAL(wp) ::  MPI_Wtime
1482      MPI_Wtime = -1.
1483   END FUNCTION MPI_Wtime
1484#endif
1485
1486   !!----------------------------------------------------------------------
1487   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam, load_nml   routines
1488   !!----------------------------------------------------------------------
1489
1490   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1491      &                 cd6, cd7, cd8, cd9, cd10 )
1492      !!----------------------------------------------------------------------
1493      !!                  ***  ROUTINE  stop_opa  ***
1494      !!
1495      !! ** Purpose :   print in ocean.outpput file a error message and
1496      !!                increment the error number (nstop) by one.
1497      !!----------------------------------------------------------------------
1498      CHARACTER(len=*), INTENT(in   )           ::   cd1
1499      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::        cd2, cd3, cd4, cd5
1500      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::   cd6, cd7, cd8, cd9, cd10
1501      !
1502      CHARACTER(LEN=8) ::   clfmt            ! writing format
1503      INTEGER          ::   inum
1504      !!----------------------------------------------------------------------
1505      !
1506      nstop = nstop + 1
1507      !
1508      IF( cd1 == 'STOP' .AND. narea /= 1 ) THEN    ! Immediate stop: add an arror message in 'ocean.output' file
1509         CALL ctl_opn( inum, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1510         WRITE(inum,*)
1511         WRITE(inum,*) ' ==>>>   Look for "E R R O R" messages in all existing *ocean.output* files'
1512         CLOSE(inum)
1513      ENDIF
1514      IF( numout == 6 ) THEN                       ! force to open ocean.output file if not already opened
1515         CALL ctl_opn( numout, 'ocean.output', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, -1, .FALSE., narea )
1516      ENDIF
1517      !
1518                            WRITE(numout,*)
1519                            WRITE(numout,*) ' ===>>> : E R R O R'
1520                            WRITE(numout,*)
1521                            WRITE(numout,*) '         ==========='
1522                            WRITE(numout,*)
1523                            WRITE(numout,*) TRIM(cd1)
1524      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1525      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1526      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1527      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1528      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1529      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1530      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1531      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1532      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1533                            WRITE(numout,*)
1534      !
1535                               CALL FLUSH(numout    )
1536      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1537      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1538      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1539      !
1540      IF( cd1 == 'STOP' ) THEN
1541         WRITE(numout,*) 
1542         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1543         WRITE(numout,*) 
1544         CALL FLUSH(numout)
1545         CALL SLEEP(60)   ! make sure that all output and abort files are written by all cores. 60s should be enough...
1546         CALL mppstop( ld_abort = .true. )
1547      ENDIF
1548      !
1549   END SUBROUTINE ctl_stop
1550
1551
1552   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1553      &                 cd6, cd7, cd8, cd9, cd10 )
1554      !!----------------------------------------------------------------------
1555      !!                  ***  ROUTINE  stop_warn  ***
1556      !!
1557      !! ** Purpose :   print in ocean.outpput file a error message and
1558      !!                increment the warning number (nwarn) by one.
1559      !!----------------------------------------------------------------------
1560      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1561      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1562      !!----------------------------------------------------------------------
1563      !
1564      nwarn = nwarn + 1
1565      !
1566      IF(lwp) THEN
1567                               WRITE(numout,*)
1568                               WRITE(numout,*) ' ===>>> : W A R N I N G'
1569                               WRITE(numout,*)
1570                               WRITE(numout,*) '         ==============='
1571                               WRITE(numout,*)
1572         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1573         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1574         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1575         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1576         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1577         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1578         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1579         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1580         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1581         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1582                               WRITE(numout,*)
1583      ENDIF
1584      CALL FLUSH(numout)
1585      !
1586   END SUBROUTINE ctl_warn
1587
1588
1589   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1590      !!----------------------------------------------------------------------
1591      !!                  ***  ROUTINE ctl_opn  ***
1592      !!
1593      !! ** Purpose :   Open file and check if required file is available.
1594      !!
1595      !! ** Method  :   Fortan open
1596      !!----------------------------------------------------------------------
1597      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1598      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1599      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1600      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1601      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1602      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1603      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1604      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1605      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1606      !
1607      CHARACTER(len=80) ::   clfile
1608      CHARACTER(LEN=10) ::   clfmt            ! writing format
1609      INTEGER           ::   iost
1610      INTEGER           ::   idg              ! number of digits
1611      !!----------------------------------------------------------------------
1612      !
1613      ! adapt filename
1614      ! ----------------
1615      clfile = TRIM(cdfile)
1616      IF( PRESENT( karea ) ) THEN
1617         IF( karea > 1 ) THEN
1618            ! Warning: jpnij is maybe not already defined when calling ctl_opn -> use mppsize instead of jpnij
1619            idg = MAX( INT(LOG10(REAL(MAX(1,mppsize-1),wp))) + 1, 4 )      ! how many digits to we need to write? min=4, max=9
1620            WRITE(clfmt, "('(a,a,i', i1, '.', i1, ')')") idg, idg          ! '(a,a,ix.x)'
1621            WRITE(clfile, clfmt) TRIM(clfile), '_', karea-1
1622         ENDIF
1623      ENDIF
1624#if defined key_agrif
1625      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1626      knum=Agrif_Get_Unit()
1627#else
1628      knum=get_unit()
1629#endif
1630      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1631      !
1632      IF(       cdacce(1:6) == 'DIRECT' )  THEN   ! cdacce has always more than 6 characters
1633         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1634      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1635         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1636      ELSE
1637         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1638      ENDIF
1639      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1640         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
1641      IF( iost == 0 ) THEN
1642         IF(ldwp .AND. kout > 0) THEN
1643            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1644            WRITE(kout,*) '     unit   = ', knum
1645            WRITE(kout,*) '     status = ', cdstat
1646            WRITE(kout,*) '     form   = ', cdform
1647            WRITE(kout,*) '     access = ', cdacce
1648            WRITE(kout,*)
1649         ENDIF
1650      ENDIF
1651100   CONTINUE
1652      IF( iost /= 0 ) THEN
1653         WRITE(ctmp1,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1654         WRITE(ctmp2,*) ' =======   ===  '
1655         WRITE(ctmp3,*) '           unit   = ', knum
1656         WRITE(ctmp4,*) '           status = ', cdstat
1657         WRITE(ctmp5,*) '           form   = ', cdform
1658         WRITE(ctmp6,*) '           access = ', cdacce
1659         WRITE(ctmp7,*) '           iostat = ', iost
1660         WRITE(ctmp8,*) '           we stop. verify the file '
1661         CALL ctl_stop( 'STOP', ctmp1, ctmp2, ctmp3, ctmp4, ctmp5, ctmp6, ctmp7, ctmp8 )
1662      ENDIF
1663      !
1664   END SUBROUTINE ctl_opn
1665
1666
1667   SUBROUTINE ctl_nam ( kios, cdnam )
1668      !!----------------------------------------------------------------------
1669      !!                  ***  ROUTINE ctl_nam  ***
1670      !!
1671      !! ** Purpose :   Informations when error while reading a namelist
1672      !!
1673      !! ** Method  :   Fortan open
1674      !!----------------------------------------------------------------------
1675      INTEGER                                , INTENT(inout) ::   kios    ! IO status after reading the namelist
1676      CHARACTER(len=*)                       , INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1677      !
1678      CHARACTER(len=5) ::   clios   ! string to convert iostat in character for print
1679      !!----------------------------------------------------------------------
1680      !
1681      WRITE (clios, '(I5.0)')   kios
1682      IF( kios < 0 ) THEN         
1683         CALL ctl_warn( 'end of record or file while reading namelist '   &
1684            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1685      ENDIF
1686      !
1687      IF( kios > 0 ) THEN
1688         CALL ctl_stop( 'misspelled variable in namelist '   &
1689            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1690      ENDIF
1691      kios = 0
1692      !
1693   END SUBROUTINE ctl_nam
1694
1695
1696   INTEGER FUNCTION get_unit()
1697      !!----------------------------------------------------------------------
1698      !!                  ***  FUNCTION  get_unit  ***
1699      !!
1700      !! ** Purpose :   return the index of an unused logical unit
1701      !!----------------------------------------------------------------------
1702      LOGICAL :: llopn
1703      !!----------------------------------------------------------------------
1704      !
1705      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1706      llopn = .TRUE.
1707      DO WHILE( (get_unit < 998) .AND. llopn )
1708         get_unit = get_unit + 1
1709         INQUIRE( unit = get_unit, opened = llopn )
1710      END DO
1711      IF( (get_unit == 999) .AND. llopn ) THEN
1712         CALL ctl_stop( 'STOP', 'get_unit: All logical units until 999 are used...' )
1713      ENDIF
1714      !
1715   END FUNCTION get_unit
1716
1717   SUBROUTINE load_nml( cdnambuff , cdnamfile, kout, ldwp)
1718      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
1719      CHARACTER(LEN=*), INTENT(IN )                :: cdnamfile
1720      CHARACTER(LEN=256)                           :: chline
1721      CHARACTER(LEN=1)                             :: csp
1722      INTEGER, INTENT(IN)                          :: kout
1723      LOGICAL, INTENT(IN)                          :: ldwp  !: .true. only for the root broadcaster
1724      INTEGER                                      :: itot, iun, iltc, inl, ios, itotsav
1725      !
1726      !csp = NEW_LINE('A')
1727      ! a new line character is the best seperator but some systems (e.g.Cray)
1728      ! seem to terminate namelist reads from internal files early if they
1729      ! encounter new-lines. Use a single space for safety.
1730      csp = ' '
1731      !
1732      ! Check if the namelist buffer has already been allocated. Return if it has.
1733      !
1734      IF ( ALLOCATED( cdnambuff ) ) RETURN
1735      IF( ldwp ) THEN
1736         !
1737         ! Open namelist file
1738         !
1739         CALL ctl_opn( iun, cdnamfile, 'OLD', 'FORMATTED', 'SEQUENTIAL', -1, kout, ldwp )
1740         !
1741         ! First pass: count characters excluding comments and trimable white space
1742         !
1743         itot=0
1744     10  READ(iun,'(A256)',END=20,ERR=20) chline
1745         iltc = LEN_TRIM(chline)
1746         IF ( iltc.GT.0 ) THEN
1747          inl = INDEX(chline, '!') 
1748          IF( inl.eq.0 ) THEN
1749           itot = itot + iltc + 1                                ! +1 for the newline character
1750          ELSEIF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl-1) ).GT.0 ) THEN
1751           itot = itot + inl                                  !  includes +1 for the newline character
1752          ENDIF
1753         ENDIF
1754         GOTO 10
1755     20  CONTINUE
1756         !
1757         ! Allocate text cdnambuff for condensed namelist
1758         !
1759!$AGRIF_DO_NOT_TREAT
1760         ALLOCATE( CHARACTER(LEN=itot) :: cdnambuff )
1761!$AGRIF_END_DO_NOT_TREAT
1762         itotsav = itot
1763         !
1764         ! Second pass: read and transfer pruned characters into cdnambuff
1765         !
1766         REWIND(iun)
1767         itot=1
1768     30  READ(iun,'(A256)',END=40,ERR=40) chline
1769         iltc = LEN_TRIM(chline)
1770         IF ( iltc.GT.0 ) THEN
1771          inl = INDEX(chline, '!')
1772          IF( inl.eq.0 ) THEN
1773           inl = iltc
1774          ELSE
1775           inl = inl - 1
1776          ENDIF
1777          IF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl) ).GT.0 ) THEN
1778             cdnambuff(itot:itot+inl-1) = chline(1:inl)
1779             WRITE( cdnambuff(itot+inl:itot+inl), '(a)' ) csp
1780             itot = itot + inl + 1
1781          ENDIF
1782         ENDIF
1783         GOTO 30
1784     40  CONTINUE
1785         itot = itot - 1
1786         IF( itotsav .NE. itot ) WRITE(*,*) 'WARNING in load_nml. Allocated ',itotsav,' for read buffer; but used ',itot
1787         !
1788         ! Close namelist file
1789         !
1790         CLOSE(iun)
1791         !write(*,'(32A)') cdnambuff
1792      ENDIF
1793#if defined key_mpp_mpi
1794      CALL mpp_bcast_nml( cdnambuff, itot )
1795#endif
1796  END SUBROUTINE load_nml
1797
1798
1799   !!----------------------------------------------------------------------
1800END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.