44from typing import Optional
55
66from cryptojwt .jwk .jwk import key_from_jwk_dict
7- from cryptojwt .jws .jws import JWS
87from cryptojwt .jws .jws import factory
8+ from cryptojwt .jws .jws import JWS
99from cryptojwt .key_bundle import key_by_alg
1010
11+ from idpyoidc .client .client_auth import BearerHeader
12+ from idpyoidc .client .client_auth import find_token_info
1113from idpyoidc .client .service_context import ServiceContext
14+ from idpyoidc .message import Message
1215from idpyoidc .message import SINGLE_OPTIONAL_STRING
1316from idpyoidc .message import SINGLE_REQUIRED_INT
1417from idpyoidc .message import SINGLE_REQUIRED_JSON
1518from idpyoidc .message import SINGLE_REQUIRED_STRING
16- from idpyoidc .message import Message
1719from idpyoidc .metadata import get_signing_algs
1820from idpyoidc .time_util import utc_time_sans_frac
1921
@@ -91,13 +93,13 @@ def verify_header(self, dpop_header) -> Optional["DPoPProof"]:
9193
9294
9395def dpop_header (
94- service_context : ServiceContext ,
95- service_endpoint : str ,
96- http_method : str ,
97- headers : Optional [dict ] = None ,
98- token : Optional [str ] = "" ,
99- nonce : Optional [str ] = "" ,
100- ** kwargs
96+ service_context : ServiceContext ,
97+ service_endpoint : str ,
98+ http_method : str ,
99+ headers : Optional [dict ] = None ,
100+ token : Optional [str ] = "" ,
101+ nonce : Optional [str ] = "" ,
102+ ** kwargs
101103) -> dict :
102104 """
103105
@@ -159,7 +161,7 @@ def dpop_header(
159161 return headers
160162
161163
162- def add_support (services , dpop_signing_alg_values_supported ):
164+ def add_support (services , dpop_signing_alg_values_supported , with_dpop_header = None ):
163165 """
164166 Add the necessary pieces to make pushed authorization happen.
165167
@@ -185,3 +187,53 @@ def add_support(services, dpop_signing_alg_values_supported):
185187 _userinfo_service = services .get ("userinfo" )
186188 if _userinfo_service :
187189 _userinfo_service .construct_extra_headers .append (dpop_header )
190+ # To be backward compatible
191+ if with_dpop_header is None :
192+ with_dpop_header = ["userinfo" ]
193+
194+ # Add dpop HTTP header to these
195+ for _srv in with_dpop_header :
196+ if _srv == "accesstoken" :
197+ continue
198+ _service = services .get (_srv )
199+ if _service :
200+ _service .construct_extra_headers .append (dpop_header )
201+
202+
203+ class DPoPClientAuth (BearerHeader ):
204+ tag = "dpop_client_auth"
205+
206+ def construct (self , request = None , service = None , http_args = None , ** kwargs ):
207+ """
208+ Constructing the Authorization header. The value of
209+ the Authorization header is "Bearer <access_token>".
210+
211+ :param request: Request class instance
212+ :param service: The service this authentication method applies to.
213+ :param http_args: HTTP header arguments
214+ :param kwargs: extra keyword arguments
215+ :return:
216+ """
217+
218+ _token_type = "access_token"
219+
220+ _token_info = find_token_info (request , _token_type , service , ** kwargs )
221+
222+ if not _token_info :
223+ raise KeyError ("No bearer token available" )
224+
225+ # The authorization value starts with the token_type
226+ # if _token_info["token_type"].to_lower() != "bearer":
227+ _bearer = f"DPoP { _token_info [_token_type ]} "
228+
229+ # Add 'Authorization' to the headers
230+ if http_args is None :
231+ http_args = {"headers" : {}}
232+ http_args ["headers" ]["Authorization" ] = _bearer
233+ else :
234+ try :
235+ http_args ["headers" ]["Authorization" ] = _bearer
236+ except KeyError :
237+ http_args ["headers" ] = {"Authorization" : _bearer }
238+
239+ return http_args
0 commit comments