diff --git a/.gitmodules b/.gitmodules index a6b49b86d..bc1adab93 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,3 +4,6 @@ [submodule "external/nanopb"] path = external/nanopb url = https://github.com/nanopb/nanopb.git +[submodule "external/libcoap"] + path = external/libcoap + url = https://github.com/obgm/libcoap.git diff --git a/CMakeLists.txt b/CMakeLists.txt index 66b128bc7..e8e04c533 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,6 +10,7 @@ set(ASAN OFF CACHE BOOL "Compile with AddressSanitizer") set(PLATFORM "POSIX" CACHE STRING "Platform to target") set(SCHEDULER "DYNAMIC" CACHE STRING "Scheduler to use") set(NETWORK_CHANNEL_TCP_POSIX OFF CACHE BOOL "Use POSIX TCP NetworkChannel") +set(NETWORK_CHANNEL_COAP OFF CACHE BOOL "Use CoAP NetworkChannel") set(FEDERATED OFF CACHE BOOL "Compile with federated sources") # Code coverage setup @@ -47,6 +48,40 @@ set(NANOPB_PATH external/nanopb) if (PLATFORM STREQUAL "POSIX") add_library(reactor-uc STATIC ${SOURCES}) target_link_libraries(reactor-uc PRIVATE pthread) + + # Set up options before libcoap is included + set(COAP_DISABLE_DTLS ON CACHE BOOL "Disable DTLS" FORCE) + set(COAP_BUILD_DOCS OFF CACHE BOOL "Disable docs" FORCE) + set(COAP_BUILD_EXAMPLES OFF CACHE BOOL "Disable examples" FORCE) + set(COAP_BUILD_TESTS OFF CACHE BOOL "Disable tests" FORCE) + + # Add libcoap as a subdirectory + add_subdirectory(external/libcoap) + + # Include internal libcoap headers if needed + target_include_directories(reactor-uc PUBLIC + external/libcoap/include + external/libcoap/src + ) + + # Link with the libcoap static library + target_link_libraries(reactor-uc PRIVATE coap-3) + + + # TODO: Remove this! + # Add Unity as a subdirectory + add_subdirectory(external/Unity) + + # Include internal Unity headers if needed + target_include_directories(reactor-uc PUBLIC + external/Unity/src + ) + + add_library(Unity STATIC external/Unity/src/unity.c) + target_include_directories(Unity PUBLIC ${UNITY_DIR}/src) + + # Link with the Unity static library + target_link_libraries(reactor-uc PRIVATE Unity) elseif (PLATFORM STREQUAL "FLEXPRET") add_library(reactor-uc STATIC ${SOURCES}) add_subdirectory($ENV{FP_SDK_PATH} BINARY_DIR) @@ -87,6 +122,10 @@ if(NETWORK_CHANNEL_TCP_POSIX) target_compile_definitions(reactor-uc PRIVATE NETWORK_CHANNEL_TCP_POSIX) endif() +if(NETWORK_CHANNEL_COAP) + target_compile_definitions(reactor-uc PUBLIC NETWORK_CHANNEL_COAP) +endif() + if(FEDERATED) target_compile_definitions(reactor-uc PUBLIC FEDERATED) endif() @@ -109,3 +148,4 @@ if(BUILD_UNIT_TESTS) set_target_properties( Unity PROPERTIES C_CLANG_TIDY "") # Disable clang-tidy for this external lib. add_subdirectory(test/unit) endif() + diff --git a/examples/posix/hello/hello.c b/examples/posix/hello/hello.c index 5f6bb4f8b..c4c838474 100644 --- a/examples/posix/hello/hello.c +++ b/examples/posix/hello/hello.c @@ -1,5 +1,5 @@ #include "reactor-uc/reactor-uc.h" -#include "../../common/timer_source.h" +#include "../../common/timer_source.h" LF_DEFINE_REACTION_BODY(TimerSource, r) { LF_SCOPE_SELF(TimerSource); diff --git a/external/libcoap b/external/libcoap new file mode 160000 index 000000000..17c3feeb0 --- /dev/null +++ b/external/libcoap @@ -0,0 +1 @@ +Subproject commit 17c3feeb0837b6583ed698c6d2430c8d19705dba diff --git a/include/reactor-uc/network_channel.h b/include/reactor-uc/network_channel.h index 8796528bf..1642990bb 100644 --- a/include/reactor-uc/network_channel.h +++ b/include/reactor-uc/network_channel.h @@ -132,6 +132,9 @@ struct AsyncNetworkChannel { #ifdef NETWORK_CHANNEL_TCP_POSIX #include "platform/posix/tcp_ip_channel.h" #endif +#ifdef NETWORK_CHANNEL_COAP +#include "platform/posix/coap_udp_ip_channel.h" +#endif #elif defined(PLATFORM_ZEPHYR) #ifdef NETWORK_CHANNEL_TCP_POSIX diff --git a/include/reactor-uc/platform/posix/coap_udp_ip_channel.h b/include/reactor-uc/platform/posix/coap_udp_ip_channel.h new file mode 100644 index 000000000..171d91f1a --- /dev/null +++ b/include/reactor-uc/platform/posix/coap_udp_ip_channel.h @@ -0,0 +1,57 @@ +#ifndef REACTOR_UC_COAP_UDP_IP_CHANNEL_H +#define REACTOR_UC_COAP_UDP_IP_CHANNEL_H +#include "reactor-uc/network_channel.h" +#include "reactor-uc/environment.h" +#include +#include + +#define COAP_UDP_IP_CHANNEL_EXPECTED_CONNECT_DURATION MSEC(10) +#define COAP_UDP_IP_CHANNEL_BUFFERSIZE 1024 +#define COAP_UDP_IP_CHANNEL_RECV_THREAD_STACK_SIZE 2048 + +typedef struct CoapUdpIpChannel CoapUdpIpChannel; +typedef struct FederatedConnectionBundle FederatedConnectionBundle; + +typedef enum { + COAP_REQUEST_TYPE_NONE, + COAP_REQUEST_TYPE_CONNECT, + COAP_REQUEST_TYPE_MESSAGE, + COAP_REQUEST_TYPE_DISCONNECT +} coap_request_type_t; + +typedef struct CoapUdpIpChannel { + NetworkChannel super; + + // Remote address etc. + coap_session_t *session; + + // Threading and synchronization + pthread_mutex_t state_mutex; + pthread_cond_t state_cond; + pthread_mutex_t send_mutex; + pthread_cond_t send_cond; + + NetworkChannelState state; + + FederateMessage output; + + // Handle message callbacks + coap_request_type_t last_request_type; + coap_mid_t last_request_mid; + + FederatedConnectionBundle *federated_connection; + void (*receive_callback)(FederatedConnectionBundle *conn, const FederateMessage *message); +} CoapUdpIpChannel; + +/** + * @brief Constructor for the CoapUdpIpChannel. + * + * Initializes a CoapUdpIpChannel instance with the specified remote host and protocol family. + * + * @param self Pointer to the CoapUdpIpChannel instance. + * @param remote_host The remote host address, hostname or domain E.g. 127.0.0.1, [::1] or hostname.local. + * @param remote_protocol_family The protocol family (e.g., AF_INET for IPv4 and AF_INET6 for IPv6). + */ +void CoapUdpIpChannel_ctor(CoapUdpIpChannel *self, const char *remote_host, int remote_protocol_family); + +#endif diff --git a/include/reactor-uc/platform/riot/coap_udp_ip_channel.h b/include/reactor-uc/platform/riot/coap_udp_ip_channel.h index 9a0e2ad23..8dd164cbd 100644 --- a/include/reactor-uc/platform/riot/coap_udp_ip_channel.h +++ b/include/reactor-uc/platform/riot/coap_udp_ip_channel.h @@ -5,9 +5,9 @@ #include "net/sock/udp.h" #include "mutex.h" -#define COAP_UDP_IP_CHANNEL_EXPECTED_CONNECT_DURATION MSEC(10); +#define COAP_UDP_IP_CHANNEL_EXPECTED_CONNECT_DURATION MSEC(10) #define COAP_UDP_IP_CHANNEL_BUFFERSIZE 1024 -// #define COAP_UDP_IP_CHANNEL_NUM_RETRIES 255; +// #define COAP_UDP_IP_CHANNEL_NUM_RETRIES 255 // #define COAP_UDP_IP_CHANNEL_RECV_THREAD_STACK_SIZE 2048 // #define COAP_UDP_IP_CHANNEL_RECV_THREAD_STACK_GUARD_SIZE 128 @@ -30,6 +30,15 @@ struct CoapUdpIpChannel { void (*receive_callback)(FederatedConnectionBundle *conn, const FederateMessage *message); }; -void CoapUdpIpChannel_ctor(CoapUdpIpChannel *self, const char *remote_address, int remote_protocol_family); +/** + * @brief Constructor for the CoapUdpIpChannel. + * + * Initializes a CoapUdpIpChannel instance with the specified remote host and protocol family. + * + * @param self Pointer to the CoapUdpIpChannel instance. + * @param remote_host The remote host address, hostname or domain E.g. 127.0.0.1, [::1] or hostname.local. + * @param remote_protocol_family The protocol family (e.g., AF_INET for IPv4 and AF_INET6 for IPv6). + */ +void CoapUdpIpChannel_ctor(CoapUdpIpChannel *self, const char *remote_host, int remote_protocol_family); #endif diff --git a/src/network_channel.c b/src/network_channel.c index f7744489b..2f8d7173b 100644 --- a/src/network_channel.c +++ b/src/network_channel.c @@ -4,6 +4,9 @@ #ifdef NETWORK_CHANNEL_TCP_POSIX #include "platform/posix/tcp_ip_channel.c" #endif +#ifdef NETWORK_CHANNEL_COAP +#include "platform/posix/coap_udp_ip_channel.c" +#endif #elif defined(PLATFORM_ZEPHYR) #ifdef NETWORK_CHANNEL_TCP_POSIX diff --git a/src/platform/posix/coap_udp_ip_channel.c b/src/platform/posix/coap_udp_ip_channel.c new file mode 100644 index 000000000..008685f2e --- /dev/null +++ b/src/platform/posix/coap_udp_ip_channel.c @@ -0,0 +1,597 @@ +#include "reactor-uc/platform/posix/coap_udp_ip_channel.h" +#include "reactor-uc/logging.h" +#include "reactor-uc/environments/federated_environment.h" +#include "reactor-uc/serialization.h" + +#include +#include + +#define COAP_UDP_IP_CHANNEL_ERR(fmt, ...) LF_ERR(NET, "CoapUdpIpChannel: " fmt, ##__VA_ARGS__) +#define COAP_UDP_IP_CHANNEL_WARN(fmt, ...) LF_WARN(NET, "CoapUdpIpChannel: " fmt, ##__VA_ARGS__) +#define COAP_UDP_IP_CHANNEL_INFO(fmt, ...) LF_INFO(NET, "CoapUdpIpChannel: " fmt, ##__VA_ARGS__) +#define COAP_UDP_IP_CHANNEL_DEBUG(fmt, ...) LF_DEBUG(NET, "CoapUdpIpChannel: " fmt, ##__VA_ARGS__) + +static coap_context_t *_coap_context = NULL; +static pthread_t _connection_thread = 0; +static bool _coap_is_globals_initialized = false; +static pthread_mutex_t _global_mutex = PTHREAD_MUTEX_INITIALIZER; + +static void _CoapUdpIpChannel_update_state(CoapUdpIpChannel *self, NetworkChannelState new_state) { + COAP_UDP_IP_CHANNEL_DEBUG("Update state: %s => %s", NetworkChannel_state_to_string(self->state), + NetworkChannel_state_to_string(new_state)); + + // Store old state + NetworkChannelState old_state; + + // Update the state of the channel to its new state + pthread_mutex_lock(&self->state_mutex); + old_state = self->state; + self->state = new_state; + pthread_mutex_unlock(&self->state_mutex); + + // Inform runtime about new state if it changed from or to NETWORK_CHANNEL_STATE_CONNECTED + if ((old_state == NETWORK_CHANNEL_STATE_CONNECTED && new_state != NETWORK_CHANNEL_STATE_CONNECTED) || + (old_state != NETWORK_CHANNEL_STATE_CONNECTED && new_state == NETWORK_CHANNEL_STATE_CONNECTED)) { + _lf_environment->platform->notify(_lf_environment->platform); + } + + // Signal connection thread to evaluate new state + pthread_cond_signal(&self->state_cond); +} + +static void _CoapUdpIpChannel_update_state_if_not(CoapUdpIpChannel *self, NetworkChannelState new_state, + NetworkChannelState if_not) { + // Update the state of the channel itself + pthread_mutex_lock(&self->state_mutex); + if (self->state != if_not) { + COAP_UDP_IP_CHANNEL_DEBUG("Update state: %s => %s", NetworkChannel_state_to_string(self->state), + NetworkChannel_state_to_string(new_state)); + self->state = new_state; + } + pthread_mutex_unlock(&self->state_mutex); + + // Inform runtime about new state + _lf_environment->platform->notify(_lf_environment->platform); +} + +static NetworkChannelState _CoapUdpIpChannel_get_state(CoapUdpIpChannel *self) { + NetworkChannelState state; + + pthread_mutex_lock(&self->state_mutex); + state = self->state; + pthread_mutex_unlock(&self->state_mutex); + + return state; +} + +static CoapUdpIpChannel *_CoapUdpIpChannel_get_coap_channel_by_session(coap_session_t *session) { + CoapUdpIpChannel *channel; + FederatedEnvironment *env = (FederatedEnvironment *)_lf_environment; + + // Get the remote session address + coap_address_t remote_addr; + coap_address_copy(&remote_addr, coap_session_get_addr_remote(session)); + // Set incoming port to COAP_DEFAULT_PORT to ignore it in the address comparison + coap_address_set_port(&remote_addr, COAP_DEFAULT_PORT); + + for (size_t i = 0; i < env->net_bundles_size; i++) { + if (env->net_bundles[i]->net_channel->type == NETWORK_CHANNEL_TYPE_COAP_UDP_IP) { + channel = (CoapUdpIpChannel *)env->net_bundles[i]->net_channel; + + // The port of the channel address is already using COAP_DEFAULT_PORT + if (coap_address_equals(coap_session_get_addr_remote(channel->session), &remote_addr)) { + return channel; + } + } + } + + // Debug print address + char addr_str[INET6_ADDRSTRLEN]; + coap_print_addr(&remote_addr, (unsigned char *)addr_str, sizeof(addr_str)); + COAP_UDP_IP_CHANNEL_ERR("Channel not found by session (addr=%s)", addr_str); + return NULL; +} + +static coap_response_t _CoapUdpIpChannel_client_response_handler(coap_session_t *session, const coap_pdu_t *sent, + const coap_pdu_t *received, const coap_mid_t id) { + (void)sent; + CoapUdpIpChannel *self = _CoapUdpIpChannel_get_coap_channel_by_session(session); + if (self == NULL) { + return COAP_RESPONSE_FAIL; + } + + // Verify this is the NACK that is expected (messages are not in order guaranteed) + if (id != self->last_request_mid) { + COAP_UDP_IP_CHANNEL_WARN("Received response for unexpected MID: %d (expected: %d)", id, self->last_request_mid); + return COAP_RESPONSE_OK; // Ignore out-of-order responses + } + + coap_pdu_code_t code = coap_pdu_get_code(received); + bool success = (COAP_RESPONSE_CLASS(code) == 2); + + switch (self->last_request_type) { + case COAP_REQUEST_TYPE_CONNECT: + if (success) { + _CoapUdpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_CONNECTED); + } else { + COAP_UDP_IP_CHANNEL_ERR("CONNECTION REJECTED => Try to connect again"); + _CoapUdpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED); + } + break; + + case COAP_REQUEST_TYPE_MESSAGE: + pthread_mutex_lock(&self->send_mutex); + if (!success) { + COAP_UDP_IP_CHANNEL_ERR("MESSAGE REJECTED"); + _CoapUdpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_LOST_CONNECTION); + } + pthread_cond_signal(&self->send_cond); + pthread_mutex_unlock(&self->send_mutex); + break; + + case COAP_REQUEST_TYPE_DISCONNECT: + break; + + default: + COAP_UDP_IP_CHANNEL_WARN("Received response for unknown request type: %d", self->last_request_type); + break; + } + + // Clear the last request info + self->last_request_type = COAP_REQUEST_TYPE_NONE; + self->last_request_mid = COAP_INVALID_MID; + + return COAP_RESPONSE_OK; +} + +static void _CoapUdpIpChannel_client_nack_handler(coap_session_t *session, const coap_pdu_t *sent, + const coap_nack_reason_t reason, const coap_mid_t mid) { + (void)sent; + (void)reason; + CoapUdpIpChannel *self = _CoapUdpIpChannel_get_coap_channel_by_session(session); + if (self == NULL) { + return; + } + + // Verify this is the NACK that is expected (messages are not in order guaranteed) + if (mid != self->last_request_mid) { + COAP_UDP_IP_CHANNEL_WARN("Received NACK for unexpected MID: %d (expected: %d)", mid, self->last_request_mid); + return; + } + + switch (self->last_request_type) { + case COAP_REQUEST_TYPE_CONNECT: + COAP_UDP_IP_CHANNEL_ERR("TIMEOUT => Try to connect again"); + _CoapUdpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED); + break; + + case COAP_REQUEST_TYPE_MESSAGE: + COAP_UDP_IP_CHANNEL_ERR("MESSAGE TIMEOUT"); + _CoapUdpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_LOST_CONNECTION); + pthread_mutex_lock(&self->send_mutex); + pthread_cond_signal(&self->send_cond); + pthread_mutex_unlock(&self->send_mutex); + break; + + case COAP_REQUEST_TYPE_DISCONNECT: + break; + + default: + COAP_UDP_IP_CHANNEL_WARN("Received NACK for unknown request type: %d", self->last_request_type); + break; + } + + // Clear the last request info + self->last_request_type = COAP_REQUEST_TYPE_NONE; + self->last_request_mid = COAP_INVALID_MID; +} + +static void _CoapUdpIpChannel_server_connect_handler(coap_resource_t *resource, coap_session_t *session, + const coap_pdu_t *request, const coap_string_t *query, + coap_pdu_t *response) { + (void)response; + (void)request; + (void)session; + (void)resource; + (void)query; + COAP_UDP_IP_CHANNEL_DEBUG("Server connect handler"); + CoapUdpIpChannel *self = _CoapUdpIpChannel_get_coap_channel_by_session(session); + + // Error => return 401 (unauthorized) + if (self == NULL) { + COAP_UDP_IP_CHANNEL_ERR("Server connect handler: Client has unknown IP address"); + coap_pdu_set_code(response, COAP_RESPONSE_CODE_UNAUTHORIZED); + return; + } + + // Error => return 503 (service unavailable) + if (_CoapUdpIpChannel_get_state(self) == NETWORK_CHANNEL_STATE_CLOSED) { + COAP_UDP_IP_CHANNEL_ERR("Server connect handler: Channel is closed"); + coap_pdu_set_code(response, COAP_RESPONSE_CODE_SERVICE_UNAVAILABLE); + return; + } + + // Success => return 204 (no content) + coap_pdu_set_code(response, COAP_RESPONSE_CODE_CHANGED); + return; +} + +static void _CoapUdpIpChannel_server_disconnect_handler(coap_resource_t *resource, coap_session_t *session, + const coap_pdu_t *request, const coap_string_t *query, + coap_pdu_t *response) { + (void)resource; + (void)query; + (void)request; + COAP_UDP_IP_CHANNEL_DEBUG("Server disconnect handler"); + CoapUdpIpChannel *self = _CoapUdpIpChannel_get_coap_channel_by_session(session); + + // Error => return 401 (unauthorized) + if (self == NULL) { + COAP_UDP_IP_CHANNEL_ERR("Server disconnect handler: Client has unknown IP address"); + coap_pdu_set_code(response, COAP_RESPONSE_CODE_UNAUTHORIZED); + return; + } + + // Update state because it does not make sense to send data to a closed connection. + if (_CoapUdpIpChannel_get_state(self) != NETWORK_CHANNEL_STATE_UNINITIALIZED && + _CoapUdpIpChannel_get_state(self) != NETWORK_CHANNEL_STATE_CLOSED) { + _CoapUdpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_CLOSED); + } + + // Success => return 204 (no content) + coap_pdu_set_code(response, COAP_RESPONSE_CODE_CHANGED); + return; +} + +static void _CoapUdpIpChannel_server_message_handler(coap_resource_t *resource, coap_session_t *session, + const coap_pdu_t *request, const coap_string_t *query, + coap_pdu_t *response) { + (void)resource; + (void)query; + COAP_UDP_IP_CHANNEL_DEBUG("Server message handler"); + CoapUdpIpChannel *self = _CoapUdpIpChannel_get_coap_channel_by_session(session); + + // Error => return 401 (unauthorized) + if (self == NULL) { + COAP_UDP_IP_CHANNEL_ERR("Server message handler: Client has unknown IP address"); + coap_pdu_set_code(response, COAP_RESPONSE_CODE_UNAUTHORIZED); + return; + } + + // Get payload from request + const uint8_t *payload; + size_t payload_len; + if (coap_get_data(request, &payload_len, &payload) == 0) { + COAP_UDP_IP_CHANNEL_ERR("Server message handler: No payload in request"); + coap_pdu_set_code(response, COAP_RESPONSE_CODE_BAD_REQUEST); + return; + } + + // Deserialize received message + deserialize_from_protobuf(&self->output, payload, payload_len); + COAP_UDP_IP_CHANNEL_DEBUG("Server message handler: Server received message"); + + // Call registered receive callback to inform runtime about the new message + if (self->receive_callback) { + self->receive_callback(self->federated_connection, &self->output); + } + + // Respond to the other federate + coap_pdu_set_code(response, COAP_RESPONSE_CODE_CHANGED); + return; +} + +static bool _CoapUdpIpChannel_send_coap_message(CoapUdpIpChannel *self, const char *path, + const FederateMessage *message) { + if (!self->session) { + COAP_UDP_IP_CHANNEL_ERR("No session available"); + return false; + } + + coap_pdu_t *pdu = coap_new_pdu(COAP_MESSAGE_CON, COAP_REQUEST_CODE_POST, self->session); + if (!pdu) { + COAP_UDP_IP_CHANNEL_ERR("Failed to create PDU"); + return false; + } + + // Add URI path + coap_add_option(pdu, COAP_OPTION_URI_PATH, strlen(path), (const uint8_t *)path); + + // Add payload if message is provided + if (message) { + uint8_t payload_buffer[COAP_UDP_IP_CHANNEL_BUFFERSIZE]; + int payload_len = serialize_to_protobuf(message, payload_buffer, sizeof(payload_buffer)); + + if (payload_len < 0) { + COAP_UDP_IP_CHANNEL_ERR("Could not encode protobuf"); + coap_delete_pdu(pdu); + return false; + } + + if ((unsigned long)payload_len > sizeof(payload_buffer)) { + COAP_UDP_IP_CHANNEL_ERR("Payload too large (%d > %zu)", payload_len, sizeof(payload_buffer)); + coap_delete_pdu(pdu); + return false; + } + + uint8_t content_format = COAP_MEDIATYPE_APPLICATION_OCTET_STREAM; + coap_add_option(pdu, COAP_OPTION_CONTENT_FORMAT, 1, &content_format); + coap_add_data(pdu, payload_len, payload_buffer); + } + + coap_mid_t mid = coap_send(self->session, pdu); + if (mid == COAP_INVALID_MID) { + COAP_UDP_IP_CHANNEL_ERR("Failed to send CoAP message"); + return false; + } + + // Track this request for response handling + if (strcmp(path, "connect") == 0) { + self->last_request_type = COAP_REQUEST_TYPE_CONNECT; + } else if (strcmp(path, "message") == 0) { + self->last_request_type = COAP_REQUEST_TYPE_MESSAGE; + } else if (strcmp(path, "disconnect") == 0) { + self->last_request_type = COAP_REQUEST_TYPE_DISCONNECT; + } else { + self->last_request_type = COAP_REQUEST_TYPE_NONE; + } + self->last_request_mid = mid; + + COAP_UDP_IP_CHANNEL_DEBUG("CoAP Message sent (MID: %d, Type: %d)", mid, self->last_request_type); + return true; +} + +static lf_ret_t _CoapUdpIpChannel_client_send_connect_message(CoapUdpIpChannel *self) { + if (!_CoapUdpIpChannel_send_coap_message(self, "connect", NULL)) { + _CoapUdpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED); + COAP_UDP_IP_CHANNEL_ERR("Open connection: Failed to send CoAP message"); + return LF_ERR; + } else { + _CoapUdpIpChannel_update_state_if_not(self, NETWORK_CHANNEL_STATE_CONNECTION_IN_PROGRESS, + NETWORK_CHANNEL_STATE_CONNECTED); + } + + return LF_OK; +} + +static lf_ret_t CoapUdpIpChannel_open_connection(NetworkChannel *untyped_self) { + COAP_UDP_IP_CHANNEL_DEBUG("Open connection"); + CoapUdpIpChannel *self = (CoapUdpIpChannel *)untyped_self; + + _CoapUdpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_OPEN); + return LF_OK; +} + +static void CoapUdpIpChannel_close_connection(NetworkChannel *untyped_self) { + COAP_UDP_IP_CHANNEL_DEBUG("Close connection"); + CoapUdpIpChannel *self = (CoapUdpIpChannel *)untyped_self; + + // Immediately close the channel + _CoapUdpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_CLOSED); + + // Inform the other federate that the channel is closed + if (self->session) { + _CoapUdpIpChannel_send_coap_message(self, "disconnect", NULL); + } +} + +static lf_ret_t CoapUdpIpChannel_send_blocking(NetworkChannel *untyped_self, const FederateMessage *message) { + COAP_UDP_IP_CHANNEL_DEBUG("Send blocking"); + CoapUdpIpChannel *self = (CoapUdpIpChannel *)untyped_self; + + // Send message + pthread_mutex_lock(&self->send_mutex); + + if (_CoapUdpIpChannel_send_coap_message(self, "message", message)) { + // Wait until the response handler confirms the ack or times out + pthread_cond_wait(&self->send_cond, &self->send_mutex); + + pthread_mutex_unlock(&self->send_mutex); + + if (_CoapUdpIpChannel_get_state(self) == NETWORK_CHANNEL_STATE_CONNECTED) { + return LF_OK; + } + } + + pthread_mutex_unlock(&self->send_mutex); + + return LF_ERR; +} + +static void CoapUdpIpChannel_register_receive_callback(NetworkChannel *untyped_self, + void (*receive_callback)(FederatedConnectionBundle *conn, + const FederateMessage *msg), + FederatedConnectionBundle *conn) { + COAP_UDP_IP_CHANNEL_INFO("Register receive callback"); + CoapUdpIpChannel *self = (CoapUdpIpChannel *)untyped_self; + + self->receive_callback = receive_callback; + self->federated_connection = conn; +} + +static void CoapUdpIpChannel_free(NetworkChannel *untyped_self) { + COAP_UDP_IP_CHANNEL_DEBUG("Free"); + CoapUdpIpChannel *self = (CoapUdpIpChannel *)untyped_self; + + // Close session if active + if (self->session) { + coap_session_release(self->session); + self->session = NULL; + } + + // Clean up mutexes and condition variables + pthread_mutex_destroy(&self->state_mutex); + pthread_cond_destroy(&self->state_cond); + pthread_mutex_destroy(&self->send_mutex); + pthread_cond_destroy(&self->send_cond); +} + +static bool CoapUdpIpChannel_is_connected(NetworkChannel *untyped_self) { + CoapUdpIpChannel *self = (CoapUdpIpChannel *)untyped_self; + return _CoapUdpIpChannel_get_state(self) == NETWORK_CHANNEL_STATE_CONNECTED; +} + +void *_CoapUdpIpChannel_connection_thread(void *arg) { + (void)arg; + COAP_UDP_IP_CHANNEL_DEBUG("Start connection thread"); + + while (true) { + // Process CoAP events + if (_coap_context) { + coap_io_process(_coap_context, 1000); // 1 second timeout + } + + // Check all channels for state changes + FederatedEnvironment *env = (FederatedEnvironment *)_lf_environment; + for (size_t i = 0; i < env->net_bundles_size; i++) { + if (env->net_bundles[i]->net_channel->type == NETWORK_CHANNEL_TYPE_COAP_UDP_IP) { + CoapUdpIpChannel *self = (CoapUdpIpChannel *)env->net_bundles[i]->net_channel; + + NetworkChannelState state = _CoapUdpIpChannel_get_state(self); + + switch (state) { + case NETWORK_CHANNEL_STATE_OPEN: + /* try to connect */ + _CoapUdpIpChannel_client_send_connect_message(self); + break; + + case NETWORK_CHANNEL_STATE_CONNECTION_IN_PROGRESS: + /* nothing to do */ + break; + + case NETWORK_CHANNEL_STATE_LOST_CONNECTION: + case NETWORK_CHANNEL_STATE_CONNECTION_FAILED: + /* try to reconnect */ + _CoapUdpIpChannel_client_send_connect_message(self); + break; + + case NETWORK_CHANNEL_STATE_CONNECTED: + case NETWORK_CHANNEL_STATE_UNINITIALIZED: + case NETWORK_CHANNEL_STATE_CLOSED: + break; + } + } + } + } + + return NULL; +} + +void CoapUdpIpChannel_ctor(CoapUdpIpChannel *self, const char *remote_host, int remote_protocol_family) { + assert(self != NULL); + assert(remote_host != NULL); + + // Initialize global coap context if not already done + pthread_mutex_lock(&_global_mutex); + if (!_coap_is_globals_initialized) { + _coap_is_globals_initialized = true; + + // Initialize libcoap + coap_startup(); + coap_set_log_level(COAP_LOG_DEBUG); + + // Create CoAP context and endpoints + _coap_context = coap_new_context(NULL); + if (!_coap_context) { + COAP_UDP_IP_CHANNEL_ERR("Failed to create CoAP context"); + pthread_mutex_unlock(&_global_mutex); + return; + } + + // Set response and NACK handlers + coap_register_response_handler(_coap_context, _CoapUdpIpChannel_client_response_handler); + coap_register_nack_handler(_coap_context, _CoapUdpIpChannel_client_nack_handler); + + // Let libcoap handle multi-block payloads + coap_context_set_block_mode(_coap_context, COAP_BLOCK_USE_LIBCOAP | COAP_BLOCK_SINGLE_BODY); + + // Create CoAP listening endpoint(s) - listen on all interfaces (0.0.0.0) + int scheme_hint_bits = coap_get_available_scheme_hint_bits(0, 0, COAP_PROTO_UDP); + coap_str_const_t *listen_addr = coap_make_str_const("0.0.0.0"); + coap_addr_info_t *info_list = coap_resolve_address_info(listen_addr, COAP_DEFAULT_PORT, 0, 0, 0, AF_UNSPEC, + scheme_hint_bits, COAP_RESOLVE_TYPE_LOCAL); + + bool endpoint_created = false; + for (coap_addr_info_t *info = info_list; info != NULL; info = info->next) { + coap_endpoint_t *ep = coap_new_endpoint(_coap_context, &info->addr, info->proto); + if (ep) { + endpoint_created = true; + COAP_UDP_IP_CHANNEL_DEBUG("Created CoAP endpoint for protocol %u on 0.0.0.0", info->proto); + } else { + COAP_UDP_IP_CHANNEL_WARN("Failed to create endpoint for protocol %u", info->proto); + } + } + coap_free_address_info(info_list); + + if (!endpoint_created) { + COAP_UDP_IP_CHANNEL_ERR("Failed to create any CoAP endpoints"); + coap_free_context(_coap_context); + _coap_context = NULL; + pthread_mutex_unlock(&_global_mutex); + return; + } + + // Create server resources + coap_resource_t *connect_resource = coap_resource_init(coap_make_str_const("connect"), 0); + coap_register_handler(connect_resource, COAP_REQUEST_POST, _CoapUdpIpChannel_server_connect_handler); + coap_add_resource(_coap_context, connect_resource); + + coap_resource_t *disconnect_resource = coap_resource_init(coap_make_str_const("disconnect"), 0); + coap_register_handler(disconnect_resource, COAP_REQUEST_POST, _CoapUdpIpChannel_server_disconnect_handler); + coap_add_resource(_coap_context, disconnect_resource); + + coap_resource_t *message_resource = coap_resource_init(coap_make_str_const("message"), 0); + coap_register_handler(message_resource, COAP_REQUEST_POST, _CoapUdpIpChannel_server_message_handler); + coap_add_resource(_coap_context, message_resource); + + // Create connection thread + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setstacksize(&attr, COAP_UDP_IP_CHANNEL_RECV_THREAD_STACK_SIZE); + if (pthread_create(&_connection_thread, &attr, _CoapUdpIpChannel_connection_thread, NULL) != 0) { + COAP_UDP_IP_CHANNEL_ERR("Failed to create connection thread"); + } + pthread_attr_destroy(&attr); + } + pthread_mutex_unlock(&_global_mutex); + + // Super fields + self->super.expected_connect_duration = COAP_UDP_IP_CHANNEL_EXPECTED_CONNECT_DURATION; + self->super.type = NETWORK_CHANNEL_TYPE_COAP_UDP_IP; + self->super.mode = NETWORK_CHANNEL_MODE_ASYNC; + self->super.is_connected = CoapUdpIpChannel_is_connected; + self->super.open_connection = CoapUdpIpChannel_open_connection; + self->super.close_connection = CoapUdpIpChannel_close_connection; + self->super.send_blocking = CoapUdpIpChannel_send_blocking; + self->super.register_receive_callback = CoapUdpIpChannel_register_receive_callback; + self->super.free = CoapUdpIpChannel_free; + + // Concrete fields + self->session = NULL; + self->receive_callback = NULL; + self->federated_connection = NULL; + self->state = NETWORK_CHANNEL_STATE_UNINITIALIZED; + + // Initialize mutexes and condition variables + pthread_mutex_init(&self->state_mutex, NULL); + pthread_cond_init(&self->state_cond, NULL); + pthread_mutex_init(&self->send_mutex, NULL); + pthread_cond_init(&self->send_cond, NULL); + + // Convert host to coap address + coap_str_const_t *host_str = coap_make_str_const(remote_host); + int scheme_hint_bits = coap_get_available_scheme_hint_bits(0, 0, COAP_PROTO_UDP); + coap_addr_info_t *addr_info = + coap_resolve_address_info(host_str, COAP_DEFAULT_PORT, COAP_DEFAULT_PORT, 0, 0, remote_protocol_family, + scheme_hint_bits, COAP_RESOLVE_TYPE_REMOTE); + if (addr_info) { + // Create client session + self->session = coap_new_client_session(_coap_context, NULL, &addr_info->addr, COAP_PROTO_UDP); + if (!self->session) { + COAP_UDP_IP_CHANNEL_ERR("Failed to create client session"); + } + coap_free_address_info(addr_info); + } else { + COAP_UDP_IP_CHANNEL_ERR("Error resolving remote address: %s", remote_host); + } +} diff --git a/src/platform/riot/coap_udp_ip_channel.c b/src/platform/riot/coap_udp_ip_channel.c index db38ec224..1a56abf20 100644 --- a/src/platform/riot/coap_udp_ip_channel.c +++ b/src/platform/riot/coap_udp_ip_channel.c @@ -397,9 +397,9 @@ void *_CoapUdpIpChannel_connection_thread(void *arg) { return NULL; } -void CoapUdpIpChannel_ctor(CoapUdpIpChannel *self, const char *remote_address, int remote_protocol_family) { +void CoapUdpIpChannel_ctor(CoapUdpIpChannel *self, const char *remote_host, int remote_protocol_family) { assert(self != NULL); - assert(remote_address != NULL); + assert(remote_host != NULL); // Initialize global coap server if not already done if (!_coap_is_globals_initialized) { @@ -432,7 +432,8 @@ void CoapUdpIpChannel_ctor(CoapUdpIpChannel *self, const char *remote_address, i self->state_mutex = (mutex_t)MUTEX_INIT; // Convert host to udp socket - if (inet_pton(remote_protocol_family, remote_address, self->remote.addr.ipv6) == 1) { + // TODO: Support hostnames and domain names + if (inet_pton(remote_protocol_family, remote_host, self->remote.addr.ipv6) == 1) { self->remote.family = remote_protocol_family; self->remote.port = CONFIG_GCOAP_PORT; } else { diff --git a/test/platform/posix/coap_channel_federated_test/receiver/CMakeLists.txt b/test/platform/posix/coap_channel_federated_test/receiver/CMakeLists.txt new file mode 100644 index 000000000..c11883e45 --- /dev/null +++ b/test/platform/posix/coap_channel_federated_test/receiver/CMakeLists.txt @@ -0,0 +1,12 @@ +cmake_minimum_required(VERSION 3.20.0) +project(reactor-uc-posix-coap-channel-test) + +set(NETWORK_CHANNEL_COAP ON CACHE BOOL "Use CoAP NetworkChannel") +set(FEDERATED ON CACHE BOOL "Compile with federated sources") +set(PLATFORM "POSIX" CACHE STRING "Platform to target") + +add_subdirectory(../../../../../ reactor-uc) + +add_executable(app main.c) + +target_link_libraries(app PRIVATE reactor-uc) diff --git a/test/platform/posix/coap_channel_federated_test/receiver/main.c b/test/platform/posix/coap_channel_federated_test/receiver/main.c new file mode 100755 index 000000000..de59a0e17 --- /dev/null +++ b/test/platform/posix/coap_channel_federated_test/receiver/main.c @@ -0,0 +1,112 @@ +#define FEDERATED true +#define REMOTE_IP_ADDRESS "192.168.100.1" + +#include "reactor-uc/platform/posix/coap_udp_ip_channel.h" +#include "reactor-uc/schedulers/dynamic/scheduler.h" +#include "reactor-uc/reactor-uc.h" + +#define REMOTE_PROTOCOL_FAMILY AF_INET6 + +typedef struct { + int size; + char msg[512]; +} lf_msg_t; + +lf_ret_t deserialize_msg_t(void *user_struct, const unsigned char *msg_buf, size_t msg_size) { + (void)msg_size; + + lf_msg_t *msg = user_struct; + memcpy(&msg->size, msg_buf, sizeof(msg->size)); + memcpy(msg->msg, msg_buf + sizeof(msg->size), msg->size); + + return LF_OK; +} + +LF_DEFINE_REACTION_STRUCT(Receiver, r, 0) +LF_DEFINE_REACTION_CTOR(Receiver, r, 0, NULL, NULL) +LF_DEFINE_INPUT_STRUCT(Receiver, in, 1, 0, lf_msg_t, 0) +LF_DEFINE_INPUT_CTOR(Receiver, in, 1, 0, lf_msg_t, 0) + +typedef struct { + Reactor super; + LF_REACTION_INSTANCE(Receiver, r); + LF_PORT_INSTANCE(Receiver, in, 1); + int cnt; + LF_REACTOR_BOOKKEEPING_INSTANCES(1, 1, 0); +} Receiver; + +LF_DEFINE_REACTION_BODY(Receiver, r) { + LF_SCOPE_SELF(Receiver); + LF_SCOPE_ENV(); + LF_SCOPE_PORT(Receiver, in); + printf("Input triggered @ " PRINTF_TIME " with %s size %d\n", env->get_elapsed_logical_time(env), in->value.msg, + in->value.size); + + if (strcmp(in->value.msg, "Hello From Sender") == 0) { + // Exit with 0 to show that the test passed. + exit(0); + } else { + exit(1); + } +} + +LF_REACTOR_CTOR_SIGNATURE_WITH_PARAMETERS(Receiver, InputExternalCtorArgs *in_external) { + LF_REACTOR_CTOR_PREAMBLE(); + LF_REACTOR_CTOR(Receiver); + LF_INITIALIZE_REACTION(Receiver, r, NEVER); + LF_INITIALIZE_INPUT(Receiver, in, 1, in_external); + + // Register reaction as an effect of in + LF_PORT_REGISTER_EFFECT(self->in, self->r, 1); +} + +LF_DEFINE_FEDERATED_INPUT_CONNECTION_STRUCT(Receiver, in, lf_msg_t, 5); +LF_DEFINE_FEDERATED_INPUT_CONNECTION_CTOR(Receiver, in, lf_msg_t, 5, MSEC(100), false, 0); + +typedef struct { + FederatedConnectionBundle super; + CoapUdpIpChannel channel; + LF_FEDERATED_INPUT_CONNECTION_INSTANCE(Receiver, in); + LF_FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(1, 0) +} LF_FEDERATED_CONNECTION_BUNDLE_TYPE(Receiver, Sender); + +LF_FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Receiver, Sender) { + LF_FEDERATED_CONNECTION_BUNDLE_CTOR_PREAMBLE(); + CoapUdpIpChannel_ctor(&self->channel, REMOTE_IP_ADDRESS, REMOTE_PROTOCOL_FAMILY); + LF_FEDERATED_CONNECTION_BUNDLE_CALL_CTOR(); + LF_INITIALIZE_FEDERATED_INPUT_CONNECTION(Receiver, in, deserialize_msg_t); +} + +LF_DEFINE_STARTUP_COORDINATOR_STRUCT(Federate, 1, 6); +LF_DEFINE_STARTUP_COORDINATOR_CTOR(Federate, 1, 1, 6); + +LF_DEFINE_CLOCK_SYNC_STRUCT(Federate, 1, 3); +// LF_DEFINE_CLOCK_SYNC_DEFAULTS_CTOR(Federate, 1, 3, false); + +typedef struct { + Reactor super; + LF_CHILD_REACTOR_INSTANCE(Receiver, receiver, 1); + LF_FEDERATED_CONNECTION_BUNDLE_INSTANCE(Receiver, Sender); + LF_FEDERATE_BOOKKEEPING_INSTANCES(1); + LF_CHILD_INPUT_SOURCES(receiver, in, 1, 1, 0); + LF_DEFINE_STARTUP_COORDINATOR(Federate); + LF_DEFINE_CLOCK_SYNC(Federate); +} MainRecv; + +LF_REACTOR_CTOR_SIGNATURE(MainRecv) { + LF_REACTOR_CTOR(MainRecv); + LF_FEDERATE_CTOR_PREAMBLE(); + LF_DEFINE_CHILD_INPUT_ARGS(receiver, in, 1, 1); + LF_INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Receiver, receiver, 1, _receiver_in_args[i]); + LF_INITIALIZE_FEDERATED_CONNECTION_BUNDLE(Receiver, Sender); + lf_connect_federated_input(&self->Receiver_Sender_bundle.inputs[0]->super, &self->receiver->in[0].super); + LF_INITIALIZE_STARTUP_COORDINATOR(Federate); + // LF_INITIALIZE_CLOCK_SYNC(Federate); +} + +LF_ENTRY_POINT_FEDERATED(MainRecv, 32, 32, 32, SEC(1), true, 1, false) + +int main() { + lf_start(); + return 0; +} diff --git a/test/platform/posix/coap_channel_federated_test/run.sh b/test/platform/posix/coap_channel_federated_test/run.sh new file mode 100755 index 000000000..3ed1ee128 --- /dev/null +++ b/test/platform/posix/coap_channel_federated_test/run.sh @@ -0,0 +1,96 @@ +#!/bin/bash +set -e + +# --------------------- Networking setup --------------------- + +# Cleanup network if it exists already +sudo ip netns del sender_ns || true +sudo ip netns del receiver_ns || true + +# 1. Create network namespaces +echo "Setting up network namespaces..." +sudo ip netns add sender_ns +sudo ip netns add receiver_ns + +# Create virtual ethernet pair +sudo ip link add veth_sender type veth peer name veth_receiver + +# Move interfaces to namespaces +sudo ip link set veth_sender netns sender_ns +sudo ip link set veth_receiver netns receiver_ns + +# Configure IP addresses +sudo ip netns exec sender_ns ip addr add 192.168.100.1/24 dev veth_sender +sudo ip netns exec receiver_ns ip addr add 192.168.100.2/24 dev veth_receiver + +# Bring up interfaces +sudo ip netns exec sender_ns ip link set veth_sender up +sudo ip netns exec receiver_ns ip link set veth_receiver up +sudo ip netns exec sender_ns ip link set lo up +sudo ip netns exec receiver_ns ip link set lo up + +echo "Network setup complete. Building applications..." + +# Test connectivity +# sudo ip netns exec sender_ns ping 192.168.100.2 +# sudo ip netns exec receiver_ns ping 192.168.100.1 + + +# --------------------- BUILD AND RUN --------------------- + +# Build sender +cmake -B sender/build -S sender +make -C sender/build + +# Build receiver +cmake -B receiver/build -S receiver +make -C receiver/build + +echo "Building complete. Starting applications..." + +# Start the sender with a 60-second timeout in sender namespace +sudo ip netns exec sender_ns timeout 60 ./sender/build/app & +pid1=$! + +# Wait for 3 seconds +sleep 3 + +# Start the receiver with a 60-second timeout in receiver namespace +sudo ip netns exec receiver_ns timeout 60 ./receiver/build/app & +pid2=$! + +# Wait for both binaries to complete or timeout +wait $pid1 +exit_code_sender=$? + +wait $pid2 +exit_code_receiver=$? + +echo "Test completed" +echo "Sender exit code: $exit_code_sender" +echo "Receiver exit code: $exit_code_receiver" + +# Check if timeout occurred +if [ $exit_code_sender -eq 124 ]; then + echo "Error sender timed out" + echo "Test failed" + exit 1 +else + echo "Exit code of sender: $exit_code_sender" +fi + +if [ $exit_code_receiver -eq 124 ]; then + echo "Error receiver timed out" + echo "Test failed" + exit 1 +else + echo "Exit code of receiver: $exit_code_receiver" +fi + +# Check exit code +if [[ $exit_code_receiver -ne 0 ]]; then + echo "Error: Receiver received wrong message text" + exit 1 +fi + +echo "All tests passed." diff --git a/test/platform/posix/coap_channel_federated_test/sender/CMakeLists.txt b/test/platform/posix/coap_channel_federated_test/sender/CMakeLists.txt new file mode 100644 index 000000000..c11883e45 --- /dev/null +++ b/test/platform/posix/coap_channel_federated_test/sender/CMakeLists.txt @@ -0,0 +1,12 @@ +cmake_minimum_required(VERSION 3.20.0) +project(reactor-uc-posix-coap-channel-test) + +set(NETWORK_CHANNEL_COAP ON CACHE BOOL "Use CoAP NetworkChannel") +set(FEDERATED ON CACHE BOOL "Compile with federated sources") +set(PLATFORM "POSIX" CACHE STRING "Platform to target") + +add_subdirectory(../../../../../ reactor-uc) + +add_executable(app main.c) + +target_link_libraries(app PRIVATE reactor-uc) diff --git a/test/platform/posix/coap_channel_federated_test/sender/main.c b/test/platform/posix/coap_channel_federated_test/sender/main.c new file mode 100755 index 000000000..d78c6e8eb --- /dev/null +++ b/test/platform/posix/coap_channel_federated_test/sender/main.c @@ -0,0 +1,116 @@ +#define FEDERATED 1 +#define REMOTE_IP_ADDRESS "192.168.100.2" + +#include "reactor-uc/reactor-uc.h" +#include "reactor-uc/schedulers/dynamic/scheduler.h" +#include "reactor-uc/platform/posix/coap_udp_ip_channel.h" + +#define REMOTE_PROTOCOL_FAMILY AF_INET6 + +typedef struct { + int size; + char msg[512]; +} lf_msg_t; + +int serialize_msg_t(const void *user_struct, size_t user_struct_size, unsigned char *msg_buf) { + (void)user_struct_size; + const lf_msg_t *msg = user_struct; + + memcpy(msg_buf, &msg->size, sizeof(msg->size)); + memcpy(msg_buf + sizeof(msg->size), msg->msg, msg->size); + + return sizeof(msg->size) + msg->size; +} + +LF_DEFINE_TIMER_STRUCT(Sender, t, 1, 0) +LF_DEFINE_TIMER_CTOR(Sender, t, 1, 0) +LF_DEFINE_REACTION_STRUCT(Sender, r, 1) +LF_DEFINE_REACTION_CTOR(Sender, r, 0, NULL, NULL) +LF_DEFINE_OUTPUT_STRUCT(Sender, out, 1, lf_msg_t) +LF_DEFINE_OUTPUT_CTOR(Sender, out, 1) + +typedef struct { + Reactor super; + LF_TIMER_INSTANCE(Sender, t); + LF_REACTION_INSTANCE(Sender, r); + LF_PORT_INSTANCE(Sender, out, 1); + LF_REACTOR_BOOKKEEPING_INSTANCES(1, 2, 0); +} Sender; + +LF_DEFINE_REACTION_BODY(Sender, r) { + LF_SCOPE_SELF(Sender); + LF_SCOPE_ENV(); + LF_SCOPE_PORT(Sender, out); + + printf("Timer triggered @ " PRINTF_TIME "\n", env->get_elapsed_logical_time(env)); + lf_msg_t val; + strcpy(val.msg, "Hello From Sender"); + val.size = sizeof("Hello From Sender"); + lf_set(out, val); +} + +LF_REACTOR_CTOR_SIGNATURE_WITH_PARAMETERS(Sender, OutputExternalCtorArgs *out_external) { + LF_REACTOR_CTOR_PREAMBLE(); + LF_REACTOR_CTOR(Sender); + LF_INITIALIZE_REACTION(Sender, r, NEVER); + LF_INITIALIZE_TIMER(Sender, t, MSEC(0), SEC(1)); + LF_INITIALIZE_OUTPUT(Sender, out, 1, out_external); + + LF_TIMER_REGISTER_EFFECT(self->t, self->r); + LF_PORT_REGISTER_SOURCE(self->out, self->r, 1); +} + +LF_DEFINE_FEDERATED_OUTPUT_CONNECTION_STRUCT(Sender, out, lf_msg_t) +LF_DEFINE_FEDERATED_OUTPUT_CONNECTION_CTOR(Sender, out, lf_msg_t, 0) + +typedef struct { + FederatedConnectionBundle super; + CoapUdpIpChannel channel; + LF_FEDERATED_OUTPUT_CONNECTION_INSTANCE(Sender, out); + LF_FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(0, 1); +} LF_FEDERATED_CONNECTION_BUNDLE_TYPE(Sender, Receiver); + +LF_FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Sender, Receiver) { + LF_FEDERATED_CONNECTION_BUNDLE_CTOR_PREAMBLE(); + CoapUdpIpChannel_ctor(&self->channel, REMOTE_IP_ADDRESS, REMOTE_PROTOCOL_FAMILY); + LF_FEDERATED_CONNECTION_BUNDLE_CALL_CTOR(); + LF_INITIALIZE_FEDERATED_OUTPUT_CONNECTION(Sender, out, serialize_msg_t); +} + +LF_DEFINE_STARTUP_COORDINATOR_STRUCT(Federate, 1, 6); +LF_DEFINE_STARTUP_COORDINATOR_CTOR(Federate, 1, 1, 6); + +LF_DEFINE_CLOCK_SYNC_STRUCT(Federate, 1, 3); +// LF_DEFINE_CLOCK_SYNC_DEFAULTS_CTOR(Federate, 1, 3, true); + +// Reactor main +typedef struct { + Reactor super; + LF_CHILD_REACTOR_INSTANCE(Sender, sender, 1); + LF_FEDERATED_CONNECTION_BUNDLE_INSTANCE(Sender, Receiver); + LF_FEDERATE_BOOKKEEPING_INSTANCES(1); + LF_CHILD_OUTPUT_CONNECTIONS(sender, out, 1, 1, 1); + LF_CHILD_OUTPUT_EFFECTS(sender, out, 1, 1, 0); + LF_CHILD_OUTPUT_OBSERVERS(sender, out, 1, 1, 0); + LF_DEFINE_STARTUP_COORDINATOR(Federate); + LF_DEFINE_CLOCK_SYNC(Federate); +} MainSender; + +LF_REACTOR_CTOR_SIGNATURE(MainSender) { + LF_REACTOR_CTOR(MainSender); + LF_FEDERATE_CTOR_PREAMBLE(); + LF_DEFINE_CHILD_OUTPUT_ARGS(sender, out, 1, 1); + LF_INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Sender, sender, 1, _sender_out_args[i]); + LF_INITIALIZE_FEDERATED_CONNECTION_BUNDLE(Sender, Receiver); + LF_INITIALIZE_STARTUP_COORDINATOR(Federate); + // LF_INITIALIZE_CLOCK_SYNC(Federate); + lf_connect_federated_output((Connection *)self->Sender_Receiver_bundle.outputs[0], (Port *)self->sender->out); +} + +LF_ENTRY_POINT_FEDERATED(MainSender, 32, 32, 32, SEC(1), true, 1, false) + +int main() { + lf_start(); + exit(0); + return 0; +} diff --git a/test/platform/posix/coap_channel_test/CMakeLists.txt b/test/platform/posix/coap_channel_test/CMakeLists.txt new file mode 100644 index 000000000..a3ac798ff --- /dev/null +++ b/test/platform/posix/coap_channel_test/CMakeLists.txt @@ -0,0 +1,12 @@ +cmake_minimum_required(VERSION 3.20.0) +project(reactor-uc-posix-coap-channel-test) + +set(NETWORK_CHANNEL_COAP ON CACHE BOOL "Use CoAP NetworkChannel") +set(FEDERATED ON CACHE BOOL "Compile with federated sources") +set(PLATFORM "POSIX" CACHE STRING "Platform to target") + +add_subdirectory(../../../../ reactor-uc) + +add_executable(app main.c) + +target_link_libraries(app PRIVATE reactor-uc) diff --git a/test/platform/posix/coap_channel_test/main.c b/test/platform/posix/coap_channel_test/main.c new file mode 100644 index 000000000..64bf09c86 --- /dev/null +++ b/test/platform/posix/coap_channel_test/main.c @@ -0,0 +1,101 @@ +#define FEDERATED true + +#include "reactor-uc/platform/posix/coap_udp_ip_channel.h" +#include "reactor-uc/reactor-uc.h" +#include "unity.h" +#include "../../../unit/test_util.h" +#include +#include +#include + +#define MESSAGE_CONTENT "Hello World1234" +#define MESSAGE_CONNECTION_ID 42 +#define REMOTE_ADDRESS "::1" +#define REMOTE_PROTOCOL_FAMILY AF_INET6 + +Reactor parent; +FederatedEnvironment env; +Environment *_lf_environment = &env.super; +FederatedConnectionBundle bundle; +FederatedConnectionBundle *net_bundles[] = {&bundle}; +StartupCoordinator startup_coordinator; + +CoapUdpIpChannel _coap_channel; +NetworkChannel *channel = &_coap_channel.super; + +bool server_callback_called = false; +bool client_callback_called = false; + +void setUp(void) { + /* init environment */ + FederatedEnvironment_ctor(&env, NULL, NULL, false, net_bundles, 1, &startup_coordinator, NULL); + + /* init channel */ + CoapUdpIpChannel_ctor(&_coap_channel, REMOTE_ADDRESS, REMOTE_PROTOCOL_FAMILY); + + /* init bundle */ + FederatedConnectionBundle_ctor(&bundle, &parent, channel, NULL, NULL, 0, NULL, NULL, 0, 0); +} + +void tearDown(void) { + channel->free(channel); +} + +/* TESTS */ +void test_open_connection_non_blocking(void) { + TEST_ASSERT_OK(channel->open_connection(channel)); + + sleep(2); // TODO: 2s is a bit long. 1s even is long. This should not need 2s but work with 1s or less even! + + TEST_ASSERT_TRUE(channel->is_connected(channel)); +} + +void server_callback_handler(FederatedConnectionBundle *self, const FederateMessage *_msg) { + (void)self; + const TaggedMessage *msg = &_msg->message.tagged_message; + printf("\nServer: Received message with connection number %" PRId32 " and content %s\n", msg->conn_id, + (char *)msg->payload.bytes); + TEST_ASSERT_EQUAL_STRING(MESSAGE_CONTENT, (char *)msg->payload.bytes); + TEST_ASSERT_EQUAL(MESSAGE_CONNECTION_ID, msg->conn_id); + + server_callback_called = true; +} + +void test_client_send_and_server_recv(void) { + // open connection + TEST_ASSERT_OK(channel->open_connection(channel)); + + // Wait until channel is connected + while (!channel->is_connected(channel)) { + sleep(1); + } + + // register receive callback for handling incoming messages + channel->register_receive_callback(channel, server_callback_handler, NULL); + + /* create message */ + FederateMessage msg; + msg.which_message = FederateMessage_tagged_message_tag; + + TaggedMessage *port_message = &msg.message.tagged_message; + port_message->conn_id = MESSAGE_CONNECTION_ID; + const char *message = MESSAGE_CONTENT; + memcpy(port_message->payload.bytes, message, sizeof(MESSAGE_CONTENT)); // NOLINT + port_message->payload.size = sizeof(MESSAGE_CONTENT); + + /* send message */ + TEST_ASSERT_OK(channel->send_blocking(channel, &msg)); + + /* wait for the callback */ + sleep(1); + + /* check if the callback was called */ + TEST_ASSERT_TRUE(server_callback_called); +} + +int main(void) { + UNITY_BEGIN(); + RUN_TEST(test_open_connection_non_blocking); + RUN_TEST(test_client_send_and_server_recv); + exit(UNITY_END()); +} diff --git a/test/platform/posix/coap_channel_test/run.sh b/test/platform/posix/coap_channel_test/run.sh new file mode 100755 index 000000000..c9d4e0413 --- /dev/null +++ b/test/platform/posix/coap_channel_test/run.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +# Make test +cmake -Bbuild +make -C build + +# Evaluate make output +if [ $? -eq 0 ]; then + echo "All tests passed." +else + echo "$? tests failed." >&2 + exit 1 +fi + +# Run test +./build/app + +# Evaluate test output +if [ $? -eq 0 ]; then + echo "All tests passed." +else + echo "$? tests failed." >&2 + exit 1 +fi \ No newline at end of file diff --git a/test/platform/posix/runAll.sh b/test/platform/posix/runAll.sh new file mode 100755 index 000000000..0c8050bca --- /dev/null +++ b/test/platform/posix/runAll.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +set -e + +# Iterate over each folder and execute the command +for dir in ./*; do + if [ -d $dir ]; then + echo "Entering $dir" + pushd $dir + ./run.sh + popd + fi +done