diff --git a/CMakeExt/FindTBB.cmake b/CMakeExt/FindTBB.cmake new file mode 100644 index 000000000..36c3866f7 --- /dev/null +++ b/CMakeExt/FindTBB.cmake @@ -0,0 +1,303 @@ +# The MIT License (MIT) +# +# Copyright (c) 2015 Justus Calvin +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +# +# FindTBB +# ------- +# +# Find TBB include directories and libraries. +# +# Usage: +# +# find_package(TBB [major[.minor]] [EXACT] +# [QUIET] [REQUIRED] +# [[COMPONENTS] [components...]] +# [OPTIONAL_COMPONENTS components...]) +# +# where the allowed components are tbbmalloc and tbb_preview. Users may modify +# the behavior of this module with the following variables: +# +# * TBB_ROOT_DIR - The base directory the of TBB installation. +# * TBB_INCLUDE_DIR - The directory that contains the TBB headers files. +# * TBB_LIBRARY - The directory that contains the TBB library files. +# * TBB__LIBRARY - The path of the TBB the corresponding TBB library. +# These libraries, if specified, override the +# corresponding library search results, where +# may be tbb, tbb_debug, tbbmalloc, tbbmalloc_debug, +# tbb_preview, or tbb_preview_debug. +# * TBB_USE_DEBUG_BUILD - The debug version of tbb libraries, if present, will +# be used instead of the release version. +# +# Users may modify the behavior of this module with the following environment +# variables: +# +# * TBB_INSTALL_DIR +# * TBBROOT +# * LIBRARY_PATH +# +# This module will set the following variables: +# +# * TBB_FOUND - Set to false, or undefined, if we haven’t found, or +# don’t want to use TBB. +# * TBB__FOUND - If False, optional part of TBB sytem is +# not available. +# * TBB_VERSION - The full version string +# * TBB_VERSION_MAJOR - The major version +# * TBB_VERSION_MINOR - The minor version +# * TBB_INTERFACE_VERSION - The interface version number defined in +# tbb/tbb_stddef.h. +# * TBB__LIBRARY_RELEASE - The path of the TBB release version of +# , where may be tbb, tbb_debug, +# tbbmalloc, tbbmalloc_debug, tbb_preview, or +# tbb_preview_debug. +# * TBB__LIBRARY_DEGUG - The path of the TBB release version of +# , where may be tbb, tbb_debug, +# tbbmalloc, tbbmalloc_debug, tbb_preview, or +# tbb_preview_debug. +# +# The following varibles should be used to build and link with TBB: +# +# * TBB_INCLUDE_DIRS - The include directory for TBB. +# * TBB_LIBRARIES - The libraries to link against to use TBB. +# * TBB_LIBRARIES_RELEASE - The release libraries to link against to use TBB. +# * TBB_LIBRARIES_DEBUG - The debug libraries to link against to use TBB. +# * TBB_DEFINITIONS - Definitions to use when compiling code that uses +# TBB. +# * TBB_DEFINITIONS_RELEASE - Definitions to use when compiling release code that +# uses TBB. +# * TBB_DEFINITIONS_DEBUG - Definitions to use when compiling debug code that +# uses TBB. +# +# This module will also create the "tbb" target that may be used when building +# executables and libraries. + +include(FindPackageHandleStandardArgs) + +if(NOT TBB_FOUND) + + ################################## + # Check the build type + ################################## + + if(NOT DEFINED TBB_USE_DEBUG_BUILD) + if(CMAKE_BUILD_TYPE MATCHES "(Debug|DEBUG|debug|RelWithDebInfo|RELWITHDEBINFO|relwithdebinfo)") + set(TBB_BUILD_TYPE DEBUG) + else() + set(TBB_BUILD_TYPE RELEASE) + endif() + elseif(TBB_USE_DEBUG_BUILD) + set(TBB_BUILD_TYPE DEBUG) + else() + set(TBB_BUILD_TYPE RELEASE) + endif() + + ################################## + # Set the TBB search directories + ################################## + + # Define search paths based on user input and environment variables + set(TBB_SEARCH_DIR ${TBB_ROOT_DIR} $ENV{TBB_BASE} $ENV{TBB_INSTALL_DIR} $ENV{TBBROOT}) + + # Define the search directories based on the current platform + if(CMAKE_SYSTEM_NAME STREQUAL "Windows") + set(TBB_DEFAULT_SEARCH_DIR "C:/Program Files/Intel/TBB" + "C:/Program Files (x86)/Intel/TBB") + + # Set the target architecture + if(CMAKE_SIZEOF_VOID_P EQUAL 8) + set(TBB_ARCHITECTURE "intel64") + else() + set(TBB_ARCHITECTURE "ia32") + endif() + + # Set the TBB search library path search suffix based on the version of VC + if(WINDOWS_STORE) + set(TBB_LIB_PATH_SUFFIX "lib/${TBB_ARCHITECTURE}/vc11_ui") + elseif(MSVC14) + set(TBB_LIB_PATH_SUFFIX "lib/${TBB_ARCHITECTURE}/vc14") + elseif(MSVC12) + set(TBB_LIB_PATH_SUFFIX "lib/${TBB_ARCHITECTURE}/vc12") + elseif(MSVC11) + set(TBB_LIB_PATH_SUFFIX "lib/${TBB_ARCHITECTURE}/vc11") + elseif(MSVC10) + set(TBB_LIB_PATH_SUFFIX "lib/${TBB_ARCHITECTURE}/vc10") + endif() + + # Add the library path search suffix for the VC independent version of TBB + list(APPEND TBB_LIB_PATH_SUFFIX "lib/${TBB_ARCHITECTURE}/vc_mt") + + elseif(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + # OS X + set(TBB_DEFAULT_SEARCH_DIR "/opt/intel/tbb") + + # TODO: Check to see which C++ library is being used by the compiler. + if(NOT ${CMAKE_SYSTEM_VERSION} VERSION_LESS 13.0) + # The default C++ library on OS X 10.9 and later is libc++ + set(TBB_LIB_PATH_SUFFIX "lib/libc++" "lib") + else() + set(TBB_LIB_PATH_SUFFIX "lib") + endif() + elseif(CMAKE_SYSTEM_NAME STREQUAL "Linux") + # Linux + set(TBB_DEFAULT_SEARCH_DIR "/opt/intel/tbb") + + # TODO: Check compiler version to see the suffix should be /gcc4.1 or + # /gcc4.1. For now, assume that the compiler is more recent than + # gcc 4.4.x or later. + if(CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + set(TBB_LIB_PATH_SUFFIX "lib/intel64/gcc4.4") + elseif(CMAKE_SYSTEM_PROCESSOR MATCHES "^i.86$") + set(TBB_LIB_PATH_SUFFIX "lib/ia32/gcc4.4") + endif() + endif() + + ################################## + # Find the TBB include dir + ################################## + + find_path(TBB_INCLUDE_DIRS tbb/tbb.h + HINTS ${TBB_INCLUDE_DIR} ${TBB_SEARCH_DIR} + PATHS ${TBB_DEFAULT_SEARCH_DIR} + PATH_SUFFIXES include) + + ################################## + # Set version strings + ################################## + + if(TBB_INCLUDE_DIRS) + file(READ "${TBB_INCLUDE_DIRS}/tbb/tbb_stddef.h" _tbb_version_file) + string(REGEX REPLACE ".*#define TBB_VERSION_MAJOR ([0-9]+).*" "\\1" + TBB_VERSION_MAJOR "${_tbb_version_file}") + string(REGEX REPLACE ".*#define TBB_VERSION_MINOR ([0-9]+).*" "\\1" + TBB_VERSION_MINOR "${_tbb_version_file}") + string(REGEX REPLACE ".*#define TBB_INTERFACE_VERSION ([0-9]+).*" "\\1" + TBB_INTERFACE_VERSION "${_tbb_version_file}") + set(TBB_VERSION "${TBB_VERSION_MAJOR}.${TBB_VERSION_MINOR}") + endif() + + ################################## + # Find TBB components + ################################## + + if(TBB_VERSION VERSION_LESS 4.3) + set(TBB_SEARCH_COMPOMPONENTS tbb_preview tbbmalloc tbb) + else() + set(TBB_SEARCH_COMPOMPONENTS tbb_preview tbbmalloc_proxy tbbmalloc tbb) + endif() + + # Find each component + foreach(_comp ${TBB_SEARCH_COMPOMPONENTS}) + if(";${TBB_FIND_COMPONENTS};tbb;" MATCHES ";${_comp};") + + # Search for the libraries + find_library(TBB_${_comp}_LIBRARY_RELEASE ${_comp} + HINTS ${TBB_LIBRARY} ${TBB_SEARCH_DIR} + PATHS ${TBB_DEFAULT_SEARCH_DIR} ENV LIBRARY_PATH + PATH_SUFFIXES ${TBB_LIB_PATH_SUFFIX}) + + find_library(TBB_${_comp}_LIBRARY_DEBUG ${_comp}_debug + HINTS ${TBB_LIBRARY} ${TBB_SEARCH_DIR} + PATHS ${TBB_DEFAULT_SEARCH_DIR} ENV LIBRARY_PATH + PATH_SUFFIXES ${TBB_LIB_PATH_SUFFIX}) + + if(TBB_${_comp}_LIBRARY_DEBUG) + list(APPEND TBB_LIBRARIES_DEBUG "${TBB_${_comp}_LIBRARY_DEBUG}") + endif() + if(TBB_${_comp}_LIBRARY_RELEASE) + list(APPEND TBB_LIBRARIES_RELEASE "${TBB_${_comp}_LIBRARY_RELEASE}") + endif() + if(TBB_${_comp}_LIBRARY_${TBB_BUILD_TYPE} AND NOT TBB_${_comp}_LIBRARY) + set(TBB_${_comp}_LIBRARY "${TBB_${_comp}_LIBRARY_${TBB_BUILD_TYPE}}") + endif() + + if(TBB_${_comp}_LIBRARY AND EXISTS "${TBB_${_comp}_LIBRARY}") + set(TBB_${_comp}_FOUND TRUE) + else() + set(TBB_${_comp}_FOUND FALSE) + endif() + + # Mark internal variables as advanced + mark_as_advanced(TBB_${_comp}_LIBRARY_RELEASE) + mark_as_advanced(TBB_${_comp}_LIBRARY_DEBUG) + mark_as_advanced(TBB_${_comp}_LIBRARY) + + endif() + endforeach() + + ################################## + # Set compile flags and libraries + ################################## + + set(TBB_DEFINITIONS_RELEASE "") + set(TBB_DEFINITIONS_DEBUG "-DTBB_USE_DEBUG=1") + + if(TBB_LIBRARIES_${TBB_BUILD_TYPE}) + set(TBB_DEFINITIONS "${TBB_DEFINITIONS_${TBB_BUILD_TYPE}}") + set(TBB_LIBRARIES "${TBB_LIBRARIES_${TBB_BUILD_TYPE}}") + elseif(TBB_LIBRARIES_RELEASE) + set(TBB_DEFINITIONS "${TBB_DEFINITIONS_RELEASE}") + set(TBB_LIBRARIES "${TBB_LIBRARIES_RELEASE}") + elseif(TBB_LIBRARIES_DEBUG) + set(TBB_DEFINITIONS "${TBB_DEFINITIONS_DEBUG}") + set(TBB_LIBRARIES "${TBB_LIBRARIES_DEBUG}") + endif() + + find_package_handle_standard_args(TBB + REQUIRED_VARS TBB_INCLUDE_DIRS TBB_LIBRARIES + HANDLE_COMPONENTS + VERSION_VAR TBB_VERSION) + + ################################## + # Create targets + ################################## + + if(NOT CMAKE_VERSION VERSION_LESS 3.0 AND TBB_FOUND) + add_library(tbb SHARED IMPORTED) + set_target_properties(tbb PROPERTIES + INTERFACE_INCLUDE_DIRECTORIES ${TBB_INCLUDE_DIRS} + IMPORTED_LOCATION ${TBB_LIBRARIES}) + if(TBB_LIBRARIES_RELEASE AND TBB_LIBRARIES_DEBUG) + set_target_properties(tbb PROPERTIES + INTERFACE_COMPILE_DEFINITIONS "$<$,$>:TBB_USE_DEBUG=1>" + IMPORTED_LOCATION_DEBUG ${TBB_LIBRARIES_DEBUG} + IMPORTED_LOCATION_RELWITHDEBINFO ${TBB_LIBRARIES_DEBUG} + IMPORTED_LOCATION_RELEASE ${TBB_LIBRARIES_RELEASE} + IMPORTED_LOCATION_MINSIZEREL ${TBB_LIBRARIES_RELEASE} + ) + elseif(TBB_LIBRARIES_RELEASE) + set_target_properties(tbb PROPERTIES IMPORTED_LOCATION ${TBB_LIBRARIES_RELEASE}) + else() + set_target_properties(tbb PROPERTIES + INTERFACE_COMPILE_DEFINITIONS "${TBB_DEFINITIONS_DEBUG}" + IMPORTED_LOCATION ${TBB_LIBRARIES_DEBUG} + ) + endif() + endif() + + mark_as_advanced(TBB_INCLUDE_DIRS TBB_LIBRARIES) + + unset(TBB_ARCHITECTURE) + unset(TBB_BUILD_TYPE) + unset(TBB_LIB_PATH_SUFFIX) + unset(TBB_DEFAULT_SEARCH_DIR) + +endif() diff --git a/CMakeExt/GenerateDASHCXX.cmake b/CMakeExt/GenerateDASHCXX.cmake index 2b99088db..553c23cfb 100644 --- a/CMakeExt/GenerateDASHCXX.cmake +++ b/CMakeExt/GenerateDASHCXX.cmake @@ -39,6 +39,9 @@ if (";${DART_IMPLEMENTATIONS_LIST};" MATCHES ";mpi;") set(ADDITIONAL_LIBRARIES_WRAP "${ADDITIONAL_LIBRARIES_WRAP} ${MPI_C_LIB}") endforeach() + if(MPI_LINK_FLAGS) + set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAG} ${MPI_LINK_FLAGS}") + endif() set(ADDITIONAL_INCLUDES_WRAP "${ADDITIONAL_INCLUDES_WRAP} -I${MPI_INCLUDE_PATH}") set(DASHCC ${CMAKE_CXX_COMPILER}) set(DART_IMPLEMENTATION "mpi") diff --git a/CMakeExt/ParallelStl.cmake b/CMakeExt/ParallelStl.cmake new file mode 100644 index 000000000..c55225836 --- /dev/null +++ b/CMakeExt/ParallelStl.cmake @@ -0,0 +1,61 @@ +# - Find Required PSTL libraries (libvmem, libpmem, libpmemobj) +# This module defines +# PSTL_FOUND +# PSTL_INCLUDE_DIRS, directory containing headers +# PSTL_LIBRARIES, directory containing libraries + +if (NOT ENABLE_PSTL) + return() +else() + if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Intel") + string(REGEX MATCH "([0-9]+)" ICC_VERSION_MAJOR "${CMAKE_CXX_COMPILER_VERSION}") + if (CMAKE_MATCH_1 LESS 18) + message(FATAL_ERROR "Parallel STL requires at least ICC 2018") + endif() + else() + message(WARNING "Parallel STL currently only supported for Intel Compiler") + return() + endif() +endif() + +if(NOT TBB_FOUND) + include(${CMAKE_SOURCE_DIR}/CMakeExt/FindTBB.cmake) +endif() + +if(NOT TBB_FOUND) + message(FATAL_ERROR "TBB is required for PSTL") +endif() + +if (PSTL_PREFIX) + message(STATUS "Searching for PSTL in path " ${PSTL_PREFIX}) +endif() + +# Define search paths based on user input and environment variables +set(PSTL_DEFAULT_SEARCH_DIR "/opt/intel/pstl") +set(PSTL_SEARCH_DIR $ENV{PSTLROOT} $ENV{PSTL_ROOT}) + +if (DEFINED ENV{INTEL_BASE}) + set(PSTL_SEARCH_DIR ${PSTL_SEARCH_DIR} "$ENV{INTEL_BASE}/linux/pstl") +endif() + +find_path( + PSTL_INCLUDE_DIRS pstl/algorithm + HINTS ${PSTL_PREFIX} ${PSTL_SEARCH_DIR} + PATHS ${PSTL_DEFAULT_SEARCH_DIR} + PATH_SUFFIXES include) + +include(FindPackageHandleStandardArgs) + +find_package_handle_standard_args( + PSTL DEFAULT_MSG + PSTL_INCLUDE_DIRS + ) + +if (NOT PSTL_FOUND AND NOT PSTL_FIND_QUIETLY) + set(PSTL_ERR_MSG "Could not find the pmem libraries. Looked for headers") + set(PSTL_ERR_MSG "${PSTL_ERR_MSG} in ${PSTL_SEARCH_HEADER_PATH}") +endif() + +mark_as_advanced( + PSTL_INCLUDE_DIRS + ) diff --git a/CMakeLists.txt b/CMakeLists.txt index bc53c27be..4f2b1f992 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -106,6 +106,7 @@ include(${CMAKE_SOURCE_DIR}/CMakeExt/Doxygen.cmake) include(${CMAKE_SOURCE_DIR}/CMakeExt/Platform.cmake) include(${CMAKE_SOURCE_DIR}/CMakeExt/Environment.cmake) include(${CMAKE_SOURCE_DIR}/CMakeExt/StdLib.cmake) +include(${CMAKE_SOURCE_DIR}/CMakeExt/ParallelStl.cmake) if (ENABLE_THREADSUPPORT) include(${CMAKE_SOURCE_DIR}/CMakeExt/Threading.cmake) diff --git a/config/knl.cmake b/config/knl.cmake new file mode 100644 index 000000000..5fe5a7cfa --- /dev/null +++ b/config/knl.cmake @@ -0,0 +1,14 @@ +set(DASH_ENV_HOST_SYSTEM_ID "default" CACHE STRING + "Host system type identifier") + +if (NOT BUILD_GENERIC) + if ("${CMAKE_C_COMPILER_ID}" MATCHES "GNU" + OR "${CMAKE_C_COMPILER_ID}" MATCHES "Clang") + #set specific flags for clang or gcc to use avx-512 + endif() + + if ("${CMAKE_C_COMPILER_ID}" MATCHES "Intel") + set(CC_ENV_SETUP_FLAGS "${CC_ENV_SETUP_FLAGS} -xhost") + set(CXX_ENV_SETUP_FLAGS "${CXX_ENV_SETUP_FLAGS} -xhost") + endif() +endif() diff --git a/config/supermuc.cmake b/config/supermuc.cmake index 8c055b82d..88178ff55 100644 --- a/config/supermuc.cmake +++ b/config/supermuc.cmake @@ -22,13 +22,11 @@ endif() if ("${CMAKE_C_COMPILER_ID}" MATCHES "Intel") set(CC_ENV_SETUP_FLAGS "${CC_ENV_SETUP_FLAGS} -qopenmp -xhost -mkl") - set(CC_ENV_SETUP_FLAGS "${CC_ENV_SETUP_FLAGS} -mt_mpi") set(CC_ENV_SETUP_FLAGS "${CC_ENV_SETUP_FLAGS} -qopt-streaming-stores always") set(CC_ENV_SETUP_FLAGS "${CC_ENV_SETUP_FLAGS} -qopt-prefetch-distance=64,8") endif() if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Intel") set(CXX_ENV_SETUP_FLAGS "${CXX_ENV_SETUP_FLAGS} -qopenmp -xhost -mkl") - set(CXX_ENV_SETUP_FLAGS "${CXX_ENV_SETUP_FLAGS} -mt_mpi") set(CXX_ENV_SETUP_FLAGS "${CXX_ENV_SETUP_FLAGS} -qopt-streaming-stores always") set(CXX_ENV_SETUP_FLAGS "${CXX_ENV_SETUP_FLAGS} -qopt-prefetch-distance=64,8") endif() diff --git a/dart-if/include/dash/dart/if/dart_communication.h b/dart-if/include/dash/dart/if/dart_communication.h index 8670be2ef..4409997b3 100644 --- a/dart-if/include/dash/dart/if/dart_communication.h +++ b/dart-if/include/dash/dart/if/dart_communication.h @@ -210,6 +210,57 @@ dart_ret_t dart_alltoall( dart_datatype_t dtype, dart_team_t team) DART_NOTHROW; +/** + * DART Equivalent to MPI alltoallv. + * + * \param sendbuf The buffer containing the data to be sent by each unit. + * \param recvbuf The buffer to hold the received data. + * \param send_counts Number of elements sent to each unit. + * \param send_displ Displacements of elements send to each unit. + * \param recv_counts Number of elements to receive from each unit. + * \param send_displl Displacements of the received elements from each unit. + * \param dtype The data type of values in \c sendbuf and \c recvbuf to use in \c op. + * \param team The team to participate in the allreduce. + * + * \return \c DART_OK on success, any other of \ref dart_ret_t otherwise. + * + * \threadsafe_data{team} + * \ingroup DartCommunication + */ +dart_ret_t dart_alltoallv( + const void* sendbuf, + void* recvbuf, + size_t const* send_counts, + size_t const* send_displ, + size_t const* recv_counts, + size_t const* recv_displ, + dart_datatype_t dtype, + dart_team_t teamid) DART_NOTHROW; + +/** + * DART Equivalent to MPI Exscan. + * + * \param sendbuf The buffer containing the data to be sent by each unit. + * \param recvbuf The buffer to hold the received data. + * \param nelem Number of elements sent by each process and received from each unit. + * The value of this parameter must not execeed INT_MAX. + * \param dtype The data type of values in \c sendbuf and \c recvbuf to use in \c op. + * \param op The reduction operation to perform. + * \param team The team to participate in the allreduce. + * + * \return \c DART_OK on success, any other of \ref dart_ret_t otherwise. + * + * \threadsafe_data{team} + * \ingroup DartCommunication + */ +dart_ret_t dart_exscan( + const void* sendbuf, + void* recvbuf, + size_t nelem, + dart_datatype_t dtype, + dart_operation_t op, + dart_team_t team) DART_NOTHROW; + /** * DART Equivalent to MPI_Reduce. * diff --git a/dart-impl/mpi/src/dart_communication.c b/dart-impl/mpi/src/dart_communication.c index 88ca9bc45..8560a11cd 100644 --- a/dart-impl/mpi/src/dart_communication.c +++ b/dart-impl/mpi/src/dart_communication.c @@ -2215,7 +2215,7 @@ dart_ret_t dart_alltoall( dart_datatype_t dtype, dart_team_t teamid) { - DART_LOG_TRACE("dart_alltoall() team:%d nelem:%" PRIu64 "", teamid, nelem); + DART_LOG_TRACE("dart_alltoall < team:%d nelem:%" PRIu64 "", teamid, nelem); CHECK_IS_BASICTYPE(dtype); @@ -2256,6 +2256,149 @@ dart_ret_t dart_alltoall( return DART_OK; } +dart_ret_t dart_alltoallv( + const void * sendbuf, + void * recvbuf, + size_t const * send_counts, + size_t const * send_displ, + size_t const * recv_counts, + size_t const * recv_displ, + dart_datatype_t dtype, + dart_team_t teamid) +{ + DART_LOG_TRACE("dart_alltoallv < team:%d", teamid); + + CHECK_IS_BASICTYPE(dtype); + + dart_team_data_t *team_data = dart_adapt_teamlist_get(teamid); + if (dart__unlikely(team_data == NULL)) { + DART_LOG_ERROR("dart_alltoallv ! unknown teamid %d", teamid); + return DART_ERR_INVAL; + } + + if (sendbuf == recvbuf || NULL == sendbuf) { + sendbuf = MPI_IN_PLACE; + } + + MPI_Comm comm = team_data->comm; + + int comm_size; + CHECK_MPI_RET(MPI_Comm_size(comm, &comm_size), "MPI_Comm_size"); + + int *send_counts_int = ALLOC_TMP(comm_size * sizeof(int)); + int *send_displ_int = ALLOC_TMP(comm_size * sizeof(int)); + int *recv_counts_int = ALLOC_TMP(comm_size * sizeof(int)); + int *recv_displ_int = ALLOC_TMP(comm_size * sizeof(int)); + + /* + * MPI uses offset type int, do not copy more than INT_MAX elements: + */ + int found_error = 0; + for(int i = 0; i < comm_size; i++) { + if (dart__unlikely(send_counts[i] > INT_MAX)) { + DART_LOG_ERROR( + "dart_alltoallv ! failed: nelem (%zu) > INT_MAX", send_counts[i]); + found_error = 1; + } + if (dart__unlikely(send_displ[i] > INT_MAX)) { + DART_LOG_ERROR( + "dart_alltoallv ! failed: nelem (%zu) > INT_MAX", send_displ[i]); + found_error = 1; + } + if (dart__unlikely(recv_counts[i] > INT_MAX)) { + DART_LOG_ERROR( + "dart_alltoallv ! failed: nelem (%zu) > INT_MAX", recv_counts[i]); + found_error = 1; + } + if (dart__unlikely(recv_displ[i] > INT_MAX)) { + DART_LOG_ERROR( + "dart_alltoallv ! failed: nelem (%zu) > INT_MAX", recv_displ[i]); + found_error = 1; + } + if (dart__unlikely(found_error)) { + FREE_TMP(comm_size, send_counts_int); + FREE_TMP(comm_size, send_displ_int); + FREE_TMP(comm_size, recv_counts_int); + FREE_TMP(comm_size, recv_displ_int); + return DART_ERR_INVAL; + } + + send_counts_int[i] = send_counts[i]; + send_displ_int[i] = send_displ[i]; + recv_counts_int[i] = recv_counts[i]; + recv_displ_int[i] = recv_displ[i]; + } + + MPI_Datatype mpi_dtype = + dart__mpi__datatype_struct(dtype)->contiguous.mpi_type; + + CHECK_MPI_RET( + MPI_Alltoallv( + sendbuf, + send_counts_int, + send_displ_int, + mpi_dtype, + recvbuf, + recv_counts_int, + recv_displ_int, + mpi_dtype, + comm), + "MPI_Alltoallv"); + + FREE_TMP(comm_size, send_counts_int); + FREE_TMP(comm_size, send_displ_int); + FREE_TMP(comm_size, recv_counts_int); + FREE_TMP(comm_size, recv_displ_int); + + DART_LOG_TRACE("dart_alltoallv > team:%d", teamid); + + return DART_OK; +} + +dart_ret_t dart_exscan( + const void * sendbuf, + void * recvbuf, + size_t nelem, + dart_datatype_t dtype, + dart_operation_t op, + dart_team_t team) +{ + DART_LOG_TRACE("dart_exscan < team:%d nelem:%" PRIu64 "", team, nelem); + + CHECK_IS_CONTIGUOUSTYPE(dtype); + + MPI_Op mpi_op = dart__mpi__op(op, dtype); + MPI_Datatype mpi_dtype = dart__mpi__op_type(op, dtype); + + /* + * MPI uses offset type int, do not copy more than INT_MAX elements: + */ + if (dart__unlikely(nelem > MAX_CONTIG_ELEMENTS)) { + DART_LOG_ERROR("dart_exscan ! failed: nelem (%zu) > INT_MAX", nelem); + return DART_ERR_INVAL; + } + + dart_team_data_t *team_data = dart_adapt_teamlist_get(team); + if (dart__unlikely(team_data == NULL)) { + DART_LOG_ERROR("dart_exscan ! unknown team %d", team); + return DART_ERR_INVAL; + } + + MPI_Comm comm = team_data->comm; + CHECK_MPI_RET( + MPI_Exscan( + sendbuf, // send buffer + recvbuf, // receive buffer + nelem, // buffer size + mpi_dtype, // datatype + mpi_op, // reduce operation + comm), + "MPI_Exscan"); + + DART_LOG_TRACE("dart_exscan > team:%d nelem:%" PRIu64 "", team, nelem); + return DART_OK; +} + dart_ret_t dart_reduce( const void * sendbuf, void * recvbuf, @@ -2421,8 +2564,37 @@ dart_ret_t dart_sendrecv( return DART_ERR_INVAL; } - CHECK_UNITID_RANGE(dest, team_data); - CHECK_UNITID_RANGE(src, team_data); + if (dart__unlikely( + src.id < DART_UNDEFINED_UNIT_ID || src.id > team_data->size)) { + DART_LOG_ERROR( + "%s ! failed: unitid out of range 0 <= %d < %d", + __func__, + src.id, + team_data->size); + return DART_ERR_INVAL; + } + + if (dart__unlikely( + dest.id < DART_UNDEFINED_UNIT_ID || dest.id > team_data->size)) { + DART_LOG_ERROR( + "%s ! failed: unitid out of range 0 <= %d < %d", + __func__, + dest.id, + team_data->size); + return DART_ERR_INVAL; + } + + int source = src.id; + int target = dest.id; + + if (src.id == DART_UNDEFINED_UNIT_ID) { + source = MPI_PROC_NULL; + } + + if (dest.id == DART_UNDEFINED_UNIT_ID) { + target = MPI_PROC_NULL; + } + comm = team_data->comm; CHECK_MPI_RET( @@ -2430,12 +2602,12 @@ dart_ret_t dart_sendrecv( sendbuf, send_nelem, mpi_send_dtype, - dest.id, + target, send_tag, recvbuf, recv_nelem, mpi_recv_dtype, - src.id, + source, recv_tag, comm, MPI_STATUS_IGNORE), diff --git a/dash/CMakeLists.txt b/dash/CMakeLists.txt index b3d41d497..c82ca4eca 100644 --- a/dash/CMakeLists.txt +++ b/dash/CMakeLists.txt @@ -43,6 +43,8 @@ set(ENABLE_COMPTIME_RED ${ENABLE_COMPTIME_RED} PARENT_SCOPE) set(ENABLE_MEMKIND ${ENABLE_MEMKIND} PARENT_SCOPE) +set(ENABLE_PSTL ${ENABLE_PSTL} + PARENT_SCOPE) # Source- and header files to be compiled (OBJ): @@ -174,6 +176,23 @@ if (ENABLE_MEMKIND AND MEMKIND_FOUND) ${MEMKIND_LINKER_FLAGS}) endif() +if (ENABLE_PSTL AND TBB_FOUND) + set (ADDITIONAL_LIBRARIES ${ADDITIONAL_LIBRARIES} + ${TBB_LIBRARIES}) +endif() + +if (ENABLE_PSTL AND PSTL_FOUND) + set (CONF_AVAIL_ALGO_PSTL "true") + set (ADDITIONAL_COMPILE_FLAGS + ${ADDITIONAL_COMPILE_FLAGS} -DDASH_ENABLE_PSTL) + set (ADDITIONAL_INCLUDES ${ADDITIONAL_INCLUDES} + ${TBB_INCLUDE_DIRS}) + set (ADDITIONAL_INCLUDES ${ADDITIONAL_INCLUDES} + ${PSTL_INCLUDE_DIRS}) +else() + set (CONF_AVAIL_ALGO_PSTL "false") +endif() + if (ENABLE_MKL AND MKL_FOUND) message (STATUS " Intel MKL enabled") diff --git a/dash/include/cpp17/monotonic_buffer.h b/dash/include/cpp17/monotonic_buffer.h new file mode 100644 index 000000000..a963e1742 --- /dev/null +++ b/dash/include/cpp17/monotonic_buffer.h @@ -0,0 +1,181 @@ +// ============================================================================== +// LLVM Release License +// ============================================================================== +// University of Illinois/NCSA +// Open Source License +// +// Copyright (c) 2003-2018 University of Illinois at Urbana-Champaign. +// All rights reserved. +// +// Developed by: +// +// LLVM Team +// +// University of Illinois at Urbana-Champaign +// +// http://llvm.org +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal with +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +// of the Software, and to permit persons to whom the Software is furnished to do +// so, subject to the following conditions: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimers. +// +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimers in the +// documentation and/or other materials provided with the distribution. +// +// * Neither the names of the LLVM Team, University of Illinois at +// Urbana-Champaign, nor the names of its contributors may be used to +// endorse or promote products derived from this Software without specific +// prior written permission. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS WITH THE +// SOFTWARE. + +/* + * Source: https://reviews.llvm.org/D47111 + **/ + +#ifndef EXPERIMENTAL_MEMORY_RESOURCE +#define EXPERIMENTAL_MEMORY_RESOURCE + +#if __cpp_lib_memory_resource < 201603 + +#include + +namespace std { +namespace pmr { + +class monotonic_buffer_resource : public memory_resource { + static constexpr const size_t __default_buffer_capacity = 1024; + static constexpr const size_t __default_buffer_alignment = 16; + + struct __chunk_header { + __chunk_header *__next_; + char * __start_; + char * __cur_; + size_t __align_; + size_t __allocation_size() + { + return (reinterpret_cast(this) - __start_) + sizeof(*this); + } + void *__try_allocate_from_chunk(size_t, size_t); + }; + + struct __initial_header { + char *__start_; + char *__cur_; + union { + char * __end_; + size_t __size_; + }; + void *__try_allocate_from_chunk(size_t, size_t); + }; + +public: + monotonic_buffer_resource() + : monotonic_buffer_resource( + nullptr, __default_buffer_capacity, get_default_resource()) + { + } + + explicit monotonic_buffer_resource(size_t __initial_size) + : monotonic_buffer_resource( + nullptr, __initial_size, get_default_resource()) + { + } + + monotonic_buffer_resource(void *__buffer, size_t __buffer_size) + : monotonic_buffer_resource( + __buffer, __buffer_size, get_default_resource()) + { + } + + explicit monotonic_buffer_resource(memory_resource *__upstream) + : monotonic_buffer_resource( + nullptr, __default_buffer_capacity, __upstream) + { + } + + monotonic_buffer_resource( + size_t __initial_size, memory_resource *__upstream) + : monotonic_buffer_resource(nullptr, __initial_size, __upstream) + { + } + + monotonic_buffer_resource( + void *__buffer, size_t __buffer_size, memory_resource *__upstream) + : __res_(__upstream) + { + __initial_.__start_ = static_cast(__buffer); + if (__buffer != nullptr) { + __initial_.__cur_ = static_cast(__buffer); + __initial_.__end_ = static_cast(__buffer) + __buffer_size; + } + else { + __initial_.__cur_ = nullptr; + __initial_.__size_ = __buffer_size; + } + __chunks_ = nullptr; + } + + monotonic_buffer_resource(const monotonic_buffer_resource &) = delete; + + ~monotonic_buffer_resource() override + { + release(); + } + + monotonic_buffer_resource &operator=(const monotonic_buffer_resource &) = + delete; + + void release() + { + __initial_.__cur_ = __initial_.__start_; + while (__chunks_ != nullptr) { + __chunk_header *__next = __chunks_->__next_; + __res_->deallocate( + __chunks_->__start_, + __chunks_->__allocation_size(), + __chunks_->__align_); + __chunks_ = __next; + } + } + + memory_resource *upstream_resource() const + { + return __res_; + } + +protected: + void *do_allocate( + size_t __bytes, size_t __alignment) override; // key function + + void do_deallocate(void *, size_t, size_t) override + { + } + + bool do_is_equal(const memory_resource &__other) const noexcept override + { + return this == std::addressof(__other); + } + +private: + __initial_header __initial_; + __chunk_header * __chunks_; + memory_resource *__res_; +}; +} // namespace pmr +} // namespace std +#endif +#endif diff --git a/dash/include/dash/algorithm/Sort.h b/dash/include/dash/algorithm/Sort.h index 3cc49e8fd..150360230 100644 --- a/dash/include/dash/algorithm/Sort.h +++ b/dash/include/dash/algorithm/Sort.h @@ -1,27 +1,8 @@ #ifndef DASH__ALGORITHM__SORT_H #define DASH__ALGORITHM__SORT_H -#include -#include -#include -#include -#include - -#include -#include -#include -#include - -#include -#include - -#include -#include - -namespace dash { - #ifdef DOXYGEN - +namespace dash { /** * Sorts the elements in the range, defined by \c [begin, end) in ascending * order. The order of equal elements is not guaranteed to be preserved. @@ -77,59 +58,89 @@ void sort(GlobRandomIt begin, GlobRandomIt end); * * \ingroup DashAlgorithms */ -template -void sort(GlobRandomIt begin, GlobRandomIt end, SortableHash hash); +template +void sort(GlobRandomIt begin, GlobRandomIt end, Projection projection); + +} // namespace dash #else -#define __DASH_SORT__FINAL_STEP_BY_MERGE (0) -#define __DASH_SORT__FINAL_STEP_BY_SORT (1) -#define __DASH_SORT__FINAL_STEP_STRATEGY (__DASH_SORT__FINAL_STEP_BY_MERGE) +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include -#include +namespace dash { -template -void sort(GlobRandomIt begin, GlobRandomIt end, SortableHash sortable_hash) +template < + class GlobRandomIt, + class Projection, + class MergeStrategy = impl::sort__final_strategy__merge> +void sort( + GlobRandomIt begin, + GlobRandomIt end, + GlobRandomIt out, + Projection projection, + MergeStrategy strategy = MergeStrategy{}) { - using iter_type = GlobRandomIt; - using value_type = typename iter_type::value_type; + using iter_type = GlobRandomIt; + using value_type = typename iter_type::value_type; using mapped_type = typename std::decay::result_type>::type; + Projection>::result_type>::type; static_assert( std::is_arithmetic::value, "Only arithmetic types are supported"); - auto pattern = begin.pattern(); - - dash::util::Trace trace("Sort"); - - auto const sort_comp = [&sortable_hash]( - const value_type& a, const value_type& b) { - return sortable_hash(a) < sortable_hash(b); - }; + static_assert( + std::is_same::value, + "incompatible pattern types for input and output iterator"); - if (pattern.team() == dash::Team::Null()) { - DASH_LOG_TRACE("dash::sort", "Sorting on dash::Team::Null()"); + if (begin >= end) { + DASH_LOG_TRACE("dash::sort", "empty range"); + begin.pattern().team().barrier(); return; } - if (pattern.team().size() == 1) { - DASH_LOG_TRACE("dash::sort", "Sorting on a team with only 1 unit"); - trace.enter_state("final_local_sort"); - std::sort(begin.local(), end.local(), sort_comp); - trace.exit_state("final_local_sort"); + + if (begin.pattern().team() == dash::Team::Null() || + out.pattern().team() == dash::Team::Null()) { + DASH_LOG_TRACE("dash::sort", "Sorting on dash::Team::Null()"); return; } - if (begin >= end) { - DASH_LOG_TRACE("dash::sort", "empty range"); - trace.enter_state("final_barrier"); - pattern.team().barrier(); - trace.exit_state("final_barrier"); + if (begin.pattern().team() != out.pattern().team()) { + DASH_LOG_ERROR("dash::sort", "incompatible teams"); return; } + dash::util::Trace trace("Sort"); + + auto const sort_comp = [&projection]( + const value_type& a, const value_type& b) { + return projection(a) < projection(b); + }; + + auto pattern = begin.pattern(); + dash::Team& team = pattern.team(); auto const nunits = team.size(); auto const myid = team.myid(); @@ -139,41 +150,80 @@ void sort(GlobRandomIt begin, GlobRandomIt end, SortableHash sortable_hash) // local distance auto const l_range = dash::local_index_range(begin, end); + // local pointer to input data auto* l_mem_begin = dash::local_begin( static_cast(begin), team.myid()); + // local pointer to output data + auto* l_mem_target = dash::local_begin( + static_cast(out), team.myid()); + auto const n_l_elem = l_range.end - l_range.begin; - auto * lbegin = l_mem_begin + l_range.begin; - auto * lend = l_mem_begin + l_range.end; + impl::LocalData local_data{// input + l_mem_begin + l_range.begin, + // output + l_mem_target + l_range.begin}; + + // Request a thread pool based on locality information + dash::util::TeamLocality tloc{pattern.team()}; + auto uloc = tloc.unit_locality(pattern.team().myid()); + auto nthreads = uloc.num_domain_threads(); + + DASH_ASSERT_GE(nthreads, 0, "invalid number of threads"); + + dash::impl::NodeParallelismConfig nodeLevelConfig{ + static_cast(nthreads)}; + + DASH_LOG_TRACE( + "dash::sort", + "nthreads for local parallelism: ", + nodeLevelConfig.parallelism()); // initial local_sort trace.enter_state("1:initial_local_sort"); - std::sort(lbegin, lend, sort_comp); - trace.exit_state("1:initial_local_sort"); - trace.enter_state("2:init_temporary_global_data"); + impl::local_sort( + local_data.input, + local_data.input + n_l_elem, + sort_comp, + nodeLevelConfig.parallelism()); - using array_t = dash::Array; + DASH_LOG_TRACE_RANGE( + "locally sorted array", local_data.input, local_data.input + n_l_elem); - std::size_t gsize = nunits * NLT_NLE_BLOCK * 2; + trace.exit_state("1:initial_local_sort"); - // implicit barrier... - array_t g_partition_data(nunits * nunits * 3, dash::BLOCKED, team); - std::uninitialized_fill( - g_partition_data.lbegin(), g_partition_data.lend(), 0); + auto ptr_begin = static_cast( + static_cast(begin)); + auto ptr_out = + static_cast(static_cast(out)); - trace.exit_state("2:init_temporary_global_data"); + auto in_place = ptr_begin.segid == ptr_out.segid; - trace.enter_state("3:find_global_min_max"); + if (pattern.team().size() == 1) { + if (!in_place) { + std::copy( + local_data.input, local_data.input + n_l_elem, local_data.output); + } + DASH_LOG_TRACE("dash::sort", "Sorting on a team with only 1 unit"); + return; + } - // Temporary local buffer (sorted); - std::vector const lcopy(lbegin, lend); + trace.enter_state("2:find_global_min_max"); - auto const min_max = detail::find_global_min_max( - std::begin(lcopy), std::end(lcopy), team.dart_id(), sortable_hash); + auto min_max = impl::minmax( + (n_l_elem > 0) ? std::make_pair( + // local minimum + projection(*local_data.input), + // local maximum + projection(*(local_data.input + n_l_elem - 1))) + : std::make_pair( + std::numeric_limits::max(), + std::numeric_limits::min()), + team.dart_id()); - trace.exit_state("3:find_global_min_max"); + trace.exit_state("2:find_global_min_max"); DASH_LOG_TRACE_VAR("global minimum in range", min_max.first); DASH_LOG_TRACE_VAR("global maximum in range", min_max.second); @@ -184,494 +234,491 @@ void sort(GlobRandomIt begin, GlobRandomIt end, SortableHash sortable_hash) return; } - trace.enter_state("4:init_temporary_local_data"); - - auto const p_unit_info = - detail::psort__find_partition_borders(pattern, begin, end); + trace.enter_state("3:init_temporary_local_data"); - auto const& acc_partition_count = p_unit_info.acc_partition_count; + // find the partition sizes within the global range + auto partition_sizes_psum = impl::psort__partition_sizes(begin, end); - auto const nboundaries = nunits - 1; - std::vector splitters(nboundaries, mapped_type{}); - - detail::PartitionBorder p_borders( + auto const nboundaries = nunits - 1; + impl::Splitter splitters( nboundaries, min_max.first, min_max.second); - detail::psort__init_partition_borders(p_unit_info, p_borders); + impl::psort__init_partition_borders(partition_sizes_psum, splitters); - DASH_LOG_TRACE_RANGE( - "locally sorted array", std::begin(lcopy), std::end(lcopy)); DASH_LOG_TRACE_RANGE( "skipped splitters", - p_borders.is_skipped.cbegin(), - p_borders.is_skipped.cend()); - - bool done = false; + std::begin(splitters.is_skipped), + std::end(splitters.is_skipped)); // collect all valid splitters in a temporary vector - std::vector valid_partitions; - + std::vector valid_splitters; + valid_splitters.reserve(nunits); { - // make this as a separately scoped block to deallocate non-required - // temporary memory - std::vector all_borders(splitters.size()); - std::iota(all_borders.begin(), all_borders.end(), 0); - - auto const& is_skipped = p_borders.is_skipped; + auto range = dash::meta::range(nboundaries); std::copy_if( - all_borders.begin(), - all_borders.end(), - std::back_inserter(valid_partitions), - [&is_skipped](size_t idx) { return is_skipped[idx] == false; }); + std::begin(range), + std::end(range), + std::back_inserter(valid_splitters), + [& is_skipped = splitters.is_skipped](size_t idx) { + return is_skipped[idx] == false; + }); } DASH_LOG_TRACE_RANGE( "valid partitions", - std::begin(valid_partitions), - std::end(valid_partitions)); + std::begin(valid_splitters), + std::end(valid_splitters)); - if (valid_partitions.empty()) { + if (valid_splitters.empty()) { // Edge case: We may have a team spanning at least 2 units, however the // global range is owned by only 1 unit team.barrier(); return; } - trace.exit_state("4:init_temporary_local_data"); - - trace.enter_state("5:find_global_partition_borders"); - - size_t iter = 0; - - std::vector global_histo(nunits * NLT_NLE_BLOCK, 0); - - do { - ++iter; - - detail::psort__calc_boundaries(p_borders, splitters); - - DASH_LOG_TRACE_VAR("finding partition borders", iter); - - DASH_LOG_TRACE_RANGE( - "partition borders", std::begin(splitters), std::end(splitters)); - - auto const l_nlt_nle = detail::psort__local_histogram( - splitters, - valid_partitions, - p_borders, - std::begin(lcopy), - std::end(lcopy), - sortable_hash); - - detail::trace_local_histo("local histogram", l_nlt_nle); - - // allreduce with implicit barrier - detail::psort__global_histogram( - // first partition - std::begin(l_nlt_nle), - // iterator past last valid partition - std::next( - std::begin(l_nlt_nle), - (valid_partitions.back() + 1) * NLT_NLE_BLOCK), - std::begin(global_histo), - team.dart_id()); - - DASH_LOG_TRACE_RANGE( - "global histogram", - std::next(std::begin(global_histo), myid * NLT_NLE_BLOCK), - std::next(std::begin(global_histo), (myid + 1) * NLT_NLE_BLOCK)); - - done = detail::psort__validate_partitions( - p_unit_info, splitters, valid_partitions, p_borders, global_histo); - } while (!done); + trace.exit_state("3:init_temporary_local_data"); - trace.exit_state("5:find_global_partition_borders"); + { + trace.enter_state("4:find_global_partition_borders"); + + size_t iter = 0; + bool done = false; + + std::vector global_histo(nunits * impl::lower_upper_block, 0); + + do { + ++iter; + + impl::psort__calc_boundaries(splitters); + + DASH_LOG_TRACE_VAR("finding partition borders", iter); + + DASH_LOG_TRACE_RANGE( + "splitters", + std::begin(splitters.threshold), + std::end(splitters.threshold)); + + auto const l_nlt_nle = impl::psort__local_histogram( + splitters, + valid_splitters, + local_data.input, + local_data.input + n_l_elem, + projection); + + DASH_LOG_TRACE_RANGE( + "local histogram ( < )", + impl::make_strided_iterator(std::begin(l_nlt_nle)), + impl::make_strided_iterator(std::begin(l_nlt_nle)) + nunits); + + DASH_LOG_TRACE_RANGE( + "local histogram ( <= )", + impl::make_strided_iterator(std::begin(l_nlt_nle) + 1), + impl::make_strided_iterator(std::begin(l_nlt_nle) + 1) + nunits); + + // allreduce with implicit barrier + impl::psort__global_histogram( + // first partition + std::begin(l_nlt_nle), + // iterator past last valid partition + std::next( + std::begin(l_nlt_nle), + (valid_splitters.back() + 1) * impl::lower_upper_block), + std::begin(global_histo), + team.dart_id()); + + DASH_LOG_TRACE_RANGE( + "global histogram", + std::next(std::begin(global_histo), myid * impl::lower_upper_block), + std::next(std::begin(global_histo), (myid + 1) * impl::lower_upper_block)); + + done = impl::psort__validate_partitions( + splitters, partition_sizes_psum, valid_splitters, global_histo); + } while (!done); + + DASH_LOG_TRACE_VAR("partition borders found after N iterations", iter); + trace.exit_state("4:find_global_partition_borders"); + + if (!myid) { + DASH_LOG_TRACE_RANGE( + "final global histogram", + std::begin(global_histo), + std::end(global_histo)); + DASH_LOG_TRACE_RANGE( + "prefix sum capacities", + std::begin(partition_sizes_psum), + std::end(partition_sizes_psum)); + DASH_LOG_WARN( + "dash::sort", "partition borders found after N iterations", iter); + } + DASH_LOG_TRACE( + "local min and max element", min_max.first, min_max.second); + } - DASH_LOG_TRACE_VAR("partition borders found after N iterations", iter); + /********************************************************************/ + /****** Final Histogram *********************************************/ + /********************************************************************/ - trace.enter_state("6:final_local_histogram"); + trace.enter_state("5:final_local_histogram"); /* How many elements are less than P * or less than equals P */ - auto const histograms = detail::psort__local_histogram( + auto const histograms = impl::psort__local_histogram( splitters, - valid_partitions, - p_borders, - std::begin(lcopy), - std::end(lcopy), - sortable_hash); - trace.exit_state("6:final_local_histogram"); - - DASH_LOG_TRACE_RANGE("final splitters", splitters.begin(), splitters.end()); - - detail::trace_local_histo("final histograms", histograms); - - trace.enter_state("7:transpose_local_histograms (all-to-all)"); - - if (n_l_elem > 0) { - // TODO(kowalewski): minimize communication to copy only until the last - // valid border - /* - * Transpose (Shuffle) the final histograms to communicate - * the partition distribution - */ - - dash::team_unit_t transposed_unit{0}; - for (auto it = std::begin(histograms); it != std::end(histograms); - it += NLT_NLE_BLOCK, ++transposed_unit) { - auto const& nlt_val = *it; - auto const& nle_val = *std::next(it); - if (transposed_unit != myid) { - auto const offset = transposed_unit * g_partition_data.lsize() + myid; - // We communicate only non-zero values - if (nlt_val > 0) { - g_partition_data.async[offset + IDX_DIST(nunits)].set(&(nlt_val)); - } - - if (nle_val > 0) { - g_partition_data.async[offset + IDX_SUPP(nunits)].set(&(nle_val)); - } - } - else { - g_partition_data.local[myid + IDX_DIST(nunits)] = nlt_val; - g_partition_data.local[myid + IDX_SUPP(nunits)] = nle_val; - } - } - // complete outstanding requests... - g_partition_data.async.flush(); - } - trace.exit_state("7:transpose_local_histograms (all-to-all)"); + valid_splitters, + local_data.input, + local_data.input + n_l_elem, + projection); - trace.enter_state("8:barrier"); - team.barrier(); - trace.exit_state("8:barrier"); + trace.exit_state("5:final_local_histogram"); DASH_LOG_TRACE_RANGE( - "initial partition distribution:", - std::next(g_partition_data.lbegin(), IDX_DIST(nunits)), - std::next(g_partition_data.lbegin(), IDX_DIST(nunits) + nunits)); + "final splitters", + std::begin(splitters.threshold), + std::end(splitters.threshold)); DASH_LOG_TRACE_RANGE( - "initial partition supply:", - std::next(g_partition_data.lbegin(), IDX_SUPP(nunits)), - std::next(g_partition_data.lbegin(), IDX_SUPP(nunits) + nunits)); - - /* Calculate final distribution per partition. Each unit calculates their - * local distribution independently. - * All accesses are only to local memory - */ - - trace.enter_state("9:calc_final_partition_dist"); - - detail::psort__calc_final_partition_dist( - acc_partition_count, g_partition_data.local); + "local histogram ( < )", + impl::make_strided_iterator(std::begin(histograms)), + impl::make_strided_iterator(std::begin(histograms)) + nunits); DASH_LOG_TRACE_RANGE( - "final partition distribution", - std::next(g_partition_data.lbegin(), IDX_DIST(nunits)), - std::next(g_partition_data.lbegin(), IDX_DIST(nunits) + nunits)); - - // Reset local elements to 0 since the following matrix transpose - // communicates only non-zero values and writes to exactly these offsets. - std::fill( - &(g_partition_data.local[IDX_TARGET_COUNT(nunits)]), - &(g_partition_data.local[IDX_TARGET_COUNT(nunits) + nunits]), - 0); + "local histogram ( <= )", + impl::make_strided_iterator(std::begin(histograms) + 1), + impl::make_strided_iterator(std::begin(histograms) + 1) + nunits); - trace.exit_state("9:calc_final_partition_dist"); + /********************************************************************/ + /****** Partition Distribution **************************************/ + /********************************************************************/ - trace.enter_state("10:barrier"); - team.barrier(); - trace.exit_state("10:barrier"); - - trace.enter_state("11:transpose_final_partition_dist (all-to-all)"); - /* - * Transpose the final distribution again to obtain the end offsets + /** + * Each unit 0 <= p < P-1 is responsible for a final refinement around the + * borders of bucket B_p. + * + * Parameters: + * - Lower bound ( < S_p): The number of elements which definitely belong to + * Bucket p. + * - Bucket size: Local capacity of unit u_p + * - Uppoer bound ( <= S_p): The number of elements which eventually go into + * Bucket p. + * + * We first calculate the deficit (Bucket size - lower bound). If the + * bucket is not fully exhausted (deficit > 0) we fill the space with + * elements from the upper bound until the bucket is full. */ - dash::team_unit_t unit{0}; - auto const last = static_cast(nunits); - for (; unit < last; ++unit) { - if (g_partition_data.local[IDX_DIST(nunits) + unit] == 0) { - continue; - } + trace.enter_state("6:transpose_local_histograms (all-to-all)"); - if (unit != myid) { - // We communicate only non-zero values - auto const offset = unit * g_partition_data.lsize() + myid; - g_partition_data.async[offset + IDX_TARGET_COUNT(nunits)].set( - &(g_partition_data.local[IDX_DIST(nunits) + unit])); - } - else { - g_partition_data.local[IDX_TARGET_COUNT(nunits) + myid] = - g_partition_data.local[IDX_DIST(nunits) + unit]; - } - } - - g_partition_data.async.flush(); + std::vector g_partition_data(nunits * 2); - trace.exit_state("11:transpose_final_partition_dist (all-to-all)"); + DASH_ASSERT_RETURNS( + dart_alltoall( + // send buffer + histograms.data(), + // receive buffer + g_partition_data.data(), + // we send / receive 1 element to / from each process + impl::lower_upper_block, + // dtype + dash::dart_datatype::value, + // teamid + team.dart_id()), + DART_OK); - trace.enter_state("12:barrier"); - team.barrier(); - trace.exit_state("12:barrier"); + DASH_LOG_TRACE_RANGE( + "initial partition distribution", + impl::make_strided_iterator(std::begin(g_partition_data)), + impl::make_strided_iterator(std::begin(g_partition_data)) + nunits); DASH_LOG_TRACE_RANGE( - "final target count", - std::next(g_partition_data.lbegin(), IDX_TARGET_COUNT(nunits)), - std::next( - g_partition_data.lbegin(), IDX_TARGET_COUNT(nunits) + nunits)); + "initial partition supply", + impl::make_strided_iterator(std::begin(g_partition_data) + 1), + impl::make_strided_iterator(std::begin(g_partition_data) + 1) + nunits); - trace.enter_state("13:calc_final_send_count"); + trace.exit_state("6:transpose_local_histograms (all-to-all)"); - std::vector l_send_displs(nunits, 0); + /* Calculate final distribution per partition. Each unit is responsible for + * its own bucket. + */ - if (n_l_elem > 0) { - auto const* l_target_count = - &(g_partition_data.local[IDX_TARGET_COUNT(nunits)]); - auto* l_send_count = &(g_partition_data.local[IDX_SEND_COUNT(nunits)]); + trace.enter_state("7:calc_final_partition_dist"); - detail::psort__calc_send_count( - p_borders, valid_partitions, l_target_count, l_send_count); + auto first_nlt = impl::make_strided_iterator(std::begin(g_partition_data)); - // exclusive scan using partial sum - std::partial_sum( - l_send_count, - std::next(l_send_count, nunits - 1), - std::next(std::begin(l_send_displs)), - std::plus()); - } - else { - std::fill( - std::next(g_partition_data.lbegin(), IDX_SEND_COUNT(nunits)), - std::next(g_partition_data.lbegin(), IDX_SEND_COUNT(nunits) + nunits), - 0); - } + auto first_nle = + impl::make_strided_iterator(std::next(std::begin(g_partition_data))); -#if defined(DASH_ENABLE_ASSERTIONS) && defined(DASH_ENABLE_TRACE_LOGGING) - { - std::vector chksum(nunits, 0); - - DASH_ASSERT_RETURNS( - dart_allreduce( - std::next(g_partition_data.lbegin(), IDX_SEND_COUNT(nunits)), - chksum.data(), - nunits, - dart_datatype::value, - DART_OP_SUM, - team.dart_id()), - DART_OK); - - DASH_ASSERT_EQ( - chksum[myid.id], - n_l_elem, - "send count must match the capacity of the unit"); - } -#endif + impl::psort__calc_final_partition_dist( + first_nlt, + first_nlt + nunits, + first_nle, + partition_sizes_psum[myid + 1]); - DASH_LOG_TRACE_RANGE( - "send count", - std::next(g_partition_data.lbegin(), IDX_SEND_COUNT(nunits)), - std::next(g_partition_data.lbegin(), IDX_SEND_COUNT(nunits) + nunits)); + // let us now collapse the data into a contiguous range with unit stride + std::move( + impl::make_strided_iterator(std::begin(g_partition_data)) + 1, + impl::make_strided_iterator(std::begin(g_partition_data)) + nunits, + std::next(std::begin(g_partition_data))); DASH_LOG_TRACE_RANGE( - "send displs", l_send_displs.begin(), l_send_displs.end()); + "final partition distribution", + std::begin(g_partition_data), + std::next(std::begin(g_partition_data), nunits)); - trace.exit_state("13:calc_final_send_count"); + trace.exit_state("7:calc_final_partition_dist"); - trace.enter_state("14:barrier"); - team.barrier(); - trace.exit_state("14:barrier"); + /********************************************************************/ + /****** Source Displacements ****************************************/ + /********************************************************************/ + /** + * Based on the distribution we have to know the source displacements + * (the offset where we have to read from in each unit). This is just a + * ring-communication where each unit shift its local distribution downwards + * to the succeeding neighbor. + * + * Worst Case Communication Complexity: O(P) + * Memory Complexity: O(P) + * + * Only Units which contribute local elements participate in the + * communication + */ - trace.enter_state("15:calc_final_target_displs"); + trace.enter_state("8:comm_source_displs (sendrecv)"); - if (n_l_elem > 0) { - detail::psort__calc_target_displs( - p_borders, valid_partitions, g_partition_data); - } + std::vector source_displs(nunits, 0); - trace.exit_state("15:calc_final_target_displs"); + auto neighbors = + impl::psort__get_neighbors(myid, n_l_elem, splitters, valid_splitters); - trace.enter_state("16:barrier"); - team.barrier(); - trace.exit_state("16:barrier"); + DASH_LOG_TRACE( + "dash::sort", + "shift partition dist", + "my_source", + neighbors.first, + "my_target", + neighbors.second); + + dart_sendrecv( + g_partition_data.data(), + nunits, + dash::dart_datatype::value, + impl::sort_sendrecv_tag, + // dest neighbor (right) + neighbors.second, + source_displs.data(), + nunits, + dash::dart_datatype::value, + impl::sort_sendrecv_tag, + // source neighbor (left) + neighbors.first); DASH_LOG_TRACE_RANGE( - "target displs", - &(g_partition_data.local[IDX_TARGET_DISP(nunits)]), - &(g_partition_data.local[IDX_TARGET_DISP(nunits) + nunits])); + "source displs", source_displs.begin(), source_displs.end()); - trace.enter_state("17:exchange_data (all-to-all)"); + trace.exit_state("8:comm_source_displs (sendrecv)"); - std::vector > async_copies{}; - async_copies.reserve(p_unit_info.valid_remote_partitions.size()); + /********************************************************************/ + /****** Send Displacements (all-to-all) *****************************/ + /********************************************************************/ - auto const l_partition_data = g_partition_data.local; + /** + * Send displacements are needed for alltoallv at the data exchange part + * + * Worst Case Communication Complexity: O(P^2) + * Memory Complexity: O(P) + */ + trace.enter_state("9:comm_send_displs (all-to-all)"); + std::vector send_displs(nunits, 0); - auto const get_send_info = [l_partition_data, &l_send_displs, nunits]( - dash::default_index_t const p_idx) { - auto const send_count = l_partition_data[p_idx + IDX_SEND_COUNT(nunits)]; - auto const target_disp = - l_partition_data[p_idx + IDX_TARGET_DISP(nunits)]; - auto const send_disp = l_send_displs[p_idx]; - return std::make_tuple(send_count, send_disp, target_disp); - }; + DASH_ASSERT_RETURNS( + dart_alltoall( + // send buffer + g_partition_data.data(), + // receive buffer + send_displs.data(), + // we send / receive 1 element to / from each process + 1, + // dtype + dash::dart_datatype::value, + // teamid + team.dart_id()), + DART_OK); + + trace.exit_state("9:comm_send_displs (all-to-all)"); + + /********************************************************************/ + /****** Send counts *************************************************/ + /********************************************************************/ + + /** + * Based on the transposed partition data we can calculate the number of + * elements to send to each process. With that we can calculate the + * correct send displacements by summing up the send counts. + * + * Communication Complexity: 0 + * Memory Complexity: O(P) + */ - std::size_t send_count, send_disp, target_disp; - - for (auto const& unit : p_unit_info.valid_remote_partitions) { - std::tie(send_count, send_disp, target_disp) = get_send_info(unit); - - // Get a global iterator to the first local element of a unit within the - // range to be sorted [begin, end) - // - iter_type it_copy = - (unit == unit_at_begin) - ? - /* If we are the unit at the beginning of the global range simply - return begin */ - begin - : - /* Otherwise construct an global iterator pointing the first local - element from the correspoding unit */ - iter_type{&(begin.globmem()), - pattern, - pattern.global_index( - static_cast(unit), {})}; - - auto&& fut = dash::copy_async( - &(*(lcopy.begin() + send_disp)), - &(*(lcopy.begin() + send_disp + send_count)), - it_copy + target_disp); - - async_copies.emplace_back(std::move(fut)); - } + trace.enter_state("10:calc_send_counts"); - std::tie(send_count, send_disp, target_disp) = get_send_info(myid); + std::vector send_counts(nunits, 0); - if (send_count) { - std::copy( - std::next(std::begin(lcopy), send_disp), - std::next(std::begin(lcopy), send_disp + send_count), - std::next(lbegin, target_disp)); - } + impl::psort__calc_send_count( + splitters, valid_splitters, send_displs.begin(), send_counts.begin()); - std::for_each( - std::begin(async_copies), - std::end(async_copies), - [](dash::Future& fut) { fut.wait(); }); - - trace.exit_state("17:exchange_data (all-to-all)"); - - /* NOTE: While merging locally sorted sequences is faster than another - * heavy-weight sort it comes at a cost. std::inplace_merge allocates a - * temporary buffer internally which is also documented on cppreference. If - * the allocation of this buffer fails, a less efficient merge method is - * used. However, in Linux, the allocation nevers fails since the - * implementation simply allocates memory using malloc and the kernel follows - * the optimistic strategy. This is ugly and can lead to a segmentation fault - * later if no physical pages are available to map the allocated - * virtual memory. + std::partial_sum( + send_counts.begin(), + std::next(send_counts.begin(), nunits - 1), + std::next(send_displs.begin()), + std::plus()); + send_displs[0] = 0; + + DASH_LOG_TRACE_RANGE("send displs", send_displs.begin(), send_displs.end()); + DASH_LOG_TRACE_RANGE("send counts", send_counts.begin(), send_counts.end()); + + trace.exit_state("10:calc_send_counts"); + + /********************************************************************/ + /****** Target Counts ***********************************************/ + /********************************************************************/ + + /** + * Based on the distribution and the source displacements we can determine + * the number of elemens we have to copy from each unit (target count) to + * obtain the finally sorted sequence. This is just a mapping operation + * where we calculcate for all elements 0 <= i < P: * + * target_count[i] = partition_dist[i+1] - source_displacements[i] * - * std::sort does not suffer from this problem and may be a more safe - * variant, especially if the user wants to utilize the fully available - * memory capacity on its own. + * Communication Complexity: 0 + * Memory Complexity: O(P) */ + trace.enter_state("11:calc_target_offsets"); + + std::vector target_counts(nunits, 0); + + if (n_l_elem) { + if (myid) { + std::transform( + // in_first + g_partition_data.data(), + // in_last + std::next(g_partition_data.data(), nunits), + // in_second + std::begin(source_displs), + // out_first + std::begin(target_counts), + // operation + std::minus()); + } + else { + std::copy( + g_partition_data.data(), + std::next(g_partition_data.data(), nunits), + std::begin(target_counts)); + } + } -#if (__DASH_SORT__FINAL_STEP_STRATEGY == __DASH_SORT__FINAL_STEP_BY_SORT) - trace.enter_state("18:barrier"); - team.barrier(); - trace.exit_state("18:barrier"); + DASH_LOG_TRACE_RANGE( + "target counts", target_counts.begin(), target_counts.end()); - trace.enter_state("19:final_local_sort"); - std::sort(lbegin, lend); - trace.exit_state("19:final_local_sort"); -#else - trace.enter_state("18:calc_recv_count (all-to-all)"); + /********************************************************************/ + /****** Target Displs ***********************************************/ + /********************************************************************/ - std::vector recv_count(nunits, 0); + /** + * Based on the target count we calculate the target displace (the offset to + * which we have to copy remote data). This is just an exclusive scan with a + * plus opertion. + * + * Communication Complexity: 0 + * Memory Complexity: O(P) + */ + std::vector target_displs(nunits + 1, 0); - DASH_ASSERT_RETURNS( - dart_alltoall( - // send buffer - std::next(g_partition_data.lbegin(), IDX_SEND_COUNT(nunits)), - // receive buffer - recv_count.data(), - // we send / receive 1 element to / from each process - 1, - // dtype - dash::dart_datatype::value, - // teamid - team.dart_id()), DART_OK); + std::partial_sum( + std::begin(target_counts), + std::prev(std::end(target_counts)), + std::begin(target_displs) + 1, + std::plus()); - DASH_LOG_TRACE_RANGE( - "recv count", std::begin(recv_count), std::end(recv_count)); + target_displs.back() = n_l_elem; - trace.exit_state("18:calc_recv_count (all-to-all)"); + DASH_LOG_TRACE_RANGE( + "target displs", target_displs.begin(), target_displs.end() - 1); - trace.enter_state("19:merge_local_sequences"); + trace.exit_state("11:calc_target_offsets"); - // merging sorted sequences - auto nsequences = nunits; - // number of merge steps in the tree - auto const depth = static_cast(std::ceil(std::log2(nsequences))); + trace.enter_state("12:exchange_data (all-to-all)"); - // calculate the prefix sum among all receive counts to find the offsets for - // merging - std::vector recv_count_psum; - recv_count_psum.reserve(nsequences + 1); - recv_count_psum.emplace_back(0); + /********************************************************************/ + /****** Exchange Data (All-To-All) **********************************/ + /********************************************************************/ - std::partial_sum( - std::begin(recv_count), - std::end(recv_count), - std::back_inserter(recv_count_psum)); + /** + * Based on the information calculate above we initiate the data exchange. + * Each process copies P chunks from each Process to the local portion. + * Assuming all local portions are of equal local size gives us the + * following complexity: + * + * Average Communication Traffic: O(N) + * Aerage Comunication Overhead: O(P^2) + */ - DASH_LOG_TRACE_RANGE( - "recv count prefix sum", - std::begin(recv_count_psum), - std::end(recv_count_psum)); - - for (std::size_t d = 0; d < depth; ++d) { - // distance between first and mid iterator while merging - auto const step = std::size_t(0x1) << d; - // distance between first and last iterator while merging - auto const dist = step << 1; - // number of merges - auto const nmerges = nsequences >> 1; - - // These merges are independent from each other and are candidates for - // shared memory parallelism - for (std::size_t m = 0; m < nmerges; ++m) { - auto first = std::next(lbegin, recv_count_psum[m * dist]); - auto mid = std::next(lbegin, recv_count_psum[m * dist + step]); - // sometimes we have a lonely merge in the end, so we have to guarantee - // that we do not access out of bounds - auto last = std::next( - lbegin, - recv_count_psum[std::min( - m * dist + dist, recv_count_psum.size() - 1)]); - - std::inplace_merge(first, mid, last); - } + // allocate a temporary buffer: + // we explcitly do not use std::make_unique because we do want to have any + // construction + local_data.buffer = + std::move(std::unique_ptr{new value_type[n_l_elem]}); - nsequences -= nmerges; + if (n_l_elem) { + // local copy + std::copy( + std::next(local_data.input, source_displs[myid]), + std::next( + local_data.input, source_displs[myid] + target_counts[myid]), + std::next(local_data.buffer.get(), target_displs[myid])); + + impl::alltoallv( + local_data.input, + local_data.buffer.get(), + std::move(send_counts), + std::move(send_displs), + std::move(target_counts), + std::move(target_displs), + team.dart_id()); } - trace.exit_state("19:merge_local_sequences"); -#endif + trace.exit_state("12:exchange_data (all-to-all)"); + + trace.enter_state("13:final_local_sort"); + impl::local_sort( + local_data.buffer.get(), + local_data.buffer.get() + n_l_elem, + sort_comp, + nodeLevelConfig.parallelism()); + trace.exit_state("13:final_local_sort"); + - DASH_LOG_TRACE_RANGE("finally sorted range", lbegin, lend); + trace.enter_state("14:final_local_copy"); + std::copy( + local_data.buffer.get(), + local_data.buffer.get() + n_l_elem, + local_data.output); + trace.exit_state("14:final_local_copy"); - trace.enter_state("20:final_barrier"); + DASH_LOG_TRACE_RANGE( + "finally sorted range", + local_data.output, + local_data.output + n_l_elem); + + trace.enter_state("15:final_barrier"); team.barrier(); - trace.exit_state("20:final_barrier"); -} + trace.exit_state("15:final_barrier"); +} // namespace dash -namespace detail { +namespace impl { template struct identity_t : std::unary_function { constexpr T&& operator()(T&& t) const noexcept @@ -680,7 +727,21 @@ struct identity_t : std::unary_function { return std::forward(t); } }; -} // namespace detail +} // namespace impl + +template +inline void sort(GlobRandomIt begin, GlobRandomIt end, GlobRandomIt out) +{ + using value_t = typename std::remove_cv< + typename dash::iterator_traits::value_type>::type; + + dash::sort( + begin, + end, + out, + impl::identity_t{}, + impl::sort__final_strategy__merge{}); +} template inline void sort(GlobRandomIt begin, GlobRandomIt end) @@ -688,7 +749,12 @@ inline void sort(GlobRandomIt begin, GlobRandomIt end) using value_t = typename std::remove_cv< typename dash::iterator_traits::value_type>::type; - dash::sort(begin, end, detail::identity_t()); + dash::sort( + begin, + end, + begin, + impl::identity_t{}, + impl::sort__final_strategy__merge{}); } #endif // DOXYGEN diff --git a/dash/include/dash/algorithm/internal/ParallelStl.h b/dash/include/dash/algorithm/internal/ParallelStl.h new file mode 100644 index 000000000..fa7e0c331 --- /dev/null +++ b/dash/include/dash/algorithm/internal/ParallelStl.h @@ -0,0 +1,8 @@ +#ifndef DASH__ALGORITHM__INTERNAL__PARALLEL_STL_H__INCLUDED + +#ifdef DASH_ENABLE_PSTL +#include +#include + +#endif // DASH_ENABLE_PSTL +#endif // DASH__ALGORITHM__INTERNAL__PARALLEL_STL_H__INCLUDED diff --git a/dash/include/dash/algorithm/internal/Sort-inl.h b/dash/include/dash/algorithm/internal/Sort-inl.h deleted file mode 100644 index 427c22039..000000000 --- a/dash/include/dash/algorithm/internal/Sort-inl.h +++ /dev/null @@ -1,784 +0,0 @@ -#ifndef DASH__ALGORITHM__INTERNAL__SORT_H__INCLUDED -#define DASH__ALGORITHM__INTERNAL__SORT_H__INCLUDED - -#define IDX_DIST(nunits) ((nunits)*0) -#define IDX_SUPP(nunits) ((nunits)*1) -#define IDX_TARGET_DISP(nunits) ((nunits)*2) - -#define IDX_SEND_COUNT(nunits) IDX_DIST(nunits) -#define IDX_TARGET_COUNT(nunits) IDX_SUPP(nunits) - -#define NLT_NLE_BLOCK 2 - -#include -#include -#include -#include -#include - -#include -#include - -#include - -namespace detail { - -struct UnitInfo { - std::size_t nunits; - // prefix sum over the number of local elements of all unit - std::vector acc_partition_count; - std::vector valid_remote_partitions; - - explicit UnitInfo(std::size_t p_nunits) - : nunits(p_nunits) - , acc_partition_count(nunits + 1) - { - valid_remote_partitions.reserve(nunits - 1); - } -}; - -template -struct PartitionBorder { -public: - // tracks if we have found a stable partition border - std::vector is_stable; - // tracks if a partition is skipped - std::vector is_skipped; - // lower bound of each partition - std::vector lower_bound; - // upper bound of each partition - std::vector upper_bound; - // Special case for the last iteration in finding partition borders - std::vector is_last_iter; - - // The right unit is always right next to the border. For this reason we - // track only the left unit. - std::vector left_partition; - - PartitionBorder(size_t nsplitter, T _lower_bound, T _upper_bound) - : is_stable(nsplitter, false) - , is_skipped(nsplitter, false) - , lower_bound(nsplitter, _lower_bound) - , upper_bound(nsplitter, _upper_bound) - , is_last_iter(nsplitter, false) - , left_partition( - nsplitter, std::numeric_limits::min()) - { - } -}; - -template -inline void psort__calc_boundaries( - PartitionBorder& p_borders, std::vector& splitters) -{ - DASH_LOG_TRACE("< psort__calc_boundaries "); - DASH_ASSERT_EQ( - p_borders.is_stable.size(), - splitters.size(), - "invalid number of partition borders"); - - // recalculate partition boundaries - for (std::size_t idx = 0; idx < splitters.size(); ++idx) { - DASH_ASSERT(p_borders.lower_bound[idx] <= p_borders.upper_bound[idx]); - // case A: partition is already stable or skipped - if (p_borders.is_stable[idx]) { - continue; - } - // case B: we have the last iteration - //-> test upper bound directly - if (p_borders.is_last_iter[idx]) { - splitters[idx] = p_borders.upper_bound[idx]; - p_borders.is_stable[idx] = true; - } - else { - // case C: ordinary iteration - - splitters[idx] = - p_borders.lower_bound[idx] + - ((p_borders.upper_bound[idx] - p_borders.lower_bound[idx]) / 2); - - if (splitters[idx] == p_borders.lower_bound[idx]) { - // if we cannot move the partition to the left - //-> last iteration - p_borders.is_last_iter[idx] = true; - } - } - } - DASH_LOG_TRACE("psort__calc_boundaries >"); -} - -template -inline const std::vector psort__local_histogram( - std::vector const& splitters, - std::vector const& valid_partitions, - PartitionBorder const& p_borders, - Iter data_lbegin, - Iter data_lend, - SortableHash sortable_hash) -{ - DASH_LOG_TRACE("< psort__local_histogram"); - - auto const nborders = splitters.size(); - // The first element is 0 and the last element is the total number of local - // elements in this unit - auto const sz = splitters.size() + 1; - // Number of elements less than P - std::vector l_nlt_nle(NLT_NLE_BLOCK * sz, 0); - - auto const n_l_elem = std::distance(data_lbegin, data_lend); - - // The value type of the iterator is not necessarily const, however, the - // reference should definitely be. If that isn't the case the compiler - // will complain anyway since our lambda required const qualifiers. - using reference = typename std::iterator_traits::reference; - - if (n_l_elem > 0) { - for (auto const& idx : valid_partitions) { - // search lower bound of partition value - auto lb_it = std::lower_bound( - data_lbegin, - data_lend, - splitters[idx], - [&sortable_hash](reference a, const MappedType& b) { - return sortable_hash(a) < b; - }); - // search upper bound by starting from the lower bound - auto ub_it = std::upper_bound( - lb_it, - data_lend, - splitters[idx], - [&sortable_hash](const MappedType& b, reference a) { - return b < sortable_hash(a); - }); - - auto const p_left = p_borders.left_partition[idx]; - DASH_ASSERT_NE(p_left, dash::team_unit_t{}, "invalid bounding unit"); - - auto const nlt_idx = (p_left)*NLT_NLE_BLOCK; - - l_nlt_nle[nlt_idx] = std::distance(data_lbegin, lb_it); - l_nlt_nle[nlt_idx + 1] = std::distance(data_lbegin, ub_it); - } - - auto const last_valid_border_idx = *std::prev(valid_partitions.cend()); - auto const p_left = p_borders.left_partition[last_valid_border_idx]; - - // fill trailing partitions with local capacity - std::fill( - std::next(std::begin(l_nlt_nle), (p_left + 1) * NLT_NLE_BLOCK), - std::end(l_nlt_nle), - n_l_elem); - } - - DASH_LOG_TRACE("psort__local_histogram >"); - return l_nlt_nle; -} - -template -inline void psort__global_histogram( - InputIt local_histo_begin, - InputIt local_histo_end, - OutputIt output_it, - dart_team_t dart_team_id) -{ - DASH_LOG_TRACE("< psort__global_histogram "); - - auto const nels = std::distance(local_histo_begin, local_histo_end); - - dart_allreduce( - &(*local_histo_begin), - &(*output_it), - nels, - dash::dart_datatype::value, - DART_OP_SUM, - dart_team_id); - - DASH_LOG_TRACE("psort__global_histogram >"); -} - -template -inline bool psort__validate_partitions( - UnitInfo const& p_unit_info, - std::vector const& splitters, - std::vector const& valid_partitions, - PartitionBorder& p_borders, - std::vector const& global_histo) -{ - DASH_LOG_TRACE("< psort__validate_partitions"); - - if (valid_partitions.empty()) { - return true; - } - - auto const& acc_partition_count = p_unit_info.acc_partition_count; - - // This validates if all partititions have been correctly determined. The - // example below shows 4 units where unit 1 is empty (capacity 0). Thus - // we have only two valid partitions, i.e. partition borders 1 and 2, - // respectively. Partition 0 is skipped because the bounding unit on the - // right-hand side is empty. For partition one, the bounding unit is unit 0, - // one the right hand side it is 2. - // - // The right hand side unit is always (partition index + 1), the unit on - // the left hand side is calculated at the beginning of dash::sort (@see - // psort__init_partition_borders) and stored in a vector for lookup. - // - // Given this information the validation checks the following constraints - // - // - The number of elements in the global histrogram less than the - // partitition value must be smaller than the "accumulated" partition size - // - The "accumulated" partition size must be less than or equal the number - // of elements which less than or equal the partition value - // - // If either of these two constraints cannot be satisfied we have to move - // the upper or lower bound of the partition value, respectively. - - // -------|-------|-------|------- - // Partition Index u0 | u1 | u2 | u3 - // -------|-------|-------|------- - // Partition Size 10 | 0 | 10 | 10 - // ^ ^ ^ - // | | | - // -------Partition-- - // | Border 1 | - // Left Unit | Right Unit - // | | | - // | | | - // -------|-------|-------|------- - // Acc Partition Count 10 | 10 | 20 | 30 - // - - for (auto const& border_idx : valid_partitions) { - auto const p_left = p_borders.left_partition[border_idx]; - auto const nlt_idx = p_left * NLT_NLE_BLOCK; - - auto const peer_idx = p_left + 1; - - if (global_histo[nlt_idx] < acc_partition_count[peer_idx] && - acc_partition_count[peer_idx] <= global_histo[nlt_idx + 1]) { - p_borders.is_stable[border_idx] = true; - } - else { - if (global_histo[nlt_idx] >= acc_partition_count[peer_idx]) { - p_borders.upper_bound[border_idx] = splitters[border_idx]; - } - else { - p_borders.lower_bound[border_idx] = splitters[border_idx]; - } - } - } - - // Exit condition: is there any non-stable partition - auto const nonstable_it = std::find( - p_borders.is_stable.cbegin(), p_borders.is_stable.cend(), false); - - DASH_LOG_TRACE("psort__validate_partitions >"); - // exit condition - return nonstable_it == p_borders.is_stable.cend(); -} - -template -inline void psort__calc_final_partition_dist( - std::vector const& acc_partition_count, - LocalArrayT& l_partition_dist) -{ - /* Calculate number of elements to receive for each partition: - * We first assume that we we receive exactly the number of elements which - * are less than P. - * The output are the end offsets for each partition - */ - DASH_LOG_TRACE("< psort__calc_final_partition_dist"); - - auto const myid = l_partition_dist.pattern().team().myid(); - auto const nunits = l_partition_dist.pattern().team().size(); - auto const supp_begin = l_partition_dist.begin() + IDX_SUPP(nunits); - auto dist_begin = l_partition_dist.begin() + IDX_DIST(nunits); - - auto const n_my_elements = std::accumulate( - dist_begin, dist_begin + nunits, static_cast(0)); - - // Calculate the deficit - auto my_deficit = acc_partition_count[myid + 1] - n_my_elements; - - // If there is a deficit, look how much unit j can supply - for (auto unit = dash::team_unit_t{0}; unit < nunits && my_deficit > 0; - ++unit) { - auto const supply_unit = *(supp_begin + unit) - *(dist_begin + unit); - - DASH_ASSERT_GE(supply_unit, 0, "invalid supply of target unit"); - if (supply_unit <= my_deficit) { - *(dist_begin + unit) += supply_unit; - my_deficit -= supply_unit; - } - else { - *(dist_begin + unit) += my_deficit; - my_deficit = 0; - } - } - - DASH_ASSERT_GE(my_deficit, 0, "Invalid local deficit"); - DASH_LOG_TRACE("psort__calc_final_partition_dist >"); -} - -template -inline void psort__calc_send_count( - PartitionBorder const& p_borders, - std::vector const& valid_partitions, - InputIt target_count, - OutputIt send_count) -{ - using value_t = typename std::iterator_traits::value_type; - - static_assert( - std::is_same< - value_t, - typename std::iterator_traits::value_type>::value, - "value types must be equal"); - - DASH_LOG_TRACE("< psort__calc_send_count"); - - // The number of units is the number of splitters + 1 - auto const nunits = p_borders.lower_bound.size() + 1; - std::vector tmp_target_count; - tmp_target_count.reserve(nunits + 1); - tmp_target_count.emplace_back(0); - - std::copy( - target_count, - std::next(target_count, nunits), - // we copy to index 1 since tmp_target_count[0] == 0 - std::back_inserter(tmp_target_count)); - - auto tmp_target_count_begin = std::next(std::begin(tmp_target_count)); - - auto const last_skipped = p_borders.is_skipped.cend(); - auto it_skipped = - std::find(p_borders.is_skipped.cbegin(), last_skipped, true); - - auto it_valid = valid_partitions.cbegin(); - - std::size_t skipped_idx = 0; - - while (std::find(it_skipped, last_skipped, true) != last_skipped) { - skipped_idx = std::distance(p_borders.is_skipped.cbegin(), it_skipped); - - it_valid = - std::upper_bound(it_valid, valid_partitions.cend(), skipped_idx); - - if (it_valid == valid_partitions.cend()) { - break; - } - - auto const p_left = p_borders.left_partition[*it_valid]; - auto const n_contig_skips = *it_valid - p_left; - - std::fill_n( - std::next(tmp_target_count_begin, p_left + 1), - n_contig_skips, - *std::next(tmp_target_count_begin, p_left)); - - std::advance(it_skipped, n_contig_skips); - std::advance(it_valid, 1); - } - - std::transform( - tmp_target_count.begin() + 1, - tmp_target_count.end(), - tmp_target_count.begin(), - send_count, - std::minus()); - - DASH_LOG_TRACE("psort__calc_send_count >"); -} - -template -inline void psort__calc_target_displs( - PartitionBorder const& p_borders, - std::vector const& valid_partitions, - dash::Array& g_partition_data) -{ - DASH_LOG_TRACE("< psort__calc_target_displs"); - auto const nunits = g_partition_data.team().size(); - auto const myid = g_partition_data.team().myid(); - - auto* l_target_displs = &(g_partition_data.local[IDX_TARGET_DISP(nunits)]); - - if (0 == myid) { - // Unit 0 always writes to target offset 0 - std::fill(l_target_displs, l_target_displs + nunits, 0); - } - - std::vector target_displs(nunits, 0); - - auto const u_blocksize = g_partition_data.lsize(); - - // What this algorithm does is basically an exclusive can over all send - // counts across all participating units to find the target displacements of - // a unit for all partitions. More precisely, each unit has to know the - // starting offset in each partition where the elements should be copied to. - // - // Note: The one-sided approach here is - // probably not the most efficient way. Something like dart_exscan should be - // more efficient in large scale scenarios - - for (auto const& border_idx : valid_partitions) { - auto const left_u = p_borders.left_partition[border_idx]; - auto const right_u = border_idx + 1; - size_t const val = - (left_u == myid) - ? - /* if we are the bounding unit on the left-hand side we can access - * the value in local memory */ - g_partition_data.local[left_u + IDX_SEND_COUNT(nunits)] - : - /* Otherwise we have to read the send count remotely from the - * corresponding offset at the unit's memory */ - g_partition_data - [left_u * u_blocksize + myid + IDX_SEND_COUNT(nunits)]; - target_displs[right_u] = val + target_displs[left_u]; - - if (right_u == myid) { - // we are local - g_partition_data.local[IDX_TARGET_DISP(nunits) + myid] = - target_displs[right_u]; - } - else { - auto const target_offset = - right_u * u_blocksize + myid + IDX_TARGET_DISP(nunits); - - g_partition_data.async[target_offset].set(&(target_displs[right_u])); - } - } - - DASH_LOG_TRACE("psort__calc_target_displs >"); - g_partition_data.async.flush(); -} - -template -inline UnitInfo psort__find_partition_borders( - typename GlobIterT::pattern_type const& pattern, - GlobIterT const begin, - GlobIterT const end) -{ - DASH_LOG_TRACE("< psort__find_partition_borders"); - - auto const nunits = pattern.team().size(); - auto const myid = pattern.team().myid(); - - dash::team_unit_t unit{0}; - const dash::team_unit_t last{static_cast(nunits)}; - - auto const unit_first = pattern.unit_at(begin.pos()); - auto const unit_last = pattern.unit_at(end.pos() - 1); - - // Starting offsets of all units - UnitInfo unit_info(nunits); - auto& acc_partition_count = unit_info.acc_partition_count; - acc_partition_count[0] = 0; - - for (; unit < last; ++unit) { - // Number of elements located at current source unit: - auto const u_extents = pattern.local_extents(unit); - auto const u_size = std::accumulate( - std::begin(u_extents), - std::end(u_extents), - 1, - std::multiplies()); - // first linear global index of unit - auto const u_gidx_begin = - (unit == myid) ? pattern.lbegin() : pattern.global_index(unit, {}); - // last global index of unit - auto const u_gidx_end = u_gidx_begin + u_size; - - DASH_LOG_TRACE( - "local indexes", - unit, - ": ", - u_gidx_begin, - " ", - u_size, - " ", - u_gidx_end); - - if (u_size == 0 || u_gidx_end - 1 < begin.pos() || - u_gidx_begin >= end.pos()) { - // This unit does not participate... - acc_partition_count[unit + 1] = acc_partition_count[unit]; - } - else { - std::size_t n_u_elements; - if (unit == unit_last) { - // The local range of this unit has the global end - n_u_elements = end.pos() - u_gidx_begin; - } - else if (unit == unit_first) { - // The local range of this unit has the global begin - auto const u_begin_disp = begin.pos() - u_gidx_begin; - n_u_elements = u_size - u_begin_disp; - } - else { - // This is an inner unit - // TODO(kowalewski): Is this really necessary or can we assume that - // n_u_elements == u_size, i.e., local_pos.index == 0? - auto const local_pos = pattern.local(u_gidx_begin); - - n_u_elements = u_size - local_pos.index; - - DASH_ASSERT_EQ(local_pos.unit, unit, "units must match"); - } - - acc_partition_count[unit + 1] = - n_u_elements + acc_partition_count[unit]; - if (unit != myid) { - unit_info.valid_remote_partitions.emplace_back(unit); - } - } - } - - DASH_LOG_TRACE("psort__find_partition_borders >"); - return unit_info; -} - -template -inline void psort__init_partition_borders( - UnitInfo const& unit_info, detail::PartitionBorder& p_borders) -{ - DASH_LOG_TRACE("< psort__init_partition_borders"); - - auto const& acc_partition_count = unit_info.acc_partition_count; - - auto const last = acc_partition_count.cend(); - - // find the first non-empty unit - auto left = - std::upper_bound(std::next(acc_partition_count.cbegin()), last, 0); - - if (left == last) { - std::fill(p_borders.is_skipped.begin(), p_borders.is_skipped.end(), true); - return; - } - - // find next unit with a non-zero local portion to obtain first partition - // border - auto right = std::upper_bound(left, last, *left); - - if (right == last) { - std::fill(p_borders.is_skipped.begin(), p_borders.is_skipped.end(), true); - return; - } - - auto const get_border_idx = [](std::size_t const& idx) { - return (idx % NLT_NLE_BLOCK) ? (idx / NLT_NLE_BLOCK) * NLT_NLE_BLOCK - : idx - 1; - }; - - auto p_left = std::distance(acc_partition_count.cbegin(), left) - 1; - auto right_u = std::distance(acc_partition_count.cbegin(), right) - 1; - auto border_idx = get_border_idx(right_u); - - // mark everything as skipped until the first partition border - std::fill( - p_borders.is_skipped.begin(), - p_borders.is_skipped.begin() + border_idx, - true); - - p_borders.left_partition[border_idx] = p_left; - - // find subsequent splitters - left = right; - - while ((right = std::upper_bound(right, last, *right)) != last) { - auto const last_border_idx = border_idx; - - p_left = std::distance(acc_partition_count.cbegin(), left) - 1; - right_u = std::distance(acc_partition_count.cbegin(), right) - 1; - border_idx = get_border_idx(right_u); - - auto const dist = border_idx - last_border_idx; - - // mark all skipped splitters as stable and skipped - std::fill_n( - std::next(p_borders.is_skipped.begin(), last_border_idx + 1), - dist - 1, - true); - - p_borders.left_partition[border_idx] = p_left; - - left = right; - } - - // mark trailing empty parititons as stable and skipped - std::fill( - std::next(p_borders.is_skipped.begin(), border_idx + 1), - p_borders.is_skipped.end(), - true); - - std::copy( - p_borders.is_skipped.begin(), - p_borders.is_skipped.end(), - p_borders.is_stable.begin()); - - DASH_LOG_TRACE("psort__init_partition_borders >"); -} - -template -inline auto find_global_min_max( - Iter lbegin, Iter lend, dart_team_t teamid, SortableHash sortable_hash) - -> std::pair< - typename std::decay::result_type>::type, - typename std::decay::result_type>::type> -{ - using mapped_type = - typename std::decay::result_type>::type; - - auto const n_l_elem = std::distance(lbegin, lend); - - std::array min_max_in{ - // local minimum - (n_l_elem > 0) ? sortable_hash(*lbegin) - : std::numeric_limits::max(), - (n_l_elem > 0) ? sortable_hash(*(std::prev(lend))) - : std::numeric_limits::min()}; - std::array min_max_out{}; - - DASH_ASSERT_RETURNS( - dart_allreduce( - &min_max_in, // send buffer - &min_max_out, // receive buffer - 2, // buffer size - dash::dart_datatype::value, // data type - DART_OP_MINMAX, // operation - teamid // team - ), - DART_OK); - - return std::make_pair(std::get<0>(min_max_out), std::get<1>(min_max_out)); -} - -#ifdef DASH_ENABLE_TRACE_LOGGING - -template < - class Iterator, - typename std::iterator_traits::difference_type Stride> -class StridedIterator { - using iterator_traits = std::iterator_traits; - using stride_t = typename std::iterator_traits::difference_type; - -public: - using value_type = typename iterator_traits::value_type; - using difference_type = typename iterator_traits::difference_type; - using reference = typename iterator_traits::reference; - using pointer = typename iterator_traits::pointer; - using iterator_category = std::bidirectional_iterator_tag; - - StridedIterator() = default; - - constexpr StridedIterator(Iterator first, Iterator it, Iterator last) - : m_first(first) - , m_iter(it) - , m_last(last) - { - } - - StridedIterator(const StridedIterator& other) = default; - StridedIterator(StridedIterator&& other) noexcept = default; - StridedIterator& operator=(StridedIterator const& other) = default; - StridedIterator& operator=(StridedIterator&& other) noexcept = default; - ~StridedIterator() = default; - - StridedIterator operator++() - { - increment(); - return *this; - } - - StridedIterator operator--() - { - decrement(); - return *this; - } - - StridedIterator operator++(int) const noexcept - { - Iterator tmp = *this; - tmp.increment(); - return tmp; - } - - StridedIterator operator--(int) const noexcept - { - Iterator tmp = *this; - tmp.decrement(); - return tmp; - } - - reference operator*() const noexcept - { - return *m_iter; - } - -private: - void increment() - { - for (difference_type i = 0; (m_iter != m_last) && (i < Stride); ++i) { - ++m_iter; - } - } - - void decrement() - { - for (difference_type i = 0; (m_iter != m_first) && (i < Stride); ++i) { - --m_iter; - } - } - -public: - friend bool operator==( - const StridedIterator& lhs, const StridedIterator rhs) noexcept - { - return lhs.m_iter == rhs.m_iter; - } - friend bool operator!=( - const StridedIterator& lhs, const StridedIterator rhs) noexcept - { - return !(lhs.m_iter == rhs.m_iter); - } - -private: - Iterator m_first{}; - Iterator m_iter{}; - Iterator m_last{}; -}; - -#endif - -inline void trace_local_histo( - std::string&& ctx, std::vector const& histograms) -{ -#ifdef DASH_ENABLE_TRACE_LOGGING - using strided_iterator_t = detail::StridedIterator< - typename std::vector::const_iterator, - NLT_NLE_BLOCK>; - - strided_iterator_t nlt_first{ - std::begin(histograms), std::begin(histograms), std::end(histograms)}; - strided_iterator_t nlt_last{ - std::begin(histograms), std::end(histograms), std::end(histograms)}; - - DASH_LOG_TRACE_RANGE(ctx.c_str(), nlt_first, nlt_last); - - strided_iterator_t nle_first{std::begin(histograms), - std::next(std::begin(histograms)), - std::end(histograms)}; - strided_iterator_t nle_last{ - std::begin(histograms), std::end(histograms), std::end(histograms)}; - - DASH_LOG_TRACE_RANGE(ctx.c_str(), nle_first, nle_last); -#endif -} - -} // namespace detail -#endif diff --git a/dash/include/dash/algorithm/sort/Communication.h b/dash/include/dash/algorithm/sort/Communication.h new file mode 100644 index 000000000..53ad357da --- /dev/null +++ b/dash/include/dash/algorithm/sort/Communication.h @@ -0,0 +1,102 @@ +#ifndef DASH__ALGORITHM__SORT__COMMUNICATION_H +#define DASH__ALGORITHM__SORT__COMMUNICATION_H + +#include +#include + +namespace dash { + +template < + class LocalInputIter, + class LocalOutputIter, + class BinaryOperation = dash::plus< + typename dash::iterator_traits::value_type>, + typename = typename std::enable_if< + !dash::iterator_traits::is_global_iterator::value && + !dash::iterator_traits::is_global_iterator::value>:: + type> +LocalOutputIter exclusive_scan( + LocalInputIter in_first, + LocalInputIter in_last, + LocalOutputIter out_first, + typename dash::iterator_traits::value_type init, + BinaryOperation op = BinaryOperation{}, + dash::Team const& team = dash::Team::All()) +{ + using value_t = typename dash::iterator_traits::value_type; + + auto nel = std::distance(in_first, in_last); + + DASH_ASSERT_EQ(nel, team.size(), "invalid number of elements to scan"); + + DASH_ASSERT_RETURNS( + dart_exscan( + // send buffer + std::addressof(*in_first), + // receive buffer + std::addressof(*out_first), + // buffer size + nel, + // data type + dash::dart_datatype::value, + // operation + dash::internal::dart_reduce_operation::value, + // team + team.dart_id()), + DART_OK); + + return std::next(out_first, nel); +} + +namespace impl { +template +void alltoallv( + InputIt input, + OutputIt output, + std::vector sendCounts, + std::vector sendDispls, + std::vector targetCounts, + std::vector targetDispls, + dart_team_t dartTeam) +{ + using value_type = typename std::iterator_traits::value_type; + + // check whether value_type is supported by dart, else switch to byte + auto dart_value_t = dash::dart_datatype::value; + if (dart_value_t == DART_TYPE_UNDEFINED) { + dart_value_t = DART_TYPE_BYTE; + + auto to_bytes = [](auto v) { return v * sizeof(value_type); }; + + std::transform( + sendCounts.begin(), sendCounts.end(), sendCounts.begin(), to_bytes); + std::transform( + sendDispls.begin(), sendDispls.end(), sendDispls.begin(), to_bytes); + std::transform( + targetCounts.begin(), + targetCounts.end(), + targetCounts.begin(), + to_bytes); + std::transform( + targetDispls.begin(), + targetDispls.end(), + targetDispls.begin(), + to_bytes); + } + + DASH_ASSERT_RETURNS( + dart_alltoallv( + input, + output, + sendCounts.data(), + sendDispls.data(), + targetCounts.data(), + targetDispls.data(), + dart_value_t, + dartTeam), + DART_OK); +} +} // namespace impl + +} // namespace dash +#endif diff --git a/dash/include/dash/algorithm/sort/Histogram.h b/dash/include/dash/algorithm/sort/Histogram.h new file mode 100644 index 000000000..573d64696 --- /dev/null +++ b/dash/include/dash/algorithm/sort/Histogram.h @@ -0,0 +1,110 @@ +#ifndef DASH__ALGORITHM__SORT__HISTOGRAM_H +#define DASH__ALGORITHM__SORT__HISTOGRAM_H + +#include +#include + +#include +#include + +namespace dash { +namespace impl { + +template +inline const std::vector psort__local_histogram( + Splitter const& splitters, + std::vector const& valid_partitions, + Iter data_lbegin, + Iter data_lend, + Projection projection) +{ + DASH_LOG_TRACE("dash::sort", "< psort__local_histogram"); + + auto const nborders = splitters.count(); + // The first element is 0 and the last element is the total number of local + // elements in this unit + auto const sz = splitters.count() + 1; + // Number of elements less than P + std::vector l_nlt_nle(impl::lower_upper_block * sz, 0); + + auto const n_l_elem = std::distance(data_lbegin, data_lend); + + // The value type of the iterator is not necessarily const, however, the + // reference should definitely be. If that isn't the case the compiler + // will complain anyway since our lambda required const qualifiers. + using reference = typename std::iterator_traits::reference; + + if (n_l_elem > 0) { + for (auto const& idx : valid_partitions) { + // search lower bound of partition value + auto lb_it = std::lower_bound( + data_lbegin, + data_lend, + splitters.threshold[idx], + [&projection](reference a, const MappedType& b) { + return projection(a) < b; + }); + // search upper bound by starting from the lower bound + auto ub_it = std::upper_bound( + lb_it, + data_lend, + splitters.threshold[idx], + [&projection](const MappedType& b, reference a) { + return b < projection(a); + }); + + DASH_LOG_TRACE( + "dash::sort", + "local histogram", + "distance between ub and lb", + ub_it - lb_it); + + auto const p_left = splitters.left_partition[idx]; + DASH_ASSERT_NE(p_left, dash::team_unit_t{}, "invalid bounding unit"); + + auto const nlt_idx = p_left * impl::lower_upper_block; + + l_nlt_nle[nlt_idx] = std::distance(data_lbegin, lb_it); + l_nlt_nle[nlt_idx + 1] = std::distance(data_lbegin, ub_it); + } + + auto const last_valid_border_idx = *std::prev(valid_partitions.cend()); + auto const p_left = splitters.left_partition[last_valid_border_idx]; + + // fill trailing partitions with local capacity + std::fill( + std::next(std::begin(l_nlt_nle), (p_left + 1) * impl::lower_upper_block), + std::end(l_nlt_nle), + n_l_elem); + } + + DASH_LOG_TRACE("dash::sort", "psort__local_histogram >"); + return l_nlt_nle; +} + +template +inline void psort__global_histogram( + InputIt local_histo_begin, + InputIt local_histo_end, + OutputIt output_it, + dart_team_t dart_team_id) +{ + DASH_LOG_TRACE("dash::sort", "< psort__global_histogram "); + + auto const nels = std::distance(local_histo_begin, local_histo_end); + + dart_allreduce( + &(*local_histo_begin), + &(*output_it), + nels, + dash::dart_datatype::value, + DART_OP_SUM, + dart_team_id); + + DASH_LOG_TRACE("dash::sort", "psort__global_histogram >"); +} + +} // namespace impl +} // namespace dash + +#endif diff --git a/dash/include/dash/algorithm/sort/NodeParallelismConfig.h b/dash/include/dash/algorithm/sort/NodeParallelismConfig.h new file mode 100644 index 000000000..a59d54013 --- /dev/null +++ b/dash/include/dash/algorithm/sort/NodeParallelismConfig.h @@ -0,0 +1,89 @@ +#ifndef DASH__ALGORITHM__SORT__NODE_PARALLELISM_CONFIG_H +#define DASH__ALGORITHM__SORT__NODE_PARALLELISM_CONFIG_H + +#include +#include +#include +#include + +#ifdef DASH_ENABLE_PSTL +#include +#endif +#ifdef DASH_ENABLE_OPENMP +#include +#endif + +namespace dash { +namespace impl { +class NodeParallelismConfig { + uint32_t m_nthreads{}; +#ifdef DASH_ENABLE_PSTL + // We use the default number of threads + tbb::task_scheduler_init m_init{}; +#endif +public: + NodeParallelismConfig(uint32_t nthreads = 0) +#ifdef DASH_ENABLE_PSTL + : m_nthreads( + + nthreads == 0 ? tbb::task_scheduler_init::default_num_threads() + : nthreads) + , m_init(m_nthreads) +#endif + { +#ifndef DASH_ENABLE_PSTL + // If we use TBB we cannot do that + setNumThreads(nthreads); +#endif + } + + void setNumThreads(uint32_t nthreadsRequested) DASH_NOEXCEPT + { + m_nthreads = getNThreads(nthreadsRequested); + +#if defined(DASH_ENABLE_OPENMP) + omp_set_num_threads(m_nthreads); +#endif + } + + auto parallelism() const noexcept + { + if (NodeParallelismConfig::hasNodeLevelParallelism()) { + return m_nthreads; + } + else { + return 1u; + } + } + +private: + constexpr static bool hasNodeLevelParallelism() noexcept + { +#if (defined(DASH_ENABLE_PSTL) || defined(DASH_ENABLE_OPENMP)) + return true; +#endif + return false; + } + + static uint32_t getNThreads(uint32_t nthreads) noexcept + { + if (!NodeParallelismConfig::hasNodeLevelParallelism()) { + return 1u; + } + + if (nthreads > 0) { + return nthreads; + } + +#if defined(DASH_ENABLE_OPENMP) + return omp_get_max_threads(); +#else + // always create at least one thread... + return std::max(std::thread::hardware_concurrency(), 2u) - 1u; +#endif + } +}; +} // namespace impl +} // namespace dash + +#endif // DASH__ALGORITHM__SORT__NODE_PARALLELISM_CONFIG_H diff --git a/dash/include/dash/algorithm/sort/Partition.h b/dash/include/dash/algorithm/sort/Partition.h new file mode 100644 index 000000000..f3ffc06d1 --- /dev/null +++ b/dash/include/dash/algorithm/sort/Partition.h @@ -0,0 +1,350 @@ +#ifndef DASH__ALGORITHM__SORT__PARTITION_H +#define DASH__ALGORITHM__SORT__PARTITION_H + +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace dash { + +namespace impl { + +template +inline auto psort__partition_sizes(GlobIter const begin, GlobIter const end) +{ + auto const& pattern = begin.pattern(); + + auto nunits = pattern.team().size(); + auto unit_begin = pattern.unit_at(begin.pos()); + auto unit_last = pattern.unit_at(end.pos() - 1); + + std::vector partition_sizes_psum; + partition_sizes_psum.reserve(nunits + 1); + + auto local_extent = [&pattern](auto unit) { + // Number of elements located at current source unit: + auto const u_extents = pattern.local_extents(unit); + + return std::accumulate( + std::begin(u_extents), + std::end(u_extents), + 1, + std::multiplies()); + }; + + auto gidx_begin = [&pattern](auto unit) { + // global start index of local segment + return (unit == pattern.team().myid()) ? pattern.lbegin() + : pattern.global_index(unit, {}); + }; + + // 1. fill leading partition with 0 until we reach the first non-empty + // partition + std::fill_n( + std::back_inserter(partition_sizes_psum), + unit_begin + 1, + std::size_t{0}); + + // 2. first unit: consider the case that we do not sort the full range but + // start somewhere in the middle of the unit's segment + auto ucap = local_extent(unit_begin); + partition_sizes_psum.emplace_back( + ucap == 0 ? 0 : ucap - (begin.pos() - gidx_begin(unit_begin))); + + if (unit_begin == unit_last) { + return partition_sizes_psum; + } + + // 3. units in the middle + auto range = dash::meta::range( + static_cast(unit_begin + 1), unit_last); + + std::transform( + std::begin(range), + std::end(range), + std::back_inserter(partition_sizes_psum), + [local_extent](auto unit) -> std::size_t { + // This is an inner unit + // TODO(kowalewski): Is this really necessary or can we assume that + // n_u_elements == u_size, i.e., local_pos.index == 0? + // + // auto const local_pos = pattern.local(u_gidx_begin); + + return local_extent(unit); + }); + + // 4. last unit: consider the case that we do not sort the full range but + // end somewhere in the middle of the unit's segment + partition_sizes_psum.emplace_back(end.pos() - gidx_begin(unit_last)); + + std::fill_n( + std::back_inserter(partition_sizes_psum), + nunits - unit_last - 1, + std::size_t{0}); + + DASH_LOG_TRACE_RANGE( + "partition sizes", + std::begin(partition_sizes_psum), + std::end(partition_sizes_psum)); + + // calculate the prefix sum + std::partial_sum( + std::begin(partition_sizes_psum), + std::end(partition_sizes_psum), + std::begin(partition_sizes_psum)); + + DASH_LOG_TRACE_RANGE( + "partition sizes prefix sum", + std::begin(partition_sizes_psum), + std::end(partition_sizes_psum)); + + return partition_sizes_psum; +} + +template +inline void psort__init_partition_borders( + std::vector const& acc_partition_count, + impl::Splitter& p_borders) +{ + DASH_LOG_TRACE("dash::sort", "< psort__init_partition_borders"); + + auto const last = acc_partition_count.cend(); + + // find the first non-empty unit + auto left = + std::upper_bound(std::next(acc_partition_count.cbegin()), last, 0); + + if (left == last) { + std::fill(p_borders.is_skipped.begin(), p_borders.is_skipped.end(), true); + return; + } + + // find next unit with a non-zero local portion to obtain first partition + // border + auto right = std::upper_bound(left, last, *left); + + if (right == last) { + std::fill(p_borders.is_skipped.begin(), p_borders.is_skipped.end(), true); + return; + } + + auto const get_border_idx = [](std::size_t const idx) { + return (idx % impl::lower_upper_block) ? (idx / impl::lower_upper_block) * impl::lower_upper_block + : idx - 1; + }; + + auto p_left = std::distance(acc_partition_count.cbegin(), left) - 1; + auto right_u = std::distance(acc_partition_count.cbegin(), right) - 1; + auto border_idx = get_border_idx(right_u); + + // mark everything as skipped until the first partition border + std::fill( + p_borders.is_skipped.begin(), + p_borders.is_skipped.begin() + border_idx, + true); + + p_borders.left_partition[border_idx] = p_left; + + // find subsequent splitters + left = right; + + while ((right = std::upper_bound(right, last, *right)) != last) { + auto const last_border_idx = border_idx; + + p_left = std::distance(acc_partition_count.cbegin(), left) - 1; + right_u = std::distance(acc_partition_count.cbegin(), right) - 1; + border_idx = get_border_idx(right_u); + + auto const dist = border_idx - last_border_idx; + + // mark all skipped splitters as stable and skipped + std::fill_n( + std::next(p_borders.is_skipped.begin(), last_border_idx + 1), + dist - 1, + true); + + p_borders.left_partition[border_idx] = p_left; + + left = right; + } + + // mark trailing empty parititons as stable and skipped + std::fill( + std::next(p_borders.is_skipped.begin(), border_idx + 1), + p_borders.is_skipped.end(), + true); + + std::copy( + p_borders.is_skipped.begin(), + p_borders.is_skipped.end(), + p_borders.is_stable.begin()); + + DASH_LOG_TRACE("dash::sort", "psort__init_partition_borders >"); +} + +template +inline void psort__calc_boundaries(Splitter& splitters) +{ + DASH_LOG_TRACE("dash::sort", "< psort__calc_boundaries "); + + // recalculate partition boundaries + for (std::size_t idx = 0; idx < splitters.count(); ++idx) { + DASH_ASSERT(splitters.lower_bound[idx] <= splitters.upper_bound[idx]); + // case A: partition is already stable or skipped + if (splitters.is_stable[idx]) { + continue; + } + // case B: we have the last iteration + //-> test upper bound directly + if (splitters.is_last_iter[idx]) { + splitters.threshold[idx] = splitters.upper_bound[idx]; + splitters.is_stable[idx] = true; + } + else { + // case C: ordinary iteration + + splitters.threshold[idx] = + splitters.lower_bound[idx] + + ((splitters.upper_bound[idx] - splitters.lower_bound[idx]) / 2); + + if (splitters.threshold[idx] == splitters.lower_bound[idx]) { + // if we cannot move the partition to the left + //-> last iteration + splitters.is_last_iter[idx] = true; + } + } + } + DASH_LOG_TRACE("dash::sort", "psort__calc_boundaries >"); +} + +template +inline bool psort__validate_partitions( + Splitter& splitters, + std::vector const& acc_partition_count, + std::vector const& valid_partitions, + std::vector const& global_histo) +{ + DASH_LOG_TRACE("dash::sort", "< psort__validate_partitions"); + + if (valid_partitions.empty()) { + return true; + } + + // This validates if all partititions have been correctly determined. The + // example below shows 4 units where unit 1 is empty (capacity 0). Thus + // we have only two valid partitions, i.e. partition borders 1 and 2, + // respectively. Partition 0 is skipped because the bounding unit on the + // right-hand side is empty. For partition one, the bounding unit is unit 0, + // one the right hand side it is 2. + // + // The right hand side unit is always (partition index + 1), the unit on + // the left hand side is calculated at the beginning of dash::sort (@see + // psort__init_partition_borders) and stored in a vector for lookup. + // + // Given this information the validation checks the following constraints + // + // - The number of elements in the global histrogram less than the + // partitition value must be smaller than the "accumulated" partition size + // - The "accumulated" partition size must be less than or equal the number + // of elements which less than or equal the partition value + // + // If either of these two constraints cannot be satisfied we have to move + // the upper or lower bound of the partition value, respectively. + + // -------|-------|-------|------- + // Partition Index u0 | u1 | u2 | u3 + // -------|-------|-------|------- + // Partition Size 10 | 0 | 10 | 10 + // ^ ^ ^ + // | | | + // -------Partition-- + // | Border 1 | + // Left Unit | Right Unit + // | | | + // | | | + // -------|-------|-------|------- + // Acc Partition Count 10 | 10 | 20 | 30 + // + + for (auto const& border_idx : valid_partitions) { + auto const p_left = splitters.left_partition[border_idx]; + auto const nlt_idx = p_left * impl::lower_upper_block; + + auto const peer_idx = p_left + 1; + + if (global_histo[nlt_idx] < acc_partition_count[peer_idx] && + acc_partition_count[peer_idx] <= global_histo[nlt_idx + 1]) { + splitters.is_stable[border_idx] = true; + } + else { + if (global_histo[nlt_idx] >= acc_partition_count[peer_idx]) { + splitters.upper_bound[border_idx] = splitters.threshold[border_idx]; + } + else { + splitters.lower_bound[border_idx] = splitters.threshold[border_idx]; + } + } + } + + // Exit condition: is there any non-stable partition + auto const nonstable_it = std::find( + std::begin(splitters.is_stable), std::end(splitters.is_stable), false); + + DASH_LOG_TRACE("dash::sort", "psort__validate_partitions >"); + // exit condition + return nonstable_it == splitters.is_stable.cend(); +} + +template +inline void psort__calc_final_partition_dist( + Iter nlt_first, + Iter nlt_last, + Iter nle_first, + typename std::iterator_traits::value_type partition_size) +{ + using value_t = typename std::iterator_traits::value_type; + + /* Calculate number of elements to receive for each partition: + * We first assume that we we receive exactly the number of elements which + * are less than P. + * The output are the end offsets for each partition + */ + DASH_LOG_TRACE("dash::sort", "< psort__calc_final_partition_dist"); + + auto const nunits = std::distance(nlt_first, nlt_last); + + auto const n_my_elements = std::accumulate(nlt_first, nlt_last, value_t{0}); + + // Calculate the deficit + auto my_deficit = partition_size - n_my_elements; + + // If there is a deficit, look how much unit j can supply + for (auto unit = dash::team_unit_t{0}; unit < nunits && my_deficit > 0; + ++unit, ++nlt_first, ++nle_first) { + auto const supply_unit = *nle_first - *nlt_first; + + DASH_ASSERT_GE(supply_unit, 0, "invalid supply of target unit"); + if (supply_unit <= my_deficit) { + *(nlt_first) += supply_unit; + my_deficit -= supply_unit; + } + else { + *(nlt_first) += my_deficit; + my_deficit = 0; + } + } + + DASH_ASSERT_GE(my_deficit, 0, "Invalid local deficit"); + DASH_LOG_TRACE("dash::sort", "psort__calc_final_partition_dist >"); +} + +} // namespace impl +} // namespace dash + +#endif diff --git a/dash/include/dash/algorithm/sort/Sampling.h b/dash/include/dash/algorithm/sort/Sampling.h new file mode 100644 index 000000000..52844c401 --- /dev/null +++ b/dash/include/dash/algorithm/sort/Sampling.h @@ -0,0 +1,70 @@ +#ifndef DASH__ALGORITHM__SORT__SAMPLING_H +#define DASH__ALGORITHM__SORT__SAMPLING_H + +#include +#include + +#include +#include + +namespace dash { +namespace impl { + +using UIntType = std::uintptr_t; + +// using Knuth LCG Constants, see The Art of Computer Programming +constexpr UIntType multiplier = 6364136223846793005u; +constexpr UIntType increment = 1442695040888963407u; +constexpr UIntType modulus = 0u; +using generator = + std::linear_congruential_engine; + +template +inline auto minmax(std::pair input, dart_team_t teamid) +{ + std::array in{input.first, input.second}; + std::array out{}; + + DASH_ASSERT_RETURNS( + dart_allreduce( + &in, // send buffer + &out, // receive buffer + 2, // buffer size + dash::dart_datatype::value, // data type + DART_OP_MINMAX, // operation + teamid // team + ), + DART_OK); + + return std::make_pair(out[DART_OP_MINMAX_MIN], out[DART_OP_MINMAX_MAX]); +} + +inline std::size_t oversamplingFactor( + std::size_t N, std::uint32_t P, double epsilon) +{ + return 0; +} + +template +void sample( + LocalIter begin, + LocalIter end, + typename std::iterator_traits::difference_type num_samples, + Generator& gen) +{ + using std::swap; + + auto n = std::distance(begin, end); + + for (; num_samples; --num_samples, ++begin) { + const auto pos = std::uniform_int_distribution< + typename std::iterator_traits::difference_type>{0, + --n}(gen); + + swap(*begin, *std::next(begin, pos)); + } +} +} // namespace impl +} // namespace dash + +#endif diff --git a/dash/include/dash/algorithm/sort/Sort-inl.h b/dash/include/dash/algorithm/sort/Sort-inl.h new file mode 100644 index 000000000..0e0f4180e --- /dev/null +++ b/dash/include/dash/algorithm/sort/Sort-inl.h @@ -0,0 +1,150 @@ +#ifndef DASH__ALGORITHM__INTERNAL__SORT_H__INCLUDED +#define DASH__ALGORITHM__INTERNAL__SORT_H__INCLUDED + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include + +namespace dash { + +namespace impl { + +template +inline void psort__calc_send_count( + Splitter const& p_borders, + std::vector const& valid_partitions, + InputIt target_count, + OutputIt send_count) +{ + using value_t = typename std::iterator_traits::value_type; + + static_assert( + std::is_same< + value_t, + typename std::iterator_traits::value_type>::value, + "value types must be equal"); + + DASH_LOG_TRACE("< psort__calc_send_count"); + + // The number of units is the number of splitters + 1 + auto const nunits = p_borders.count() + 1; + std::vector tmp_target_count; + tmp_target_count.reserve(nunits + 1); + tmp_target_count.emplace_back(0); + + std::copy( + target_count, + std::next(target_count, nunits), + // we copy to index 1 since tmp_target_count[0] == 0 + std::back_inserter(tmp_target_count)); + + auto tmp_target_count_begin = std::next(std::begin(tmp_target_count)); + + auto const last_skipped = p_borders.is_skipped.cend(); + // find the first empty partition + auto it_skipped = + std::find(p_borders.is_skipped.cbegin(), last_skipped, true); + + auto it_valid = valid_partitions.cbegin(); + + std::size_t skipped_idx = 0; + + while (std::find(it_skipped, last_skipped, true) != last_skipped) { + skipped_idx = std::distance(p_borders.is_skipped.cbegin(), it_skipped); + + it_valid = + std::upper_bound(it_valid, valid_partitions.cend(), skipped_idx); + + if (it_valid == valid_partitions.cend()) { + break; + } + + auto const p_left = p_borders.left_partition[*it_valid]; + auto const n_contig_skips = *it_valid - p_left; + + std::fill_n( + std::next(tmp_target_count_begin, p_left + 1), + n_contig_skips, + *std::next(tmp_target_count_begin, p_left)); + + std::advance(it_skipped, n_contig_skips); + std::advance(it_valid, 1); + } + + std::transform( + tmp_target_count.begin() + 1, + tmp_target_count.end(), + tmp_target_count.begin(), + send_count, + std::minus()); + + DASH_LOG_TRACE("psort__calc_send_count >"); +} + +template +inline void local_sort(RAI first, RAI last, Cmp sort_comp, int nthreads = 1) +{ +#ifdef DASH_ENABLE_PSTL + if (nthreads > 1) { + DASH_LOG_TRACE( + "dash::sort", "local_sort", "Calling parallel sort using PSTL"); + ::std::sort(pstl::execution::par_unseq, first, last, sort_comp); + } + else { + ::std::sort(first, last, sort_comp); + } +#else + DASH_LOG_TRACE("dash::sort", "local_sort", "Calling std::sort"); + ::std::sort(first, last, sort_comp); +#endif +} + +template +inline auto psort__get_neighbors( + dash::team_unit_t whoami, + std::size_t n_myelems, + Splitter const& splitters, + std::vector const& valid_partitions) +{ + auto it_left_splitter = std::lower_bound( + std::begin(valid_partitions), std::end(valid_partitions), (whoami - 1)); + + auto has_left_splitter = + (n_myelems > 0) && whoami && + (it_left_splitter != std::end(valid_partitions)) + ? (*it_left_splitter == whoami - 1) + : false; + + auto nunits = splitters.count() + 1; + + dash::global_unit_t my_source{ + (has_left_splitter) ? static_cast( + splitters.left_partition[*it_left_splitter]) + : DART_UNDEFINED_UNIT_ID}; + + auto it_right_splitter = (n_myelems > 0 && whoami < nunits) + ? std::lower_bound( + std::begin(valid_partitions), + std::end(valid_partitions), + whoami) + : std::end(valid_partitions); + + dash::global_unit_t my_target{ + (it_right_splitter != std::end(valid_partitions)) + ? static_cast(*it_right_splitter) + 1 + : DART_UNDEFINED_UNIT_ID}; + + return std::make_pair(my_source, my_target); +} +} // namespace impl +} // namespace dash +#endif diff --git a/dash/include/dash/algorithm/sort/ThreadSafeQueue.h b/dash/include/dash/algorithm/sort/ThreadSafeQueue.h new file mode 100644 index 000000000..4cbccb917 --- /dev/null +++ b/dash/include/dash/algorithm/sort/ThreadSafeQueue.h @@ -0,0 +1,160 @@ +#ifndef DASH__ALGORITHM__SORT__THREADSAVEQUEUE_H +#define DASH__ALGORITHM__SORT__THREADSAVEQUEUE_H + + +#include +#include +#include +#include +#include + +namespace dash { +namespace impl { + +/** + * The ThreadSafeQueue class. + * Provides a wrapper around a basic queue to provide thread safety. + * + * @see http://roar11.com/2016/01/a-platform-independent-thread-pool-using-c14/ + * + * This code is released under the BSD-2-Clause license. + +Copyright (c) 2018, Will Pearce + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + Redistributions of source code must retain the above copyright notice, +this list of conditions and the following disclaimer. + + Redistributions in binary form must reproduce the above copyright notice, +this list of conditions and the following disclaimer in the documentation +and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS “AS IS” +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ +template +class ThreadSafeQueue { +public: + /** + * Destructor. + */ + ~ThreadSafeQueue(void) + { + invalidate(); + } + + /** + * Attempt to get the first value in the queue. + * Returns true if a value was successfully written to the out parameter, + * false otherwise. + */ + bool tryPop(T& out) + { + std::lock_guard lock{m_mutex}; + if (m_queue.empty() || !m_valid) { + return false; + } + out = std::move(m_queue.front()); + m_queue.pop(); + return true; + } + + /** + * Get the first value in the queue. + * Will block until a value is available unless clear is called or the + * instance is destructed. Returns true if a value was successfully written + * to the out parameter, false otherwise. + */ + bool waitPop(T& out) + { + std::unique_lock lock{m_mutex}; + m_condition.wait(lock, [this]() { return !m_queue.empty() || !m_valid; }); + /* + * Using the condition in the predicate ensures that spurious wakeups with + * a valid but empty queue will not proceed, so only need to check for + * validity before proceeding. + */ + if (!m_valid) { + return false; + } + out = std::move(m_queue.front()); + m_queue.pop(); + return true; + } + + /** + * Push a new value onto the queue. + */ + void push(T value) + { + std::lock_guard lock{m_mutex}; + m_queue.push(std::move(value)); + m_condition.notify_one(); + } + + /** + * Check whether or not the queue is empty. + */ + bool empty(void) const + { + std::lock_guard lock{m_mutex}; + return m_queue.empty(); + } + + /** + * Clear all items from the queue. + */ + void clear(void) + { + std::lock_guard lock{m_mutex}; + while (!m_queue.empty()) { + m_queue.pop(); + } + m_condition.notify_all(); + } + + /** + * Invalidate the queue. + * Used to ensure no conditions are being waited on in waitPop when + * a thread or the application is trying to exit. + * The queue is invalid after calling this method and it is an error + * to continue using a queue after this method has been called. + */ + void invalidate(void) + { + std::lock_guard lock{m_mutex}; + m_valid = false; + m_condition.notify_all(); + } + + /** + * Returns whether or not this queue is valid. + */ + bool isValid(void) const + { + std::lock_guard lock{m_mutex}; + return m_valid; + } + +private: + std::atomic_bool m_valid{true}; + mutable std::mutex m_mutex; + std::queue m_queue; + std::condition_variable m_condition; +}; + +} // namespace impl +} // namespace dash + +#endif diff --git a/dash/include/dash/algorithm/sort/Types.h b/dash/include/dash/algorithm/sort/Types.h new file mode 100644 index 000000000..594c26005 --- /dev/null +++ b/dash/include/dash/algorithm/sort/Types.h @@ -0,0 +1,305 @@ +#ifndef DASH__ALGORITHM__SORT__TYPES_H +#define DASH__ALGORITHM__SORT__TYPES_H + +#include +#include +#include +#include +#include +#include + +namespace dash { + +namespace impl { + +// Final Step Strategy +struct sort__final_strategy__merge { +}; + +struct sort__final_strategy__sort { +}; + + +constexpr size_t lower_upper_block = 2; +constexpr int sort_sendrecv_tag = 0xdea110c; + +template +struct LocalData { +private: + using element_t = T; +public: + element_t* input{}; + element_t* output{}; + std::unique_ptr buffer{}; +}; + +template +struct Splitter { +public: + // tracks if we have found a stable partition border + std::vector is_stable; + // tracks if a partition is skipped + std::vector is_skipped; + // lower bound of each partition + std::vector lower_bound; + // the splitter values + std::vector threshold; + // upper bound of each partition + std::vector upper_bound; + // Special case for the last iteration in finding partition borders + std::vector is_last_iter; + + // The right unit is always right next to the border. For this reason we + // track only the left unit. + std::vector left_partition; + + constexpr Splitter(size_t nsplitter, T _lower_bound, T _upper_bound) + : is_stable(nsplitter, false) + , is_skipped(nsplitter, false) + , lower_bound(nsplitter, _lower_bound) + , threshold(nsplitter, T{}) + , upper_bound(nsplitter, _upper_bound) + , is_last_iter(nsplitter, false) + , left_partition( + nsplitter, std::numeric_limits::min()) + { + } + + constexpr size_t count() const noexcept + { + return threshold.size(); + } +}; + +template < + class Iterator, + typename std::iterator_traits::difference_type Stride> +class StridedIterator { + using iterator_traits = std::iterator_traits; + using stride_t = typename std::iterator_traits::difference_type; + + static_assert( + std::is_same< + typename std::iterator_traits::iterator_category, + std::random_access_iterator_tag>::value, + "only random access iterators are supported for strided iteration"); + +public: + using value_type = typename iterator_traits::value_type; + using difference_type = typename iterator_traits::difference_type; + using reference = typename iterator_traits::reference; + using pointer = typename iterator_traits::pointer; + using iterator_category = std::random_access_iterator_tag; + + constexpr StridedIterator() = default; + + constexpr StridedIterator(Iterator begin, Iterator it) + : m_iter(it) + , m_begin(begin) + { + } + + StridedIterator(const StridedIterator& other) = default; + StridedIterator(StridedIterator&& other) noexcept = default; + StridedIterator& operator=(StridedIterator const& other) = default; + StridedIterator& operator=(StridedIterator&& other) noexcept = default; + ~StridedIterator() = default; + + constexpr StridedIterator& operator++() noexcept + { + increment(1); + return *this; + } + + constexpr StridedIterator operator++(int) const noexcept + { + StridedIterator tmp = *this; + tmp.increment(1); + return tmp; + } + + constexpr StridedIterator& operator--() noexcept + { + decrement(1); + return *this; + } + + constexpr StridedIterator operator--(int) const noexcept + { + StridedIterator tmp = *this; + tmp.decrement(1); + return tmp; + } + + constexpr StridedIterator& operator+=(const difference_type n) noexcept + { + increment(n); + return *this; + } + + constexpr StridedIterator operator+(const difference_type n) const noexcept + { + StridedIterator tmp = *this; + tmp.increment(n); + return tmp; + } + + constexpr StridedIterator& operator-=(const difference_type n) noexcept + { + decrement(n); + return *this; + } + + constexpr StridedIterator operator-(const difference_type n) const noexcept + { + StridedIterator tmp = *this; + tmp.decrement(n); + return tmp; + } + + constexpr reference operator*() const noexcept + { + return *m_iter; + } + +private: + constexpr void increment(difference_type n) + { + std::advance(m_iter, n * Stride); + } + + constexpr void decrement(difference_type n) + { + std::advance(m_iter, -n * Stride); + } + +public: + template ::difference_type S> + friend DASH_CONSTEXPR bool operator==( + const StridedIterator& lhs, + const StridedIterator& rhs) DASH_NOEXCEPT; + + template ::difference_type S> + friend DASH_CONSTEXPR bool operator!=( + const StridedIterator& lhs, + const StridedIterator& rhs) DASH_NOEXCEPT; + + template ::difference_type S> + friend DASH_CONSTEXPR bool operator<( + const StridedIterator& lhs, + const StridedIterator& rhs) DASH_NOEXCEPT; + + template ::difference_type S> + friend DASH_CONSTEXPR bool operator<=( + const StridedIterator& lhs, + const StridedIterator& rhs) DASH_NOEXCEPT; + + template ::difference_type S> + friend DASH_CONSTEXPR bool operator>( + const StridedIterator& lhs, + const StridedIterator& rhs) DASH_NOEXCEPT; + + template ::difference_type S> + friend DASH_CONSTEXPR bool operator>=( + const StridedIterator& lhs, + const StridedIterator& rhs) DASH_NOEXCEPT; + + template ::difference_type S> + friend DASH_CONSTEXPR typename std::iterator_traits::difference_type + operator-( + const StridedIterator& lhs, + const StridedIterator& rhs) DASH_NOEXCEPT; + +private: + Iterator m_iter{}; + Iterator const m_begin{}; +}; + +template < + class Iterator, + typename std::iterator_traits::difference_type Stride> +DASH_CONSTEXPR bool operator==( + const StridedIterator& lhs, + const StridedIterator& rhs) DASH_NOEXCEPT +{ + DASH_ASSERT(lhs.m_begin == rhs.m_begin); + return lhs.m_iter == rhs.m_iter; +} + +template < + class Iterator, + typename std::iterator_traits::difference_type Stride> +DASH_CONSTEXPR bool operator!=( + const StridedIterator& lhs, + const StridedIterator& rhs) DASH_NOEXCEPT +{ + DASH_ASSERT(lhs.m_begin == rhs.m_begin); + return lhs.m_iter != rhs.m_iter; +} + +template < + class Iterator, + typename std::iterator_traits::difference_type Stride> +DASH_CONSTEXPR bool operator<( + const StridedIterator& lhs, + const StridedIterator& rhs) DASH_NOEXCEPT +{ + DASH_ASSERT(lhs.m_begin == rhs.m_begin); + return (lhs.m_iter < rhs.m_iter); +} + +template < + class Iterator, + typename std::iterator_traits::difference_type Stride> +DASH_CONSTEXPR bool operator<=( + const StridedIterator& lhs, + const StridedIterator& rhs) DASH_NOEXCEPT +{ + DASH_ASSERT(lhs.m_begin == rhs.m_begin); + return (lhs.m_iter <= rhs.m_iter); +} + +template < + class Iterator, + typename std::iterator_traits::difference_type Stride> +DASH_CONSTEXPR bool operator>( + const StridedIterator& lhs, + const StridedIterator& rhs) DASH_NOEXCEPT +{ + DASH_ASSERT(lhs.m_begin == rhs.m_begin); + return lhs.m_iter > rhs.m_iter; +} + +template < + class Iterator, + typename std::iterator_traits::difference_type Stride> +DASH_CONSTEXPR bool operator>=( + const StridedIterator& lhs, + const StridedIterator& rhs) DASH_NOEXCEPT +{ + DASH_ASSERT(lhs.m_begin == rhs.m_begin); + return lhs.m_iter >= rhs.m_iter; +} + +template < + class Iterator, + typename std::iterator_traits::difference_type Stride> +DASH_CONSTEXPR typename std::iterator_traits::difference_type +operator-( + const StridedIterator& lhs, + const StridedIterator& rhs) DASH_NOEXCEPT +{ + DASH_ASSERT(lhs.m_begin == rhs.m_begin); + + return (lhs.m_iter - rhs.m_iter) / 2; +} + +template +constexpr StridedIterator make_strided_iterator(Iter begin) +{ + return StridedIterator{begin, begin}; +} + +} // namespace impl +} // namespace dash +#endif diff --git a/dash/include/dash/internal/Logging.h b/dash/include/dash/internal/Logging.h index c3019c1ea..017dc1a8b 100644 --- a/dash/include/dash/internal/Logging.h +++ b/dash/include/dash/internal/Logging.h @@ -85,13 +85,15 @@ typename std::iterator_traits::value_type; \ using difference_t = \ typename std::iterator_traits::difference_type; \ - auto const nelems = std::distance(begin, end); \ + auto first = (begin); \ + auto last = (end); \ + auto const nelems = std::distance(first, last); \ auto const max_elems = \ std::min(nelems, MAX_ELEMS_RANGE_LOGGING__); \ std::ostringstream os; \ std::copy( \ - begin, \ - std::next(begin, max_elems), \ + first, \ + std::next(first, max_elems), \ std::ostream_iterator(os, " ")); \ if (nelems > MAX_ELEMS_RANGE_LOGGING__) os << "..."; \ DASH_LOG_TRACE(ctx, os.str()); \ diff --git a/dash/include/dash/iterator/internal/GlobPtrBase.h b/dash/include/dash/iterator/internal/GlobPtrBase.h index 4fd0a093c..78cf0742c 100644 --- a/dash/include/dash/iterator/internal/GlobPtrBase.h +++ b/dash/include/dash/iterator/internal/GlobPtrBase.h @@ -258,11 +258,6 @@ dart_gptr_t increment( // and in order to prevent this we set the local offset to 0. // Log the number of positions beyond the global end. - DASH_LOG_ERROR( - "GlobPtr.increment", - "offset goes beyond the global memory end", - offs == lsize ? 1 : offs - lsize + 1); - offs = 0; ++current_uid; DASH_ASSERT_EQ( diff --git a/dash/include/dash/meta/NumericRange.h b/dash/include/dash/meta/NumericRange.h new file mode 100644 index 000000000..7f6c6c1c0 --- /dev/null +++ b/dash/include/dash/meta/NumericRange.h @@ -0,0 +1,226 @@ +#ifndef DASH__META__NUMERICRANGE_H +#define DASH__META__NUMERICRANGE_H +// -*- C++ -*- +// Copyright (c) 2017, Just Software Solutions Ltd +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or +// without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above +// copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following +// disclaimer in the documentation and/or other materials +// provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of +// its contributors may be used to endorse or promote products +// derived from this software without specific prior written +// permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND +// CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT +// NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +// HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR +// OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, +// EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +// Slightly modified by the DASH contributors + +#include +#include +#include + +namespace dash { +namespace meta { + +template +struct IncrementValue { + void operator()(T& x) const + { + ++x; + } +}; + +template +struct IncrementBy { + T delta; + + IncrementBy(T delta_) + : delta(std::move(delta_)) + { + } + + void operator()(T& x) const + { + x += delta; + } +}; + +template > +class numeric_range { +public: + enum class direction { increasing, decreasing }; + +private: + T m_current; + T m_final; + Increment m_inc; + direction m_dir; + + bool at_end() + { + if (m_dir == direction::increasing) { + return m_current >= m_final; + } + else { + return m_current <= m_final; + } + } + +public: + class iterator { + numeric_range* range; + + void check_done() + { + if (range->at_end()) { + range = nullptr; + } + } + + class postinc_return { + T value; + + public: + postinc_return(T value_) + : value(std::move(value_)) + { + } + T operator*() + { + return std::move(value); + } + }; + + public: + using value_type = T; + using reference = T; + using iterator_category = std::input_iterator_tag; + using pointer = T*; + using difference_type = void; + + iterator(numeric_range* range_) + : range(range_) + { + if (range) check_done(); + } + + T operator*() const + { + return range->m_current; + } + + T* operator->() const + { + return &range->m_current; + } + + iterator& operator++() + { + if (!range) + throw std::runtime_error("Increment a past-the-end iterator"); + range->m_inc(range->m_current); + check_done(); + return *this; + } + + postinc_return operator++(int) + { + postinc_return temp(**this); + ++*this; + return temp; + } + + friend bool operator==(iterator const& lhs, iterator const& rhs) + { + return lhs.range == rhs.range; + } + friend bool operator!=(iterator const& lhs, iterator const& rhs) + { + return !(lhs == rhs); + } + }; + + iterator begin() + { + return iterator(this); + } + + iterator end() + { + return iterator(nullptr); + } + + numeric_range(T initial_, T final_) + : m_current(std::move(initial_)) + , m_final(std::move(final_)) + , m_dir(direction::increasing) + { + } + numeric_range(T initial_, T final_, Increment inc_) + : m_current(std::move(initial_)) + , m_final(std::move(final_)) + , m_inc(std::move(inc_)) + , m_dir(direction::increasing) + { + } + numeric_range(T initial_, T final_, Increment inc_, direction dir_) + : m_current(std::move(initial_)) + , m_final(std::move(final_)) + , m_inc(std::move(inc_)) + , m_dir(dir_) + { + } +}; + +template +numeric_range range(T from, T to) +{ + if (to < from) throw std::runtime_error("Cannot count down "); + return numeric_range(std::move(from), std::move(to)); +} + +template +numeric_range range(T to) +{ + return range(T(), std::move(to)); +} + +template +numeric_range> range(T from, T to, T delta) +{ + if (!delta) throw std::runtime_error("Step must be non-zero"); + using direction = typename numeric_range>::direction; + direction const m_dir = + (delta > T()) ? direction::increasing : direction::decreasing; + return numeric_range>( + std::move(from), + std::move(to), + IncrementBy(std::move(delta)), + m_dir); +} +} // namespace meta +} // namespace dash +#endif diff --git a/dash/include/dash/util/StaticConfig.h.in b/dash/include/dash/util/StaticConfig.h.in index df04b65e3..c9ba7ba25 100644 --- a/dash/include/dash/util/StaticConfig.h.in +++ b/dash/include/dash/util/StaticConfig.h.in @@ -27,6 +27,7 @@ namespace util { bool avail_memkind = @CONF_AVAIL_MEMKIND@; /* Available Algorithms */ bool avail_algo_summa = @CONF_AVAIL_ALGO_SUMMA@; + bool avail_algo_pstl = @CONF_AVAIL_ALGO_PSTL@; } DashConfig; } diff --git a/dash/scripts/dash-ci-deploy.sh b/dash/scripts/dash-ci-deploy.sh index 910599876..cfd9d66aa 100755 --- a/dash/scripts/dash-ci-deploy.sh +++ b/dash/scripts/dash-ci-deploy.sh @@ -69,6 +69,7 @@ if [ "$BUILD_TYPE" = "Release" ]; then -DINSTALL_PREFIX=$INSTALL_PATH \ -DDART_IMPLEMENTATIONS=mpi \ -DENABLE_ASSERTIONS=OFF \ + -DENABLE_THREADSUPPORT=OFF \ -DENABLE_SHARED_WINDOWS=ON \ -DENABLE_UNIFIED_MEMORY_MODEL=ON \ -DENABLE_DEFAULT_INDEX_TYPE_LONG=ON \ @@ -141,6 +142,7 @@ elif [ "$BUILD_TYPE" = "Minimal" ]; then -DENABLE_COMPILER_WARNINGS=ON \ -DENABLE_LT_OPTIMIZATION=OFF \ -DENABLE_ASSERTIONS=OFF \ + -DENABLE_THREADSUPPORT=OFF \ -DENABLE_SHARED_WINDOWS=OFF \ -DENABLE_UNIFIED_MEMORY_MODEL=ON \ -DENABLE_DEFAULT_INDEX_TYPE_LONG=OFF \ diff --git a/dash/src/cpp17/monotonic_buffer.cc b/dash/src/cpp17/monotonic_buffer.cc new file mode 100644 index 000000000..80bed7f6d --- /dev/null +++ b/dash/src/cpp17/monotonic_buffer.cc @@ -0,0 +1,133 @@ +// ============================================================================== +// LLVM Release License +// ============================================================================== +// University of Illinois/NCSA +// Open Source License +// +// Copyright (c) 2003-2018 University of Illinois at Urbana-Champaign. +// All rights reserved. +// +// Developed by: +// +// LLVM Team +// +// University of Illinois at Urbana-Champaign +// +// http://llvm.org +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal with the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// * Redistributions of source code must retain the above copyright +// notice, +// this list of conditions and the following disclaimers. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, +// this list of conditions and the following disclaimers in the +// documentation and/or other materials provided with the distribution. +// +// * Neither the names of the LLVM Team, University of Illinois at +// Urbana-Champaign, nor the names of its contributors may be used to +// endorse or promote products derived from this Software without +// specific prior written permission. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +// THE CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR +// OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +// ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS WITH THE SOFTWARE. + +/* + * Source: https://reviews.llvm.org/D47111 + **/ + +#if __cpp_lib_memory_resource < 201603 + +#include + +namespace std { +namespace pmr { + +static size_t roundup(size_t count, size_t alignment) +{ + size_t mask = alignment - 1; + return (count + mask) & ~mask; +} + +void *monotonic_buffer_resource::__initial_header::__try_allocate_from_chunk( + size_t bytes, size_t align) +{ + if (!__cur_) return nullptr; + void * new_ptr = static_cast(__cur_); + size_t new_capacity = (__end_ - __cur_); + void * aligned_ptr = std::align(align, bytes, new_ptr, new_capacity); + if (aligned_ptr != nullptr) __cur_ = static_cast(new_ptr) + bytes; + return aligned_ptr; +} + +void *monotonic_buffer_resource::__chunk_header::__try_allocate_from_chunk( + size_t bytes, size_t align) +{ + void * new_ptr = static_cast(__cur_); + size_t new_capacity = (reinterpret_cast(this) - __cur_); + void * aligned_ptr = std::align(align, bytes, new_ptr, new_capacity); + if (aligned_ptr != nullptr) __cur_ = static_cast(new_ptr) + bytes; + return aligned_ptr; +} + +void *monotonic_buffer_resource::do_allocate(size_t bytes, size_t align) +{ + const size_t header_size = sizeof(__chunk_header); + const size_t header_align = alignof(__chunk_header); + + auto previous_allocation_size = [&]() { + if (__chunks_ != nullptr) return __chunks_->__allocation_size(); + + size_t newsize = (__initial_.__start_ != nullptr) + ? (__initial_.__end_ - __initial_.__start_) + : __initial_.__size_; + + return roundup(newsize, header_align) + header_size; + }; + + if (void *result = __initial_.__try_allocate_from_chunk(bytes, align)) + return result; + if (__chunks_ != nullptr) { + if (void *result = __chunks_->__try_allocate_from_chunk(bytes, align)) + return result; + } + + // Allocate a brand-new chunk. + + if (align < header_align) align = header_align; + + size_t aligned_capacity = roundup(bytes, header_align) + header_size; + size_t previous_capacity = previous_allocation_size(); + + if (aligned_capacity <= previous_capacity) { + size_t newsize = 2 * (previous_capacity - header_size); + aligned_capacity = roundup(newsize, header_align) + header_size; + } + + char * start = (char *)__res_->allocate(aligned_capacity, align); + __chunk_header *header = + (__chunk_header *)(start + aligned_capacity - header_size); + header->__next_ = __chunks_; + header->__start_ = start; + header->__cur_ = start; + header->__align_ = align; + __chunks_ = header; + + return __chunks_->__try_allocate_from_chunk(bytes, align); +} +} // namespace pmr +} // namespace std + +#endif diff --git a/dash/test/algorithm/SortTest.cc b/dash/test/algorithm/SortTest.cc index c8e6d4acf..6a04a0b49 100644 --- a/dash/test/algorithm/SortTest.cc +++ b/dash/test/algorithm/SortTest.cc @@ -18,11 +18,31 @@ using random_dev_t = sense_of_life_dev; #endif class sense_of_life_dev { - unsigned int operator()() const { + unsigned int operator()() const + { return 42; } }; +struct random_seed_seq { + template + void generate(It begin, It end) + { + for (; begin != end; ++begin) { + *begin = device(); + } + } + + static random_seed_seq& get_instance() + { + static thread_local random_seed_seq result; + return result; + } + +private: + random_dev_t device; +}; + template static void perform_test(GlobIter begin, GlobIter end); @@ -32,10 +52,11 @@ template < typename GlobIter::value_type>::value>::type* = nullptr> static void rand_range(GlobIter begin, GlobIter end) { - static std::uniform_int_distribution - distribution(-1E6, 1E6); - static random_dev_t rd; - static std::mt19937 generator(rd() + begin.team().myid()); + static thread_local std::mt19937_64 generator ( + random_seed_seq::get_instance()); + static thread_local std::uniform_int_distribution< + typename GlobIter::value_type> + distribution(-1E6, 1E6); dash::generate(begin, end, []() { return distribution(generator); }); } @@ -47,7 +68,7 @@ template < static void rand_range(GlobIter begin, GlobIter end) { static std::uniform_real_distribution - distribution(-1.0, 1.0); + distribution(-1.0, 1.0); static random_dev_t rd; static std::mt19937 generator(rd() + begin.team().myid()); @@ -265,7 +286,9 @@ TEST_F(SortTest, ArrayOfPoints) array.barrier(); - dash::sort(array.begin(), array.end(), [](const Point& p) { return p.x; }); + dash::sort(array.begin(), array.end(), array.begin(), [](const Point& p) { + return p.x; + }); if (dash::myid() == 0) { for (auto it = array.begin() + 1; it < array.end(); ++it) { @@ -278,7 +301,7 @@ TEST_F(SortTest, ArrayOfPoints) } template -static void perform_test(GlobIter begin, GlobIter end) +static void perform_test(GlobIter begin, GlobIter end, GlobIter out) { using Element_t = typename decltype(begin)::value_type; Element_t true_sum = 0, actual_sum = 0, mysum; @@ -292,8 +315,8 @@ static void perform_test(GlobIter begin, GlobIter end) auto const n_l_elem = l_range.end - l_range.begin; - auto const * lbegin = l_mem_begin + l_range.begin; - auto const * lend = l_mem_begin + l_range.end; + auto const* lbegin = l_mem_begin + l_range.begin; + auto const* lend = l_mem_begin + l_range.end; mysum = std::accumulate(lbegin, lend, 0); @@ -306,7 +329,7 @@ static void perform_test(GlobIter begin, GlobIter end) 0, begin.pattern().team().dart_id()); - dash::sort(begin, end); + dash::sort(begin, end, out); mysum = std::accumulate(lbegin, lend, 0); @@ -322,17 +345,23 @@ static void perform_test(GlobIter begin, GlobIter end) if (dash::myid() == 0) { EXPECT_EQ_U(true_sum, actual_sum); - for (auto it = begin + 1; it < end; ++it) { + for (auto it = out + 1; it < out + dash::distance(begin, end); ++it) { auto const a = static_cast(*(it - 1)); auto const b = static_cast(*it); - EXPECT_FALSE_U(b < a); + EXPECT_LE_U(a, b); } } begin.pattern().team().barrier(); } +template +static void perform_test(GlobIter begin, GlobIter end) +{ + perform_test(begin, end, begin); +} + TEST_F(SortTest, PlausibilityWithStdSort) { auto const ThisTask = dash::myid(); @@ -430,5 +459,69 @@ TEST_F(SortTest, ExtremValues) perform_test(arr.begin(), arr.end()); } -// TODO: add additional unit tests with various pattern types and containers -// +TEST_F(SortTest, StridedIteratorTest) +{ + std::vector v(10, 0); + std::iota(std::begin(v), std::end(v), 0); + auto begin = std::begin(v); + auto it_6 = begin + 6; + + auto s_begin = dash::impl::make_strided_iterator(std::begin(v)); + auto s_it_6 = dash::impl::make_strided_iterator(std::begin(v)) + 3; + + EXPECT_EQ_U(*begin, *s_begin); + EXPECT_EQ_U(*it_6, *s_it_6); +} + +TEST_F(SortTest, ArrayBlockedFullRangeNonInPlace) +{ + using Element_t = int32_t; + using Array_t = dash::Array; + + LOG_MESSAGE("SortTest.ArrayBlockedFullRange: allocate array"); + // Initialize global array: + Array_t array(num_local_elem * dash::size()); + Array_t out(num_local_elem * dash::size()); + + rand_range(array.begin(), array.end()); + + array.barrier(); + + perform_test(array.begin(), array.end(), out.begin()); +} + +TEST_F(SortTest, ArrayOfPointsFinalSort) +{ + using Element_t = Point; + using Array_t = dash::Array; + + LOG_MESSAGE("SortTest.ArrayOfPoints: allocate array"); + // Initialize global array: + Array_t array(num_local_elem * dash::size()); + + static std::uniform_int_distribution distribution(-1000, 1000); + static random_dev_t rd; + static std::mt19937 generator(rd() + array.team().myid()); + + dash::generate(array.begin(), array.end(), []() { + return Point{distribution(generator), distribution(generator)}; + }); + + array.barrier(); + + dash::sort( + array.begin(), + array.end(), + array.begin(), + [](const Point& p) { return p.x; }, + dash::impl::sort__final_strategy__sort{}); + + if (dash::myid() == 0) { + for (auto it = array.begin() + 1; it < array.end(); ++it) { + auto const a = static_cast(*(it - 1)); + auto const b = static_cast(*it); + + EXPECT_FALSE_U(b < a); + } + } +}