From c57e1b3ac4d3cba2513ee64a7ba4084b5b8e7b6b Mon Sep 17 00:00:00 2001 From: Niranjan Kumar Date: Mon, 21 Jul 2025 16:15:57 +0530 Subject: [PATCH 1/2] Improve GraphQL query_params handling --- .../data/search_criteria.xml | 5 +- .../fetch_social_registry_beneficiary.py | 175 +++++++++--------- ...etch_social_registry_beneficiary_views.xml | 2 +- 3 files changed, 87 insertions(+), 95 deletions(-) diff --git a/g2p_social_registry_importer/data/search_criteria.xml b/g2p_social_registry_importer/data/search_criteria.xml index fd8e3b5b..c42aaa13 100644 --- a/g2p_social_registry_importer/data/search_criteria.xml +++ b/g2p_social_registry_importer/data/search_criteria.xml @@ -74,8 +74,7 @@ } - { - getRegistrants(limit:2){ + { name, isGroup, givenName, @@ -125,7 +124,7 @@ expiryDate } } - } + diff --git a/g2p_social_registry_importer/models/fetch_social_registry_beneficiary.py b/g2p_social_registry_importer/models/fetch_social_registry_beneficiary.py index 993ce801..0f59cc97 100644 --- a/g2p_social_registry_importer/models/fetch_social_registry_beneficiary.py +++ b/g2p_social_registry_importer/models/fetch_social_registry_beneficiary.py @@ -36,7 +36,7 @@ class G2PFetchSocialRegistryBeneficiary(models.Model): domain=("[('target_type', '=', target_registry)]"), ) query_params = fields.Text(default="""{}""") - query = fields.Text(required=True, default="""{}""") + query_fields = fields.Text(required=True, default="""{}""") output_mapping = fields.Text(required=True, default="""{}""") last_sync_date = fields.Datetime(string="Last synced on", required=False) @@ -286,72 +286,42 @@ def build_request_body(self, today_isoformat, message_id, transaction_id, refere "message": message, } - def get_graphql_query(self, limit=None, offset=None, order=None): - query = self.query.strip() - - graphql_query = query[0:-1] + "totalRegistrantCount }" - _logger.debug(query) + def get_graphql_query(self, query_fields, current_limit=None, current_offset=None, **kwargs): + params = [] if self.target_registry: - index = graphql_query.find("(") + 1 is_group = str(self.target_registry == "group").lower() - if not index: - get_registrants_index = graphql_query.find("getRegistrants") + 14 - graphql_query = ( - graphql_query[:get_registrants_index] + "()" + graphql_query[get_registrants_index:] - ) - index = graphql_query.find("(") + 1 - - graphql_query = graphql_query[:index] + f"isGroup: {is_group}" + graphql_query[index:] - - else: - graphql_query = graphql_query[:index] + f"isGroup: {is_group}," + graphql_query[index:] + params.append(f"isGroup: {is_group}") if self.last_sync_date: - index = graphql_query.find("(") + 1 - if not index: - get_registrants_index = graphql_query.find("getRegistrants") + 14 - graphql_query = ( - graphql_query[:get_registrants_index] + "()" + graphql_query[get_registrants_index:] - ) - index = graphql_query.find("(") + 1 - graphql_query = ( - graphql_query[:index] - + f'lastSyncDate: "{self.last_sync_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")}"' - + graphql_query[index:] - ) + params.append(f'lastSyncDate: "{self.last_sync_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")}"') + + # Add pagination parameters explicitly + if current_limit is not None: + params.append(f"limit: {current_limit}") + if current_offset is not None: + params.append(f"offset: {current_offset}") + + # Handle additional kwargs (skip ones already handled) + for key, value in kwargs.items(): + if key in ["limit", "offset", "isGroup", "lastSyncDate"]: + continue + + if value is not None: + if isinstance(value, str): + params.append(f'{key}: "{value}"') + elif isinstance(value, bool): + params.append(f"{key}: {str(value).lower()}") + else: + params.append(f"{key}: {value}") - else: - graphql_query = ( - graphql_query[:index] - + f'lastSyncDate: "{self.last_sync_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")}",' - + graphql_query[index:] - ) - # Add pagination parameters - if limit is not None or offset is not None: - index = graphql_query.find("(") + 1 - pagination_params = [] - - if limit is not None: - pagination_params.append(f"limit: {limit}") - if offset is not None: - pagination_params.append(f"offset: {offset}") - if order is not None: - pagination_params.append(f'order: "{order}"') - - pagination_str = ", ".join(pagination_params) - - if index == 0: - get_registrants_index = graphql_query.find("getRegistrants") + 14 - graphql_query = ( - graphql_query[:get_registrants_index] + "()" + graphql_query[get_registrants_index:] - ) - index = graphql_query.find("(") + 1 - graphql_query = graphql_query[:index] + pagination_str + graphql_query[index:] - else: - graphql_query = graphql_query[:index] + pagination_str + "," + graphql_query[index:] + # Build the parameters string + params_str = ", ".join(params) + + # Construct the final GraphQL query + graphql_query = f"{{ getRegistrants({params_str}) {query_fields} totalRegistrantCount }}" - _logger.debug("updated graphql query", graphql_query) + _logger.debug("updated graphql query: %s", graphql_query) return graphql_query.strip() def get_total_registrant_count(self): @@ -370,12 +340,14 @@ def get_total_registrant_count(self): transaction_id = str(uuid.uuid4()) reference_id = str(uuid.uuid4()) - # Build count-only query - count_query = "{totalRegistrantCount}" + count_query_fields = "{ totalRegistrantCount }" + query_params = self.get_parsed_query_params() or {} + # Override the query_fields explicitly + graphql_query = self.get_graphql_query(query_fields=count_query_fields, **query_params) # Create request body for count data = self.build_request_body( - today_isoformat, message_id, transaction_id, reference_id, count_query + today_isoformat, message_id, transaction_id, reference_id, graphql_query ) # Make request @@ -702,48 +674,65 @@ def fetch_social_registry_beneficiary(self): try: self.write({"job_status": "running", "start_datetime": fields.Datetime.now()}) + + query_fields = self.query_fields.strip() query_params = self.get_parsed_query_params() - limit = query_params.get("limit") - offset = query_params.get("offset", 0) - order = query_params.get("order", "id asc") - current_offset = offset or 0 - total_processed_records = 0 + max_registrant_per_batch = int( + self.env["ir.config_parameter"] + .sudo() + .get_param("g2p_import_social_registry.max_registrants_count_job_queue", 100) + ) - # If the limit is not specified in query_params, use the total count as the limit - if not limit: + total_processed_records = 0 # Initialize variable + + # Determine processing parameters based on conditions + if query_params is None: + # fetch all records total_count = self.get_total_registrant_count() if total_count == 0: message = _("No registrants found in the Social Registry.") kind = "warning" raise ValueError("Empty Social Registry") - limit = total_count + current_limit = total_count + current_offset = 0 + query_params = {} - # max_registrant is the batch size for pagination and queue - max_registrant_per_batch = int( - self.env["ir.config_parameter"] - .sudo() - .get_param("g2p_import_social_registry.max_registrants_count_job_queue", 100) - ) + else: + limit = query_params.get("limit") + offset = query_params.get("offset") + + if offset is not None: + # limit with offset - paginated fetch from specific offset + current_limit = limit + current_offset = offset + else: + # limit without offset - fetch from beginning + current_limit = limit + current_offset = 0 _logger.info( "Starting Social Registry fetch - Limit: %s, Offset: %s, Batch size: %s", - limit, + current_limit, current_offset, max_registrant_per_batch, ) - if max_registrant_per_batch >= limit: - # Fetch Records synchronously - paginated_query = self.get_graphql_query(limit, current_offset, order) + # Process based on batch size threshold + if max_registrant_per_batch >= current_limit: + # Synchronous processing + paginated_query = self.get_graphql_query( + query_fields, current_limit, current_offset, **query_params + ) + registrants = self.fetch_registrants_batch(paginated_query) if registrants: sticky = True - message = _("Successfully processed %s registrants synchronously.") % len(registrants) + message = _("Successfully processed %s registrants.") % len(registrants) kind = "success" self.process_registrants(registrants) - total_processed_records += len(registrants) + total_processed_records = len(registrants) else: message = _( "No registrants found. " @@ -751,16 +740,18 @@ def fetch_social_registry_beneficiary(self): "check if Social Registry contains data." ) kind = "warning" + else: # Process asynchronously in batches - while total_processed_records < limit: - # Calculate the number of records to fetch in this batch - batch_size = min(max_registrant_per_batch, limit - total_processed_records) + while total_processed_records < current_limit: + # Calculate batch size for this iteration + batch_size = min(max_registrant_per_batch, current_limit - total_processed_records) - # Use batch_size as the limit in the paginated GraphQL query - paginated_query = self.get_graphql_query(batch_size, current_offset, order) + # Build and execute paginated query + paginated_query = self.get_graphql_query( + query_fields, batch_size, current_offset, **query_params + ) - # Fetch the registrants from the Social Registry registrants = self.fetch_registrants_batch(paginated_query) if not registrants: @@ -770,10 +761,11 @@ def fetch_social_registry_beneficiary(self): kind = "warning" if total_processed_records == 0 else "success" break - # Process Records asynchronously + # Process batch asynchronously sticky = True self.process_registrants_async(registrants, len(registrants)) + # Update counters total_processed_records += len(registrants) current_offset += len(registrants) @@ -781,9 +773,10 @@ def fetch_social_registry_beneficiary(self): "Processed batch: %s registrants (total: %s/%s)", len(registrants), total_processed_records, - limit, + current_limit, ) + # Set final message for successful async processing if total_processed_records > 0 and kind != "warning": message = ( _("Successfully queued %s registrants for asynchronous processing.") diff --git a/g2p_social_registry_importer/views/fetch_social_registry_beneficiary_views.xml b/g2p_social_registry_importer/views/fetch_social_registry_beneficiary_views.xml index 1cf8009a..b6490d79 100644 --- a/g2p_social_registry_importer/views/fetch_social_registry_beneficiary_views.xml +++ b/g2p_social_registry_importer/views/fetch_social_registry_beneficiary_views.xml @@ -110,7 +110,7 @@ - + Date: Tue, 22 Jul 2025 11:11:44 +0530 Subject: [PATCH 2/2] refactor: clean up registrants fetching logic --- .../fetch_social_registry_beneficiary.py | 117 +++++++++++------- 1 file changed, 71 insertions(+), 46 deletions(-) diff --git a/g2p_social_registry_importer/models/fetch_social_registry_beneficiary.py b/g2p_social_registry_importer/models/fetch_social_registry_beneficiary.py index 0f59cc97..0296c4d4 100644 --- a/g2p_social_registry_importer/models/fetch_social_registry_beneficiary.py +++ b/g2p_social_registry_importer/models/fetch_social_registry_beneficiary.py @@ -286,7 +286,16 @@ def build_request_body(self, today_isoformat, message_id, transaction_id, refere "message": message, } - def get_graphql_query(self, query_fields, current_limit=None, current_offset=None, **kwargs): + def get_graphql_query(self, query_fields=None, effective_limit=None, effective_offset=None, **kwargs): + """ + Build GraphQL query with pagination and filtering. + + Args: + query_fields: GraphQL fields to include in the query + effective_limit: Explicit limit (takes precedence over kwargs['limit']) + effective_offset: Explicit offset (takes precedence over kwargs['offset']) + **kwargs: Additional query parameters (limit/offset here are ignored) + """ params = [] if self.target_registry: @@ -297,10 +306,10 @@ def get_graphql_query(self, query_fields, current_limit=None, current_offset=Non params.append(f'lastSyncDate: "{self.last_sync_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")}"') # Add pagination parameters explicitly - if current_limit is not None: - params.append(f"limit: {current_limit}") - if current_offset is not None: - params.append(f"offset: {current_offset}") + if effective_limit is not None: + params.append(f"limit: {effective_limit}") + if effective_offset is not None: + params.append(f"offset: {effective_offset}") # Handle additional kwargs (skip ones already handled) for key, value in kwargs.items(): @@ -319,7 +328,10 @@ def get_graphql_query(self, query_fields, current_limit=None, current_offset=Non params_str = ", ".join(params) # Construct the final GraphQL query - graphql_query = f"{{ getRegistrants({params_str}) {query_fields} totalRegistrantCount }}" + if query_fields is None: + graphql_query = "{totalRegistrantCount}" + else: + graphql_query = f"{{ getRegistrants({params_str}) {query_fields} totalRegistrantCount }}" _logger.debug("updated graphql query: %s", graphql_query) return graphql_query.strip() @@ -340,12 +352,7 @@ def get_total_registrant_count(self): transaction_id = str(uuid.uuid4()) reference_id = str(uuid.uuid4()) - count_query_fields = "{ totalRegistrantCount }" - query_params = self.get_parsed_query_params() or {} - - # Override the query_fields explicitly - graphql_query = self.get_graphql_query(query_fields=count_query_fields, **query_params) - # Create request body for count + graphql_query = self.get_graphql_query() data = self.build_request_body( today_isoformat, message_id, transaction_id, reference_id, graphql_query ) @@ -678,51 +685,62 @@ def fetch_social_registry_beneficiary(self): query_fields = self.query_fields.strip() query_params = self.get_parsed_query_params() - max_registrant_per_batch = int( + max_registrants_per_batch = int( self.env["ir.config_parameter"] .sudo() .get_param("g2p_import_social_registry.max_registrants_count_job_queue", 100) ) - total_processed_records = 0 # Initialize variable + total_processed_registrants = 0 - # Determine processing parameters based on conditions if query_params is None: - # fetch all records + # No query parameters provided → fetch all records from the beginning + # Example: query_params = None + # → fetch all records (e.g., 500), starting from index 0 + total_count = self.get_total_registrant_count() if total_count == 0: message = _("No registrants found in the Social Registry.") kind = "warning" - raise ValueError("Empty Social Registry") - current_limit = total_count - current_offset = 0 + effective_limit = total_count + effective_offset = 0 query_params = {} - else: + # Query parameters provided limit = query_params.get("limit") - offset = query_params.get("offset") + offset = query_params.get("offset", 0) + + if limit is not None: + # 'limit' is provided → use given limit and offset + # Example: query_params = {'limit': 100, 'offset': 200} + # → fetch 100 records starting from the 201st record - if offset is not None: - # limit with offset - paginated fetch from specific offset - current_limit = limit - current_offset = offset + effective_limit = limit + effective_offset = offset else: - # limit without offset - fetch from beginning - current_limit = limit - current_offset = 0 + # 'limit' is not provided → treat as fetch all from 'offset' + # Example: query_params = {'offset': 50} + # → fetch all remaining records starting from the 51st + + total_count = self.get_total_registrant_count() + if total_count == 0: + message = _("No registrants found in the Social Registry.") + kind = "warning" + effective_limit = total_count + effective_offset = offset _logger.info( "Starting Social Registry fetch - Limit: %s, Offset: %s, Batch size: %s", - current_limit, - current_offset, - max_registrant_per_batch, + effective_limit, + effective_offset, + max_registrants_per_batch, ) # Process based on batch size threshold - if max_registrant_per_batch >= current_limit: + if max_registrants_per_batch >= effective_limit: # Synchronous processing paginated_query = self.get_graphql_query( - query_fields, current_limit, current_offset, **query_params + query_fields, effective_limit, effective_offset, **query_params ) registrants = self.fetch_registrants_batch(paginated_query) @@ -732,7 +750,7 @@ def fetch_social_registry_beneficiary(self): message = _("Successfully processed %s registrants.") % len(registrants) kind = "success" self.process_registrants(registrants) - total_processed_records = len(registrants) + total_processed_registrants = len(registrants) else: message = _( "No registrants found. " @@ -743,9 +761,12 @@ def fetch_social_registry_beneficiary(self): else: # Process asynchronously in batches - while total_processed_records < current_limit: + current_offset = effective_offset + + while total_processed_registrants < effective_limit: # Calculate batch size for this iteration - batch_size = min(max_registrant_per_batch, current_limit - total_processed_records) + remaining_records = effective_limit - total_processed_registrants + batch_size = min(max_registrants_per_batch, remaining_records) # Build and execute paginated query paginated_query = self.get_graphql_query( @@ -756,9 +777,10 @@ def fetch_social_registry_beneficiary(self): if not registrants: message = ( - _("No more registrants to process. Processed %s total.") % total_processed_records + _("No more registrants to process. Processed %s total.") + % total_processed_registrants ) - kind = "warning" if total_processed_records == 0 else "success" + kind = "warning" if total_processed_registrants == 0 else "success" break # Process batch asynchronously @@ -766,25 +788,28 @@ def fetch_social_registry_beneficiary(self): self.process_registrants_async(registrants, len(registrants)) # Update counters - total_processed_records += len(registrants) - current_offset += len(registrants) + batch_count = len(registrants) + total_processed_registrants += batch_count + current_offset += batch_count _logger.info( "Processed batch: %s registrants (total: %s/%s)", - len(registrants), - total_processed_records, - current_limit, + batch_count, + total_processed_registrants, + effective_limit, ) # Set final message for successful async processing - if total_processed_records > 0 and kind != "warning": + if total_processed_registrants > 0 and kind != "warning": message = ( _("Successfully queued %s registrants for asynchronous processing.") - % total_processed_records + % total_processed_registrants ) kind = "success" - _logger.info("Completed Social Registry fetch. Processed %s registrants", total_processed_records) + _logger.info( + "Completed Social Registry fetch. Processed %s registrants", total_processed_registrants + ) self.last_sync_date = fields.Datetime.now() end_time = fields.Datetime.now()