diff --git a/core/federated/RTI/main.c b/core/federated/RTI/main.c index f8afef687..fe364b593 100644 --- a/core/federated/RTI/main.c +++ b/core/federated/RTI/main.c @@ -115,6 +115,7 @@ void usage(int argc, const char* argv[]) { lf_print(" -a, --auth Turn on HMAC authentication options.\n"); lf_print(" -t, --tracing Turn on tracing.\n"); lf_print(" -d, --disable_dnet Turn off the use of DNET signals.\n"); + lf_print(" -sst, --sst SST config path for RTI.\n"); lf_print("Command given:"); for (int i = 0; i < argc; i++) { @@ -220,7 +221,7 @@ int process_args(int argc, const char* argv[]) { rti.base.number_of_scheduling_nodes = (int32_t)num_federates; // FIXME: Loses numbers on 64-bit machines lf_print("RTI: Number of federates: %d", rti.base.number_of_scheduling_nodes); } else if (strcmp(argv[i], "-p") == 0 || strcmp(argv[i], "--port") == 0) { -#ifdef COMM_TYPE_TCP +#if defined(COMM_TYPE_TCP) || defined(COMM_TYPE_SST) if (argc < i + 2) { lf_print_error("--port needs a short unsigned integer argument ( > 0 and < %d).", UINT16_MAX); usage(argc, argv); @@ -252,6 +253,15 @@ int process_args(int argc, const char* argv[]) { return 0; #endif rti.authentication_enabled = true; + } else if (strcmp(argv[i], "-sst") == 0 || strcmp(argv[i], "--sst") == 0) { +#ifndef COMM_TYPE_SST + lf_print_error("--sst requires the RTI to be built with the --DCOMM_TYPE=SST option."); + usage(argc, argv); + return 0; +#else + i++; + lf_set_sst_config_path(argv[i]); +#endif } else if (strcmp(argv[i], "-t") == 0 || strcmp(argv[i], "--tracing") == 0) { rti.base.tracing_enabled = true; } else if (strcmp(argv[i], "-d") == 0 || strcmp(argv[i], "--dnet_disabled") == 0) { diff --git a/core/federated/federate.c b/core/federated/federate.c index c5fa94284..fc86f4fe6 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -708,7 +708,6 @@ static int handle_port_absent_message(net_abstraction_t net_abstraction, int fed * network abstraction in _fed.net_abstractions_for_inbound_p2p_connections * to -1 and returns, terminating the thread. * @param _args The remote federate ID (cast to void*). - * @param fed_id_ptr A pointer to a uint16_t containing federate ID being listened to. * This procedure frees the memory pointed to before returning. */ static void* listen_to_federates(void* _args) { diff --git a/core/reactor_common.c b/core/reactor_common.c index 09eaba005..9a2f90ff2 100644 --- a/core/reactor_common.c +++ b/core/reactor_common.c @@ -974,6 +974,9 @@ void usage(int argc, const char* argv[]) { printf(" The address of the RTI, which can be in the form of user@host:port or ip:port.\n\n"); printf(" -l\n"); printf(" Send stdout to individual log files for each federate.\n\n"); +#ifdef COMM_TYPE_SST + printf(" -sst, --sst \n"); +#endif #endif printf("Command given:\n"); @@ -1123,6 +1126,17 @@ int process_args(int argc, const char* argv[]) { return 0; } } +#endif +#ifdef COMM_TYPE_SST + else if (strcmp(arg, "-sst") == 0 || strcmp(arg, "--sst") == 0) { + if (argc < i + 1) { + lf_print_error("--sst needs a string argument."); + usage(argc, argv); + return 0; + } + const char* fid = argv[i++]; + lf_set_sst_config_path(fid); + } #endif else if (strcmp(arg, "--ros-args") == 0) { // FIXME: Ignore ROS arguments for now diff --git a/logging/api/logging_macros.h b/logging/api/logging_macros.h index 2a8a04c1b..b471f3a1e 100644 --- a/logging/api/logging_macros.h +++ b/logging/api/logging_macros.h @@ -169,4 +169,4 @@ static const bool _lf_log_level_is_debug = LOG_LEVEL >= LOG_LEVEL_DEBUG; } \ } while (0) -#endif // LOGGING_MACROS_H \ No newline at end of file +#endif // LOGGING_MACROS_H diff --git a/network/api/lf_sst_support.h b/network/api/lf_sst_support.h new file mode 100644 index 000000000..7fdb941ab --- /dev/null +++ b/network/api/lf_sst_support.h @@ -0,0 +1,15 @@ +#ifndef LF_SST_SUPPORT_H +#define LF_SST_SUPPORT_H + +#include "socket_common.h" +#include + +typedef struct sst_priv_t { + socket_priv_t* socket_priv; + SST_ctx_t* sst_ctx; + SST_session_ctx_t* session_ctx; +} sst_priv_t; + +void lf_set_sst_config_path(const char* config_path); + +#endif /* LF_SST_SUPPORT_H */ diff --git a/network/api/net_abstraction.h b/network/api/net_abstraction.h index 834cc8497..37d617071 100644 --- a/network/api/net_abstraction.h +++ b/network/api/net_abstraction.h @@ -16,6 +16,9 @@ #define NET_ABSTRACTION_H #include "socket_common.h" +#if defined(COMM_TYPE_SST) +#include "lf_sst_support.h" +#endif typedef void* net_abstraction_t; // net_abstraction_t diff --git a/network/impl/CMakeLists.txt b/network/impl/CMakeLists.txt index 225edf3d5..116f7db1e 100644 --- a/network/impl/CMakeLists.txt +++ b/network/impl/CMakeLists.txt @@ -10,6 +10,11 @@ target_sources(lf-network-impl PUBLIC if(COMM_TYPE MATCHES TCP) target_sources(lf-network-impl PUBLIC ${CMAKE_CURRENT_LIST_DIR}/src/lf_socket_support.c) +elseif(COMM_TYPE MATCHES SST) + find_package(OpenSSL REQUIRED) + find_package(sst-lib REQUIRED) + target_sources(lf-network-impl PUBLIC ${CMAKE_CURRENT_LIST_DIR}/src/lf_sst_support.c) + target_link_libraries(lf-network-impl PUBLIC sst-lib::sst-c-api) else() message(FATAL_ERROR "Your communication type is not supported! The C target supports TCP.") endif() diff --git a/network/impl/src/lf_sst_support.c b/network/impl/src/lf_sst_support.c new file mode 100644 index 000000000..35ee94ad4 --- /dev/null +++ b/network/impl/src/lf_sst_support.c @@ -0,0 +1,247 @@ +#include // malloc() +#include // strncpy() + +#include "net_abstraction.h" +#include "lf_sst_support.h" +#include "util.h" + +const char* sst_config_path; // The SST's configuration file path. + +static sst_priv_t* get_sst_priv_t(net_abstraction_t net_abs) { + if (net_abs == NULL) { + lf_print_error("Network abstraction is already closed."); + return NULL; + } + return (sst_priv_t*)net_abs; +} + +net_abstraction_t initialize_net_abstraction() { + // Initialize sst_priv. + sst_priv_t* sst_priv = malloc(sizeof(sst_priv_t)); + if (sst_priv == NULL) { + lf_print_error_and_exit("Falied to malloc sst_priv_t."); + } + // Initialize socket_priv. + socket_priv_t* socket_priv = malloc(sizeof(socket_priv_t)); + if (socket_priv == NULL) { + lf_print_error_and_exit("Falied to malloc socket_priv_t."); + } + + // Server initialization. + socket_priv->port = 0; + socket_priv->user_specified_port = 0; + socket_priv->socket_descriptor = -1; + + // Federate initialization + strncpy(socket_priv->server_hostname, "localhost", INET_ADDRSTRLEN); + socket_priv->server_ip_addr.s_addr = 0; + socket_priv->server_port = -1; + + sst_priv->socket_priv = socket_priv; + + // SST initialization. Only set pointers to NULL. + sst_priv->sst_ctx = NULL; + sst_priv->session_ctx = NULL; + + return (net_abstraction_t)sst_priv; +} + +void free_net_abstraction(net_abstraction_t net_abs) { + sst_priv_t* priv = get_sst_priv_t(net_abs); + free(priv->socket_priv); + free(priv); +} + +int create_server(net_abstraction_t net_abs, bool increment_port_on_retry) { + sst_priv_t* priv = get_sst_priv_t(net_abs); + SST_ctx_t* ctx = init_SST(sst_config_path); + priv->sst_ctx = ctx; + return create_socket_server(priv->socket_priv->user_specified_port, &priv->socket_priv->socket_descriptor, + &priv->socket_priv->port, TCP, increment_port_on_retry); +} + +net_abstraction_t accept_net_abstraction(net_abstraction_t server_chan, net_abstraction_t rti_chan) { + sst_priv_t* serv_priv = get_sst_priv_t(server_chan); + int rti_socket; + if (rti_chan == NULL) { + // Set to -1, to indicate that this accept_net_abstraction() call is not trying to check if the rti_chan is + // available, inside the accept_socket() function. + rti_socket = -1; + } else { + sst_priv_t* rti_priv = get_sst_priv_t(rti_chan); + rti_socket = rti_priv->socket_priv->socket_descriptor; + } + net_abstraction_t fed_net_abstraction = initialize_net_abstraction(); + sst_priv_t* fed_priv = get_sst_priv_t(fed_net_abstraction); + + int sock = accept_socket(serv_priv->socket_priv->socket_descriptor, rti_socket); + if (sock == -1) { + free_net_abstraction(fed_net_abstraction); + return NULL; + } + fed_priv->socket_priv->socket_descriptor = sock; + // Get the peer address from the connected socket_id. Saving this for the address query. + if (get_peer_address(fed_priv->socket_priv) != 0) { + lf_print_error("RTI failed to get peer address."); + }; + + // TODO: Do we need to copy sst_ctx form server_chan to fed_chan? + session_key_list_t* s_key_list = init_empty_session_key_list(); + SST_session_ctx_t* session_ctx = + server_secure_comm_setup(serv_priv->sst_ctx, fed_priv->socket_priv->socket_descriptor, s_key_list); + // Session key used is copied to the session_ctx. + free_session_key_list_t(s_key_list); + fed_priv->session_ctx = session_ctx; + return fed_net_abstraction; +} + +void create_client(net_abstraction_t net_abs) { + sst_priv_t* priv = get_sst_priv_t(net_abs); + priv->socket_priv->socket_descriptor = create_real_time_tcp_socket_errexit(); + SST_ctx_t* ctx = init_SST(sst_config_path); + priv->sst_ctx = ctx; +} + +int connect_to_net_abstraction(net_abstraction_t net_abs) { + sst_priv_t* priv = get_sst_priv_t(net_abs); + int ret = connect_to_socket(priv->socket_priv->socket_descriptor, priv->socket_priv->server_hostname, + priv->socket_priv->server_port); + if (ret != 0) { + return ret; + } + session_key_list_t* s_key_list = get_session_key(priv->sst_ctx, NULL); + SST_session_ctx_t* session_ctx = + secure_connect_to_server_with_socket(&s_key_list->s_key[0], priv->socket_priv->socket_descriptor); + priv->session_ctx = session_ctx; + return 0; +} + +// TODO: Still need to fix... +int read_from_net_abstraction(net_abstraction_t net_abs, size_t num_bytes, unsigned char* buffer) { + sst_priv_t* priv = get_sst_priv_t(net_abs); + return read_from_socket(priv->socket_priv->socket_descriptor, num_bytes, buffer); +} + +int read_from_net_abstraction_close_on_error(net_abstraction_t net_abs, size_t num_bytes, unsigned char* buffer) { + sst_priv_t* priv = get_sst_priv_t(net_abs); + int read_failed = read_from_net_abstraction(net_abs, num_bytes, buffer); + if (read_failed) { + // Read failed. + // Socket has probably been closed from the other side. + // Shut down and close the socket from this side. + shutdown_socket(&priv->socket_priv->socket_descriptor, false); + return -1; + } + return 0; +} + +void read_from_net_abstraction_fail_on_error(net_abstraction_t net_abs, size_t num_bytes, unsigned char* buffer, + char* format, ...) { + va_list args; + int read_failed = read_from_net_abstraction_close_on_error(net_abs, num_bytes, buffer); + if (read_failed) { + // Read failed. + if (format != NULL) { + va_start(args, format); + lf_print_error_system_failure(format, args); + va_end(args); + } else { + lf_print_error_system_failure("Failed to read from socket."); + } + } +} + +int write_to_net_abstraction(net_abstraction_t net_abs, size_t num_bytes, unsigned char* buffer) { + sst_priv_t* priv = get_sst_priv_t(net_abs); + return write_to_socket(priv->socket_priv->socket_descriptor, num_bytes, buffer); +} + +int write_to_net_abstraction_close_on_error(net_abstraction_t net_abs, size_t num_bytes, unsigned char* buffer) { + sst_priv_t* priv = get_sst_priv_t(net_abs); + int result = write_to_net_abstraction(net_abs, num_bytes, buffer); + if (result) { + // Write failed. + // Socket has probably been closed from the other side. + // Shut down and close the socket from this side. + shutdown_socket(&priv->socket_priv->socket_descriptor, false); + } + return result; +} + +void write_to_net_abstraction_fail_on_error(net_abstraction_t net_abs, size_t num_bytes, unsigned char* buffer, + lf_mutex_t* mutex, char* format, ...) { + va_list args; + int result = write_to_net_abstraction_close_on_error(net_abs, num_bytes, buffer); + if (result) { + // Write failed. + if (mutex != NULL) { + LF_MUTEX_UNLOCK(mutex); + } + if (format != NULL) { + va_start(args, format); + lf_print_error_system_failure(format, args); + va_end(args); + } else { + lf_print_error("Failed to write to socket. Closing it."); + } + } +} + +bool check_net_abstraction_closed(net_abstraction_t net_abs) { + sst_priv_t* priv = get_sst_priv_t(net_abs); + return check_socket_closed(priv->socket_priv->socket_descriptor); +} + +int shutdown_net_abstraction(net_abstraction_t net_abs, bool read_before_closing) { + if (net_abs == NULL) { + lf_print("Socket already closed."); + return 0; + } + sst_priv_t* priv = get_sst_priv_t(net_abs); + int ret = shutdown_socket(&priv->socket_priv->socket_descriptor, read_before_closing); + if (ret != 0) { + lf_print_error("Failed to shutdown socket."); + } + free_net_abstraction(net_abs); + return ret; +} +// END of TODO: + +// Get/set functions. +int32_t get_my_port(net_abstraction_t net_abs) { + sst_priv_t* priv = get_sst_priv_t(net_abs); + return priv->socket_priv->port; +} + +int32_t get_server_port(net_abstraction_t net_abs) { + sst_priv_t* priv = get_sst_priv_t(net_abs); + return priv->socket_priv->server_port; +} + +struct in_addr* get_ip_addr(net_abstraction_t net_abs) { + sst_priv_t* priv = get_sst_priv_t(net_abs); + return &priv->socket_priv->server_ip_addr; +} + +char* get_server_hostname(net_abstraction_t net_abs) { + sst_priv_t* priv = get_sst_priv_t(net_abs); + return priv->socket_priv->server_hostname; +} + +void set_my_port(net_abstraction_t net_abs, int32_t port) { + sst_priv_t* priv = get_sst_priv_t(net_abs); + priv->socket_priv->port = port; +} + +void set_server_port(net_abstraction_t net_abs, int32_t port) { + sst_priv_t* priv = get_sst_priv_t(net_abs); + priv->socket_priv->server_port = port; +} + +void set_server_hostname(net_abstraction_t net_abs, const char* hostname) { + sst_priv_t* priv = get_sst_priv_t(net_abs); + memcpy(priv->socket_priv->server_hostname, hostname, INET_ADDRSTRLEN); +} + +// Helper function. +void lf_set_sst_config_path(const char* config_path) { sst_config_path = config_path; }