diff --git a/g2p_social_registry_importer/data/search_criteria.xml b/g2p_social_registry_importer/data/search_criteria.xml
index cd3c6f60..c42aaa13 100644
--- a/g2p_social_registry_importer/data/search_criteria.xml
+++ b/g2p_social_registry_importer/data/search_criteria.xml
@@ -6,6 +6,12 @@
False
group
+ {
+ "limit": 10000,
+ "offset": 1000,
+ "order": "id asc"
+ }
+
{
"name": .name,
@@ -68,8 +74,7 @@
}
- {
- getRegistrants(limit:2){
+ {
name,
isGroup,
givenName,
@@ -119,7 +124,7 @@
expiryDate
}
}
- }
+
diff --git a/g2p_social_registry_importer/models/fetch_social_registry_beneficiary.py b/g2p_social_registry_importer/models/fetch_social_registry_beneficiary.py
index be71d1b5..0296c4d4 100644
--- a/g2p_social_registry_importer/models/fetch_social_registry_beneficiary.py
+++ b/g2p_social_registry_importer/models/fetch_social_registry_beneficiary.py
@@ -35,11 +35,9 @@ class G2PFetchSocialRegistryBeneficiary(models.Model):
"g2p.program",
domain=("[('target_type', '=', target_registry)]"),
)
-
- query = fields.Text(
- required=True,
- )
- output_mapping = fields.Text(required=True)
+ query_params = fields.Text(default="""{}""")
+ query_fields = fields.Text(required=True, default="""{}""")
+ output_mapping = fields.Text(required=True, default="""{}""")
last_sync_date = fields.Datetime(string="Last synced on", required=False)
@@ -148,6 +146,24 @@ def onchange_target_registry(self):
for rec in self:
rec.target_program = None
+ @api.constrains("query_params")
+ def _validate_query_params(self):
+ for rec in self:
+ if rec.query_params:
+ try:
+ json.loads(rec.query_params)
+ except json.JSONDecodeError as e:
+ raise ValidationError(_("Query Parameters must be valid JSON. Error: %s") % str(e)) from e
+
+ def get_parsed_query_params(self):
+ """Parse query_params JSON and return as dict"""
+ self.ensure_one()
+ try:
+ return json.loads(self.query_params) if self.query_params else {}
+ except json.JSONDecodeError:
+ _logger.error("Invalid JSON in query_params for record %s", self.id)
+ return {}
+
@api.constrains("output_mapping")
def constraint_json_fields(self):
for rec in self:
@@ -228,10 +244,12 @@ def get_auth_token(self, auth_url):
else:
raise ValidationError(_("{reason}: Unable to connect to API.").format(reason=response.reason))
- def get_header_for_body(self, social_registry_version, today_isoformat, message_id):
+ def build_request_body(self, today_isoformat, message_id, transaction_id, reference_id, query):
+ """Build the request body for API call"""
sender_id = self.env["ir.config_parameter"].sudo().get_param("web.base.url") or ""
receiver_id = "Social Registry"
- return {
+
+ header = {
"version": "1.0.0",
"message_id": message_id,
"message_ts": today_isoformat,
@@ -240,82 +258,170 @@ def get_header_for_body(self, social_registry_version, today_isoformat, message_
"sender_uri": "",
"receiver_id": receiver_id,
"total_count": 0,
+ "is_msg_encrypted": False,
+ "meta": {},
}
- def get_graphql_query(self):
- query = self.query.strip()
-
- graphql_query = query[0:-1] + "totalRegistrantCount }"
- _logger.debug(query)
-
- if self.target_registry:
- index = graphql_query.find("(") + 1
- is_group = str(self.target_registry == "group").lower()
- if not index:
- get_registrants_index = graphql_query.find("getRegistrants") + 14
- graphql_query = (
- graphql_query[:get_registrants_index] + "()" + graphql_query[get_registrants_index:]
- )
- index = graphql_query.find("(") + 1
-
- graphql_query = graphql_query[:index] + f"isGroup: {is_group}" + graphql_query[index:]
-
- else:
- graphql_query = graphql_query[:index] + f"isGroup: {is_group}," + graphql_query[index:]
-
- if self.last_sync_date:
- index = graphql_query.find("(") + 1
- if not index:
- get_registrants_index = graphql_query.find("getRegistrants") + 14
- graphql_query = (
- graphql_query[:get_registrants_index] + "()" + graphql_query[get_registrants_index:]
- )
- index = graphql_query.find("(") + 1
- graphql_query = (
- graphql_query[:index]
- + f'lastSyncDate: "{self.last_sync_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")}"'
- + graphql_query[index:]
- )
-
- else:
- graphql_query = (
- graphql_query[:index]
- + f'lastSyncDate: "{self.last_sync_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")}",'
- + graphql_query[index:]
- )
-
- _logger.debug("updated graphql query", graphql_query)
- return graphql_query.strip()
-
- def get_search_request(self, reference_id, today_isoformat):
- search_requests = {
+ search_request = {
"reference_id": reference_id,
"timestamp": today_isoformat,
"search_criteria": {
+ "version": "1.0.0",
"reg_type": "G2P:RegistryType:Individual",
+ "reg_sub_type": "Individual",
"query_type": "graphql",
- "query": self.get_graphql_query(),
+ "query": query,
},
+ "locale": "en",
}
- return search_requests
-
- def get_message(self, today_isoformat, transaction_id, reference_id):
- # Define Search Requests
- search_request = self.get_search_request(reference_id, today_isoformat)
-
- return {
+ message = {
"transaction_id": transaction_id,
"search_request": [search_request],
}
- def get_data(self, signature, header, message):
return {
- "sinature": signature,
+ "signature": "",
"header": header,
"message": message,
}
+ def get_graphql_query(self, query_fields=None, effective_limit=None, effective_offset=None, **kwargs):
+ """
+ Build GraphQL query with pagination and filtering.
+
+ Args:
+ query_fields: GraphQL fields to include in the query
+ effective_limit: Explicit limit (takes precedence over kwargs['limit'])
+ effective_offset: Explicit offset (takes precedence over kwargs['offset'])
+ **kwargs: Additional query parameters (limit/offset here are ignored)
+ """
+ params = []
+
+ if self.target_registry:
+ is_group = str(self.target_registry == "group").lower()
+ params.append(f"isGroup: {is_group}")
+
+ if self.last_sync_date:
+ params.append(f'lastSyncDate: "{self.last_sync_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")}"')
+
+ # Add pagination parameters explicitly
+ if effective_limit is not None:
+ params.append(f"limit: {effective_limit}")
+ if effective_offset is not None:
+ params.append(f"offset: {effective_offset}")
+
+ # Handle additional kwargs (skip ones already handled)
+ for key, value in kwargs.items():
+ if key in ["limit", "offset", "isGroup", "lastSyncDate"]:
+ continue
+
+ if value is not None:
+ if isinstance(value, str):
+ params.append(f'{key}: "{value}"')
+ elif isinstance(value, bool):
+ params.append(f"{key}: {str(value).lower()}")
+ else:
+ params.append(f"{key}: {value}")
+
+ # Build the parameters string
+ params_str = ", ".join(params)
+
+ # Construct the final GraphQL query
+ if query_fields is None:
+ graphql_query = "{totalRegistrantCount}"
+ else:
+ graphql_query = f"{{ getRegistrants({params_str}) {query_fields} totalRegistrantCount }}"
+
+ _logger.debug("updated graphql query: %s", graphql_query)
+ return graphql_query.strip()
+
+ def get_total_registrant_count(self):
+ self.ensure_one()
+
+ try:
+ # Get paths and auth
+ paths = self.get_data_source_paths()
+ auth_url = self.get_social_registry_auth_url(paths)
+ auth_token = self.get_auth_token(auth_url)
+ search_url = self.get_social_registry_search_url(paths)
+
+ # Prepare count query
+ today_isoformat = datetime.now(timezone.utc).isoformat()
+ message_id = str(uuid.uuid4())
+ transaction_id = str(uuid.uuid4())
+ reference_id = str(uuid.uuid4())
+
+ graphql_query = self.get_graphql_query()
+ data = self.build_request_body(
+ today_isoformat, message_id, transaction_id, reference_id, graphql_query
+ )
+
+ # Make request
+ response = requests.post(
+ search_url,
+ data=json.dumps(data),
+ headers={"Authorization": auth_token},
+ timeout=constants.REQUEST_TIMEOUT,
+ )
+
+ if response.ok:
+ search_responses = response.json().get("message", {}).get("search_response", [])
+ if search_responses:
+ reg_record = search_responses[0].get("data", {}).get("reg_records", {})
+ return reg_record.get("totalRegistrantCount", 0)
+
+ return 0
+
+ except Exception as e:
+ _logger.error("Error getting total registrant count: %s", str(e))
+ return 0
+
+ def fetch_registrants_batch(self, paginated_query):
+ """Fetch a batch of registrants with pagination"""
+ self.ensure_one()
+
+ try:
+ # Get paths and auth
+ paths = self.get_data_source_paths()
+ auth_url = self.get_social_registry_auth_url(paths)
+ auth_token = self.get_auth_token(auth_url)
+ search_url = self.get_social_registry_search_url(paths)
+
+ # Prepare paginated query
+ today_isoformat = datetime.now(timezone.utc).isoformat()
+ message_id = str(uuid.uuid4())
+ transaction_id = str(uuid.uuid4())
+ reference_id = str(uuid.uuid4())
+
+ data = self.build_request_body(
+ today_isoformat, message_id, transaction_id, reference_id, paginated_query
+ )
+
+ response = requests.post(
+ search_url,
+ data=json.dumps(data),
+ headers={"Authorization": auth_token},
+ timeout=constants.REQUEST_TIMEOUT,
+ )
+ if not response.ok:
+ _logger.error("Social Registry Search API response: %s", response.text)
+ response.raise_for_status()
+
+ # Process response
+ search_responses = response.json().get("message", {}).get("search_response", [])
+ if search_responses:
+ reg_record = search_responses[0].get("data", {}).get("reg_records", {})
+ registrants = reg_record.get("getRegistrants", [])
+
+ return registrants
+
+ return []
+
+ except Exception as e:
+ _logger.error("Error fetching registrant batch: %s", str(e))
+ raise
+
def get_partner_and_clean_identifier(self, identifiers):
clean_identifiers = []
partner_id = None
@@ -560,111 +666,162 @@ def process_registrants_async(self, registrants, count):
main_job.delay()
def fetch_social_registry_beneficiary(self):
- self.write({"job_status": "running", "start_datetime": fields.Datetime.now()})
-
- config_parameters = self.env["ir.config_parameter"].sudo()
- today_isoformat = datetime.now(timezone.utc).isoformat()
- social_registry_version = config_parameters.get_param("social_registry_version")
- max_registrant = int(
- config_parameters.get_param("g2p_import_social_registry.max_registrants_count_job_queue")
- )
-
- message_id = str(uuid.uuid4())
- transaction_id = str(uuid.uuid4())
- reference_id = str(uuid.uuid4())
-
- # Define Data Source
- paths = self.get_data_source_paths()
-
- # Define Social Registry auth url
+ """
+ Main method to fetch beneficiaries from Social Registry.
- full_social_registry_auth_url = self.get_social_registry_auth_url(paths)
+ Supports both synchronous and asynchronous processing based on batch size.
+ Uses pagination to handle large datasets efficiently.
- # Retrieve auth token
+ Returns:
+ dict: Action dictionary for UI notification
+ """
+ sticky = False
+ kind = "info"
+ message = _("No registrants were processed.")
- auth_token = self.get_auth_token(full_social_registry_auth_url)
+ try:
+ self.write({"job_status": "running", "start_datetime": fields.Datetime.now()})
- # Define Social Registry search url
- full_social_registry_search_url = self.get_social_registry_search_url(paths)
+ query_fields = self.query_fields.strip()
+ query_params = self.get_parsed_query_params()
- # Define header
- header = self.get_header_for_body(
- social_registry_version,
- today_isoformat,
- message_id,
- )
+ max_registrants_per_batch = int(
+ self.env["ir.config_parameter"]
+ .sudo()
+ .get_param("g2p_import_social_registry.max_registrants_count_job_queue", 100)
+ )
- # Define message
- message = self.get_message(
- today_isoformat,
- transaction_id=transaction_id,
- reference_id=reference_id,
- )
+ total_processed_registrants = 0
- signature = ""
+ if query_params is None:
+ # No query parameters provided → fetch all records from the beginning
+ # Example: query_params = None
+ # → fetch all records (e.g., 500), starting from index 0
- # Define data
- data = self.get_data(
- signature,
- header,
- message,
- )
+ total_count = self.get_total_registrant_count()
+ if total_count == 0:
+ message = _("No registrants found in the Social Registry.")
+ kind = "warning"
+ effective_limit = total_count
+ effective_offset = 0
+ query_params = {}
+ else:
+ # Query parameters provided
+ limit = query_params.get("limit")
+ offset = query_params.get("offset", 0)
- data = json.dumps(data)
+ if limit is not None:
+ # 'limit' is provided → use given limit and offset
+ # Example: query_params = {'limit': 100, 'offset': 200}
+ # → fetch 100 records starting from the 201st record
- # POST Request
- response = requests.post(
- full_social_registry_search_url,
- data=data,
- headers={"Authorization": auth_token},
- timeout=constants.REQUEST_TIMEOUT,
- )
+ effective_limit = limit
+ effective_offset = offset
+ else:
+ # 'limit' is not provided → treat as fetch all from 'offset'
+ # Example: query_params = {'offset': 50}
+ # → fetch all remaining records starting from the 51st
+
+ total_count = self.get_total_registrant_count()
+ if total_count == 0:
+ message = _("No registrants found in the Social Registry.")
+ kind = "warning"
+ effective_limit = total_count
+ effective_offset = offset
+
+ _logger.info(
+ "Starting Social Registry fetch - Limit: %s, Offset: %s, Batch size: %s",
+ effective_limit,
+ effective_offset,
+ max_registrants_per_batch,
+ )
- if not response.ok:
- _logger.error("Social Registry Search API response: %s", response.text)
- response.raise_for_status()
+ # Process based on batch size threshold
+ if max_registrants_per_batch >= effective_limit:
+ # Synchronous processing
+ paginated_query = self.get_graphql_query(
+ query_fields, effective_limit, effective_offset, **query_params
+ )
- sticky = False
+ registrants = self.fetch_registrants_batch(paginated_query)
- # Process response
- if response.ok:
- kind = "success"
- message = _("Successfully Imported Social Registry Beneficiaries")
+ if registrants:
+ sticky = True
+ message = _("Successfully processed %s registrants.") % len(registrants)
+ kind = "success"
+ self.process_registrants(registrants)
+ total_processed_registrants = len(registrants)
+ else:
+ message = _(
+ "No registrants found. "
+ "Verify last sync date or "
+ "check if Social Registry contains data."
+ )
+ kind = "warning"
- search_responses = response.json().get("message", {}).get("search_response", [])
- if not search_responses:
- kind = "warning"
- message = _("No imported beneficiary")
+ else:
+ # Process asynchronously in batches
+ current_offset = effective_offset
- for search_response in search_responses:
- reg_record = search_response.get("data", {}).get("reg_records", [])
- registrants = reg_record.get("getRegistrants", [])
- total_partners_count = reg_record.get("totalRegistrantCount", "")
+ while total_processed_registrants < effective_limit:
+ # Calculate batch size for this iteration
+ remaining_records = effective_limit - total_processed_registrants
+ batch_size = min(max_registrants_per_batch, remaining_records)
- if total_partners_count:
- if total_partners_count < max_registrant:
- self.process_registrants(registrants)
+ # Build and execute paginated query
+ paginated_query = self.get_graphql_query(
+ query_fields, batch_size, current_offset, **query_params
+ )
- else:
- self.process_registrants_async(registrants, total_partners_count)
- kind = "success"
- message = _("Fetching from Social Registry Started Asynchronously.")
- sticky = True
+ registrants = self.fetch_registrants_batch(paginated_query)
+
+ if not registrants:
+ message = (
+ _("No more registrants to process. Processed %s total.")
+ % total_processed_registrants
+ )
+ kind = "warning" if total_processed_registrants == 0 else "success"
+ break
+
+ # Process batch asynchronously
+ sticky = True
+ self.process_registrants_async(registrants, len(registrants))
+
+ # Update counters
+ batch_count = len(registrants)
+ total_processed_registrants += batch_count
+ current_offset += batch_count
+
+ _logger.info(
+ "Processed batch: %s registrants (total: %s/%s)",
+ batch_count,
+ total_processed_registrants,
+ effective_limit,
+ )
- else:
+ # Set final message for successful async processing
+ if total_processed_registrants > 0 and kind != "warning":
+ message = (
+ _("Successfully queued %s registrants for asynchronous processing.")
+ % total_processed_registrants
+ )
kind = "success"
- message = _("No matching records found.")
+
+ _logger.info(
+ "Completed Social Registry fetch. Processed %s registrants", total_processed_registrants
+ )
self.last_sync_date = fields.Datetime.now()
+ end_time = fields.Datetime.now()
+ self.write({"job_status": "completed", "end_datetime": end_time})
- else:
- self.write({"job_status": "failed", "end_datetime": fields.Datetime.now()})
+ except Exception as e:
+ _logger.exception("Failed to fetch registrants from Social Registry: %s", e)
+ end_time = fields.Datetime.now()
+ self.write({"job_status": "failed", "end_datetime": end_time})
+ message = _("Failed to fetch registrants: %s") % str(e)
kind = "danger"
- message = response.json().get("error", {}).get("message", "")
- if not message:
- message = _("{reason}: Unable to connect to API.").format(reason=response.reason)
-
- self.write({"job_status": "completed", "end_datetime": fields.Datetime.now()})
+ sticky = True
action = {
"type": "ir.actions.client",
diff --git a/g2p_social_registry_importer/views/fetch_social_registry_beneficiary_views.xml b/g2p_social_registry_importer/views/fetch_social_registry_beneficiary_views.xml
index 10414145..b6490d79 100644
--- a/g2p_social_registry_importer/views/fetch_social_registry_beneficiary_views.xml
+++ b/g2p_social_registry_importer/views/fetch_social_registry_beneficiary_views.xml
@@ -109,7 +109,8 @@
-
+
+