From eee9c72ad192062e536d943282060708d92699df Mon Sep 17 00:00:00 2001 From: Antonis Angelakis Date: Wed, 2 Sep 2020 18:50:05 +0300 Subject: [PATCH 1/2] Implement token exchange For a first implementation, we would like a simple functionality to create new access tokens with altered scopes. Through grant_types_supported a customized class can be set to handle token exchange so an app can implement any missing functionality it requires. --- src/oidcendpoint/oidc/token.py | 180 ++++++++++++++++++++++++++++++++- 1 file changed, 177 insertions(+), 3 deletions(-) diff --git a/src/oidcendpoint/oidc/token.py b/src/oidcendpoint/oidc/token.py index 0e51713..bead868 100755 --- a/src/oidcendpoint/oidc/token.py +++ b/src/oidcendpoint/oidc/token.py @@ -1,13 +1,16 @@ import logging from typing import Optional from typing import Union +from urllib.parse import urlparse +from cryptojwt.exception import JWKESTException from cryptojwt.jwe.exception import JWEException from cryptojwt.jws.exception import NoSuitableSigningKeys from cryptojwt.jwt import utc_time_sans_frac from oidcmsg import oidc from oidcmsg.message import Message -from oidcmsg.oauth2 import ResponseMessage +from oidcmsg.exception import MissingRequiredValue, MissingRequiredAttribute +from oidcmsg.oauth2 import TokenExchangeRequest, ResponseMessage, TokenExchangeResponse from oidcmsg.oidc import RefreshAccessTokenRequest from oidcmsg.oidc import TokenErrorResponse from oidcmsg.time_util import time_sans_frac @@ -16,11 +19,12 @@ from oidcendpoint.cookie import new_cookie from oidcendpoint.endpoint import Endpoint from oidcendpoint.exception import ProcessError -from oidcendpoint.session import unpack_session_key +from oidcendpoint.session import unpack_session_key, MintingNotAllowed from oidcendpoint.session.grant import AuthorizationCode from oidcendpoint.session.grant import Grant from oidcendpoint.session.grant import RefreshToken -from oidcendpoint.session.token import Token as sessionToken +from oidcendpoint.session.token import AccessToken, Token as sessionToken +from oidcendpoint.token.exception import UnknownToken from oidcendpoint.util import importer logger = logging.getLogger(__name__) @@ -300,9 +304,179 @@ def post_parse_request(self, request: Union[Message, dict], return request +class TokenExchangeHelper(TokenEndpointHelper): + """Implements Token Exchange a.k.a. RFC8693""" + + def __init__(self, endpoint, config=None): + TokenEndpointHelper.__init__(self, endpoint=endpoint, config=config) + + # TODO: should we even have a policy for the simple use cases? + if config is None: + self.policy = {} + else: + self.policy = config.get('policy', {}) + + # TODO: Make this a part of the policy. Note the distinction between + # requested_token_type, subject_token_type, actor_token_type, issued_token_type + self.token_types_allowed = [ + "urn:ietf:params:oauth:token-type:access_token", + "urn:ietf:params:oauth:token-type:jwt", + # "urn:ietf:params:oauth:token-type:id_token", + # "urn:ietf:params:oauth:token-type:refresh_token", + ] + + def post_parse_request(self, request, client_id="", **kwargs): + request = TokenExchangeRequest(**request.to_dict()) + + # if "client_id" not in request: + # request["client_id"] = client_id + + keyjar = getattr(self.endpoint_context, "keyjar", "") + + try: + request.verify(keyjar=keyjar, opponent_id=client_id) + except ( + MissingRequiredAttribute, + ValueError, + MissingRequiredValue, + JWKESTException, + ) as err: + return self.endpoint.error_cls( + error="invalid_request", error_description="%s" % err + ) + + + error = self.check_for_errors(request=request) + if error is not None: + return error + + _mngr = self.endpoint_context.session_manager + try: + _session_info = _mngr.get_session_info_by_token( + request["subject_token"], grant=True + ) + except (KeyError, UnknownToken): + logger.error("Subject token invalid.") + return self.error_cls( + error="invalid_request", + error_description="Subject token invalid" + ) + + token = _mngr.find_token(_session_info["session_id"], request["subject_token"]) + + if not isinstance(token, AccessToken): + return self.error_cls( + error="invalid_request", error_description="Wrong token type" + ) + + if token.is_active() is False: + return self.error_cls( + error="invalid_request", error_description="Subject token inactive" + ) + + return request + + def check_for_errors(self, request): + context = self.endpoint.endpoint_context + if "resource" in request: + iss = urlparse(context.issuer) + if any( + urlparse(res).netloc != iss.netloc for res in request["resource"] + ): + return TokenErrorResponse( + error="invalid_target", error_description="Unknown resource" + ) + + if "audience" in request: + if any( + aud != context.issuer for aud in request["audience"] + ): + return TokenErrorResponse( + error="invalid_target", error_description="Unknown audience" + ) + + # TODO: if requested type is jwt make sure our tokens are jwt + if ( + "requested_token_type" in request + and request["requested_token_type"] not in self.token_types_allowed + ): + return TokenErrorResponse( + error="invalid_target", + error_description="Unsupported requested token type" + ) + + if "actor_token" in request or "actor_token_type" in request: + return TokenErrorResponse( + error="invalid_request", error_description="Actor token not supported" + ) + + # TODO: also check if the (valid) subject_token matches subject_token_type + if request["subject_token_type"] not in self.token_types_allowed: + return TokenErrorResponse( + error="invalid_request", + error_description="Unsupported subject token type", + ) + + def token_exchange_response(self, token): + response_args = { + "issued_token_type": "urn:ietf:params:oauth:token-type:access_token", + "token_type": token.type, + "access_token": token.value, + "scope": token.scope, + "expires_in": token.usage_rules["expires_in"] + } + return TokenExchangeResponse(**response_args) + + def process_request(self, request, **kwargs): + # TODO: should we even have a policy for the simple use cases? + # client_policy = self.policy.get(req["client_id"]) or self.policy.get("default") + # if not client_policy: + # logger.error( + # "TokenExchange policy for client {req['client_id']} or default missing." + # ) + # return TokenErrorResponse( + # error="invalid_request", error_description="Not allowed" + # ) + _mngr = self.endpoint_context.session_manager + try: + _session_info = _mngr.get_session_info_by_token( + request["subject_token"], + grant=True, + ) + except KeyError: + logger.error("Subject token invalid.") + return self.error_cls( + error="invalid_grant", + error_description="Subject token invalid", + ) + + token = _mngr.find_token(_session_info["session_id"], request["subject_token"]) + grant = _session_info["grant"] + + try: + new_token = grant.mint_token( + session_id=_session_info["session_id"], + endpoint_context=self.endpoint_context, + token_type='access_token', + token_handler=_mngr.token_handler["access_token"], + based_on=token, + resources=request.get("resource"), + scope=request.get("scope"), + ) + except MintingNotAllowed: + logger.error("Minting not allowed for 'access_token'") + return self.error_cls( + error="invalid_grant", + error_description="Token Exchange not allowed with that token", + ) + + return self.token_exchange_response(token=new_token) + + HELPER_BY_GRANT_TYPE = { "authorization_code": AccessTokenHelper, "refresh_token": RefreshTokenHelper, + "urn:ietf:params:oauth:grant-type:token-exchange": TokenExchangeHelper, } From 3c41571df4fffb85f97beaedb5ed6fb55f74251c Mon Sep 17 00:00:00 2001 From: Antonis Angelakis Date: Thu, 18 Mar 2021 03:21:04 +0200 Subject: [PATCH 2/2] Add proper token exchange tests --- tests/test_36_token_exchange.py | 503 +++++++++++++++++++++++++++----- 1 file changed, 434 insertions(+), 69 deletions(-) diff --git a/tests/test_36_token_exchange.py b/tests/test_36_token_exchange.py index 3931f5c..05be26c 100644 --- a/tests/test_36_token_exchange.py +++ b/tests/test_36_token_exchange.py @@ -39,12 +39,6 @@ CAPABILITIES = { "subject_types_supported": ["public", "pairwise", "ephemeral"], - "grant_types_supported": [ - "authorization_code", - "implicit", - "urn:ietf:params:oauth:grant-type:jwt-bearer", - "refresh_token", - ], } AUTH_REQ = AuthorizationRequest( @@ -99,7 +93,11 @@ def create_endpoint(self): "client_secret_post", "client_secret_jwt", "private_key_jwt", - ] + ], + "grant_types_supported": { + "authorization_code": True, + "urn:ietf:params:oauth:grant-type:token-exchange": True, + }, }, }, "introspection": { @@ -128,7 +126,10 @@ def create_endpoint(self): 'supports_minting': ["access_token", "refresh_token", "id_token"], "max_usage": 1 }, - "access_token": {}, + "access_token": { + "supports_minting": ["access_token", "refresh_token", "id_token"], + "expires_in": 600, + }, "refresh_token": { 'supports_minting': ["access_token", "refresh_token"], } @@ -145,10 +146,10 @@ def create_endpoint(self): "client_salt": "salted", "token_endpoint_auth_method": "client_secret_post", "response_types": ["code", "token", "code id_token", "id_token"], + "allowed_scopes": ["openid", "profile"], } endpoint_context.keyjar.import_jwks(CLIENT_KEYJAR.export_jwks(), "client_1") self.endpoint = endpoint_context.endpoint["token"] - self.introspection_endpoint = endpoint_context.endpoint["introspection"] self.session_manager = endpoint_context.session_manager self.user_id = "diana" @@ -164,93 +165,457 @@ def _create_session(self, auth_req, sub_type="public", sector_identifier=''): client_id=client_id, sub_type=sub_type) - def _mint_code(self, grant, session_id): - return grant.mint_token( + def _mint_code(self, grant, client_id): + session_id = session_key(self.user_id, client_id, grant.id) + usage_rules = grant.usage_rules.get("authorization_code", {}) + _exp_in = usage_rules.get("expires_in") + + # Constructing an authorization code is now done + _code = grant.mint_token( session_id=session_id, endpoint_context=self.endpoint.endpoint_context, token_type='authorization_code', - token_handler=self.session_manager.token_handler["code"] + token_handler=self.session_manager.token_handler["code"], + usage_rules=usage_rules ) - def _mint_access_token(self, grant, session_id, token_ref=None, resources=None): - return grant.mint_token( - session_id=session_id, - endpoint_context=self.endpoint.endpoint_context, - token_type='access_token', - token_handler=self.session_manager.token_handler["access_token"], - based_on=token_ref, - resources=resources + if _exp_in: + if isinstance(_exp_in, str): + _exp_in = int(_exp_in) + if _exp_in: + _code.expires_at = utc_time_sans_frac() + _exp_in + return _code + + def test_token_exchange(self): + """ + Test that token exchange requests work correctly, removing a scope. + """ + areq = AUTH_REQ.copy() + areq["scope"] = ["openid", "profile"] + + session_id = self._create_session(areq) + grant = self.endpoint.endpoint_context.authz(session_id, areq) + code = self._mint_code(grant, areq['client_id']) + + _cntx = self.endpoint.endpoint_context + + _token_request = TOKEN_REQ_DICT.copy() + _token_request["code"] = code.value + _req = self.endpoint.parse_request(_token_request) + _resp = self.endpoint.process_request(request=_req) + + _token_value = _resp["response_args"]["access_token"] + _session_info = self.session_manager.get_session_info_by_token(_token_value) + _token = self.session_manager.find_token(_session_info["session_id"], _token_value) + + token_exchange_req = TokenExchangeRequest( + grant_type="urn:ietf:params:oauth:grant-type:token-exchange", + subject_token=_token_value, + subject_token_type="urn:ietf:params:oauth:token-type:access_token", + resource=["https://example.com/api"], + scope=["openid"], ) - def exchange_grant(self, session_id, users, targets, scope): - session_info = self.session_manager.get_session_info(session_id) - exchange_grant = ExchangeGrant(scope=scope, resources=targets, users=users) + _req = self.endpoint.parse_request( + token_exchange_req.to_json(), + auth="Basic {}".format("Y2xpZW50XzE6aGVtbGlndA=="), + ) + _resp = self.endpoint.process_request(request=_req) - # the grant is assigned to a session (user_id, client_id) - self.session_manager.set( - [self.user_id, session_info["client_id"], exchange_grant.id], - exchange_grant) - return exchange_grant + assert set(_resp.keys()) == {"response_args", "http_headers"} + assert set(_resp["response_args"].keys()) == { + 'access_token', 'token_type', 'scope', 'expires_in', 'issued_token_type' + } + assert _resp["response_args"]["scope"] == ["openid"] + + def test_additional_parameters(self): + """ + Test that a token exchange with additional parameters including + audience and subject_token_type works. + """ + areq = AUTH_REQ.copy() - def test_do_response(self): - session_id = self._create_session(AUTH_REQ) - grant = self.endpoint.endpoint_context.authz(session_id, AUTH_REQ) - grant.usage_rules["access_token"] = {"supports_minting":["access_token"]} - self.session_manager[session_id] = grant + session_id = self._create_session(areq) + grant = self.endpoint.endpoint_context.authz(session_id, areq) + code = self._mint_code(grant, areq['client_id']) - grant_user_id = "https://frontend.example.com/resource" - backend = "https://backend.example.com" - _ = self.exchange_grant(session_id, [grant_user_id], [backend], scope=["api"]) - code = self._mint_code(grant, session_id) + _cntx = self.endpoint.endpoint_context _token_request = TOKEN_REQ_DICT.copy() _token_request["code"] = code.value _req = self.endpoint.parse_request(_token_request) + _resp = self.endpoint.process_request(request=_req) + _token_value = _resp["response_args"]["access_token"] + _session_info = self.session_manager.get_session_info_by_token(_token_value) + _token = self.session_manager.find_token(_session_info["session_id"], _token_value) + + token_exchange_req = TokenExchangeRequest( + grant_type="urn:ietf:params:oauth:grant-type:token-exchange", + subject_token=_token_value, + subject_token_type="urn:ietf:params:oauth:token-type:access_token", + resource=["https://example.com/api"], + requested_token_type="urn:ietf:params:oauth:token-type:access_token", + audience=["https://example.com/"], + ) + + _req = self.endpoint.parse_request( + token_exchange_req.to_json(), + auth="Basic {}".format("Y2xpZW50XzE6aGVtbGlndA=="), + ) _resp = self.endpoint.process_request(request=_req) + + assert set(_resp.keys()) == {"response_args", "http_headers"} + assert set(_resp["response_args"].keys()) == { + 'access_token', 'token_type', 'expires_in', 'issued_token_type' + } msg = self.endpoint.do_response(request=_req, **_resp) assert isinstance(msg, dict) - token_response = json.loads(msg["response"]) - print(token_response["access_token"]) - # resource server sends a token exchange request with - # access token as subject_token + def test_token_exchange_fails_if_disabled(self): + """ + Test that token exchange fails if it's not included in Token's + grant_types_supported (that are set in its helper attribute). + """ + del self.endpoint.helper[ + "urn:ietf:params:oauth:grant-type:token-exchange" + ] + + areq = AUTH_REQ.copy() + + session_id = self._create_session(areq) + grant = self.endpoint.endpoint_context.authz(session_id, areq) + code = self._mint_code(grant, areq['client_id']) + + _cntx = self.endpoint.endpoint_context + + _token_request = TOKEN_REQ_DICT.copy() + _token_request["code"] = code.value + _req = self.endpoint.parse_request(_token_request) + _resp = self.endpoint.process_request(request=_req) + + _token_value = _resp["response_args"]["access_token"] + _session_info = self.session_manager.get_session_info_by_token(_token_value) + _token = self.session_manager.find_token(_session_info["session_id"], _token_value) - ter = TokenExchangeRequest( - subject_token=token_response["access_token"], + token_exchange_req = TokenExchangeRequest( + grant_type="urn:ietf:params:oauth:grant-type:token-exchange", + subject_token=_token_value, subject_token_type="urn:ietf:params:oauth:token-type:access_token", + resource=["https://example.com/api"] + ) + + _resp = self.endpoint.parse_request( + token_exchange_req.to_json(), + auth="Basic {}".format("Y2xpZW50XzE6aGVtbGlndA=="), + ) + assert set(_resp.keys()) == {"error", "error_description"} + assert _resp["error"] == "invalid_request" + assert( + _resp["error_description"] + == "Unsupported grant_type: urn:ietf:params:oauth:grant-type:token-exchange" + ) + + def test_wrong_resource(self): + """ + Test that requesting a token for an unknown resource fails. + + We currently only allow resources that match the issuer's host part. + TODO: Should we do this? + """ + areq = AUTH_REQ.copy() + + session_id = self._create_session(areq) + grant = self.endpoint.endpoint_context.authz(session_id, areq) + code = self._mint_code(grant, areq['client_id']) + + _cntx = self.endpoint.endpoint_context + + _token_request = TOKEN_REQ_DICT.copy() + _token_request["code"] = code.value + _req = self.endpoint.parse_request(_token_request) + _resp = self.endpoint.process_request(request=_req) + + _token_value = _resp["response_args"]["access_token"] + _session_info = self.session_manager.get_session_info_by_token(_token_value) + _token = self.session_manager.find_token(_session_info["session_id"], _token_value) + + token_exchange_req = TokenExchangeRequest( grant_type="urn:ietf:params:oauth:grant-type:token-exchange", - resource="https://backend.example.com/api" + subject_token=_token_value, + subject_token_type="urn:ietf:params:oauth:token-type:access_token", + resource=["https://unknown-resource.com/api"] ) - exch_grants = [] - for grant in self.session_manager.grants(session_id=session_id): - if isinstance(grant, ExchangeGrant): - if grant_user_id in grant.users: - exch_grants.append(grant) + _req = self.endpoint.parse_request( + token_exchange_req.to_json(), + auth="Basic {}".format("Y2xpZW50XzE6aGVtbGlndA=="), + ) + _resp = self.endpoint.process_request(request=_req) + assert set(_resp.keys()) == {"error", "error_description"} + assert _resp["error"] == "invalid_target" + assert _resp["error_description"] == "Unknown resource" + + def test_wrong_audience(self): + """ + Test that requesting a token for an unknown audience fails. + + We currently only allow audience that match the issuer. + TODO: Should we do this? + """ + areq = AUTH_REQ.copy() + + session_id = self._create_session(areq) + grant = self.endpoint.endpoint_context.authz(session_id, areq) + code = self._mint_code(grant, areq['client_id']) - assert exch_grants - exch_grant = exch_grants[0] + _cntx = self.endpoint.endpoint_context + + _token_request = TOKEN_REQ_DICT.copy() + _token_request["code"] = code.value + _req = self.endpoint.parse_request(_token_request) + _resp = self.endpoint.process_request(request=_req) - session_info = self.session_manager.get_session_info_by_token(ter["subject_token"]) - _token = self.session_manager.find_token(session_info["session_id"], - ter["subject_token"]) + _token_value = _resp["response_args"]["access_token"] + _session_info = self.session_manager.get_session_info_by_token(_token_value) + _token = self.session_manager.find_token(_session_info["session_id"], _token_value) - session_id = session_key(session_info['user_id'], session_info["client_id"], exch_grant.id) + token_exchange_req = TokenExchangeRequest( + grant_type="urn:ietf:params:oauth:grant-type:token-exchange", + subject_token=_token_value, + subject_token_type="urn:ietf:params:oauth:token-type:access_token", + audience=["https://unknown-audience.com/"], + resource=["https://example.com/api"] + ) - _token = self._mint_access_token(exch_grant, session_id, token_ref=_token, - resources=["https://backend.example.com"]) + _req = self.endpoint.parse_request( + token_exchange_req.to_json(), + auth="Basic {}".format("Y2xpZW50XzE6aGVtbGlndA=="), + ) + _resp = self.endpoint.process_request(request=_req) + assert set(_resp.keys()) == {"error", "error_description"} + assert _resp["error"] == "invalid_target" + assert _resp["error_description"] == "Unknown audience" + + @pytest.mark.parametrize("missing_attribute", [ + "subject_token_type", + "subject_token", + ]) + def test_missing_parameters(self, missing_attribute): + """ + Test that omitting the subject_token_type fails. + """ + areq = AUTH_REQ.copy() + + session_id = self._create_session(areq) + grant = self.endpoint.endpoint_context.authz(session_id, areq) + code = self._mint_code(grant, areq['client_id']) + + _cntx = self.endpoint.endpoint_context - print(_token.value) - _req = self.introspection_endpoint.parse_request( - { - "token": _token.value, - "client_id": "client_1", - "client_secret": self.introspection_endpoint.endpoint_context.cdb[ - "client_1"]["client_secret"], - } + _token_request = TOKEN_REQ_DICT.copy() + _token_request["code"] = code.value + _req = self.endpoint.parse_request(_token_request) + _resp = self.endpoint.process_request(request=_req) + + _token_value = _resp["response_args"]["access_token"] + _session_info = self.session_manager.get_session_info_by_token(_token_value) + _token = self.session_manager.find_token(_session_info["session_id"], _token_value) + + token_exchange_req = TokenExchangeRequest( + grant_type="urn:ietf:params:oauth:grant-type:token-exchange", + subject_token=_token_value, + subject_token_type="urn:ietf:params:oauth:token-type:access_token", + audience=["https://example.com/"], + resource=["https://example.com/api"] + ) + + del token_exchange_req[missing_attribute] + + _req = self.endpoint.parse_request( + token_exchange_req.to_json(), + auth="Basic {}".format("Y2xpZW50XzE6aGVtbGlndA=="), + ) + _resp = self.endpoint.process_request(request=_req) + assert set(_resp.keys()) == {"error", "error_description"} + assert _resp["error"] == "invalid_request" + assert ( + _resp["error_description"] + == f"Missing required attribute '{missing_attribute}'" + ) + + @pytest.mark.parametrize("unsupported_type", [ + "unknown", + "urn:ietf:params:oauth:token-type:refresh_token", + "urn:ietf:params:oauth:token-type:id_token", + "urn:ietf:params:oauth:token-type:saml2", + "urn:ietf:params:oauth:token-type:saml1", + ]) + def test_unsupported_requested_token_type(self, unsupported_type): + """ + Test that requesting a token type that is unknown or unsupported fails. + """ + areq = AUTH_REQ.copy() + + session_id = self._create_session(areq) + grant = self.endpoint.endpoint_context.authz(session_id, areq) + code = self._mint_code(grant, areq['client_id']) + + _cntx = self.endpoint.endpoint_context + + _token_request = TOKEN_REQ_DICT.copy() + _token_request["code"] = code.value + _req = self.endpoint.parse_request(_token_request) + _resp = self.endpoint.process_request(request=_req) + + _token_value = _resp["response_args"]["access_token"] + _session_info = self.session_manager.get_session_info_by_token(_token_value) + _token = self.session_manager.find_token(_session_info["session_id"], _token_value) + + token_exchange_req = TokenExchangeRequest( + grant_type="urn:ietf:params:oauth:grant-type:token-exchange", + subject_token=_token_value, + subject_token_type="urn:ietf:params:oauth:token-type:access_token", + requested_token_type=unsupported_type, + audience=["https://example.com/"], + resource=["https://example.com/api"] + ) + + _req = self.endpoint.parse_request( + token_exchange_req.to_json(), + auth="Basic {}".format("Y2xpZW50XzE6aGVtbGlndA=="), + ) + _resp = self.endpoint.process_request(request=_req) + assert set(_resp.keys()) == {"error", "error_description"} + assert _resp["error"] == "invalid_target" + assert ( + _resp["error_description"] + == "Unsupported requested token type" + ) + + @pytest.mark.parametrize("unsupported_type", [ + "unknown", + "urn:ietf:params:oauth:token-type:refresh_token", + "urn:ietf:params:oauth:token-type:id_token", + "urn:ietf:params:oauth:token-type:saml2", + "urn:ietf:params:oauth:token-type:saml1", + ]) + def test_unsupported_subject_token_type(self, unsupported_type): + """ + Test that providing an unsupported subject token type fails. + """ + areq = AUTH_REQ.copy() + + session_id = self._create_session(areq) + grant = self.endpoint.endpoint_context.authz(session_id, areq) + code = self._mint_code(grant, areq['client_id']) + + _cntx = self.endpoint.endpoint_context + + _token_request = TOKEN_REQ_DICT.copy() + _token_request["code"] = code.value + _req = self.endpoint.parse_request(_token_request) + _resp = self.endpoint.process_request(request=_req) + + _token_value = _resp["response_args"]["access_token"] + _session_info = self.session_manager.get_session_info_by_token(_token_value) + _token = self.session_manager.find_token(_session_info["session_id"], _token_value) + + token_exchange_req = TokenExchangeRequest( + grant_type="urn:ietf:params:oauth:grant-type:token-exchange", + subject_token=_token_value, + subject_token_type=unsupported_type, + audience=["https://example.com/"], + resource=["https://example.com/api"] + ) + + _req = self.endpoint.parse_request( + token_exchange_req.to_json(), + auth="Basic {}".format("Y2xpZW50XzE6aGVtbGlndA=="), + ) + _resp = self.endpoint.process_request(request=_req) + assert set(_resp.keys()) == {"error", "error_description"} + assert _resp["error"] == "invalid_request" + assert ( + _resp["error_description"] + == "Unsupported subject token type" + ) + + def test_unsupported_actor_token(self): + """ + Test that providing an actor token fails as it's unsupported. + """ + areq = AUTH_REQ.copy() + + session_id = self._create_session(areq) + grant = self.endpoint.endpoint_context.authz(session_id, areq) + code = self._mint_code(grant, areq['client_id']) + + _cntx = self.endpoint.endpoint_context + + _token_request = TOKEN_REQ_DICT.copy() + _token_request["code"] = code.value + _req = self.endpoint.parse_request(_token_request) + _resp = self.endpoint.process_request(request=_req) + + _token_value = _resp["response_args"]["access_token"] + _session_info = self.session_manager.get_session_info_by_token(_token_value) + _token = self.session_manager.find_token(_session_info["session_id"], _token_value) + + token_exchange_req = TokenExchangeRequest( + grant_type="urn:ietf:params:oauth:grant-type:token-exchange", + subject_token=_token_value, + subject_token_type="urn:ietf:params:oauth:token-type:access_token", + actor_token=_resp['response_args']['access_token'] + ) + + _req = self.endpoint.parse_request( + token_exchange_req.to_json(), + auth="Basic {}".format("Y2xpZW50XzE6aGVtbGlndA=="), + ) + _resp = self.endpoint.process_request(request=_req) + assert set(_resp.keys()) == {"error", "error_description"} + assert _resp["error"] == "invalid_request" + assert ( + _resp["error_description"] + == "Actor token not supported" + ) + + def test_invalid_token(self): + """ + Test that providing an invalid token fails. + """ + areq = AUTH_REQ.copy() + + session_id = self._create_session(areq) + grant = self.endpoint.endpoint_context.authz(session_id, areq) + code = self._mint_code(grant, areq['client_id']) + + _cntx = self.endpoint.endpoint_context + + _token_request = TOKEN_REQ_DICT.copy() + _token_request["code"] = code.value + _req = self.endpoint.parse_request(_token_request) + _resp = self.endpoint.process_request(request=_req) + + _token_value = _resp["response_args"]["access_token"] + _session_info = self.session_manager.get_session_info_by_token(_token_value) + _token = self.session_manager.find_token(_session_info["session_id"], _token_value) + + token_exchange_req = TokenExchangeRequest( + grant_type="urn:ietf:params:oauth:grant-type:token-exchange", + subject_token="invalid_token", + subject_token_type="urn:ietf:params:oauth:token-type:access_token", + ) + + _req = self.endpoint.parse_request( + token_exchange_req.to_json(), + auth="Basic {}".format("Y2xpZW50XzE6aGVtbGlndA=="), + ) + _resp = self.endpoint.process_request(request=_req) + assert set(_resp.keys()) == {"error", "error_description"} + assert _resp["error"] == "invalid_request" + assert ( + _resp["error_description"] + == "Subject token invalid" ) - _resp = self.introspection_endpoint.process_request(_req) - msg_info = self.introspection_endpoint.do_response(request=_req, **_resp) - assert msg_info - print(json.loads(msg_info["response"]))