Source code for medmij_oauth.client.client

import urllib.parse
import secrets

from . import validation

from .data_store import DataStore

[docs]class Client: """ Class to assist in the OAuth clientside flow :type data_store: :class:`DataStore <medmij_oauth.client.DataStore>` :param data_store: Must be subclass of DataStore, handles data interaction with OAuthSessions see :class:`DataStore <medmij_oauth.client.DataStore>` for more info. :type get_zal: coroutine :param get_zal: Function that returns a `ZAL <https://github.com/GidsOpenStandaarden/OpenPGO-Medmij-ImplementatieBouwstenen-Python>`__ :type get_gnl: coroutine :param get_gnl: Function that returns a `GegevensdienstNamenlijst <https://github.com/GidsOpenStandaarden/OpenPGO-Medmij-ImplementatieBouwstenen-Python>`__ :type client_info: dict :param client_info: Dict containing info about the client application (client_id and redirect_url for authorization request responses) :type make_request: coroutine :param make_request: coroutine that makes a post request. Should have the signature :code:`(url:string, body:dict)->dict`. Used to make a authorization exchange request to the oauth server. """ def __init__(self, data_store=None, get_zal=None, get_gnl=None, client_info=None, make_request=None): assert get_zal is not None, "Can't instantiate Client without 'get_zal'" assert get_gnl is not None, "Can't instantiate Client without 'get_gnl'" assert make_request is not None, "Can't instantiate Client without 'make_request'" assert client_info is not None, "Can't instantiate Client without 'client_info'" if not issubclass(data_store.__class__, DataStore): raise ValueError( 'data_store argument should be a subclass of the DataStore abstract class' ) self.data_store = data_store self.client_info = client_info self.make_request = make_request self._get_zal = get_zal self._get_gnl = get_gnl
[docs] async def get_zal(self): """ Return a tuple of the ZAL and GNL (zal, gnl) returned by the get_zal and get_gnl function supplied in instantiation of Client object """ zal = await self._get_zal() gnl = await self._get_gnl() return (zal, gnl)
[docs] async def create_oauth_session(self, za_name, gegevensdienst_id, **kwargs): """ Create and return a new OAuthSession to start the oauth flow. Add the zorggebruikers choice of zorgaanbieder gegevensdienst. :ref:`(FLOW #2) <flow2>` :type za_name: string :param za_name: Name of zorgaanbieder chosen by the zorggebruiker. :type gegevensdienst_id: string :param gegevensdienst_id: Id of the gegevensdienst chosen by the zorggebruiker :type \*\*kwargs: various :param \*\*kwargs: Keyword arguments get passed on to the data_store.create_oauth_session function, e.g. db object :return: The created OAuthSession :rtype: :ref:`OAuthSession <client.oauthsession>` """ return await self.data_store.create_oauth_session( za_name=za_name, gegevensdienst_id=gegevensdienst_id, scope=gegevensdienst_id, state=secrets.token_hex(16), **kwargs )
[docs] async def create_auth_request_url(self, oauth_session): """ Build and return authorization request url :ref:`(FLOW #2) <flow2>` :type oauth_session: :ref:`OAuthSession <client.oauthsession>` :param oauth_session: OAuthSession for current zorggebruiker :return: The authorization request url :rtype: str """ request_dict = { 'state': oauth_session.state, 'scope': oauth_session.scope, 'response_type': 'code', 'client_id': self.client_id, 'redirect_uri': self.redirect_uri } zal, _ = await self.get_zal() za = zal[oauth_session.za_name] gegevensdienst = za.gegevensdiensten[oauth_session.gegevensdienst_id] query_parameters = urllib.parse.urlencode(request_dict) return f'{gegevensdienst.authorization_endpoint_uri}?{query_parameters}'
[docs] async def handle_auth_response(self, parameters, **kwargs): """ Handles the response to the authorization request. (:ref:`FLOW #10 <flow10>`, :ref:`FLOW #11 <flow11>`) :type parameters: dict :param parameters: The query params from the servers's response to the authorization request :type \*\*kwargs: various :param \*\*kwargs: Keyword arguments get passed on to the data_store.get_oauth_session_by_state function, e.g. db object :return: The updated OAuthSession no containing the authorization_code, and authorized set to True :rtype: :ref:`OAuthSession <client.oauthsession>` :raises OAuthException: If validation of the params fails :raises ValueError: If there is no session found linked to the state parameter in the provided query parameters """ validation.validate_auth_response(parameters) oauth_session = await self.data_store.get_oauth_session_by_state(parameters['state'], **kwargs) if oauth_session is None: raise ValueError('No oauth_session found!') oauth_session.authorization_code = parameters['code'] oauth_session.authorized = True oauth_session = await self.data_store.save_oauth_session(oauth_session, **kwargs) return oauth_session
[docs] async def exchange_authorization_code(self, oauth_session, **kwargs): """ Make a request to a oauth server with the supplied make_request function on instantiation of the Client, exchange the received authorization code for an access token and update the oauth_session. :ref:`(FLOW #12) <flow12>` :type oauth_session: :ref:`OAuthSession <client.oauthsession>` :param oauth_session: Authorized oauth session of which to exchange the authorization code :type \*\*kwargs: various :param \*\*kwargs: Keyword arguments get passed on to the data_store.save_oauth_session function, e.g. db object :return: The updated OAuthSession containing the access_token :rtype: :ref:`OAuthSession <client.oauthsession>` :raises OAuthException: If the server's response is invalid """ zal, _ = await self.get_zal() gegevensdienst = zal[oauth_session.za_name].gegevensdiensten[oauth_session.gegevensdienst_id] response = await self.make_request(url=gegevensdienst.token_endpoint_uri, body={ 'grant_type': 'authorization_code', 'code': oauth_session.authorization_code, 'redirect_uri': self.redirect_uri, 'client_id': self.client_id }) validation.validate_access_token_response(response, oauth_session) oauth_session.access_token = response['access_token'] oauth_session.authorization_code = None oauth_session = await self.data_store.save_oauth_session(oauth_session, **kwargs) return oauth_session
def __repr__(self): return f'Client(data_store={repr(self.data_store)}, get_zal={self._get_zal.__name__}, \ client_info={self.client_info}, make_request={self.make_request.__name__})' def __getattr__(self, attr): try: return self.client_info[attr] except AttributeError: pass raise AttributeError(f'Client has no attribute \'{attr}\'')