diff --git a/sdk/identity/azure-identity/azure/identity/_credentials/azure_cli.py b/sdk/identity/azure-identity/azure/identity/_credentials/azure_cli.py index 0c05f9ba95e9..715ef07fef8f 100644 --- a/sdk/identity/azure-identity/azure/identity/_credentials/azure_cli.py +++ b/sdk/identity/azure-identity/azure/identity/_credentials/azure_cli.py @@ -90,7 +90,7 @@ def close(self) -> None: def get_token( self, *scopes: str, - claims: Optional[str] = None, # pylint:disable=unused-argument + claims: Optional[str] = None, tenant_id: Optional[str] = None, **kwargs: Any, ) -> AccessToken: @@ -102,20 +102,23 @@ def get_token( :param str scopes: desired scope for the access token. This credential allows only one scope per request. For more information about scopes, see https://learn.microsoft.com/entra/identity-platform/scopes-oidc. - :keyword str claims: not used by this credential; any value provided will be ignored. + :keyword str claims: additional claims required in the token. This credential does not support claims + challenges. :keyword str tenant_id: optional tenant to include in the token request. :return: An access token with the desired scopes. :rtype: ~azure.core.credentials.AccessToken - :raises ~azure.identity.CredentialUnavailableError: the credential was unable to invoke the Azure CLI. + :raises ~azure.identity.CredentialUnavailableError: the credential was either unable to invoke the Azure CLI + or a claims challenge was provided. :raises ~azure.core.exceptions.ClientAuthenticationError: the credential invoked the Azure CLI but didn't receive an access token. """ - options: TokenRequestOptions = {} if tenant_id: options["tenant_id"] = tenant_id + if claims: + options["claims"] = claims token_info = self._get_token_base(*scopes, options=options, **kwargs) return AccessToken(token_info.token, token_info.expires_on) @@ -136,7 +139,8 @@ def get_token_info(self, *scopes: str, options: Optional[TokenRequestOptions] = :rtype: ~azure.core.credentials.AccessTokenInfo :return: An AccessTokenInfo instance containing information about the token. - :raises ~azure.identity.CredentialUnavailableError: the credential was unable to invoke the Azure CLI. + :raises ~azure.identity.CredentialUnavailableError: the credential was either unable to invoke the Azure CLI + or a claims challenge was provided. :raises ~azure.core.exceptions.ClientAuthenticationError: the credential invoked the Azure CLI but didn't receive an access token. """ @@ -146,6 +150,22 @@ def _get_token_base( self, *scopes: str, options: Optional[TokenRequestOptions] = None, **kwargs: Any ) -> AccessTokenInfo: + # Check for claims challenge first + claims_value = options.get("claims") if options else None + if claims_value and claims_value.strip(): + login_cmd = f"az login --claims-challenge {claims_value}" + + # Add tenant if provided in options + tenant_id_from_options = options.get("tenant_id") if options else None + if tenant_id_from_options: + login_cmd += f" --tenant {tenant_id_from_options}" + + # Add scope if provided + if scopes: + login_cmd += f" --scope {scopes[0]}" + + raise CredentialUnavailableError(f"Failed to get token. Run {login_cmd}") + tenant_id = options.get("tenant_id") if options else None if tenant_id: validate_tenant_id(tenant_id) diff --git a/sdk/identity/azure-identity/azure/identity/aio/_credentials/azure_cli.py b/sdk/identity/azure-identity/azure/identity/aio/_credentials/azure_cli.py index 90fa6af8442c..0b08cb85a879 100644 --- a/sdk/identity/azure-identity/azure/identity/aio/_credentials/azure_cli.py +++ b/sdk/identity/azure-identity/azure/identity/aio/_credentials/azure_cli.py @@ -82,7 +82,7 @@ def __init__( async def get_token( self, *scopes: str, - claims: Optional[str] = None, # pylint:disable=unused-argument + claims: Optional[str] = None, tenant_id: Optional[str] = None, **kwargs: Any, ) -> AccessToken: @@ -94,22 +94,26 @@ async def get_token( :param str scopes: desired scope for the access token. This credential allows only one scope per request. For more information about scopes, see https://learn.microsoft.com/entra/identity-platform/scopes-oidc. - :keyword str claims: not used by this credential; any value provided will be ignored. + :keyword str claims: additional claims required in the token. This credential does not support claims + challenges. :keyword str tenant_id: optional tenant to include in the token request. :return: An access token with the desired scopes. :rtype: ~azure.core.credentials.AccessToken - :raises ~azure.identity.CredentialUnavailableError: the credential was unable to invoke the Azure CLI. + :raises ~azure.identity.CredentialUnavailableError: the credential was either unable to invoke the Azure CLI + or a claims challenge was provided. :raises ~azure.core.exceptions.ClientAuthenticationError: the credential invoked the Azure CLI but didn't receive an access token. """ # only ProactorEventLoop supports subprocesses on Windows (and it isn't the default loop on Python < 3.8) if sys.platform.startswith("win") and not isinstance(asyncio.get_event_loop(), asyncio.ProactorEventLoop): - return _SyncAzureCliCredential().get_token(*scopes, tenant_id=tenant_id, **kwargs) + return _SyncAzureCliCredential().get_token(*scopes, claims=claims, tenant_id=tenant_id, **kwargs) options: TokenRequestOptions = {} if tenant_id: options["tenant_id"] = tenant_id + if claims: + options["claims"] = claims token_info = await self._get_token_base(*scopes, options=options, **kwargs) return AccessToken(token_info.token, token_info.expires_on) @@ -130,7 +134,8 @@ async def get_token_info(self, *scopes: str, options: Optional[TokenRequestOptio :rtype: ~azure.core.credentials.AccessTokenInfo :return: An AccessTokenInfo instance containing information about the token. - :raises ~azure.identity.CredentialUnavailableError: the credential was unable to invoke the Azure CLI. + :raises ~azure.identity.CredentialUnavailableError: the credential was either unable to invoke the Azure CLI + or a claims challenge was provided. :raises ~azure.core.exceptions.ClientAuthenticationError: the credential invoked the Azure CLI but didn't receive an access token. """ @@ -142,6 +147,22 @@ async def get_token_info(self, *scopes: str, options: Optional[TokenRequestOptio async def _get_token_base( self, *scopes: str, options: Optional[TokenRequestOptions] = None, **kwargs: Any ) -> AccessTokenInfo: + # Check for claims challenge first + claims_value = options.get("claims") if options else None + if claims_value and claims_value.strip(): + login_cmd = f"az login --claims-challenge {claims_value}" + + # Add tenant if provided in options + tenant_id_from_options = options.get("tenant_id") if options else None + if tenant_id_from_options: + login_cmd += f" --tenant {tenant_id_from_options}" + + # Add scope if provided + if scopes: + login_cmd += f" --scope {scopes[0]}" + + raise CredentialUnavailableError(f"Failed to get token. Run {login_cmd}") + tenant_id = options.get("tenant_id") if options else None if tenant_id: validate_tenant_id(tenant_id) diff --git a/sdk/identity/azure-identity/tests/test_cli_credential.py b/sdk/identity/azure-identity/tests/test_cli_credential.py index 039e7be199e0..4a566a408e01 100644 --- a/sdk/identity/azure-identity/tests/test_cli_credential.py +++ b/sdk/identity/azure-identity/tests/test_cli_credential.py @@ -395,3 +395,116 @@ def fake_check_output(command_line, **_): kwargs = {"options": kwargs} token = getattr(credential, get_token_method)("scope", **kwargs) assert token.token == expected_token + + +@pytest.mark.parametrize("get_token_method", GET_TOKEN_METHODS) +def test_claims_challenge_raises_error(get_token_method): + """The credential should raise CredentialUnavailableError when claims challenge is provided""" + + claims = "test-claims-challenge" + expected_message = f"Failed to get token. Run az login --claims-challenge {claims} --scope scope" + + if get_token_method == "get_token": + with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)): + AzureCliCredential().get_token("scope", claims=claims) + else: # get_token_info + with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)): + AzureCliCredential().get_token_info("scope", options={"claims": claims}) + + +@pytest.mark.parametrize("get_token_method", GET_TOKEN_METHODS) +def test_claims_challenge_without_scopes(get_token_method): + """The credential should raise CredentialUnavailableError with appropriate message even when no scopes provided""" + + claims = "test-claims-challenge" + expected_message = f"Failed to get token. Run az login --claims-challenge {claims}" + + if get_token_method == "get_token": + with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)): + AzureCliCredential().get_token(claims=claims) # No scopes provided + else: # get_token_info + with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)): + AzureCliCredential().get_token_info(options={"claims": claims}) # No scopes provided + + +@pytest.mark.parametrize("get_token_method", GET_TOKEN_METHODS) +def test_empty_claims_does_not_raise_error(get_token_method): + """The credential should not raise error when claims parameter is empty or None""" + + # Mock the CLI to avoid actual invocation + with mock.patch("shutil.which", return_value="az"): + with mock.patch( + CHECK_OUTPUT, mock.Mock(return_value='{"accessToken": "token", "expiresOn": "2021-10-07 12:00:00.000000"}') + ): + + if get_token_method == "get_token": + # Test with None (default) + token = AzureCliCredential().get_token("scope") + assert token.token == "token" + + # Test with empty string + token = AzureCliCredential().get_token("scope", claims="") + assert token.token == "token" + + # Test with None explicitly + token = AzureCliCredential().get_token("scope", claims=None) + assert token.token == "token" + + # Test with whitespace-only string + token = AzureCliCredential().get_token("scope", claims=" ") + assert token.token == "token" + else: # get_token_info + # Test with None options + token = AzureCliCredential().get_token_info("scope") + assert token.token == "token" + + # Test with empty options + token = AzureCliCredential().get_token_info("scope", options={}) + assert token.token == "token" + + # Test with None claims in options + token = AzureCliCredential().get_token_info("scope", options={"claims": None}) + assert token.token == "token" + + # Test with empty string claims in options + token = AzureCliCredential().get_token_info("scope", options={"claims": ""}) + assert token.token == "token" + + # Test with whitespace-only claims in options + token = AzureCliCredential().get_token_info("scope", options={"claims": " "}) + assert token.token == "token" + + +@pytest.mark.parametrize("get_token_method", GET_TOKEN_METHODS) +def test_claims_challenge_with_tenant(get_token_method): + """The credential should include tenant in the error message when claims and tenant are provided""" + + claims = "test-claims-challenge" + tenant_id = "test-tenant-id" + + if get_token_method == "get_token": + # Test with get_token - tenant passed via get_token parameter (should appear in options) + expected_message = f"Failed to get token. Run az login --claims-challenge {claims} --tenant {tenant_id} --scope scope" + with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)): + AzureCliCredential().get_token("scope", claims=claims, tenant_id=tenant_id) + else: # get_token_info + # Test with get_token_info - tenant passed via options + expected_message = f"Failed to get token. Run az login --claims-challenge {claims} --tenant {tenant_id} --scope scope" + with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)): + AzureCliCredential().get_token_info("scope", options={"claims": claims, "tenant_id": tenant_id}) + + +@pytest.mark.parametrize("get_token_method", GET_TOKEN_METHODS) +def test_claims_challenge_with_tenant_without_scopes(get_token_method): + """The credential should include tenant in error message when claims and tenant are provided but no scopes""" + + claims = "test-claims-challenge" + tenant_id = "test-tenant-id" + expected_message = f"Failed to get token. Run az login --claims-challenge {claims} --tenant {tenant_id}" + + if get_token_method == "get_token": + with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)): + AzureCliCredential().get_token(claims=claims, tenant_id=tenant_id) # No scopes provided + else: # get_token_info + with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)): + AzureCliCredential().get_token_info(options={"claims": claims, "tenant_id": tenant_id}) # No scopes provided diff --git a/sdk/identity/azure-identity/tests/test_cli_credential_async.py b/sdk/identity/azure-identity/tests/test_cli_credential_async.py index b5b727c6a238..b41a936e7593 100644 --- a/sdk/identity/azure-identity/tests/test_cli_credential_async.py +++ b/sdk/identity/azure-identity/tests/test_cli_credential_async.py @@ -389,3 +389,126 @@ async def fake_exec(*args, **_): kwargs = {"options": kwargs} token = await getattr(credential, get_token_method)("scope", **kwargs) assert token.token == expected_token + + +@pytest.mark.parametrize("get_token_method", GET_TOKEN_METHODS) +async def test_claims_challenge_raises_error(get_token_method): + """The credential should raise CredentialUnavailableError when claims challenge is provided""" + + claims = "test-claims-challenge" + expected_message = f"Failed to get token. Run az login --claims-challenge {claims} --scope scope" + + if get_token_method == "get_token": + with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)): + await AzureCliCredential().get_token("scope", claims=claims) + else: # get_token_info + with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)): + await AzureCliCredential().get_token_info("scope", options={"claims": claims}) + + +@pytest.mark.parametrize("get_token_method", GET_TOKEN_METHODS) +async def test_claims_challenge_without_scopes(get_token_method): + """The credential should raise CredentialUnavailableError with appropriate message even when no scopes provided""" + + claims = "test-claims-challenge" + expected_message = f"Failed to get token. Run az login --claims-challenge {claims}" + + if get_token_method == "get_token": + with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)): + await AzureCliCredential().get_token(claims=claims) # No scopes provided + else: # get_token_info + with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)): + await AzureCliCredential().get_token_info(options={"claims": claims}) # No scopes provided + + +@pytest.mark.parametrize("get_token_method", GET_TOKEN_METHODS) +async def test_empty_claims_does_not_raise_error(get_token_method): + """The credential should not raise error when claims parameter is empty or None""" + + successful_output = json.dumps( + { + "expiresOn": datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"), + "accessToken": "access-token", + "subscription": "subscription", + "tenant": "tenant", + "tokenType": "Bearer", + } + ) + + # Mock the CLI to avoid actual invocation + with mock.patch("shutil.which", return_value="az"): + with mock.patch(SUBPROCESS_EXEC, mock_exec(successful_output)): + + if get_token_method == "get_token": + # Test with None (default) + token = await AzureCliCredential().get_token("scope") + assert token.token == "access-token" + + # Test with empty string + token = await AzureCliCredential().get_token("scope", claims="") + assert token.token == "access-token" + + # Test with None explicitly + token = await AzureCliCredential().get_token("scope", claims=None) + assert token.token == "access-token" + + # Test with whitespace-only string + token = await AzureCliCredential().get_token("scope", claims=" ") + assert token.token == "access-token" + else: # get_token_info + # Test with None options + token = await AzureCliCredential().get_token_info("scope") + assert token.token == "access-token" + + # Test with empty options + token = await AzureCliCredential().get_token_info("scope", options={}) + assert token.token == "access-token" + + # Test with None claims in options + token = await AzureCliCredential().get_token_info("scope", options={"claims": None}) + assert token.token == "access-token" + + # Test with empty string claims in options + token = await AzureCliCredential().get_token_info("scope", options={"claims": ""}) + assert token.token == "access-token" + + # Test with whitespace-only claims in options + token = await AzureCliCredential().get_token_info("scope", options={"claims": " "}) + assert token.token == "access-token" + + +@pytest.mark.parametrize("get_token_method", GET_TOKEN_METHODS) +@pytest.mark.asyncio +async def test_claims_challenge_with_tenant(get_token_method): + """The credential should include tenant in the error message when claims and tenant are provided""" + + claims = "test-claims-challenge" + tenant_id = "test-tenant-id" + + if get_token_method == "get_token": + # Test with get_token - tenant passed via get_token parameter (should appear in options) + expected_message = f"Failed to get token. Run az login --claims-challenge {claims} --tenant {tenant_id} --scope scope" + with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)): + await AzureCliCredential().get_token("scope", claims=claims, tenant_id=tenant_id) + else: # get_token_info + # Test with get_token_info - tenant passed via options + expected_message = f"Failed to get token. Run az login --claims-challenge {claims} --tenant {tenant_id} --scope scope" + with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)): + await AzureCliCredential().get_token_info("scope", options={"claims": claims, "tenant_id": tenant_id}) + + +@pytest.mark.parametrize("get_token_method", GET_TOKEN_METHODS) +@pytest.mark.asyncio +async def test_claims_challenge_with_tenant_without_scopes(get_token_method): + """The credential should include tenant in error message when claims and tenant are provided but no scopes""" + + claims = "test-claims-challenge" + tenant_id = "test-tenant-id" + expected_message = f"Failed to get token. Run az login --claims-challenge {claims} --tenant {tenant_id}" + + if get_token_method == "get_token": + with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)): + await AzureCliCredential().get_token(claims=claims, tenant_id=tenant_id) # No scopes provided + else: # get_token_info + with pytest.raises(CredentialUnavailableError, match=re.escape(expected_message)): + await AzureCliCredential().get_token_info(options={"claims": claims, "tenant_id": tenant_id}) # No scopes provided