Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def close(self) -> None:
def get_token(
self,
*scopes: str,
claims: Optional[str] = None, # pylint:disable=unused-argument
claims: Optional[str] = None,
tenant_id: Optional[str] = None,
**kwargs: Any,
) -> AccessToken:
Expand All @@ -102,20 +102,23 @@ def get_token(
:param str scopes: desired scope for the access token. This credential allows only one scope per request.
For more information about scopes, see
https://learn.microsoft.com/entra/identity-platform/scopes-oidc.
:keyword str claims: not used by this credential; any value provided will be ignored.
:keyword str claims: additional claims required in the token. This credential does not support claims
challenges.
:keyword str tenant_id: optional tenant to include in the token request.

:return: An access token with the desired scopes.
:rtype: ~azure.core.credentials.AccessToken

:raises ~azure.identity.CredentialUnavailableError: the credential was unable to invoke the Azure CLI.
:raises ~azure.identity.CredentialUnavailableError: the credential was either unable to invoke the Azure CLI
or a claims challenge was provided.
:raises ~azure.core.exceptions.ClientAuthenticationError: the credential invoked the Azure CLI but didn't
receive an access token.
"""

options: TokenRequestOptions = {}
if tenant_id:
options["tenant_id"] = tenant_id
if claims:
options["claims"] = claims

token_info = self._get_token_base(*scopes, options=options, **kwargs)
return AccessToken(token_info.token, token_info.expires_on)
Expand All @@ -136,7 +139,8 @@ def get_token_info(self, *scopes: str, options: Optional[TokenRequestOptions] =
:rtype: ~azure.core.credentials.AccessTokenInfo
:return: An AccessTokenInfo instance containing information about the token.

:raises ~azure.identity.CredentialUnavailableError: the credential was unable to invoke the Azure CLI.
:raises ~azure.identity.CredentialUnavailableError: the credential was either unable to invoke the Azure CLI
or a claims challenge was provided.
:raises ~azure.core.exceptions.ClientAuthenticationError: the credential invoked the Azure CLI but didn't
receive an access token.
"""
Expand All @@ -146,6 +150,22 @@ def _get_token_base(
self, *scopes: str, options: Optional[TokenRequestOptions] = None, **kwargs: Any
) -> AccessTokenInfo:

# Check for claims challenge first
claims_value = options.get("claims") if options else None
if claims_value and claims_value.strip():
login_cmd = f"az login --claims-challenge {claims_value}"

# Add tenant if provided in options
tenant_id_from_options = options.get("tenant_id") if options else None
if tenant_id_from_options:
login_cmd += f" --tenant {tenant_id_from_options}"

# Add scope if provided
if scopes:
login_cmd += f" --scope {scopes[0]}"

raise CredentialUnavailableError(f"Failed to get token. Run {login_cmd}")
Copy link
Member

@pvaneck pvaneck Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot Extract the base error message into a constant that can be used in both azure_cli.py files, and make the error message more descriptive. Something like: "Claims challenge received, but not supported. To authenticate with the required claims, please run the following command: ". Then you can append the login_cmd to the message.


tenant_id = options.get("tenant_id") if options else None
if tenant_id:
validate_tenant_id(tenant_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def __init__(
async def get_token(
self,
*scopes: str,
claims: Optional[str] = None, # pylint:disable=unused-argument
claims: Optional[str] = None,
tenant_id: Optional[str] = None,
**kwargs: Any,
) -> AccessToken:
Expand All @@ -94,22 +94,26 @@ async def get_token(
:param str scopes: desired scope for the access token. This credential allows only one scope per request.
For more information about scopes, see
https://learn.microsoft.com/entra/identity-platform/scopes-oidc.
:keyword str claims: not used by this credential; any value provided will be ignored.
:keyword str claims: additional claims required in the token. This credential does not support claims
challenges.
:keyword str tenant_id: optional tenant to include in the token request.

:return: An access token with the desired scopes.
:rtype: ~azure.core.credentials.AccessToken
:raises ~azure.identity.CredentialUnavailableError: the credential was unable to invoke the Azure CLI.
:raises ~azure.identity.CredentialUnavailableError: the credential was either unable to invoke the Azure CLI
or a claims challenge was provided.
:raises ~azure.core.exceptions.ClientAuthenticationError: the credential invoked the Azure CLI but didn't
receive an access token.
"""
# only ProactorEventLoop supports subprocesses on Windows (and it isn't the default loop on Python < 3.8)
if sys.platform.startswith("win") and not isinstance(asyncio.get_event_loop(), asyncio.ProactorEventLoop):
return _SyncAzureCliCredential().get_token(*scopes, tenant_id=tenant_id, **kwargs)
return _SyncAzureCliCredential().get_token(*scopes, claims=claims, tenant_id=tenant_id, **kwargs)

options: TokenRequestOptions = {}
if tenant_id:
options["tenant_id"] = tenant_id
if claims:
options["claims"] = claims

token_info = await self._get_token_base(*scopes, options=options, **kwargs)
return AccessToken(token_info.token, token_info.expires_on)
Expand All @@ -130,7 +134,8 @@ async def get_token_info(self, *scopes: str, options: Optional[TokenRequestOptio
:rtype: ~azure.core.credentials.AccessTokenInfo
:return: An AccessTokenInfo instance containing information about the token.

:raises ~azure.identity.CredentialUnavailableError: the credential was unable to invoke the Azure CLI.
:raises ~azure.identity.CredentialUnavailableError: the credential was either unable to invoke the Azure CLI
or a claims challenge was provided.
:raises ~azure.core.exceptions.ClientAuthenticationError: the credential invoked the Azure CLI but didn't
receive an access token.
"""
Expand All @@ -142,6 +147,22 @@ async def get_token_info(self, *scopes: str, options: Optional[TokenRequestOptio
async def _get_token_base(
self, *scopes: str, options: Optional[TokenRequestOptions] = None, **kwargs: Any
) -> AccessTokenInfo:
# Check for claims challenge first
claims_value = options.get("claims") if options else None
if claims_value and claims_value.strip():
login_cmd = f"az login --claims-challenge {claims_value}"

# Add tenant if provided in options
tenant_id_from_options = options.get("tenant_id") if options else None
if tenant_id_from_options:
login_cmd += f" --tenant {tenant_id_from_options}"

# Add scope if provided
if scopes:
login_cmd += f" --scope {scopes[0]}"

raise CredentialUnavailableError(f"Failed to get token. Run {login_cmd}")

tenant_id = options.get("tenant_id") if options else None
if tenant_id:
validate_tenant_id(tenant_id)
Expand Down
113 changes: 113 additions & 0 deletions sdk/identity/azure-identity/tests/test_cli_credential.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,3 +395,116 @@ def fake_check_output(command_line, **_):
kwargs = {"options": kwargs}
token = getattr(credential, get_token_method)("scope", **kwargs)
assert token.token == expected_token


@pytest.mark.parametrize("get_token_method", GET_TOKEN_METHODS)
def test_claims_challenge_raises_error(get_token_method):
"""The credential should raise CredentialUnavailableError when claims challenge is provided"""

claims = "test-claims-challenge"
expected_message = f"Failed to get token. Run az login --claims-challenge {claims} --scope scope"

if get_token_method == "get_token":
with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)):
AzureCliCredential().get_token("scope", claims=claims)
else: # get_token_info
with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)):
AzureCliCredential().get_token_info("scope", options={"claims": claims})


@pytest.mark.parametrize("get_token_method", GET_TOKEN_METHODS)
def test_claims_challenge_without_scopes(get_token_method):
"""The credential should raise CredentialUnavailableError with appropriate message even when no scopes provided"""

claims = "test-claims-challenge"
expected_message = f"Failed to get token. Run az login --claims-challenge {claims}"

if get_token_method == "get_token":
with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)):
AzureCliCredential().get_token(claims=claims) # No scopes provided
else: # get_token_info
with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)):
AzureCliCredential().get_token_info(options={"claims": claims}) # No scopes provided


@pytest.mark.parametrize("get_token_method", GET_TOKEN_METHODS)
def test_empty_claims_does_not_raise_error(get_token_method):
"""The credential should not raise error when claims parameter is empty or None"""

# Mock the CLI to avoid actual invocation
with mock.patch("shutil.which", return_value="az"):
with mock.patch(
CHECK_OUTPUT, mock.Mock(return_value='{"accessToken": "token", "expiresOn": "2021-10-07 12:00:00.000000"}')
Copy link
Preview

Copilot AI Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardcoded datetime string '2021-10-07 12:00:00.000000' in the mock response should use a dynamic date or a more recent date to avoid potential issues with token expiration validation.

Suggested change
CHECK_OUTPUT, mock.Mock(return_value='{"accessToken": "token", "expiresOn": "2021-10-07 12:00:00.000000"}')
CHECK_OUTPUT, mock.Mock(return_value=json.dumps({"accessToken": "token", "expiresOn": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")}))

Copilot uses AI. Check for mistakes.

):

if get_token_method == "get_token":
# Test with None (default)
token = AzureCliCredential().get_token("scope")
assert token.token == "token"

# Test with empty string
token = AzureCliCredential().get_token("scope", claims="")
assert token.token == "token"

# Test with None explicitly
token = AzureCliCredential().get_token("scope", claims=None)
assert token.token == "token"

# Test with whitespace-only string
token = AzureCliCredential().get_token("scope", claims=" ")
assert token.token == "token"
else: # get_token_info
# Test with None options
token = AzureCliCredential().get_token_info("scope")
assert token.token == "token"

# Test with empty options
token = AzureCliCredential().get_token_info("scope", options={})
assert token.token == "token"

# Test with None claims in options
token = AzureCliCredential().get_token_info("scope", options={"claims": None})
assert token.token == "token"

# Test with empty string claims in options
token = AzureCliCredential().get_token_info("scope", options={"claims": ""})
assert token.token == "token"

# Test with whitespace-only claims in options
token = AzureCliCredential().get_token_info("scope", options={"claims": " "})
assert token.token == "token"


@pytest.mark.parametrize("get_token_method", GET_TOKEN_METHODS)
def test_claims_challenge_with_tenant(get_token_method):
"""The credential should include tenant in the error message when claims and tenant are provided"""

claims = "test-claims-challenge"
tenant_id = "test-tenant-id"

if get_token_method == "get_token":
# Test with get_token - tenant passed via get_token parameter (should appear in options)
expected_message = f"Failed to get token. Run az login --claims-challenge {claims} --tenant {tenant_id} --scope scope"
with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)):
AzureCliCredential().get_token("scope", claims=claims, tenant_id=tenant_id)
else: # get_token_info
# Test with get_token_info - tenant passed via options
expected_message = f"Failed to get token. Run az login --claims-challenge {claims} --tenant {tenant_id} --scope scope"
with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)):
AzureCliCredential().get_token_info("scope", options={"claims": claims, "tenant_id": tenant_id})


@pytest.mark.parametrize("get_token_method", GET_TOKEN_METHODS)
def test_claims_challenge_with_tenant_without_scopes(get_token_method):
"""The credential should include tenant in error message when claims and tenant are provided but no scopes"""

claims = "test-claims-challenge"
tenant_id = "test-tenant-id"
expected_message = f"Failed to get token. Run az login --claims-challenge {claims} --tenant {tenant_id}"

if get_token_method == "get_token":
with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)):
AzureCliCredential().get_token(claims=claims, tenant_id=tenant_id) # No scopes provided
else: # get_token_info
with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)):
AzureCliCredential().get_token_info(options={"claims": claims, "tenant_id": tenant_id}) # No scopes provided
123 changes: 123 additions & 0 deletions sdk/identity/azure-identity/tests/test_cli_credential_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,3 +389,126 @@ async def fake_exec(*args, **_):
kwargs = {"options": kwargs}
token = await getattr(credential, get_token_method)("scope", **kwargs)
assert token.token == expected_token


@pytest.mark.parametrize("get_token_method", GET_TOKEN_METHODS)
async def test_claims_challenge_raises_error(get_token_method):
"""The credential should raise CredentialUnavailableError when claims challenge is provided"""

claims = "test-claims-challenge"
expected_message = f"Failed to get token. Run az login --claims-challenge {claims} --scope scope"

if get_token_method == "get_token":
with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)):
await AzureCliCredential().get_token("scope", claims=claims)
else: # get_token_info
with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)):
await AzureCliCredential().get_token_info("scope", options={"claims": claims})


@pytest.mark.parametrize("get_token_method", GET_TOKEN_METHODS)
async def test_claims_challenge_without_scopes(get_token_method):
"""The credential should raise CredentialUnavailableError with appropriate message even when no scopes provided"""

claims = "test-claims-challenge"
expected_message = f"Failed to get token. Run az login --claims-challenge {claims}"

if get_token_method == "get_token":
with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)):
await AzureCliCredential().get_token(claims=claims) # No scopes provided
else: # get_token_info
with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)):
await AzureCliCredential().get_token_info(options={"claims": claims}) # No scopes provided


@pytest.mark.parametrize("get_token_method", GET_TOKEN_METHODS)
async def test_empty_claims_does_not_raise_error(get_token_method):
"""The credential should not raise error when claims parameter is empty or None"""

successful_output = json.dumps(
{
"expiresOn": datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"),
Copy link
Preview

Copilot AI Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using datetime.now() without importing datetime module from the datetime package. Should be 'from datetime import datetime' or use a fixed future date for consistency.

Suggested change
"expiresOn": datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"),
"expiresOn": "2023-01-01 00:00:00.000000",

Copilot uses AI. Check for mistakes.

"accessToken": "access-token",
"subscription": "subscription",
"tenant": "tenant",
"tokenType": "Bearer",
}
)

# Mock the CLI to avoid actual invocation
with mock.patch("shutil.which", return_value="az"):
with mock.patch(SUBPROCESS_EXEC, mock_exec(successful_output)):

if get_token_method == "get_token":
# Test with None (default)
token = await AzureCliCredential().get_token("scope")
assert token.token == "access-token"

# Test with empty string
token = await AzureCliCredential().get_token("scope", claims="")
assert token.token == "access-token"

# Test with None explicitly
token = await AzureCliCredential().get_token("scope", claims=None)
assert token.token == "access-token"

# Test with whitespace-only string
token = await AzureCliCredential().get_token("scope", claims=" ")
assert token.token == "access-token"
else: # get_token_info
# Test with None options
token = await AzureCliCredential().get_token_info("scope")
assert token.token == "access-token"

# Test with empty options
token = await AzureCliCredential().get_token_info("scope", options={})
assert token.token == "access-token"

# Test with None claims in options
token = await AzureCliCredential().get_token_info("scope", options={"claims": None})
assert token.token == "access-token"

# Test with empty string claims in options
token = await AzureCliCredential().get_token_info("scope", options={"claims": ""})
assert token.token == "access-token"

# Test with whitespace-only claims in options
token = await AzureCliCredential().get_token_info("scope", options={"claims": " "})
assert token.token == "access-token"


@pytest.mark.parametrize("get_token_method", GET_TOKEN_METHODS)
@pytest.mark.asyncio
async def test_claims_challenge_with_tenant(get_token_method):
"""The credential should include tenant in the error message when claims and tenant are provided"""

claims = "test-claims-challenge"
tenant_id = "test-tenant-id"

if get_token_method == "get_token":
# Test with get_token - tenant passed via get_token parameter (should appear in options)
expected_message = f"Failed to get token. Run az login --claims-challenge {claims} --tenant {tenant_id} --scope scope"
with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)):
await AzureCliCredential().get_token("scope", claims=claims, tenant_id=tenant_id)
else: # get_token_info
# Test with get_token_info - tenant passed via options
expected_message = f"Failed to get token. Run az login --claims-challenge {claims} --tenant {tenant_id} --scope scope"
with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)):
await AzureCliCredential().get_token_info("scope", options={"claims": claims, "tenant_id": tenant_id})


@pytest.mark.parametrize("get_token_method", GET_TOKEN_METHODS)
@pytest.mark.asyncio
async def test_claims_challenge_with_tenant_without_scopes(get_token_method):
"""The credential should include tenant in error message when claims and tenant are provided but no scopes"""

claims = "test-claims-challenge"
tenant_id = "test-tenant-id"
expected_message = f"Failed to get token. Run az login --claims-challenge {claims} --tenant {tenant_id}"

if get_token_method == "get_token":
with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)):
await AzureCliCredential().get_token(claims=claims, tenant_id=tenant_id) # No scopes provided
else: # get_token_info
with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)):
await AzureCliCredential().get_token_info(options={"claims": claims, "tenant_id": tenant_id}) # No scopes provided
Loading