diff --git a/sources/hubspot/__init__.py b/sources/hubspot/__init__.py index 621bb561b..ec423d2f7 100644 --- a/sources/hubspot/__init__.py +++ b/sources/hubspot/__init__.py @@ -30,28 +30,32 @@ List, Literal, Optional, - Sequence, ) from urllib.parse import quote import dlt -from dlt.common import pendulum +from dlt.common import logger, pendulum from dlt.common.typing import TDataItems from dlt.sources import DltResource from .helpers import ( _get_property_names_types, _to_dlt_columns_schema, + search_data_since, fetch_data, fetch_property_history, get_properties_labels, + SearchOutOfBoundsException, ) from .settings import ( ALL_OBJECTS, ARCHIVED_PARAM, + CRM_OBJECT_ASSOCIATIONS, CRM_OBJECT_ENDPOINTS, CRM_PIPELINES_ENDPOINT, ENTITY_PROPERTIES, + LAST_MODIFIED_PROPERTY, + HUBSPOT_CREATION_DATE, MAX_PROPS_LENGTH, OBJECT_TYPE_PLURAL, OBJECT_TYPE_SINGULAR, @@ -73,6 +77,7 @@ def fetch_data_for_properties( api_key: str, object_type: str, soft_delete: bool, + last_modified: str = None, ) -> Iterator[TDataItems]: """ Fetch data for a given set of properties from the HubSpot API. @@ -82,20 +87,50 @@ def fetch_data_for_properties( api_key (str): HubSpot API key for authentication. object_type (str): The type of HubSpot object (e.g., 'company', 'contact'). soft_delete (bool): Flag to fetch soft-deleted (archived) records. + last_modified (str): The date from which to fetch records. If None, get all records. Yields: Iterator[TDataItems]: Data retrieved from the HubSpot API. """ + logger.info(f"Fetching data for {object_type}.") # The Hubspot API expects a comma separated string as properties joined_props = ",".join(sorted(props)) - params: Dict[str, Any] = {"properties": joined_props, "limit": 100} + associations = CRM_OBJECT_ASSOCIATIONS[object_type] + joined_associations = ",".join(associations) + params: Dict[str, Any] = { + "properties": joined_props, + "limit": 100, + } + if associations: + params["associations"] = joined_associations + context: Optional[Dict[str, Any]] = ( {SOFT_DELETE_KEY: False} if soft_delete else None ) - yield from fetch_data( - CRM_OBJECT_ENDPOINTS[object_type], api_key, params=params, context=context - ) + if last_modified is not None: + try: + yield from search_data_since( + CRM_OBJECT_ENDPOINTS[object_type], + api_key, + last_modified, + LAST_MODIFIED_PROPERTY[object_type], + props=props, + associations=associations, + context=context, + ) + except SearchOutOfBoundsException: + logger.info("Search out of bounds, fetching all data") + yield from fetch_data( + CRM_OBJECT_ENDPOINTS[object_type], + api_key, + params=params, + context=context, + ) + else: + yield from fetch_data( + CRM_OBJECT_ENDPOINTS[object_type], api_key, params=params, context=context + ) if soft_delete: yield from fetch_data( CRM_OBJECT_ENDPOINTS[object_type], @@ -109,6 +144,7 @@ def crm_objects( object_type: str, api_key: str, props: List[str], + last_modified: dlt.sources.incremental[str], include_custom_props: bool = True, archived: bool = False, ) -> Iterator[TDataItems]: @@ -119,6 +155,7 @@ def crm_objects( object_type (str): Type of HubSpot object (e.g., 'company', 'contact'). api_key (str): API key for HubSpot authentication. props (List[str]): List of properties to retrieve. + last_modified (str): The date from which to fetch records include_custom_props (bool, optional): Include custom properties in the result. Defaults to True. archived (bool, optional): Fetch archived (soft-deleted) objects. Defaults to False. @@ -135,8 +172,17 @@ def crm_objects( prop: _to_dlt_columns_schema({prop: hb_type}) for prop, hb_type in props_to_type.items() } + last_modified_on = ( + None + if last_modified.start_value == last_modified.initial_value + else last_modified.start_value + ) for batch in fetch_data_for_properties( - list(props_to_type.keys()), api_key, object_type, archived + list(props_to_type.keys()), + api_key, + object_type, + archived, + last_modified_on, ): yield dlt.mark.with_hints(batch, dlt.mark.make_hints(columns=col_type_hints)) @@ -337,7 +383,7 @@ def properties_custom_labels(api_key: str = api_key) -> Iterator[TDataItems]: """ def get_properties_description( - properties_list_inner: List[Dict[str, Any]] + properties_list_inner: List[Dict[str, Any]], ) -> Iterator[Dict[str, Any]]: """Fetch properties.""" for property_info in properties_list_inner: @@ -411,6 +457,10 @@ def get_pipelines(object_type: str) -> Iterator[TDataItems]: object_type=obj, api_key=api_key, props=properties.get(obj), + last_modified=dlt.sources.incremental( + LAST_MODIFIED_PROPERTY[obj], + initial_value=HUBSPOT_CREATION_DATE.isoformat(), + ), include_custom_props=include_custom_props, archived=soft_delete, ) diff --git a/sources/hubspot/helpers.py b/sources/hubspot/helpers.py index 5f66afbab..36fc3ab61 100644 --- a/sources/hubspot/helpers.py +++ b/sources/hubspot/helpers.py @@ -5,14 +5,24 @@ import urllib.parse from typing import Any, Dict, Iterator, List, Optional +from dlt.common import logger from dlt.common.schema.typing import TColumnSchema from dlt.sources.helpers import requests -from .settings import OBJECT_TYPE_PLURAL, HS_TO_DLT_TYPE +from .settings import ( + CRM_ASSOCIATIONS_ENDPOINT, + CRM_SEARCH_ENDPOINT, + OBJECT_TYPE_PLURAL, + HS_TO_DLT_TYPE, +) BASE_URL = "https://api.hubapi.com/" +class SearchOutOfBoundsException(Exception): + pass + + def get_url(endpoint: str) -> str: """Get absolute hubspot endpoint URL""" return urllib.parse.urljoin(BASE_URL, endpoint) @@ -48,6 +58,21 @@ def pagination( return None +def search_pagination( + url: str, + _data: Dict[str, Any], + headers: Dict[str, Any], + params: Optional[Dict[str, Any]] = None, +) -> Optional[Dict[str, Any]]: + _after = _data.get("paging", {}).get("next", {}).get("after", False) + if _after and _after != "10000": + # Get the next page response + r = requests.post(url, headers=headers, json={**params, "after": _after}) + return r.json() # type: ignore + else: + return None + + def extract_association_data( _obj: Dict[str, Any], data: Dict[str, Any], @@ -126,6 +151,101 @@ def fetch_property_history( _data = None +def search_data_since( + endpoint: str, + api_key: str, + last_modified: str, + last_modified_prop: str, + props: List[str], + associations: Optional[List[str]] = None, + context: Optional[Dict[str, Any]] = None, +) -> Iterator[List[Dict[str, Any]]]: + """ + Fetch data from the HUBSPOT search endpoint, based on a given root endpoint, using a specified + API key and yield the properties of each result. This function yields results from a last modified + point in time based on the provided last modified property. + + Args: + endpoint (str): The root endpoint to fetch data from, as a string. + api_key (str): The API key to use for authentication, as a string. + last_modified (str): The date from which to start the search, as a string in ISO format. + last_modified_prop (str): The property used to check the last modified date against, as a string. + props: The list of properties to include for the object in the request. + associations: Optional dict of associations to search for for each object. + context (Optional[Dict[str, Any]]): Additional data which need to be added in the resulting page. + + Yields: + A List of CRM object dicts + + Raises: + requests.exceptions.HTTPError: If the API returns an HTTP error status code. + + Notes: + This function uses the `requests` library to make a POST request to the specified endpoint, with + the API key included in the headers. If the API returns a non-successful HTTP status code (e.g. + 404 Not Found), a `requests.exceptions.HTTPError` exception will be raised. + + The `endpoint` argument should be a relative URL, which will be modified to a search endpoint + and then appended to the base URL for the API. `last_modified`, `last_modified_prop`, and `props` + are used to pass additional parameters to the request + """ + # Construct the URL and headers for the API request + url = get_url(CRM_SEARCH_ENDPOINT.format(crm_endpoint=endpoint)) + headers = _get_headers(api_key) + body: Dict[str, Any] = { + "properties": sorted(props), + "limit": 200, + "filterGroups": [ + { + "filters": [ + { + "propertyName": last_modified_prop, + "operator": "GTE", + "value": last_modified, + } + ] + } + ], + "sorts": [{"propertyName": last_modified_prop, "direction": "ASCENDING"}], + } + + # Make the API request + r = requests.post(url, headers=headers, json=body) + # Parse the API response and yield the properties of each result + # Parse the response JSON data + _data = r.json() + + _total = _data.get("total", 0) + logger.info(f"Getting {_total} new objects from {url} starting at {last_modified}") + _max_last_modified = last_modified + # Yield the properties of each result in the API response + while _data is not None: + if "results" in _data: + for _result in _data["results"]: + if _result["updatedAt"]: + _max_last_modified = max(_max_last_modified, _result["updatedAt"]) + yield _data_to_objects( + _data, endpoint, headers, associations=associations, context=context + ) + + # Follow pagination links if they exist + _data = search_pagination(url, _data, headers, body) + + if _total > 9999: + if _max_last_modified == last_modified: + raise SearchOutOfBoundsException + logger.info(f"Starting new search iteration at {_max_last_modified}") + yield from search_data_since( + endpoint, + api_key, + _max_last_modified, + last_modified_prop, + props, + associations, + context, + ) + + def fetch_data( endpoint: str, api_key: str, @@ -168,38 +288,62 @@ def fetch_data( # Parse the API response and yield the properties of each result # Parse the response JSON data _data = r.json() + # Yield the properties of each result in the API response while _data is not None: if "results" in _data: - _objects: List[Dict[str, Any]] = [] - for _result in _data["results"]: - _obj = _result.get("properties", _result) - if "id" not in _obj and "id" in _result: - # Move id from properties to top level - _obj["id"] = _result["id"] - if "associations" in _result: - for association in _result["associations"]: - __data = _result["associations"][association] - - __values = extract_association_data( - _obj, __data, association, headers - ) - - # remove duplicates from list of dicts - __values = [ - dict(t) for t in {tuple(d.items()) for d in __values} - ] - - _obj[association] = __values - if context: - _obj.update(context) - _objects.append(_obj) - yield _objects + yield _data_to_objects(_data, endpoint, headers, context=context) # Follow pagination links if they exist _data = pagination(_data, headers) +def _data_to_objects( + data: Any, + endpoint: str, + headers: Dict[str, str], + associations: Optional[List[str]] = None, + context: Optional[Dict[str, Any]] = None, +) -> List[Dict[str, Any]]: + _objects: List[Dict[str, Any]] = [] + for _result in data["results"]: + _obj = _result.get("properties", _result) + if "id" not in _obj and "id" in _result: + # Move id from properties to top level + _obj["id"] = _result["id"] + if "associations" in _result: + for association in _result["associations"]: + __data = _result["associations"][association] + _add_association_data(__data, association, headers, _obj) + elif associations is not None: + for association in associations: + __endpoint = get_url( + CRM_ASSOCIATIONS_ENDPOINT.format( + crm_endpoint=endpoint, + object_id=_result["id"], + association=association, + ) + ) + r = requests.get(__endpoint, headers=headers, params={"limit": 500}) + __data = r.json() + _add_association_data(__data, association, headers, _obj) + if context: + _obj.update(context) + _objects.append(_obj) + return _objects + + +def _add_association_data( + data: Any, association: str, headers: Dict[str, str], obj: Any +) -> None: + __values = extract_association_data(obj, data, association, headers) + + # remove duplicates from list of dicts + __values = [dict(t) for t in {tuple(d.items()) for d in __values}] + + obj[association] = __values + + def _get_property_names_types( api_key: str, object_type: str ) -> Dict[str, Union[str, None]]: diff --git a/sources/hubspot/settings.py b/sources/hubspot/settings.py index 213a3be0a..8fcf09c9e 100644 --- a/sources/hubspot/settings.py +++ b/sources/hubspot/settings.py @@ -1,17 +1,18 @@ """Hubspot source settings and constants""" + from typing import Dict from dlt.common import pendulum from dlt.common.data_types import TDataType +HUBSPOT_CREATION_DATE = pendulum.datetime(year=2006, month=6, day=1) STARTDATE = pendulum.datetime(year=2024, month=2, day=10) -CRM_CONTACTS_ENDPOINT = ( - "/crm/v3/objects/contacts?associations=deals,products,tickets,quotes" -) -CRM_COMPANIES_ENDPOINT = ( - "/crm/v3/objects/companies?associations=contacts,deals,products,tickets,quotes" -) +CRM_CONTACTS_ENDPOINT = "/crm/v3/objects/contacts" +CRM_COMPANIES_ENDPOINT = "/crm/v3/objects/companies" CRM_DEALS_ENDPOINT = "/crm/v3/objects/deals" +CRM_CALLS_ENDPOINT = "/crm/v3/objects/calls" +CRM_EMAILS_ENDPOINT = "/crm/v3/objects/emails" +CRM_MEETINGS_ENDPOINT = "/crm/v3/objects/meetings" CRM_PRODUCTS_ENDPOINT = "/crm/v3/objects/products" CRM_TICKETS_ENDPOINT = "/crm/v3/objects/tickets" CRM_QUOTES_ENDPOINT = "/crm/v3/objects/quotes" @@ -19,22 +20,44 @@ CRM_PROPERTIES_ENDPOINT = "/crm/v3/properties/{objectType}/{property_name}" CRM_PIPELINES_ENDPOINT = "/crm/v3/pipelines/{objectType}" +CRM_SEARCH_ENDPOINT = "{crm_endpoint}/search" +CRM_ASSOCIATIONS_ENDPOINT = "{crm_endpoint}/{object_id}/associations/{association}" + CRM_OBJECT_ENDPOINTS = { "contact": CRM_CONTACTS_ENDPOINT, "company": CRM_COMPANIES_ENDPOINT, "deal": CRM_DEALS_ENDPOINT, + "call": CRM_CALLS_ENDPOINT, + "email": CRM_EMAILS_ENDPOINT, + "meeting": CRM_MEETINGS_ENDPOINT, "product": CRM_PRODUCTS_ENDPOINT, "ticket": CRM_TICKETS_ENDPOINT, "quote": CRM_QUOTES_ENDPOINT, "owner": CRM_OWNERS_ENDPOINT, } +CRM_OBJECT_ASSOCIATIONS = { + "contact": ["deals", "products", "tickets", "quotes"], + "company": ["contacts", "deals", "products", "tickets", "quotes"], + "deal": [], + "call": [], + "email": [], + "meeting": [], + "product": [], + "ticket": [], + "quote": [], + "owner": [], +} + WEB_ANALYTICS_EVENTS_ENDPOINT = "/events/v3/events?objectType={objectType}&objectId={objectId}&occurredAfter={occurredAfter}&occurredBefore={occurredBefore}&sort=-occurredAt" OBJECT_TYPE_SINGULAR = { "companies": "company", "contacts": "contact", "deals": "deal", + "calls": "call", + "emails": "email", + "meetings": "meeting", "tickets": "ticket", "products": "product", "quotes": "quote", @@ -72,6 +95,39 @@ "pipeline", ] +DEFAULT_CALL_PROPS = [ + "hs_object_id", + "hs_createdate", + "hs_call_title", + "hs_body_preview", + "hs_activity_type", + "hubspot_owner_id", + "hs_lastmodifieddate", +] + +DEFAULT_EMAIL_PROPS = [ + "hs_object_id", + "hs_createdate", + "hs_email_subject", + "hs_body_preview", + "hs_email_from_email", + "hs_email_to_email", + "hs_email_status", + "hubspot_owner_id", + "hs_lastmodifieddate", +] + +DEFAULT_MEETING_PROPS = [ + "hs_object_id", + "hs_createdate", + "hs_meeting_title", + "hs_body_preview", + "hs_meeting_start_time", + "hs_activity_type", + "hubspot_owner_id", + "hs_lastmodifieddate", +] + DEFAULT_TICKET_PROPS = [ "createdate", "content", @@ -107,11 +163,25 @@ "company": DEFAULT_COMPANY_PROPS, "contact": DEFAULT_CONTACT_PROPS, "deal": DEFAULT_DEAL_PROPS, + "call": DEFAULT_CALL_PROPS, + "email": DEFAULT_EMAIL_PROPS, + "meeting": DEFAULT_MEETING_PROPS, "ticket": DEFAULT_TICKET_PROPS, "product": DEFAULT_PRODUCT_PROPS, "quote": DEFAULT_QUOTE_PROPS, } +LAST_MODIFIED_PROPERTY = { + "company": "hs_lastmodifieddate", + "contact": "lastmodifieddate", + "deal": "hs_lastmodifieddate", + "call": "hs_lastmodifieddate", + "email": "hs_lastmodifieddate", + "meeting": "hs_lastmodifieddate", + "ticket": "hs_lastmodifieddate", + "product": "hs_lastmodifieddate", + "quote": "hs_lastmodifieddate", +} PIPELINES_OBJECTS = ["deals", "tickets"] SOFT_DELETE_KEY = "is_deleted"