Source code for flask_saml

import flask
import flask.signals
from flask import _app_ctx_stack as stack
import functools
import logging
import requests
import saml2
import saml2.client
import saml2.config
import saml2.metadata
try:
    import urllib.parse as urlparse
except ImportError:  # pragma: no cover
    import urlparse

__version__ = '0.4.3'

log = logging.getLogger(__name__)

signals = flask.signals.Namespace()
saml_authenticated = signals.signal('saml-authenticated')
saml_log_out = signals.signal('saml-log-out')
saml_error = signals.signal('saml-error')


def _get_metadata(metadata_url):  # pragma: no cover
    response = requests.get(metadata_url)
    if response.status_code != 200:
        exc = RuntimeError(
            'Unexpected Status Code: {0}'.format(response.status_code))
        exc.response = response
        raise exc
    return response.text


def _get_client(metadata, allow_unknown_attributes=True):
    acs_url = flask.url_for('login_acs', _external=True)
    metadata_url = flask.url_for('metadata', _external=True)
    settings = {
        'entityid': metadata_url,
        'metadata': {
            'inline': [metadata],
            },
        'service': {
            'sp': {
                'endpoints': {
                    'assertion_consumer_service': [
                        (acs_url, saml2.BINDING_HTTP_POST),
                    ],
                },
                # Don't verify that the incoming requests originate from us via
                # the built-in cache for authn request ids in pysaml2
                'allow_unsolicited': True,
                # Don't sign authn requests, since signed requests only make
                # sense in a situation where you control both the SP and IdP
                'authn_requests_signed': False,
                'logout_requests_signed': True,
                'want_assertions_signed': True,
                'want_response_signed': False,
            },
        },
    }
    config = saml2.config.Config()
    config.load(settings)
    config.allow_unknown_attributes = allow_unknown_attributes
    client = saml2.client.Saml2Client(config=config)
    return client


def _saml_prepare(wrapped_func):

    @functools.wraps(wrapped_func)
    def func():
        ext, config = flask.current_app.extensions['saml']
        client = _get_client(config['metadata'])
        return wrapped_func(client)
    return func


def _session_login(sender, subject, attributes, auth):
    flask.session['saml'] = {
        'subject': subject,
        'attributes': attributes,
    }


def _session_logout(sender):
    flask.session.clear()


[docs]class FlaskSAML(object): """ The extension class. Refer to the documentation on its usage. :param app: The :class:`flask.Flask` app. :param bool debug: Enable debug mode for the extension. """ def __init__( self, app=None, debug=False): self.app = app self._debug = debug if self.app is not None: self.init_app(app) def init_app(self, app): app.config.setdefault('SAML_PREFIX', '/saml') app.config.setdefault('SAML_DEFAULT_REDIRECT', '/') app.config.setdefault('SAML_USE_SESSIONS', True) config = { 'metadata': _get_metadata( metadata_url=app.config['SAML_METADATA_URL'], ), 'prefix': app.config['SAML_PREFIX'], 'default_redirect': app.config['SAML_DEFAULT_REDIRECT'], } saml_routes = { 'logout': logout, 'sso': login, 'acs': login_acs, 'metadata': metadata, } for route, func in saml_routes.items(): path = '%s/%s/' % (config['prefix'], route) app.add_url_rule(path, view_func=func, methods=['GET', 'POST']) # Register configuration on app so we can retrieve it later on if not hasattr(app, 'extensions'): # pragma: no cover app.extensions = {} app.extensions['saml'] = self, config if app.config['SAML_USE_SESSIONS']: saml_authenticated.connect(_session_login, app) saml_log_out.connect(_session_logout, app)
def _get_return_to(): ext, config = stack.top.app.extensions['saml'] return_to = flask.request.args.get('next', '') if not return_to.startswith(flask.request.url_root): return_to = config['default_redirect'] return return_to @_saml_prepare def logout(saml_client): log.debug('Received logout request') saml_log_out.send( flask.current_app._get_current_object(), ) ext, config = stack.top.app.extensions['saml'] url = flask.request.url_root[:-1] + config['default_redirect'] return flask.redirect(url) @_saml_prepare def login(saml_client): log.debug('Received login request') return_url = _get_return_to() reqid, info = saml_client.prepare_for_authenticate( relay_state=return_url, ) headers = dict(info['headers']) response = flask.redirect(headers.pop('Location'), code=302) for name, value in headers.items(): response.headers[name] = value response.headers['Cache-Control'] = 'no-cache, no-store' response.headers['Pragma'] = 'no-cache' return response @_saml_prepare def login_acs(saml_client): if 'SAMLResponse' in flask.request.form: log.debug('Received SAMLResponse for login') try: authn_response = saml_client.parse_authn_request_response( flask.request.form['SAMLResponse'], saml2.entity.BINDING_HTTP_POST, ) if authn_response is None: raise RuntimeError('Unknown SAML error, please check logs') except Exception as exc: saml_error.send( flask.current_app._get_current_object(), exception=exc, ) else: saml_authenticated.send( flask.current_app._get_current_object(), subject=authn_response.get_subject().text, attributes=authn_response.get_identity(), auth=authn_response, ) relay_state = flask.request.form.get('RelayState') ext, config = stack.top.app.extensions['saml'] if not relay_state: relay_state = config['default_redirect'] redirect_to = relay_state if not relay_state.startswith(flask.request.url_root): redirect_to = flask.request.url_root[:-1] + redirect_to return flask.redirect(redirect_to) return 'Missing SAMLResponse POST data', 500 @_saml_prepare def metadata(saml_client): metadata_str = saml2.metadata.create_metadata_string( configfile=None, config=saml_client.config, ) return metadata_str, {'Content-Type': 'text/xml'}