11# distutils: language = c++
22# cython: always_allow_keywords=True
3- from libcpp.deque cimport deque as cppdq
43
54from dxfeed.core.utils.helpers cimport *
65from dxfeed.core.utils.helpers import *
@@ -12,6 +11,7 @@ from datetime import datetime
1211import pandas as pd
1312from typing import Optional, Union, Iterable
1413from warnings import warn
14+ from weakref import WeakSet
1515
1616# for importing variables
1717import dxfeed.core.listeners.listener as lis
@@ -53,24 +53,30 @@ cdef class ConnectionClass:
5353 Data structure that contains connection
5454 """
5555 cdef clib.dxf_connection_t connection
56- # sub_ptr_list contains pointers to all subscriptions related to current connection
57- cdef cppdq[clib.dxf_subscription_t * ] sub_ptr_list
58- # each subscription has its own index in a list
59- cdef int subs_order
56+ cdef object __sub_refs
6057
6158 def __init__ (self ):
62- self .subs_order = 0
59+ self .__sub_refs = WeakSet()
6360
6461 def __dealloc__ (self ):
6562 dxf_close_connection(self )
6663
64+ def get_sub_refs (self ):
65+ """
66+ Method to get list of references to all subscriptions related to current connection
67+
68+ Returns
69+ -------
70+ :list
71+ List of weakref objects. Empty list if no refs
72+ """
73+ refs = list (self .__sub_refs) if self .__sub_refs else list ()
74+ return refs
75+
6776 cpdef SubscriptionClass make_new_subscription(self , data_len: int ):
6877 cdef SubscriptionClass out = SubscriptionClass(data_len)
6978 out.connection = self .connection
70- self .sub_ptr_list.push_back(& out.subscription) # append pointer to new subscription
71- out.subscription_order = self .subs_order # assign each subscription an index
72- self .subs_order += 1
73- out.con_sub_list_ptr = & self .sub_ptr_list # reverse pointer to pointers list
79+ self .__sub_refs.add(out)
7480 return out
7581
7682
@@ -80,9 +86,8 @@ cdef class SubscriptionClass:
8086 """
8187 cdef clib.dxf_connection_t connection
8288 cdef clib.dxf_subscription_t subscription
83- cdef int subscription_order # index in list of subscription pointers
84- cdef cppdq[clib.dxf_subscription_t * ] * con_sub_list_ptr # pointer to list of subscription pointers
8589 cdef dxf_event_listener_t listener
90+ cdef object __weakref__ # Weak referencing enabling
8691 cdef object event_type_str
8792 cdef list columns
8893 cdef object data
@@ -185,7 +190,7 @@ def dxf_create_connection_auth_bearer(address: Union[str, unicode, bytes],
185190 address = address.encode(' utf-8' )
186191 token = token.encode(' utf-8' )
187192 clib.dxf_create_connection_auth_bearer(address, token,
188- NULL , NULL , NULL , NULL , NULL , & cc.connection)
193+ NULL , NULL , NULL , NULL , NULL , & cc.connection)
189194 error_code = process_last_error(verbose = False )
190195 if error_code:
191196 raise RuntimeError (f" In underlying C-API library error {error_code} occurred!" )
@@ -366,14 +371,9 @@ def dxf_close_connection(ConnectionClass cc):
366371 cc: ConnectionClass
367372 Variable with connection information
368373 """
369-
370- # close all subscriptions before closing the connection
371- for i in range (cc.sub_ptr_list.size()):
372- if cc.sub_ptr_list[i]: # subscription should not be closed previously
373- clib.dxf_close_subscription(cc.sub_ptr_list[i][0 ])
374- cc.sub_ptr_list[i][0 ] = NULL # mark subscription as closed
375-
376- cc.sub_ptr_list.clear()
374+ related_subs = cc.get_sub_refs()
375+ for sub in related_subs:
376+ dxf_close_subscription(sub)
377377
378378 if cc.connection:
379379 clib.dxf_close_connection(cc.connection)
@@ -391,7 +391,6 @@ def dxf_close_subscription(SubscriptionClass sc):
391391 if sc.subscription:
392392 clib.dxf_close_subscription(sc.subscription)
393393 sc.subscription = NULL
394- sc.con_sub_list_ptr[0 ][sc.subscription_order] = NULL
395394
396395def dxf_get_current_connection_status (ConnectionClass cc , return_str: bool = True ):
397396 """
0 commit comments