Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 68 additions & 21 deletions tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,19 +649,27 @@ def _test_acquire_token_obo(self, config_pca, config_cca,
# Here we just test regional apps won't adversely break OBO
http_client=None,
):
# 1. An app obtains a token representing a user, for our mid-tier service
pca = msal.PublicClientApplication(
config_pca["client_id"], authority=config_pca["authority"],
azure_region=azure_region,
http_client=http_client or MinimalHttpClient())
pca_result = pca.acquire_token_by_username_password(
config_pca["username"],
config_pca["password"],
scopes=config_pca["scope"],
)
self.assertIsNotNone(
pca_result.get("access_token"),
"PCA failed to get AT because %s" % json.dumps(pca_result, indent=2))
if "client_secret" not in config_pca:
# 1.a An app obtains a token representing a user, for our mid-tier service
result = msal.PublicClientApplication(
config_pca["client_id"], authority=config_pca["authority"],
azure_region=azure_region,
http_client=http_client or MinimalHttpClient(),
).acquire_token_by_username_password(
config_pca["username"], config_pca["password"],
scopes=config_pca["scope"],
)
else: # We repurpose the config_pca to contain client_secret for cca app 1
# 1.b An app obtains a token representing itself, for our mid-tier service
result = msal.ConfidentialClientApplication(
config_pca["client_id"], authority=config_pca["authority"],
client_credential=config_pca["client_secret"],
azure_region=azure_region,
http_client=http_client or MinimalHttpClient(),
).acquire_token_for_client(scopes=config_pca["scope"])
assertion = result.get("access_token")
self.assertIsNotNone(assertion, "First app failed to get AT. {}".format(
json.dumps(result, indent=2)))

# 2. Our mid-tier service uses OBO to obtain a token for downstream service
cca = msal.ConfidentialClientApplication(
Expand All @@ -674,9 +682,9 @@ def _test_acquire_token_obo(self, config_pca, config_cca,
# That's fine if OBO app uses short-lived msal instance per session.
# Otherwise, the OBO app need to implement a one-cache-per-user setup.
)
cca_result = cca.acquire_token_on_behalf_of(
pca_result['access_token'], config_cca["scope"])
self.assertNotEqual(None, cca_result.get("access_token"), str(cca_result))
cca_result = cca.acquire_token_on_behalf_of(assertion, config_cca["scope"])
self.assertIsNotNone(cca_result.get("access_token"), "OBO call failed: {}".format(
json.dumps(cca_result, indent=2)))

# 3. Now the OBO app can simply store downstream token(s) in same session.
# Alternatively, if you want to persist the downstream AT, and possibly
Expand All @@ -685,13 +693,27 @@ def _test_acquire_token_obo(self, config_pca, config_cca,
# Assuming you already did that (which is not shown in this test case),
# the following part shows one of the ways to obtain an AT from cache.
username = cca_result.get("id_token_claims", {}).get("preferred_username")
if username: # It means CCA have requested an IDT w/ "profile" scope
self.assertEqual(config_cca["username"], username)
accounts = cca.get_accounts(username=username)
assert len(accounts) == 1, "App is expected to partition token cache per user"
account = accounts[0]
if username is not None: # It means CCA have requested an IDT w/ "profile" scope
assert config_cca["username"] == username, "Incorrect test case configuration"
self.assertEqual(1, len(accounts), "App is supposed to partition token cache per user")
account = accounts[0] # Alternatively, cca app could just loop through each account
result = cca.acquire_token_silent(config_cca["scope"], account)
self.assertEqual(cca_result["access_token"], result["access_token"])
self.assertTrue(
result and result.get("access_token") == cca_result["access_token"],
"CCA should hit an access token from cache: {}".format(
json.dumps(cca.token_cache._cache, indent=2)))
if "refresh_token" in cca_result:
result = cca.acquire_token_silent(
config_cca["scope"], account=account, force_refresh=True)
self.assertTrue(
result and "access_token" in result,
"CCA should get an AT silently, but we got this instead: {}".format(result))
self.assertNotEqual(
result["access_token"], cca_result["access_token"],
"CCA should get a new AT")
else:
logger.info("AAD did not issue a RT for OBO flow")

def _test_acquire_token_by_client_secret(
self, client_id=None, client_secret=None, authority=None, scope=None,
Expand Down Expand Up @@ -933,6 +955,31 @@ def test_acquire_token_obo(self):

self._test_acquire_token_obo(config_pca, config_cca)

@unittest.skipUnless(
os.path.exists("tests/sp_obo.pem"),
"Need a 'tests/sp_obo.pem' private to run OBO for SP test")
def test_acquire_token_obo_for_sp(self):
authority = "https://login.windows-ppe.net/f686d426-8d16-42db-81b7-ab578e110ccd"
with open("tests/sp_obo.pem") as pem:
client_secret = {
"private_key": pem.read(),
"thumbprint": "378938210C976692D7F523B8C4FFBB645D17CE92",
}
midtier_app = {
"authority": authority,
"client_id": "c84e9c32-0bc9-4a73-af05-9efe9982a322",
"client_secret": client_secret,
"scope": ["23d08a1e-1249-4f7c-b5a5-cb11f29b6923/.default"],
#"username": "OBO-Client-PPE", # We do NOT attempt locating initial_app by name
}
initial_app = {
"authority": authority,
"client_id": "9793041b-9078-4942-b1d2-babdc472cc0c",
"client_secret": client_secret,
"scope": [midtier_app["client_id"] + "/.default"],
}
self._test_acquire_token_obo(initial_app, midtier_app)

def test_acquire_token_by_client_secret(self):
# Vastly different than ArlingtonCloudTestCase.test_acquire_token_by_client_secret()
_app = self.get_lab_app_object(
Expand Down
Loading