99EXCHANGE_NAME = 'amqp.opencti'
1010
1111
12- class OpenCTIConnector :
12+ class OpenCTIConnectorHelper :
1313 """
1414 Python API for OpenCTI connector
1515 :param identifier: Connector identifier
@@ -35,40 +35,48 @@ def __init__(self, identifier, config, rabbitmq_hostname, rabbitmq_port, rabbitm
3535 # Encode the configuration
3636 config_encoded = base64 .b64encode (json .dumps (self .config ).encode ('utf-8' )).decode ('utf-8' )
3737
38+ # Connect to RabbitMQ
39+ self .connection = self ._connect ()
40+ self .channel = self ._create_channel ()
41+ logging .info ('Successfully connected to RabbitMQ' )
42+
3843 # Declare the queue for the connector
39- channel = self ._connect ()
40- channel .queue_delete (self .queue_name )
41- channel .queue_declare (self .queue_name , durable = True , arguments = {'config' : config_encoded })
42- channel .queue_bind (queue = self .queue_name , exchange = EXCHANGE_NAME , routing_key = self .routing_key )
43- channel .close ()
44- self ._disconnect ()
44+ self .channel .queue_delete (self .queue_name )
45+ self .channel .queue_declare (self .queue_name , durable = True , arguments = {'config' : config_encoded })
46+ self .channel .queue_bind (queue = self .queue_name , exchange = EXCHANGE_NAME , routing_key = self .routing_key )
4547
4648 def _connect (self ):
4749 try :
4850 credentials = pika .PlainCredentials (self .rabbitmq_username , self .rabbitmq_password )
4951 parameters = pika .ConnectionParameters (self .rabbitmq_hostname , self .rabbitmq_port , '/' , credentials )
50- self .connection = pika .BlockingConnection (parameters )
52+ return pika .BlockingConnection (parameters )
53+ except :
54+ logging .error ('Unable to connect to RabbitMQ with the given parameters' )
55+
56+ def _create_channel (self ):
57+ try :
5158 channel = self .connection .channel ()
5259 channel .exchange_declare (exchange = EXCHANGE_NAME , exchange_type = 'direct' , durable = True )
5360 return channel
5461 except :
55- logging .error ('Unable to connect to RabbitMQ with the given parameters' )
62+ logging .error ('Unable to open channel to RabbitMQ with the given parameters' )
5663
57- def _disconnect (self ):
58- if self .connection is not None and not self .connection . is_closed :
59- self .connection . close ()
64+ def _reconnect (self ):
65+ self .connection = self ._connect ()
66+ self . channel = self ._create_channel ()
6067
6168 def send_stix2_bundle (self , bundle ):
6269 """
6370 This method send a STIX2 bundle to RabbitMQ to be consumed by workers
6471 :param bundle: A valid STIX2 bundle
6572 """
66- logging .info ('Sending a STI2 bundle to RabbitMQ...' )
73+ if not self .channel .is_open :
74+ self ._reconnect ()
6775
6876 # Validate the STIX 2 bundle
6977 validation = validate_string (bundle )
7078 if not validation .is_valid :
71- raise ValueError ('The bundle is not a valid STIX2 JSON' )
79+ raise ValueError ('The bundle is not a valid STIX2 JSON:' + bundle )
7280
7381 # Prepare the message
7482 message = {
@@ -77,8 +85,5 @@ def send_stix2_bundle(self, bundle):
7785 }
7886
7987 # Send the message
80- channel = self ._connect ()
81- channel .basic_publish (EXCHANGE_NAME , self .routing_key , json .dumps (message ))
82- channel .close ()
83- self ._disconnect ()
88+ self .channel .basic_publish (EXCHANGE_NAME , self .routing_key , json .dumps (message ))
8489 logging .info ('STIX2 bundle has been sent' )
0 commit comments