diff --git a/mypy.ini b/mypy.ini index dc12fb3ae..23d9672cf 100644 --- a/mypy.ini +++ b/mypy.ini @@ -40,6 +40,12 @@ ignore_missing_imports=true [mypy-pyarrow.*] ignore_missing_imports=true +[mypy-quickbooks.*] +ignore_missing_imports=true + +[mypy-intuitlib.*] +ignore_missing_imports=true + [mypy-tests.*] disallow_untyped_defs=false disallow_any_generics=false diff --git a/sources/quickbooks_online/README.md b/sources/quickbooks_online/README.md new file mode 100644 index 000000000..1501715ad --- /dev/null +++ b/sources/quickbooks_online/README.md @@ -0,0 +1,53 @@ +# Quickbooks + +QuickBooks is a cloud-based accounting software designed for small to medium-sized businesses. This QuickBooks `dlt` verified source and pipeline example offers the capability to load QuickBooks endpoints such as "Customer" to a destination of your choosing. It enables you to conveniently load the following endpoint as a start: + +### Single loading endpoints (replace mode) + +| Endpoint | Mode | Description | +| --- | --- | --- | +| Customer | replace | A customer is a consumer of the service or product that your business offers. An individual customer can have an underlying nested structure, with a parent customer (the top-level object) having zero or more sub-customers and jobs associated with it. | + + +## Initialize the pipeline with Quickbooks verified source +```bash +dlt init quickbooks_online duckdb +``` + +Here, we chose DuckDB as the destination. Alternatively, you can also choose redshift, snowflake, or any of the otherĀ [destinations.](https://dlthub.com/docs/dlt-ecosystem/destinations/) + +## Setup verified source and pipeline example + +### Add credentials + +1. Open `.dlt/secrets.toml`. +2. Put the credentials in, these can be sourced from [quickbooks developer portal and quickbooks oauth playground](https://developer.intuit.com/app/developer/qbo/docs/develop/authentication-and-authorization/oauth-2.0#authorization-request): + ```toml + # put your secret values and credentials here. do not share this file and do not push it to github + [sources.quickbooks_online] + company_id="" + client_id="" + client_secret="" + refresh_token="" + redirect_url="" + ``` + +### Run the pipeline example + +1. Install the necessary dependencies by running the following command: + ```bash + pip install -r requirements.txt + ``` + +2. Now the pipeline can be run by using the command: + ```bash + python3 quickbooks_online_pipeline.py + ``` + +3. To make sure that everything is loaded as expected, use the command: + ```bash + dlt pipeline show + ``` + + For example, the pipeline_name for the above pipeline is `quickbooks_online`, you may also use any custom name instead. + diff --git a/sources/quickbooks_online/__init__.py b/sources/quickbooks_online/__init__.py new file mode 100644 index 000000000..c2108d9bf --- /dev/null +++ b/sources/quickbooks_online/__init__.py @@ -0,0 +1,81 @@ +"""Source for Quickbooks depending on the quickbooks_online-python python package. + +Quickbooks-python docs: https://github.com/ej2/python-quickbooks +Quickbooks api docs: https://developer.intuit.com/app/developer/qbo/docs/api/accounting/all-entities/ +Quickbooks company id: https://quickbooks.intuit.com/learn-support/en-uk/help-article/customer-company-settings/find-quickbooks-online-company-id/L7lp8O9yU_GB_en_GB +To get API credentials: https://developer.intuit.com/app/developer/qbo/docs/get-started/start-developing-your-app +Get oAuth Authorization code from: https://developer.intuit.com/app/developer/playground +""" + +from dlt.sources import DltResource + +from typing import Iterable, Sequence + +import dlt +from .oauth_setup import QuickBooksAuth +from .settings import sandbox_env, production_env +from dlt.common.typing import TDataItem +from intuitlib.client import AuthClient +from quickbooks import QuickBooks +from quickbooks.objects.customer import Customer +from quickbooks.objects.invoice import Invoice + + +@dlt.source(name="quickbooks_online") +def quickbooks_online( + environment: str, + client_id: str = dlt.secrets.value, + client_secret: str = dlt.secrets.value, + refresh_token: str = dlt.secrets.value, + company_id: str = dlt.secrets.value, + redirect_url: str = dlt.secrets.value, +) -> Sequence[DltResource]: + """ + Retrieves data from Quickbooks using the Quickbooks API. + + Args: + environment (str): The environment used for authentication, only "sandbox" or "production" values are allowed + client_id (str): The client id provided by quickbooks for authentication. Defaults to the value in the `dlt.secrets` object. + client_secret (str): The client secret provided by quickbooks for authentication. Defaults to the value in the `dlt.secrets` object. + refresh_token (str): The refresh token given a quickbooks scope. Defaults to the value in the `dlt.secrets` object. + company_id (str): The company id / realm id provided by quickbooks. Defaults to the value in the `dlt.secrets` object. + redirect_url (str): The redirect uri end user creates in quickbooks, found in the developer application created. Defaults to the value in the `dlt.secrets` object. + Yields: + DltResource: Data resources from Quickbooks. + """ + + bearer_access_token = QuickBooksAuth( + client_id=client_id, + client_secret=client_secret, + company_id=company_id, + redirect_url=redirect_url, + refresh_token=refresh_token, + is_sandbox=False if environment == production_env else True, + ).get_bearer_token_from_refresh_token() + + auth_client = AuthClient( + client_id=client_id, + client_secret=client_secret, + environment=environment, + redirect_uri=redirect_url, + access_token=bearer_access_token.accessToken, + ) + + client = QuickBooks( + auth_client=auth_client, refresh_token=refresh_token, company_id=company_id + ) + + # define resources + @dlt.resource + def customer() -> Iterable[TDataItem]: + customer = Customer.all(qb=client) # returns a list of iterables + for record in customer: + yield record.to_dict() + + @dlt.resource + def invoice() -> Iterable[TDataItem]: + invoice = Invoice.all(qb=client) + for record in invoice: + yield record.to_dict() + + return [customer, invoice] diff --git a/sources/quickbooks_online/oauth_setup.py b/sources/quickbooks_online/oauth_setup.py new file mode 100644 index 000000000..14b9da06f --- /dev/null +++ b/sources/quickbooks_online/oauth_setup.py @@ -0,0 +1,185 @@ +from dlt.sources.helpers import requests +import base64 +import json +import random +from intuitlib.enums import Scopes +from typing import Union +from urllib.parse import urlencode +from .settings import ( + discovery_document_url_sandbox, + discovery_document_url_prod +) + + +class OAuth2Config: + def __init__( + self, + issuer: str = "", + auth_endpoint: str = "", + token_endpoint: str = "", + userinfo_endpoint: str = "", + revoke_endpoint: str = "", + jwks_uri: str = "", + ): + self.issuer = issuer + self.auth_endpoint = auth_endpoint + self.token_endpoint = token_endpoint + self.userinfo_endpoint = userinfo_endpoint + self.revoke_endpoint = revoke_endpoint + self.jwks_uri = jwks_uri + + +class Bearer: + def __init__( + self, + refresh_expiry: str, + access_token: str, + token_type: str, + refresh_token: str, + access_token_expiry: str, + id_token: Union[str, None] = None, + ): + self.refreshExpiry = refresh_expiry + self.accessToken = access_token + self.tokenType = token_type + self.refreshToken = refresh_token + self.accessTokenExpiry = access_token_expiry + self.idToken = id_token + + +class QuickBooksAuth: + def __init__( + self, + client_id: str, + client_secret: str, + company_id: str, + redirect_url: str, + refresh_token: str = None, + is_sandbox: Union[bool, None] = True, + ): + """ + End user should use this class to generate refresh token once manually and store in secrets.toml + and continually use it to generate access tokens + + Should the user need to change scopes, then this should be generated again and stored safely + + Source code used is from: https://github.com/IntuitDeveloper/OAuth2PythonSampleApp/blob/master/sampleAppOAuth2/services.py + """ + self.is_sandbox = is_sandbox or None + self.client_id = client_id + self.client_secret = client_secret + self.company_id = company_id + self.redirect_url = redirect_url + self.refresh_token = refresh_token + + @staticmethod + def string_to_base64(s: str) -> str: + return base64.b64encode(bytes(s, "utf-8")).decode() + + @staticmethod + def get_random_string( + length: int = 64, + allowed_chars: str = "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789", + ) -> str: + return "".join(random.choice(allowed_chars) for i in range(length)) + + def get_discovery_document(self) -> OAuth2Config: + if self.is_sandbox: + discovery_document_url = discovery_document_url_sandbox + else: + discovery_document_url = discovery_document_url_prod + r = requests.get(discovery_document_url) + if r.status_code >= 400: + raise ConnectionError(r.json()) + + discovery_doc_json = r.json() + discovery_doc = OAuth2Config( + issuer=discovery_doc_json["issuer"], + auth_endpoint=discovery_doc_json["authorization_endpoint"], + userinfo_endpoint=discovery_doc_json["userinfo_endpoint"], + revoke_endpoint=discovery_doc_json["revocation_endpoint"], + token_endpoint=discovery_doc_json["token_endpoint"], + jwks_uri=discovery_doc_json["jwks_uri"], + ) + + return discovery_doc + + def get_auth_url(self, scope: Union[str, Scopes]) -> str: + """ + scopes available in settings.py from intuitlib.enums + """ + auth_endpoint = self.get_discovery_document().auth_endpoint + auth_url_params = { + "client_id": self.client_id, + "redirect_uri": self.redirect_url, + "response_type": "code", + "scope": scope, + "state": self.get_random_string(), + } + url = f"{auth_endpoint}?{urlencode(auth_url_params)}" + + return url + + def get_bearer_token( + self, auth_code: str, client_id: str, client_secret: str, redirect_uri: str + ) -> Union[str, Bearer]: + token_endpoint = self.get_discovery_document().token_endpoint + auth_header = "Basic " + self.string_to_base64(client_id + ":" + client_secret) + headers = { + "Accept": "application/json", + "content-type": "application/x-www-form-urlencoded", + "Authorization": auth_header, + } + payload = { + "code": auth_code, + "redirect_uri": redirect_uri, + "grant_type": "authorization_code", + } + r = requests.post(token_endpoint, data=payload, headers=headers) + if r.status_code != 200: + return r.text + bearer_raw = json.loads(r.text) + + if "id_token" in bearer_raw: + id_token = bearer_raw["id_token"] + else: + id_token = None + + return Bearer( + bearer_raw["x_refresh_token_expires_in"], + bearer_raw["access_token"], + bearer_raw["token_type"], + bearer_raw["refresh_token"], + bearer_raw["expires_in"], + id_token=id_token, + ) + + def get_bearer_token_from_refresh_token(self) -> Bearer: + token_endpoint = self.get_discovery_document().token_endpoint + auth_header = "Basic " + self.string_to_base64( + self.client_id + ":" + self.client_secret + ) + headers = { + "Accept": "application/json", + "content-type": "application/x-www-form-urlencoded", + "Authorization": auth_header, + } + + payload = {"refresh_token": self.refresh_token, "grant_type": "refresh_token"} + r = requests.post(token_endpoint, data=payload, headers=headers) + bearer_raw = json.loads(r.text) + + if "id_token" in bearer_raw: + id_token = bearer_raw["id_token"] + else: + id_token = None + + return Bearer( + bearer_raw["x_refresh_token_expires_in"], + bearer_raw["access_token"], + bearer_raw["token_type"], + bearer_raw["refresh_token"], + bearer_raw["expires_in"], + id_token=id_token, + ) diff --git a/sources/quickbooks_online/requirements.txt b/sources/quickbooks_online/requirements.txt new file mode 100644 index 000000000..d710b33af --- /dev/null +++ b/sources/quickbooks_online/requirements.txt @@ -0,0 +1,3 @@ +dlt>=0.5.1 +python-quickbooks>=0.9.12 +intuit-oauth==1.2.6 \ No newline at end of file diff --git a/sources/quickbooks_online/settings.py b/sources/quickbooks_online/settings.py new file mode 100644 index 000000000..dee07b5cd --- /dev/null +++ b/sources/quickbooks_online/settings.py @@ -0,0 +1,10 @@ +discovery_document_url_sandbox = ( + "https://developer.api.intuit.com/.well-known/openid_sandbox_configuration" +) +discovery_document_url_prod = ( + "https://developer.api.intuit.com/.well-known/openid_configuration" +) + +# comes directly from quickbooks https://developer.intuit.com/app/developer/qbo/docs/develop/authentication-and-authorization/oauth-2.0#authorization-request +sandbox_env = "sandbox" +production_env = "production" diff --git a/sources/quickbooks_online_pipeline.py b/sources/quickbooks_online_pipeline.py new file mode 100644 index 000000000..4217dbb53 --- /dev/null +++ b/sources/quickbooks_online_pipeline.py @@ -0,0 +1,17 @@ +import dlt +from quickbooks_online import quickbooks_online +from quickbooks_online.settings import sandbox_env, production_env + + +def load_customer() -> None: + pipeline = dlt.pipeline( + pipeline_name="quickbooks_customer", + destination="duckdb", + dataset_name="quickbooks_online", + ) + load_info = pipeline.run(quickbooks_online(environment=sandbox_env)) + print(load_info) + + +if __name__ == "__main__": + load_customer() diff --git a/tests/quickbooks_online/__init__.py b/tests/quickbooks_online/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/quickbooks_online/test_quickbooks_online_source.py b/tests/quickbooks_online/test_quickbooks_online_source.py new file mode 100644 index 000000000..9eae3d56f --- /dev/null +++ b/tests/quickbooks_online/test_quickbooks_online_source.py @@ -0,0 +1,28 @@ +from tests.utils import ALL_DESTINATIONS, assert_load_info, load_table_counts +import pytest +import dlt +from sources.quickbooks_online import quickbooks_online + + +@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) +def test_quickbooks_online(destination_name: str) -> None: + pipeline = dlt.pipeline( + pipeline_name="quickbooks_customer", + destination=destination_name, + dataset_name="duckdb_customer", + dev_mode=True, + ) + data = quickbooks_online() + load_info = pipeline.run(data) + assert_load_info(load_info) + + expected_tables = ["customer", "invoice"] + # only those tables in the schema + assert set(t["name"] for t in pipeline.default_schema.data_tables()) == set( + expected_tables + ) + # get counts + table_counts = load_table_counts(pipeline, *expected_tables) + # all tables loaded + assert set(table_counts.keys()) == set(expected_tables) + assert all(c > 0 for c in table_counts.values())