diff --git a/g2p_social_registry_importer/data/search_criteria.xml b/g2p_social_registry_importer/data/search_criteria.xml index cd3c6f60..c42aaa13 100644 --- a/g2p_social_registry_importer/data/search_criteria.xml +++ b/g2p_social_registry_importer/data/search_criteria.xml @@ -6,6 +6,12 @@ False group + { + "limit": 10000, + "offset": 1000, + "order": "id asc" + } + { "name": .name, @@ -68,8 +74,7 @@ } - { - getRegistrants(limit:2){ + { name, isGroup, givenName, @@ -119,7 +124,7 @@ expiryDate } } - } + diff --git a/g2p_social_registry_importer/models/fetch_social_registry_beneficiary.py b/g2p_social_registry_importer/models/fetch_social_registry_beneficiary.py index be71d1b5..0296c4d4 100644 --- a/g2p_social_registry_importer/models/fetch_social_registry_beneficiary.py +++ b/g2p_social_registry_importer/models/fetch_social_registry_beneficiary.py @@ -35,11 +35,9 @@ class G2PFetchSocialRegistryBeneficiary(models.Model): "g2p.program", domain=("[('target_type', '=', target_registry)]"), ) - - query = fields.Text( - required=True, - ) - output_mapping = fields.Text(required=True) + query_params = fields.Text(default="""{}""") + query_fields = fields.Text(required=True, default="""{}""") + output_mapping = fields.Text(required=True, default="""{}""") last_sync_date = fields.Datetime(string="Last synced on", required=False) @@ -148,6 +146,24 @@ def onchange_target_registry(self): for rec in self: rec.target_program = None + @api.constrains("query_params") + def _validate_query_params(self): + for rec in self: + if rec.query_params: + try: + json.loads(rec.query_params) + except json.JSONDecodeError as e: + raise ValidationError(_("Query Parameters must be valid JSON. Error: %s") % str(e)) from e + + def get_parsed_query_params(self): + """Parse query_params JSON and return as dict""" + self.ensure_one() + try: + return json.loads(self.query_params) if self.query_params else {} + except json.JSONDecodeError: + _logger.error("Invalid JSON in query_params for record %s", self.id) + return {} + @api.constrains("output_mapping") def constraint_json_fields(self): for rec in self: @@ -228,10 +244,12 @@ def get_auth_token(self, auth_url): else: raise ValidationError(_("{reason}: Unable to connect to API.").format(reason=response.reason)) - def get_header_for_body(self, social_registry_version, today_isoformat, message_id): + def build_request_body(self, today_isoformat, message_id, transaction_id, reference_id, query): + """Build the request body for API call""" sender_id = self.env["ir.config_parameter"].sudo().get_param("web.base.url") or "" receiver_id = "Social Registry" - return { + + header = { "version": "1.0.0", "message_id": message_id, "message_ts": today_isoformat, @@ -240,82 +258,170 @@ def get_header_for_body(self, social_registry_version, today_isoformat, message_ "sender_uri": "", "receiver_id": receiver_id, "total_count": 0, + "is_msg_encrypted": False, + "meta": {}, } - def get_graphql_query(self): - query = self.query.strip() - - graphql_query = query[0:-1] + "totalRegistrantCount }" - _logger.debug(query) - - if self.target_registry: - index = graphql_query.find("(") + 1 - is_group = str(self.target_registry == "group").lower() - if not index: - get_registrants_index = graphql_query.find("getRegistrants") + 14 - graphql_query = ( - graphql_query[:get_registrants_index] + "()" + graphql_query[get_registrants_index:] - ) - index = graphql_query.find("(") + 1 - - graphql_query = graphql_query[:index] + f"isGroup: {is_group}" + graphql_query[index:] - - else: - graphql_query = graphql_query[:index] + f"isGroup: {is_group}," + graphql_query[index:] - - if self.last_sync_date: - index = graphql_query.find("(") + 1 - if not index: - get_registrants_index = graphql_query.find("getRegistrants") + 14 - graphql_query = ( - graphql_query[:get_registrants_index] + "()" + graphql_query[get_registrants_index:] - ) - index = graphql_query.find("(") + 1 - graphql_query = ( - graphql_query[:index] - + f'lastSyncDate: "{self.last_sync_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")}"' - + graphql_query[index:] - ) - - else: - graphql_query = ( - graphql_query[:index] - + f'lastSyncDate: "{self.last_sync_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")}",' - + graphql_query[index:] - ) - - _logger.debug("updated graphql query", graphql_query) - return graphql_query.strip() - - def get_search_request(self, reference_id, today_isoformat): - search_requests = { + search_request = { "reference_id": reference_id, "timestamp": today_isoformat, "search_criteria": { + "version": "1.0.0", "reg_type": "G2P:RegistryType:Individual", + "reg_sub_type": "Individual", "query_type": "graphql", - "query": self.get_graphql_query(), + "query": query, }, + "locale": "en", } - return search_requests - - def get_message(self, today_isoformat, transaction_id, reference_id): - # Define Search Requests - search_request = self.get_search_request(reference_id, today_isoformat) - - return { + message = { "transaction_id": transaction_id, "search_request": [search_request], } - def get_data(self, signature, header, message): return { - "sinature": signature, + "signature": "", "header": header, "message": message, } + def get_graphql_query(self, query_fields=None, effective_limit=None, effective_offset=None, **kwargs): + """ + Build GraphQL query with pagination and filtering. + + Args: + query_fields: GraphQL fields to include in the query + effective_limit: Explicit limit (takes precedence over kwargs['limit']) + effective_offset: Explicit offset (takes precedence over kwargs['offset']) + **kwargs: Additional query parameters (limit/offset here are ignored) + """ + params = [] + + if self.target_registry: + is_group = str(self.target_registry == "group").lower() + params.append(f"isGroup: {is_group}") + + if self.last_sync_date: + params.append(f'lastSyncDate: "{self.last_sync_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")}"') + + # Add pagination parameters explicitly + if effective_limit is not None: + params.append(f"limit: {effective_limit}") + if effective_offset is not None: + params.append(f"offset: {effective_offset}") + + # Handle additional kwargs (skip ones already handled) + for key, value in kwargs.items(): + if key in ["limit", "offset", "isGroup", "lastSyncDate"]: + continue + + if value is not None: + if isinstance(value, str): + params.append(f'{key}: "{value}"') + elif isinstance(value, bool): + params.append(f"{key}: {str(value).lower()}") + else: + params.append(f"{key}: {value}") + + # Build the parameters string + params_str = ", ".join(params) + + # Construct the final GraphQL query + if query_fields is None: + graphql_query = "{totalRegistrantCount}" + else: + graphql_query = f"{{ getRegistrants({params_str}) {query_fields} totalRegistrantCount }}" + + _logger.debug("updated graphql query: %s", graphql_query) + return graphql_query.strip() + + def get_total_registrant_count(self): + self.ensure_one() + + try: + # Get paths and auth + paths = self.get_data_source_paths() + auth_url = self.get_social_registry_auth_url(paths) + auth_token = self.get_auth_token(auth_url) + search_url = self.get_social_registry_search_url(paths) + + # Prepare count query + today_isoformat = datetime.now(timezone.utc).isoformat() + message_id = str(uuid.uuid4()) + transaction_id = str(uuid.uuid4()) + reference_id = str(uuid.uuid4()) + + graphql_query = self.get_graphql_query() + data = self.build_request_body( + today_isoformat, message_id, transaction_id, reference_id, graphql_query + ) + + # Make request + response = requests.post( + search_url, + data=json.dumps(data), + headers={"Authorization": auth_token}, + timeout=constants.REQUEST_TIMEOUT, + ) + + if response.ok: + search_responses = response.json().get("message", {}).get("search_response", []) + if search_responses: + reg_record = search_responses[0].get("data", {}).get("reg_records", {}) + return reg_record.get("totalRegistrantCount", 0) + + return 0 + + except Exception as e: + _logger.error("Error getting total registrant count: %s", str(e)) + return 0 + + def fetch_registrants_batch(self, paginated_query): + """Fetch a batch of registrants with pagination""" + self.ensure_one() + + try: + # Get paths and auth + paths = self.get_data_source_paths() + auth_url = self.get_social_registry_auth_url(paths) + auth_token = self.get_auth_token(auth_url) + search_url = self.get_social_registry_search_url(paths) + + # Prepare paginated query + today_isoformat = datetime.now(timezone.utc).isoformat() + message_id = str(uuid.uuid4()) + transaction_id = str(uuid.uuid4()) + reference_id = str(uuid.uuid4()) + + data = self.build_request_body( + today_isoformat, message_id, transaction_id, reference_id, paginated_query + ) + + response = requests.post( + search_url, + data=json.dumps(data), + headers={"Authorization": auth_token}, + timeout=constants.REQUEST_TIMEOUT, + ) + if not response.ok: + _logger.error("Social Registry Search API response: %s", response.text) + response.raise_for_status() + + # Process response + search_responses = response.json().get("message", {}).get("search_response", []) + if search_responses: + reg_record = search_responses[0].get("data", {}).get("reg_records", {}) + registrants = reg_record.get("getRegistrants", []) + + return registrants + + return [] + + except Exception as e: + _logger.error("Error fetching registrant batch: %s", str(e)) + raise + def get_partner_and_clean_identifier(self, identifiers): clean_identifiers = [] partner_id = None @@ -560,111 +666,162 @@ def process_registrants_async(self, registrants, count): main_job.delay() def fetch_social_registry_beneficiary(self): - self.write({"job_status": "running", "start_datetime": fields.Datetime.now()}) - - config_parameters = self.env["ir.config_parameter"].sudo() - today_isoformat = datetime.now(timezone.utc).isoformat() - social_registry_version = config_parameters.get_param("social_registry_version") - max_registrant = int( - config_parameters.get_param("g2p_import_social_registry.max_registrants_count_job_queue") - ) - - message_id = str(uuid.uuid4()) - transaction_id = str(uuid.uuid4()) - reference_id = str(uuid.uuid4()) - - # Define Data Source - paths = self.get_data_source_paths() - - # Define Social Registry auth url + """ + Main method to fetch beneficiaries from Social Registry. - full_social_registry_auth_url = self.get_social_registry_auth_url(paths) + Supports both synchronous and asynchronous processing based on batch size. + Uses pagination to handle large datasets efficiently. - # Retrieve auth token + Returns: + dict: Action dictionary for UI notification + """ + sticky = False + kind = "info" + message = _("No registrants were processed.") - auth_token = self.get_auth_token(full_social_registry_auth_url) + try: + self.write({"job_status": "running", "start_datetime": fields.Datetime.now()}) - # Define Social Registry search url - full_social_registry_search_url = self.get_social_registry_search_url(paths) + query_fields = self.query_fields.strip() + query_params = self.get_parsed_query_params() - # Define header - header = self.get_header_for_body( - social_registry_version, - today_isoformat, - message_id, - ) + max_registrants_per_batch = int( + self.env["ir.config_parameter"] + .sudo() + .get_param("g2p_import_social_registry.max_registrants_count_job_queue", 100) + ) - # Define message - message = self.get_message( - today_isoformat, - transaction_id=transaction_id, - reference_id=reference_id, - ) + total_processed_registrants = 0 - signature = "" + if query_params is None: + # No query parameters provided → fetch all records from the beginning + # Example: query_params = None + # → fetch all records (e.g., 500), starting from index 0 - # Define data - data = self.get_data( - signature, - header, - message, - ) + total_count = self.get_total_registrant_count() + if total_count == 0: + message = _("No registrants found in the Social Registry.") + kind = "warning" + effective_limit = total_count + effective_offset = 0 + query_params = {} + else: + # Query parameters provided + limit = query_params.get("limit") + offset = query_params.get("offset", 0) - data = json.dumps(data) + if limit is not None: + # 'limit' is provided → use given limit and offset + # Example: query_params = {'limit': 100, 'offset': 200} + # → fetch 100 records starting from the 201st record - # POST Request - response = requests.post( - full_social_registry_search_url, - data=data, - headers={"Authorization": auth_token}, - timeout=constants.REQUEST_TIMEOUT, - ) + effective_limit = limit + effective_offset = offset + else: + # 'limit' is not provided → treat as fetch all from 'offset' + # Example: query_params = {'offset': 50} + # → fetch all remaining records starting from the 51st + + total_count = self.get_total_registrant_count() + if total_count == 0: + message = _("No registrants found in the Social Registry.") + kind = "warning" + effective_limit = total_count + effective_offset = offset + + _logger.info( + "Starting Social Registry fetch - Limit: %s, Offset: %s, Batch size: %s", + effective_limit, + effective_offset, + max_registrants_per_batch, + ) - if not response.ok: - _logger.error("Social Registry Search API response: %s", response.text) - response.raise_for_status() + # Process based on batch size threshold + if max_registrants_per_batch >= effective_limit: + # Synchronous processing + paginated_query = self.get_graphql_query( + query_fields, effective_limit, effective_offset, **query_params + ) - sticky = False + registrants = self.fetch_registrants_batch(paginated_query) - # Process response - if response.ok: - kind = "success" - message = _("Successfully Imported Social Registry Beneficiaries") + if registrants: + sticky = True + message = _("Successfully processed %s registrants.") % len(registrants) + kind = "success" + self.process_registrants(registrants) + total_processed_registrants = len(registrants) + else: + message = _( + "No registrants found. " + "Verify last sync date or " + "check if Social Registry contains data." + ) + kind = "warning" - search_responses = response.json().get("message", {}).get("search_response", []) - if not search_responses: - kind = "warning" - message = _("No imported beneficiary") + else: + # Process asynchronously in batches + current_offset = effective_offset - for search_response in search_responses: - reg_record = search_response.get("data", {}).get("reg_records", []) - registrants = reg_record.get("getRegistrants", []) - total_partners_count = reg_record.get("totalRegistrantCount", "") + while total_processed_registrants < effective_limit: + # Calculate batch size for this iteration + remaining_records = effective_limit - total_processed_registrants + batch_size = min(max_registrants_per_batch, remaining_records) - if total_partners_count: - if total_partners_count < max_registrant: - self.process_registrants(registrants) + # Build and execute paginated query + paginated_query = self.get_graphql_query( + query_fields, batch_size, current_offset, **query_params + ) - else: - self.process_registrants_async(registrants, total_partners_count) - kind = "success" - message = _("Fetching from Social Registry Started Asynchronously.") - sticky = True + registrants = self.fetch_registrants_batch(paginated_query) + + if not registrants: + message = ( + _("No more registrants to process. Processed %s total.") + % total_processed_registrants + ) + kind = "warning" if total_processed_registrants == 0 else "success" + break + + # Process batch asynchronously + sticky = True + self.process_registrants_async(registrants, len(registrants)) + + # Update counters + batch_count = len(registrants) + total_processed_registrants += batch_count + current_offset += batch_count + + _logger.info( + "Processed batch: %s registrants (total: %s/%s)", + batch_count, + total_processed_registrants, + effective_limit, + ) - else: + # Set final message for successful async processing + if total_processed_registrants > 0 and kind != "warning": + message = ( + _("Successfully queued %s registrants for asynchronous processing.") + % total_processed_registrants + ) kind = "success" - message = _("No matching records found.") + + _logger.info( + "Completed Social Registry fetch. Processed %s registrants", total_processed_registrants + ) self.last_sync_date = fields.Datetime.now() + end_time = fields.Datetime.now() + self.write({"job_status": "completed", "end_datetime": end_time}) - else: - self.write({"job_status": "failed", "end_datetime": fields.Datetime.now()}) + except Exception as e: + _logger.exception("Failed to fetch registrants from Social Registry: %s", e) + end_time = fields.Datetime.now() + self.write({"job_status": "failed", "end_datetime": end_time}) + message = _("Failed to fetch registrants: %s") % str(e) kind = "danger" - message = response.json().get("error", {}).get("message", "") - if not message: - message = _("{reason}: Unable to connect to API.").format(reason=response.reason) - - self.write({"job_status": "completed", "end_datetime": fields.Datetime.now()}) + sticky = True action = { "type": "ir.actions.client", diff --git a/g2p_social_registry_importer/views/fetch_social_registry_beneficiary_views.xml b/g2p_social_registry_importer/views/fetch_social_registry_beneficiary_views.xml index 10414145..b6490d79 100644 --- a/g2p_social_registry_importer/views/fetch_social_registry_beneficiary_views.xml +++ b/g2p_social_registry_importer/views/fetch_social_registry_beneficiary_views.xml @@ -109,7 +109,8 @@ - + +