1+ import os
12import json
3+ import base64
24from urllib .parse import urlparse , urlencode , parse_qsl
35
46import pytest
2022
2123
2224CLIENT_ID = "client1"
25+ CLIENT_SECRET = "secret"
26+ CLIENT_REDIRECT_URI = "https://client.example.com/cb"
2327REDIRECT_URI = "https://client.example.com/cb"
2428
29+ @pytest .fixture (scope = "session" )
30+ def client_db_path (tmpdir_factory ):
31+ tmpdir = str (tmpdir_factory .getbasetemp ())
32+ path = os .path .join (tmpdir , "cdb.json" )
33+ cdb_json = {
34+ CLIENT_ID : {
35+ "response_types" : ["id_token" , "code" ],
36+ "redirect_uris" : [
37+ CLIENT_REDIRECT_URI
38+ ],
39+ "client_secret" : CLIENT_SECRET
40+ }
41+ }
42+ with open (path , "w" ) as f :
43+ f .write (json .dumps (cdb_json ))
44+
45+ return path
2546
2647@pytest .fixture
2748def oidc_frontend_config (signing_key_path , mongodb_instance ):
@@ -47,6 +68,25 @@ def oidc_frontend_config(signing_key_path, mongodb_instance):
4768 return data
4869
4970
71+ @pytest .fixture
72+ def oidc_stateless_frontend_config (signing_key_path , client_db_path ):
73+ data = {
74+ "module" : "satosa.frontends.openid_connect.OpenIDConnectFrontend" ,
75+ "name" : "OIDCFrontend" ,
76+ "config" : {
77+ "issuer" : "https://proxy-op.example.com" ,
78+ "signing_key_path" : signing_key_path ,
79+ "client_db_path" : client_db_path ,
80+ "db_uri" : "stateless://user:abc123@localhost" ,
81+ "provider" : {
82+ "response_types_supported" : ["id_token" , "code" ]
83+ }
84+ }
85+ }
86+
87+ return data
88+
89+
5090class TestOIDCToSAML :
5191 def test_full_flow (self , satosa_config_dict , oidc_frontend_config , saml_backend_config , idp_conf ):
5292 subject_id = "testuser1"
@@ -105,3 +145,134 @@ def test_full_flow(self, satosa_config_dict, oidc_frontend_config, saml_backend_
105145 (name , values ) in id_token_claims .items ()
106146 for name , values in OIDC_USERS [subject_id ].items ()
107147 )
148+
149+ def test_full_stateless_id_token_flow (self , satosa_config_dict , oidc_stateless_frontend_config , saml_backend_config , idp_conf ):
150+ subject_id = "testuser1"
151+
152+ # proxy config
153+ satosa_config_dict ["FRONTEND_MODULES" ] = [oidc_stateless_frontend_config ]
154+ satosa_config_dict ["BACKEND_MODULES" ] = [saml_backend_config ]
155+ satosa_config_dict ["INTERNAL_ATTRIBUTES" ]["attributes" ] = {attr_name : {"openid" : [attr_name ],
156+ "saml" : [attr_name ]}
157+ for attr_name in USERS [subject_id ]}
158+ _ , backend_metadata = create_entity_descriptors (SATOSAConfig (satosa_config_dict ))
159+
160+ # application
161+ test_client = Client (make_app (SATOSAConfig (satosa_config_dict )), Response )
162+
163+ # get frontend OP config info
164+ provider_config = json .loads (test_client .get ("/.well-known/openid-configuration" ).data .decode ("utf-8" ))
165+
166+ # create auth req
167+ claims_request = ClaimsRequest (id_token = Claims (** {k : None for k in USERS [subject_id ]}))
168+ req_args = {"scope" : "openid" , "response_type" : "id_token" , "client_id" : CLIENT_ID ,
169+ "redirect_uri" : REDIRECT_URI , "nonce" : "nonce" ,
170+ "claims" : claims_request .to_json ()}
171+ auth_req = urlparse (provider_config ["authorization_endpoint" ]).path + "?" + urlencode (req_args )
172+
173+ # make auth req to proxy
174+ proxied_auth_req = test_client .get (auth_req )
175+ assert proxied_auth_req .status == "303 See Other"
176+
177+ # config test IdP
178+ backend_metadata_str = str (backend_metadata [saml_backend_config ["name" ]][0 ])
179+ idp_conf ["metadata" ]["inline" ].append (backend_metadata_str )
180+ fakeidp = FakeIdP (USERS , config = IdPConfig ().load (idp_conf ))
181+
182+ # create auth resp
183+ req_params = dict (parse_qsl (urlparse (proxied_auth_req .data .decode ("utf-8" )).query ))
184+ url , authn_resp = fakeidp .handle_auth_req (
185+ req_params ["SAMLRequest" ],
186+ req_params ["RelayState" ],
187+ BINDING_HTTP_REDIRECT ,
188+ subject_id ,
189+ response_binding = BINDING_HTTP_REDIRECT )
190+
191+ # make auth resp to proxy
192+ authn_resp_req = urlparse (url ).path + "?" + urlencode (authn_resp )
193+ authn_resp = test_client .get (authn_resp_req )
194+ assert authn_resp .status == "303 See Other"
195+
196+ # verify auth resp from proxy
197+ resp_dict = dict (parse_qsl (urlparse (authn_resp .data .decode ("utf-8" )).fragment ))
198+ signing_key = RSAKey (key = rsa_load (oidc_stateless_frontend_config ["config" ]["signing_key_path" ]),
199+ use = "sig" , alg = "RS256" )
200+ id_token_claims = JWS ().verify_compact (resp_dict ["id_token" ], keys = [signing_key ])
201+
202+ assert all (
203+ (name , values ) in id_token_claims .items ()
204+ for name , values in OIDC_USERS [subject_id ].items ()
205+ )
206+
207+ def test_full_stateless_code_flow (self , satosa_config_dict , oidc_stateless_frontend_config , saml_backend_config , idp_conf ):
208+ subject_id = "testuser1"
209+
210+ # proxy config
211+ satosa_config_dict ["FRONTEND_MODULES" ] = [oidc_stateless_frontend_config ]
212+ satosa_config_dict ["BACKEND_MODULES" ] = [saml_backend_config ]
213+ satosa_config_dict ["INTERNAL_ATTRIBUTES" ]["attributes" ] = {attr_name : {"openid" : [attr_name ],
214+ "saml" : [attr_name ]}
215+ for attr_name in USERS [subject_id ]}
216+ _ , backend_metadata = create_entity_descriptors (SATOSAConfig (satosa_config_dict ))
217+
218+ # application
219+ test_client = Client (make_app (SATOSAConfig (satosa_config_dict )), Response )
220+
221+ # get frontend OP config info
222+ provider_config = json .loads (test_client .get ("/.well-known/openid-configuration" ).data .decode ("utf-8" ))
223+
224+ # create auth req
225+ claims_request = ClaimsRequest (id_token = Claims (** {k : None for k in USERS [subject_id ]}))
226+ req_args = {"scope" : "openid" , "response_type" : "code" , "client_id" : CLIENT_ID ,
227+ "redirect_uri" : REDIRECT_URI , "nonce" : "nonce" ,
228+ "claims" : claims_request .to_json ()}
229+ auth_req = urlparse (provider_config ["authorization_endpoint" ]).path + "?" + urlencode (req_args )
230+
231+ # make auth req to proxy
232+ proxied_auth_req = test_client .get (auth_req )
233+ assert proxied_auth_req .status == "303 See Other"
234+
235+ # config test IdP
236+ backend_metadata_str = str (backend_metadata [saml_backend_config ["name" ]][0 ])
237+ idp_conf ["metadata" ]["inline" ].append (backend_metadata_str )
238+ fakeidp = FakeIdP (USERS , config = IdPConfig ().load (idp_conf ))
239+
240+ # create auth resp
241+ req_params = dict (parse_qsl (urlparse (proxied_auth_req .data .decode ("utf-8" )).query ))
242+ url , authn_resp = fakeidp .handle_auth_req (
243+ req_params ["SAMLRequest" ],
244+ req_params ["RelayState" ],
245+ BINDING_HTTP_REDIRECT ,
246+ subject_id ,
247+ response_binding = BINDING_HTTP_REDIRECT )
248+
249+ # make auth resp to proxy
250+ authn_resp_req = urlparse (url ).path + "?" + urlencode (authn_resp )
251+ authn_resp = test_client .get (authn_resp_req )
252+ assert authn_resp .status == "303 See Other"
253+
254+ resp_dict = dict (parse_qsl (urlparse (authn_resp .data .decode ("utf-8" )).query ))
255+ code = resp_dict ["code" ]
256+ client_id_secret_str = CLIENT_ID + ":" + CLIENT_SECRET
257+ auth_header = "Basic %s" % base64 .b64encode (client_id_secret_str .encode ()).decode ()
258+
259+ authn_resp = test_client .post (provider_config ["token_endpoint" ],
260+ data = {
261+ "code" : code ,
262+ "grant_type" : "authorization_code" ,
263+ "redirect_uri" : CLIENT_REDIRECT_URI
264+ },
265+ headers = {'Authorization' : auth_header })
266+
267+ assert authn_resp .status == "200 OK"
268+
269+ # verify auth resp from proxy
270+ resp_dict = json .loads (authn_resp .data .decode ("utf-8" ))
271+ signing_key = RSAKey (key = rsa_load (oidc_stateless_frontend_config ["config" ]["signing_key_path" ]),
272+ use = "sig" , alg = "RS256" )
273+ id_token_claims = JWS ().verify_compact (resp_dict ["id_token" ], keys = [signing_key ])
274+
275+ assert all (
276+ (name , values ) in id_token_claims .items ()
277+ for name , values in OIDC_USERS [subject_id ].items ()
278+ )
0 commit comments