"""This module provides glue to make using python-raven with Flask easy"""
from __future__ import unicode_literals
import sys
import os
import base64
import logging
import functools
if sys.version_info[0] >= 3:
from urllib.parse import unquote_plus
else:
from urllib import unquote_plus
from calendar import timegm
from time import time as time_float
time = lambda: int(time_float())
from flask import request, session, redirect, abort
logger = logging.getLogger("ucam_webauth.flask_glue")
[docs]class AuthDecorator(object):
"""
An instance of this class decorates views to add authentication.
To use it, you'll need to subclass it and set response_class,
request_class and logout_url (see :class:`raven.flask_glue.AuthDecorator`).
Then::
auth_decorator = AuthDecorator() # settings, e.g., desc="..." go here
@app.route("/some_url")
@auth_decorator
def my_view():
return "You are " + auth_decorator.principal
Or to require users be authenticated for all views::
app.before_request(auth_decorator.before_request)
Note that since it uses flask.session, you'll need to set
:attr:`app.secret_key`.
We need to be able to reliably determine the hostname of the current
website. This is retrieved from :attr:`flask.Request.url`.
By default, Werkzeug will respect the value of a ``X-Forwarded-Host``
header, which means that a man-in-the-middle can have someone authenticate
to *their* website, and forward the response from the WLS on to you.
You must either set :attr:`flask.Request.trusted_hosts`, for example
like so::
class R(flask.Request):
trusted_hosts = {'www.danielrichman.co.uk'}
app.request_class = R
... or sanitise both the `Host` header *and* the `X-Forwarded-Host`
header in your web-server. If you choose the second option, set
`can_trust_request_host`.
This tries to emulate the feel of applying mod_ucam_webauth to a file.
The decorator wraps the view in a function that calls
:meth:`before_request` first, calling the original view function if it
does not return a redirect or abort.
You may wish to catch the 401 and 403 aborts with :attr:`app.errorhandler`.
The :attr:`principal`, their :attr:`ptags`, the :attr:`issue` and
:attr:`life` from the WLS are available as attributes of the
:class:`AuthDecorator` object
(magic properties that retrieve the current values from ``flask.session``).
Further, the attributes :attr:`expires` and :attr:`expires_all` give
information on when the ucam_webauth session will expire.
For the `desc`, `aauth`, `iact`, `msg` parameters, see
:class:`ucam_webauth.Request`.
Note that the `max_life`, `use_wls_life` and `inactive_timeout` parameters
deal with the ucam_webauth session `only`; they only affect
``flask.session["_ucam_webauth"]``. Flask's session expiry, cookie
lifetimes, etc. are independent.
:type max_life: :class:`int` (seconds) or ``None``
:param max_life: upper bound on how long a successful authentication can
last before it expires and the user must reauthenticate
:type use_wls_life: :class:`bool`
:param use_wls_life: should we lower the life of the session to the life
reported by the WLS, if it is less than `max_life`?
:type inactive_timeout: :class:`int` (seconds) or ``None``
:param inactive_timeout: expire the session if no request is processed
via this decorator in `inactive_timeout` seconds
:type issue_bounds: :class:`tuple`: (:class:`int`, :class:`int`) (seconds)
:param issue_bounds: a tuple, (lower, upper) - how close the `issue`
(datetime that the WLS says the authentication
happened at) must be to `now`
(i.e., require ``now - lower < issue < now + upper``;
this is a combination of two settings found in
mod_ucam_webauth: `clock skew` and `response timeout`,
``issue_bounds=(clock_skew + response_timeout,
clock_skew)`` is equivalent)
:type require_principal: :class:`set` of :class:`str`, or ``None``
:param require_principal: require the principal to be in the set
:type require_ptags: :class:`set` of :class:`str`, or ``None``
:param require_ptags: require the ptags to contain `any` string in
`require_ptags` (i.e., non empty intersection)
:type can_trust_request_host: :class:`bool`
:param can_trust_request_host: Can we trust the hostname in
``request.url``?
(see :ref:`checking-response-values`)
More complex customisation is possible:
* override :meth:`check_authorised` to do more complex
checking than `require_principal`, `require_ptags`
(note that this replaces checking `require_principal`, `require_ptags`)
* override :meth:`session_new`
The :class:`AuthDecorator` only touches
``flask.session["_ucam_webauth"]``.
If you've saved other (important) things to the session object, you
may want to clear them out when the state changes.
You can do this by subclassing and overriding session_new. It is called
whenever a response is received from the WLS, except if the response
is a successful re-authentication after session expiry, with the same
`principal` and `ptags` as before.
To log the user out, call :meth:`logout`, which will clear the session
state. Further, :meth:`logout` returns a :meth:`flask.redirect` to the
Raven logout page. Be aware that the default flask session handlers are
susceptible to replay attacks.
POST requests:
Since it will redirect to the WLS and back, the auth decorator will
discard any POST data in the process. You may wish to either work
around this (by subclassing and saving it somewhere before redirecting)
or ensure that when it returns (with a GET request) to the URL, a
sensible page is displayed (the form, or an error message).
.. automethod:: __call__
"""
request_class = None
response_class = None
logout_url = None
def __init__(self, desc=None, aauth=None, iact=None, msg=None,
max_life=7200, use_wls_life=False,
inactive_timeout=None, issue_bounds=(15,5),
require_principal=None,
require_ptags=frozenset(["current"]),
can_trust_request_host=False):
self.desc = desc
self.aauth = aauth
self.iact = iact
self.msg = msg
self.max_life = max_life
self.use_wls_life = use_wls_life
self.inactive_timeout = inactive_timeout
self.issue_bounds = issue_bounds
self.require_principal = require_principal
self.require_ptags = require_ptags
self.can_trust_request_host = can_trust_request_host
[docs] def __call__(self, view_function):
"""
Wraps `view_function` with the auth decorator
(:class:`AuthDecorator` objects are callable so that they can be used
as function decorators.)
Calling it returns a 'wrapper' view function that calls
:meth:`request` first.
"""
def wrapper(**view_args):
r = self.before_request()
if r is not None:
return r
return view_function(**view_args)
functools.update_wrapper(wrapper, view_function)
return wrapper
@property
def _state(self):
return session.get("_ucam_webauth", {}).get("state", {})
@_state.setter
def _state(self, new):
if "_ucam_webauth" not in session:
session["_ucam_webauth"] = {}
session["_ucam_webauth"]["state"] = new
@_state.deleter
def _state(self):
if "_ucam_webauth" in session:
d = session["_ucam_webauth"]
if "state" in d:
del d["state"]
@property
def _params_token(self):
return session.get("_ucam_webauth", {}).get("params_token", None)
@_params_token.setter
def _params_token(self, new):
if "_ucam_webauth" not in session:
session["_ucam_webauth"] = {}
session["_ucam_webauth"]["params_token"] = new
@property
def principal(self):
"""The current principal, or ``None``"""
return self._state.get("principal")
@property
def ptags(self):
"""The current ptags, or ``None``"""
ptags = self._state.get("ptags")
if ptags is not None:
ptags = frozenset(ptags)
return ptags
@property
def issue(self):
"""
When the last WLS response was issued
`issue` is converted to a unix timestamp (:class:`int`), rather than
the :class:`datetime` object used by :class:`ucam_webauth.Response`.
(`issue` is ``None`` if there is no current session.)
"""
return self._state.get("issue")
@property
def life(self):
"""life of the last WLS response (:class:`int` seconds), or ``None``"""
return self._state.get("life")
@property
def last(self):
"""Time (:class:`int` unix timestamp) of the last decorated request"""
return self._state.get("last")
@property
def expires(self):
"""When (:class:`int` unix timestamp) the current auth. will expire"""
if "principal" not in self._state:
return None
expires = self._get_expires(self._state)
if len(expires) == 0:
return None
else:
return min(when for reason, when in expires)
@property
def expires_all(self):
"""
A list of all things that could cause the current auth. to expire
A list of (:class:`str`, :class:`int` unix timestamp) tuples;
(`reason`, `when`).
`reason` will be one of "config max life", "wls life" or "inactive".
"""
if "principal" not in self._state:
return None
return self._get_expires(self._state)
[docs] def logout(self):
"""Clear the auth., and return a redirect to the WLS' logout page"""
session.modified = True
del self._state
return redirect(self.logout_url, code=303)
[docs] def before_request(self):
"""
The "main" method
* checks if there is a response from the WLS
* checks if the current URL matches that which the WLS said it
redirected to (avoid an evil admin of another site replaying
successful authentications)
* checks if ``flask.session`` is empty - if so, then we deduce that
the user has cookies disabled, and must abort immediately with
403 Forbidden, or we will start a redirect loop
* checks if `params` matches the token we set (and saved in
``flask.session``) when redirecting to Raven
* checks if the authentication method used is permitted by `aauth`
and user-interaction respected `iact` - if not, abort with
400 Bad Request
* updates the state with the response: updating the `principal`,
`ptags` and `issue` information if it was a success, or
clearing them (but setting a flag - see below: 401 Authentication
Required will be thrown after redirect) if it was a failure
* returns a redirect that removes ``WLS-Response`` from
``request.args``
* checks if the "response was an authentication failure" flag is set
in ``flask.session`` - if so, clears the flag and aborts with
401 Authentication Required
* checks to see if we are authenticated (and the session hasn't
expired)
* if not, returns a redirect that will sends the user to the
WLS to authenticate
* checks to see if the `principal` / `ptags` are permitted
* if not, aborts with a 403 Forbidden
* updates the 'last used' time in the state (to implement
`inactive_timeout`)
Returns ``None``, if the request should proceed to the actual
view function.
"""
# we always modify the session. Changes to mutable objects (our
# state dict) aren't automatically picked up
session.modified = True
if "WLS-Response" in request.args:
if request.method != "GET":
abort(405)
if hasattr(request.args, "getlist") and \
len(request.args.getlist("WLS-Response")) != 1:
abort(400)
if "_ucam_webauth" not in session:
# we set this before redirecting - so the user has cookies
# disabled. avoid a redirect loop
abort(403)
r = self.response_class(request.args["WLS-Response"])
return self._handle_response(r)
state = self._state
if state.get("response_failure", False):
del state["response_failure"]
abort(401)
if "principal" not in state:
logger.info("unauthenticated: redirecting to WLS")
return self._redirect_to_wls()
ok, reason = self._check_expires(state)
if not ok:
logger.info("session expired (%s): redirecting to WLS", reason)
return self._redirect_to_wls()
if not self.check_authorised(state["principal"],
frozenset(state["ptags"])):
logger.info("not authorised: bad principal (%s) or ptags (%r)",
state["principal"], state["ptags"])
abort(403)
state["last"] = time()
return None
def _handle_response(self, response):
"""
Deal with a response in the query string of this request
* checks `url`, `iact`, `aauth` and `issue`
* checks that the `params` token matches the one saved in
``flask.session``
* starts a new session if necessary (see :meth:`session_new`)
* sets the 'auth failed' flag if necessary
* redirects to remove ``WLS-Response`` from the url/query string
"""
url_without_response = self._check_url(response)
if url_without_response is None:
abort(400)
token = self._params_token
if token is None or response.params != token:
logger.warning("params token mismatch: session=%s response=%s",
token, response.params)
abort(400)
if response.success:
if not response.check_iact_aauth(self.iact, self.aauth):
logger.warning("response.check_iact failed: "
"auth=%s sso=%s iact=%s aauth=%s",
response.auth, response.sso,
self.iact, self.aauth)
abort(400)
issue = timegm(response.issue.timetuple())
if not self._check_issue(issue):
abort(400)
if not self._is_new_session(response):
logger.debug("new session")
self.session_new()
self._state = \
{"principal": response.principal,
"ptags": list(response.ptags),
"issue": issue, "life": response.life,
"last": time()}
else:
self._state = {"response_failure": True}
return redirect(url_without_response, code=303)
def _check_url(self, response):
"""
Check if the response from the WLS was intended for us
Checks that the current requested url (``flask.request.url``) matches
the URL in the WLS' response, which is equal to the URL specified
in the request to the WLS and is the URL to which the client was
redirected after authentication.
"""
if not self.can_trust_request_host and request.trusted_hosts is None:
raise RuntimeError("Either set request.trusted_hosts, or sanitise "
"Host/X-Forwarded-Host headers and pass "
"can_trust_request_host = True")
wls_response_url = response.url
actual_url = request.url
# See the docs (misc/Response URLs for Cancels) for comments on how
# the WLS constructs the URL to which the user is redirected after a
# request.
# Essentially, it either appends (?|&)WLS-Response=... to the URL
# in the request, or appends ?WLS-Response=... to the URL in the
# request with the query part removed.
start = max(actual_url.rfind("?WLS-Response="),
actual_url.rfind("&WLS-Response="))
if start == -1:
logger.warning("have args['WLS-Response'] but "
"(?|&)WLS-Response not in request.url?")
return None
# check that nothing funny is going on (that the dumb parsing done
# above is correct)
response_start = start + len(".WLS-Response=")
response_part = unquote_plus(actual_url[response_start:])
if response_part != request.args["WLS-Response"]:
logger.debug("WLS-Response removal failed "
"(removed: %r, request.args: %r)",
response_part, request.args["WLS-Response"])
return None
# chop off the WLS-Response
actual_url = actual_url[:start]
if response.ver == 1:
expected_response_url, _, _ = wls_response_url.partition("?")
else:
expected_response_url = wls_response_url
# finally check that they agree.
if expected_response_url != actual_url:
logger.debug("response.url did not match url visited "
"(replay of WLS-Response to other website?) "
"expected_response_url=%r actual_url=%r",
expected_response_url, actual_url)
return None
return wls_response_url
def _check_issue(self, issue):
"""Check that `issue` (from a response) is in an acceptable range"""
now = time()
lower = now - self.issue_bounds[0]
upper = now + self.issue_bounds[1]
result = lower <= issue <= upper
if not result:
logger.debug("response had bad issue: "
"now=%s issue=%s lower=%s upper=%s",
now, issue, lower, upper)
return result
def _is_new_session(self, response):
"""Is this a new session, or reauthentication (after expiry)?"""
state = self._state
if "principal" not in state:
return False
principal = state["principal"]
ptags = frozenset(state["ptags"])
return response.principal == principal and response.ptags == ptags
def _redirect_to_wls(self):
"""Create a request and return a redirect to the WLS"""
if self._params_token is None:
self._params_token = \
base64.b64encode(os.urandom(18)).decode("ascii")
req = self.request_class(url=request.url, desc=self.desc,
aauth=self.aauth, iact=self.iact,
msg=self.msg, params=self._params_token)
return redirect(str(req), code=303)
def _get_expires(self, state):
"""Get a list of (reason, when) tuples describing expiration times"""
expires = []
if self.max_life is not None:
expires.append(("config max life", state["issue"] + self.max_life))
if self.use_wls_life and state["life"] is not None:
expires.append(("wls life", state["issue"] + state["life"]))
if self.inactive_timeout is not None:
expires.append(("inactive", state["last"] + self.inactive_timeout))
return expires
def _check_expires(self, state):
"""Check whether the current authentication has expired"""
expires = self._get_expires(state)
now = time()
for reason, when in expires:
if when < now:
return False, reason
else:
return True, None
[docs] def check_authorised(self, principal, ptags):
"""
Check if an authenticated user is authorised.
The default implementation requires the principal to be in
the whitelist :attr:`require_principal` (if it is not ``None``, in
which case any principal is allowed) and the intersection of
:attr:`require_ptags` and `ptags` to be non-empty (unless
:attr:`require_ptags` is ``None``, in which case any ptags
(or no `ptags` at all) is permitted).
Note that the default value of :attr:`require_ptags` in
:class:`raven.flask_glue.AuthDecorator` is ``{"current"}``.
"""
if self.require_principal is not None:
if principal not in self.require_principal:
return False
if self.require_ptags is not None:
if self.require_ptags & ptags == set():
return False
return True
[docs] def session_new(self):
"""
Called when a new user authenticates
More specifically, when :attr:`principal` or :attr:`ptags` changes.
"""
pass