Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 55 additions & 8 deletions observations/obs_converters/GSI2DART/enkf/mpi_readobs.f90
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ module mpi_readobs
!$$$

use kinds, only: r_kind, r_single, i_kind
use types_mod, only: r8
use radinfo, only: npred
use readconvobs
use readsatobs
use readozobs
use mpisetup
use mpi_utilities_mod, only : initialize_mpi_utilities, broadcast_send, broadcast_recv, send_to, receive_from

implicit none

Expand All @@ -62,6 +64,7 @@ subroutine mpi_getobs(obspath, datestring, nobs_conv, nobs_oz, nobs_sat, nobs_to
integer(i_kind) nob, ierr, iozproc, isatproc, &
nobs_conv, nobs_oz, nobs_sat, nobs_tot, nanal
integer(i_kind), intent(in) :: nanals
real(r8), allocatable, dimension(:) :: buffer
!include 'mpif.h'
!integer mpi_status(mpi_status_size)
iozproc=max(0,min(1,numproc-1))
Expand All @@ -71,9 +74,32 @@ subroutine mpi_getobs(obspath, datestring, nobs_conv, nobs_oz, nobs_sat, nobs_to
if(nproc == 0)call get_num_convobs(obspath,datestring,nobs_conv,id)
if(nproc == iozproc)call get_num_ozobs(obspath,datestring,nobs_oz,id)
if(nproc == isatproc)call get_num_satobs(obspath,datestring,nobs_sat,id)
call mpi_bcast(nobs_conv,1,mpi_integer,0,mpi_comm_world,ierr)
call mpi_bcast(nobs_oz,1,mpi_integer,iozproc,mpi_comm_world,ierr)
call mpi_bcast(nobs_sat,1,mpi_integer,isatproc,mpi_comm_world,ierr)
! call mpi_bcast(nobs_conv,1,mpi_integer,0,mpi_comm_world,ierr)
allocate(buffer(1))
buffer(1) = real(nobs_conv, 8)
if (nproc == 0) then
call broadcast_send(0, array1=buffer)
else
call broadcast_recv(0, array1=buffer)
nobs_conv = int(buffer(1))
end if
buffer(1) = real(nobs_oz, 8)
if (nproc == iozproc) then
call broadcast_send(iozproc, array1=buffer)
else
call broadcast_recv(iozproc, array1=buffer)
nobs_oz = int(buffer(1))
end if
buffer(1) = real(nobs_sat, 8)
if (nproc == isatproc) then
call broadcast_send(isatproc, array1=buffer)
else
call broadcast_recv(isatproc, array1=buffer)
nobs_sat = int(buffer(1))
end if
if (allocated(buffer)) deallocate(buffer)
! call mpi_bcast(nobs_oz,1,mpi_integer,iozproc,mpi_comm_world,ierr)
! call mpi_bcast(nobs_sat,1,mpi_integer,isatproc,mpi_comm_world,ierr)
if(nproc == 0)print *,'nobs_conv, nobs_oz, nobs_sat = ',nobs_conv,nobs_oz,nobs_sat
nobs_tot = nobs_conv + nobs_oz + nobs_sat
! if nobs_tot != 0 (there were some obs to read)
Expand Down Expand Up @@ -145,11 +171,14 @@ subroutine mpi_getobs(obspath, datestring, nobs_conv, nobs_oz, nobs_sat, nobs_to
! if (nanal <= nanals) print *,id,id2,'read sat obs'
end if

allocate(buffer(nobs_tot))
if (nanal <= nanals) then
if (nanal == 0) then
do nanal=1,nanals
call mpi_recv(h_xnobc,nobs_tot,mpi_real4,nanal, &
1,mpi_comm_world,mpi_status,ierr)
!call mpi_recv(h_xnobc,nobs_tot,mpi_real4,nanal, &
! 1,mpi_comm_world,mpi_status,ierr)
call receive_from(nanal, buffer)
h_xnobc(1:nobs_tot) = buffer(1:nobs_tot)
anal_ob(nanal,:) = h_xnobc(:)
enddo
analsi=1._r_single/float(nanals)
Expand All @@ -166,12 +195,30 @@ subroutine mpi_getobs(obspath, datestring, nobs_conv, nobs_oz, nobs_sat, nobs_to
!$omp end parallel do
else ! nanal/nproc != 0
! send to root.
call mpi_send(h_xnobc,nobs_tot,mpi_real4,0,1,mpi_comm_world,ierr)
! call mpi_send(h_xnobc,nobs_tot,mpi_real4,0,1,mpi_comm_world,ierr)
buffer(1:nobs_tot) = h_xnobc(1:nobs_tot)
call send_to(0, buffer)
end if ! if nanal == 0
end if ! nproc <= nanals
call mpi_bcast(ensmean_ob,nobs_tot,mpi_real4,0,mpi_comm_world,ierr)
call mpi_bcast(sprd_ob,nobs_tot,mpi_real4,0,mpi_comm_world,ierr)
!call mpi_bcast(ensmean_ob,nobs_tot,mpi_real4,0,mpi_comm_world,ierr)
! allocate(buffer(nobs_tot))
buffer(1:nobs_tot) = ensmean_ob(1:nobs_tot)
if (nproc == 0) then
call broadcast_send(0, buffer)
else
call broadcast_recv(0, buffer)
ensmean_ob(1:nobs_tot) = buffer(1:nobs_tot)
end if
! call mpi_bcast(sprd_ob,nobs_tot,mpi_real4,0,mpi_comm_world,ierr)
buffer(1:nobs_tot) = sprd_ob(1:nobs_tot)
if (nproc == 0) then
call broadcast_send(0, buffer)
else
call broadcast_recv(0, buffer)
sprd_ob(1:nobs_tot) = buffer(1:nobs_tot)
end if
deallocate(h_xnobc)
deallocate(buffer)

end subroutine mpi_getobs

Expand Down
61 changes: 51 additions & 10 deletions observations/obs_converters/GSI2DART/gsi_to_dart.f90
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@ program gsi_to_dart
use mpi_readobs, only : mpi_getobs
use dart_obs_seq_mod, only : dart_obs_seq, set_debug
use utilities_mod, only : find_namelist_in_file, check_namelist_read
use mpi_utilities_mod, only : initialize_mpi_utilities, finalize_mpi_utilities
use mpi_utilities_mod, only : initialize_mpi_utilities, finalize_mpi_utilities, broadcast_send, broadcast_recv, send_to, &
receive_from
use mpi, only : mpi_barrier, mpi_comm_world
use types_mod, only: r8

implicit none

logical :: debug = .false.
integer(i_kind) :: nobs_sat, nobs_oz, nobs_conv, nobstot
integer(i_kind) :: nobs_start, nobs_end
integer(i_kind) :: i, irank
integer(i_kind) :: i, irank, j
integer(i_kind) :: ierr
integer(i_kind) :: unitnml, io
integer(i_kind) :: pe_write_conv,pe_write_rad
Expand All @@ -30,6 +33,7 @@ program gsi_to_dart
character(len=256) :: my_output_filename
character(len=256) :: obs_seq_out_filename_conv, obs_seq_out_filename_sat
real(r_single), allocatable, dimension(:) :: workgrid_in, workgrid_out
real(r8), allocatable, dimension(:) :: multibuffer

! namelist variables are declared and initialized in params and radinfo
namelist /gsi_to_dart_nml/ ens_size, &
Expand All @@ -52,7 +56,7 @@ program gsi_to_dart
call mpi_initialize

! Print out some info
call initialize_mpi_utilities('gsi_to_dart',communicator=mpi_comm_world)
! call initialize_mpi_utilities('gsi_to_dart',communicator=mpi_comm_world)

! The barrier makes sure the 'starting' banner from initialize_mpi_utilities is
! printed before any subsequent print statements. Some tasks were printing the
Expand Down Expand Up @@ -127,8 +131,25 @@ program gsi_to_dart
! output obs_sequence file
if ( output_option .eq. 3 ) then
if ( .not. allocated(anal_ob)) allocate(anal_ob(ens_size,nobstot)) ! may have been allocated in mpi_getobs
call mpi_bcast(anal_ob,nobstot*ens_size,mpi_real4,0,mpi_comm_world,ierr) ! should really do mpi_scatterv...but doing so doesn't seem to save memory overall...

!call mpi_bcast(anal_ob,nobstot*ens_size,mpi_real4,0,mpi_comm_world,ierr) ! should really do mpi_scatterv...but doing so doesn't seem to save memory overall...
allocate(multibuffer(ens_size*nobstot))
! note: column-major order for faster copy
do j = 1, nobstot
do i = 1, ens_size
multibuffer(((i-1)*nobstot) + j) = anal_ob(i, j)
enddo
enddo
if (nproc == 0) then
call broadcast_send(0, array1=multibuffer)
else
call broadcast_recv(0, array1=multibuffer)
do j = 1, nobstot
do i = 1, ens_size
anal_ob(i, j) = multibuffer(((i-1)*nobstot) + j)
enddo
enddo
end if
deallocate(multibuffer)
if ( convert_conv ) then
nobs_start = 1
nobs_end = nobs_conv
Expand Down Expand Up @@ -185,8 +206,27 @@ program gsi_to_dart
if ( convert_sat .and. nproc == pe_write_rad) then
if ( .not. allocated(anal_ob)) allocate(anal_ob(ens_size,nobstot)) ! may have been allocated in mpi_getobs
endif
if ( convert_sat .and. nproc==0) call mpi_send(anal_ob,nobstot*ens_size,mpi_real4,pe_write_rad,1,mpi_comm_world,ierr) ! send from nproc=0
if ( convert_sat .and. nproc==pe_write_rad) call mpi_recv(anal_ob,nobstot*ens_size,mpi_real4,0,1,mpi_comm_world,mpi_status,ierr) ! recieve on nproc=pe_write_rad
! if ( convert_sat .and. nproc==0) call mpi_send(anal_ob,nobstot*ens_size,mpi_real4,pe_write_rad,1,mpi_comm_world,ierr) ! send from nproc=0
allocate(multibuffer(nobstot*ens_size))
do j = 1, nobstot
do i = 1, ens_size
multibuffer(((i-1)*nobstot) + j) = anal_ob(i, j)
enddo
enddo
if (convert_sat .and. nproc==0) then
call send_to(1, multibuffer)
end if
! if ( convert_sat .and. nproc==pe_write_rad) call mpi_recv(anal_ob,nobstot*ens_size,mpi_real4,0,1,mpi_comm_world,mpi_status,ierr) ! recieve on nproc=pe_write_rad
if ( convert_sat .and. nproc == pe_write_rad) then
call receive_from(0, multibuffer)
do j = 1, nobstot
do i = 1, ens_size
! multibuffer(((i-1)*nobstot) + j) = anal_ob(i, j)
anal_ob(i, j) = multibuffer(((i-1)*nobstot) + j)
enddo
enddo
end if
deallocate(multibuffer)
! what if pe_write_rad == 0? will the mpi_send/recv fail (sending and receiving from same processor)?

obs_seq_out_filename_conv = trim(adjustl(obs_seq_out_filename))//'.conv'
Expand Down Expand Up @@ -228,7 +268,7 @@ program gsi_to_dart
if ( convert_sat ) call radinfo_clean ! from radinfo

! print ending info
call finalize_mpi_utilities(callfinalize=.false.)
! call finalize_mpi_utilities(callfinalize=.false.)

! finalize MPI
call mpi_cleanup
Expand All @@ -241,15 +281,16 @@ subroutine stop2(ierror_code)
! adapted from GSI/src/main/stop1.f90

use kinds, only: i_kind
use mpisetup, only : mpi_comm_world, MPI_SUCCESS
! use mpisetup, only : mpi_comm_world, MPI_SUCCESS
implicit none

integer(i_kind), intent(in) :: ierror_code
integer(i_kind) :: ierr

write(*,*)'****STOP2**** ABORTING EXECUTION w/code=',ierror_code
write(0,*)'****STOP2**** ABORTING EXECUTION w/code=',ierror_code
call mpi_abort(mpi_comm_world,ierror_code,ierr)
!call mpi_abort(mpi_comm_world,ierror_code,ierr)
call exit_all(ierror_code)
stop
return
end subroutine stop2
Expand Down
37 changes: 22 additions & 15 deletions observations/obs_converters/GSI2DART/mpisetup.f90
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,38 @@ module mpisetup
!$$$

use kinds, only: r_kind, r_single, r_double
use mpi_utilities_mod, only : task_count, my_task_id, initialize_mpi_utilities, finalize_mpi_utilities
implicit none
! mpi definitions.
include 'mpif.h'
! include 'mpif.h'
integer, public :: numproc, nproc
integer, public :: mpi_status(mpi_status_size)
integer, public :: mpi_realkind
! integer, public :: mpi_status(mpi_status_size)
! integer, public :: mpi_realkind

contains

subroutine mpi_initialize()
integer :: ierr
call mpi_init(ierr)
! call mpi_init(ierr)

call initialize_mpi_utilities('gsi_to_dart')
! nproc is process number, numproc is total number of processes.
call mpi_comm_rank(mpi_comm_world,nproc,ierr)
call mpi_comm_size(mpi_comm_world,numproc,ierr)
! call mpi_comm_rank(mpi_comm_world,nproc,ierr)
! call mpi_comm_size(mpi_comm_world,numproc,ierr)
numproc = task_count()
nproc = my_task_id()

if (nproc == 0) print *,'running on ',numproc,' processors ...'
if (r_kind == r_single) then
mpi_realkind = mpi_real4
if (nproc == 0) print *,'mpi_realkind set to mpi_real4',mpi_real4
! mpi_realkind = mpi_real4
! if (nproc == 0) print *,'mpi_realkind set to mpi_real4',mpi_real4
else if (r_kind == r_double) then
mpi_realkind = mpi_real8
if (nproc == 0) print *,'mpi_realkind set to mpi_real8',mpi_real8
! mpi_realkind = mpi_real8
! if (nproc == 0) print *,'mpi_realkind set to mpi_real8',mpi_real8
else
print *,'ERROR: illegal r_kind (must be single or double)'
call mpi_cleanup()
! call finalize_mpi_utilities()
endif

end subroutine mpi_initialize
Expand All @@ -67,12 +73,13 @@ subroutine mpi_cleanup()
flush(6,err=10)
flush(0,err=10)
10 continue
call mpi_barrier(mpi_comm_world,ierr)
!call mpi_barrier(mpi_comm_world,ierr)
if (nproc == 0) write(6,*) 'all done!'
call mpi_finalize(ierr)
if (ierr /= 0) then
print *, 'MPI_Finalize error status = ',ierr
end if
!call mpi_finalize(ierr)
call finalize_mpi_utilities()
! if (ierr /= 0) then
! print *, 'MPI_Finalize error status = ',ierr
! end if
end subroutine mpi_cleanup

end module mpisetup