@@ -1009,161 +1009,6 @@ def test_operation_sub_unsub(self):
10091009 client .stop ()
10101010 callbacks .future_stopped .result (TIMEOUT )
10111011
1012- total_callbacks = 0
1013- all_packets_received = Future ()
1014- mutex = Lock ()
1015- received_subscriptions = [0 ] * 10
1016-
1017- def subscriber1_callback (self , publish_received_data : mqtt5 .PublishReceivedData ):
1018- self .mutex .acquire ()
1019- var = publish_received_data .publish_packet .payload
1020- self .received_subscriptions [int (var )] = 1
1021- self .total_callbacks = self .total_callbacks + 1
1022- if self .total_callbacks == 10 :
1023- self .all_packets_received .set_result (None )
1024- self .mutex .release ()
1025-
1026- def subscriber2_callback (self , publish_received_data : mqtt5 .PublishReceivedData ):
1027- self .mutex .acquire ()
1028- var = publish_received_data .publish_packet .payload
1029- self .received_subscriptions [int (var )] = 1
1030- self .total_callbacks = self .total_callbacks + 1
1031- if self .total_callbacks == 10 :
1032- self .all_packets_received .set_result (None )
1033- self .mutex .release ()
1034-
1035- def test_operation_shared_subscription (self ):
1036- input_host_name = _get_env_variable ("AWS_TEST_MQTT5_IOT_CORE_HOST" )
1037- input_cert = _get_env_variable ("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT" )
1038- input_key = _get_env_variable ("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY" )
1039-
1040- client_id_subscriber1 = create_client_id ()
1041- client_id_subscriber2 = create_client_id ()
1042- client_id_publisher = create_client_id ()
1043-
1044- testTopic = "test/MQTT5_Binding_Python_" + client_id_publisher
1045- sharedTopicfilter = "$share/crttest/test/MQTT5_Binding_Python_" + client_id_publisher
1046-
1047- tls_ctx_options = io .TlsContextOptions .create_client_with_mtls_from_path (
1048- input_cert ,
1049- input_key
1050- )
1051-
1052- # subscriber 1
1053- connect_subscriber1_options = mqtt5 .ConnectPacket (client_id = client_id_subscriber1 )
1054- subscriber1_generic_callback = Mqtt5TestCallbacks ()
1055- subscriber1_options = mqtt5 .ClientOptions (
1056- host_name = input_host_name ,
1057- port = 8883 ,
1058- tls_ctx = io .ClientTlsContext (tls_ctx_options ),
1059- connect_options = connect_subscriber1_options ,
1060- on_publish_callback_fn = self .subscriber1_callback ,
1061- on_lifecycle_event_stopped_fn = subscriber1_generic_callback .on_lifecycle_stopped ,
1062- on_lifecycle_event_attempting_connect_fn = subscriber1_generic_callback .on_lifecycle_attempting_connect ,
1063- on_lifecycle_event_connection_success_fn = subscriber1_generic_callback .on_lifecycle_connection_success ,
1064- on_lifecycle_event_connection_failure_fn = subscriber1_generic_callback .on_lifecycle_connection_failure
1065- )
1066- subscriber1_client = mqtt5 .Client (client_options = subscriber1_options )
1067-
1068- # subscriber 2
1069- connect_subscriber2_options = mqtt5 .ConnectPacket (client_id = client_id_subscriber2 )
1070- subscriber2_generic_callback = Mqtt5TestCallbacks ()
1071- subscriber2_options = mqtt5 .ClientOptions (
1072- host_name = input_host_name ,
1073- port = 8883 ,
1074- tls_ctx = io .ClientTlsContext (tls_ctx_options ),
1075- connect_options = connect_subscriber2_options ,
1076- on_publish_callback_fn = self .subscriber2_callback ,
1077- on_lifecycle_event_stopped_fn = subscriber2_generic_callback .on_lifecycle_stopped ,
1078- on_lifecycle_event_attempting_connect_fn = subscriber2_generic_callback .on_lifecycle_attempting_connect ,
1079- on_lifecycle_event_connection_success_fn = subscriber2_generic_callback .on_lifecycle_connection_success ,
1080- on_lifecycle_event_connection_failure_fn = subscriber2_generic_callback .on_lifecycle_connection_failure
1081- )
1082- subscriber2_client = mqtt5 .Client (client_options = subscriber2_options )
1083-
1084- # publisher
1085- connect_publisher_options = mqtt5 .ConnectPacket (client_id = client_id_publisher )
1086- publisher_generic_callback = Mqtt5TestCallbacks ()
1087-
1088- publisher_options = mqtt5 .ClientOptions (
1089- host_name = input_host_name ,
1090- port = 8883 ,
1091- tls_ctx = io .ClientTlsContext (tls_ctx_options ),
1092- connect_options = connect_publisher_options ,
1093- on_lifecycle_event_stopped_fn = publisher_generic_callback .on_lifecycle_stopped ,
1094- on_lifecycle_event_attempting_connect_fn = publisher_generic_callback .on_lifecycle_attempting_connect ,
1095- on_lifecycle_event_connection_success_fn = publisher_generic_callback .on_lifecycle_connection_success ,
1096- on_lifecycle_event_connection_failure_fn = publisher_generic_callback .on_lifecycle_connection_failure
1097- )
1098- publisher_client = mqtt5 .Client (client_options = publisher_options )
1099-
1100- print ("Connecting all 3 clients\n " )
1101- subscriber1_client .start ()
1102- subscriber1_generic_callback .future_connection_success .result (TIMEOUT )
1103-
1104- subscriber2_client .start ()
1105- subscriber2_generic_callback .future_connection_success .result (TIMEOUT )
1106-
1107- publisher_client .start ()
1108- publisher_generic_callback .future_connection_success .result (TIMEOUT )
1109- print ("All clients connected\n " )
1110-
1111- # Subscriber 1
1112- subscriptions = []
1113- subscriptions .append (mqtt5 .Subscription (topic_filter = sharedTopicfilter , qos = mqtt5 .QoS .AT_LEAST_ONCE ))
1114- subscribe_packet = mqtt5 .SubscribePacket (
1115- subscriptions = subscriptions )
1116- subscribe_future = subscriber1_client .subscribe (subscribe_packet = subscribe_packet )
1117- suback_packet1 = subscribe_future .result (TIMEOUT )
1118- self .assertIsInstance (suback_packet1 , mqtt5 .SubackPacket )
1119-
1120- # Subscriber 2
1121- subscriptions2 = []
1122- subscriptions2 .append (mqtt5 .Subscription (topic_filter = sharedTopicfilter , qos = mqtt5 .QoS .AT_LEAST_ONCE ))
1123- subscribe_packet2 = mqtt5 .SubscribePacket (
1124- subscriptions = subscriptions2 )
1125- subscribe_future2 = subscriber2_client .subscribe (subscribe_packet = subscribe_packet2 )
1126- suback_packet2 = subscribe_future2 .result (TIMEOUT )
1127- self .assertIsInstance (suback_packet2 , mqtt5 .SubackPacket )
1128-
1129- publishes = 10
1130- for x in range (0 , publishes ):
1131- packet = mqtt5 .PublishPacket (
1132- payload = f"{ x } " ,
1133- qos = mqtt5 .QoS .AT_LEAST_ONCE ,
1134- topic = testTopic
1135- )
1136- publish_future = publisher_client .publish (packet )
1137- publish_future .result (TIMEOUT )
1138-
1139- self .all_packets_received .result (TIMEOUT )
1140-
1141- topic_filters = []
1142- topic_filters .append (testTopic )
1143- unsubscribe_packet = mqtt5 .UnsubscribePacket (topic_filters = testTopic )
1144-
1145- unsubscribe_future = subscriber1_client .unsubscribe (unsubscribe_packet )
1146- unsuback_packet = unsubscribe_future .result (TIMEOUT )
1147- self .assertIsInstance (unsuback_packet , mqtt5 .UnsubackPacket )
1148-
1149- unsubscribe_future = subscriber2_client .unsubscribe (unsubscribe_packet )
1150- unsuback_packet = unsubscribe_future .result (TIMEOUT )
1151- self .assertIsInstance (unsuback_packet , mqtt5 .UnsubackPacket )
1152-
1153- self .assertEqual (self .total_callbacks , 10 )
1154-
1155- for e in self .received_subscriptions :
1156- self .assertEqual (e , 1 )
1157-
1158- subscriber1_client .stop ()
1159- subscriber1_generic_callback .future_stopped .result (TIMEOUT )
1160-
1161- subscriber2_client .stop ()
1162- subscriber2_generic_callback .future_stopped .result (TIMEOUT )
1163-
1164- publisher_client .stop ()
1165- publisher_generic_callback .future_stopped .result (TIMEOUT )
1166-
11671012 def test_operation_will (self ):
11681013 input_host_name = _get_env_variable ("AWS_TEST_MQTT5_IOT_CORE_HOST" )
11691014 input_cert = _get_env_variable ("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT" )
0 commit comments