from django.utils.translation import gettext_lazy as _
from rest_framework import exceptions
from rest_framework.authentication import BaseAuthentication, get_authorization_header
from durin.models import AuthToken
from durin.settings import durin_settings
from durin.signals import token_expired
# try to import memoize
memoize = None
try:
from cache_memoize import cache_memoize as memoize
except ImportError:
pass
[docs]class TokenAuthentication(BaseAuthentication):
"""
This authentication scheme uses Durin's
:class:`durin.models.AuthToken` for authentication.
Similar to `DRF's authentication system
<http://www.django-rest-framework.org/api-guide/authentication/>`__,
it overrides it a bit to
accomodate that tokens can be expired.
If successful,
- ``request.user`` will be a django ``User`` instance
- ``request.auth`` will be an ``AuthToken`` instance
"""
model = AuthToken
def authenticate(self, request):
auth = get_authorization_header(request).split()
prefix = durin_settings.AUTH_HEADER_PREFIX.encode()
if not auth or auth[0].lower() != prefix.lower():
return None
if len(auth) == 1:
msg = _("Invalid token header. No credentials provided.")
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _("Invalid token header. " "Token string should not contain spaces.")
raise exceptions.AuthenticationFailed(msg)
return self.authenticate_credentials(auth[1])
@classmethod
def authenticate_credentials(cls, token):
"""
Verify that the given token exists in the database
"""
token_str = token.decode("utf-8")
try:
# read settings
to_select = durin_settings.AUTHTOKEN_SELECT_RELATED_LIST
# get AuthToken object
if isinstance(to_select, list):
auth_token = AuthToken.objects.select_related(*to_select).get(
token=token_str
)
else:
auth_token = AuthToken.objects.get(token=token_str)
# validate token
if cls._cleanup_token(auth_token):
e = _("The given token has expired.")
raise exceptions.AuthenticationFailed(e)
return cls.validate_user(auth_token)
except exceptions.AuthenticationFailed as e:
raise exceptions.AuthenticationFailed(e)
except Exception:
msg = _("Invalid token.")
raise exceptions.AuthenticationFailed(msg)
@staticmethod
def validate_user(auth_token: AuthToken):
if not auth_token.user.is_active:
raise exceptions.AuthenticationFailed(_("User inactive or deleted."))
return (auth_token.user, auth_token)
def authenticate_header(self, request):
return durin_settings.AUTH_HEADER_PREFIX
@classmethod
def _cleanup_token(cls, auth_token: AuthToken):
if auth_token.expiry is not None:
if auth_token.has_expired:
username = auth_token.user.get_username()
auth_token.delete()
token_expired.send(sender=cls, username=username, source="auth_token")
return True
return False
# if memoize is available, create another token authentication class
# which uses django-memoize for caching
if memoize:
[docs] class CachedTokenAuthentication(TokenAuthentication):
"""
Similar to ``TokenAuthentication`` but uses
`django-cache-memoize <https://github.com/peterbe/django-cache-memoize>`__
as cache backend for faster lookups.
The cache timeout is configurable by setting the
``REST_DURIN["TOKEN_CACHE_TIMEOUT"]`` under your app's ``settings.py``.
**How To Enable:**
1. Install django-cache-memoize
.. parsed-literal::
pip install django-cache-memoize
2. Then you need to use ``CachedTokenAuthentication``
instead of ``TokenAuthentication``.
"""
@classmethod
@memoize(timeout=int(durin_settings.TOKEN_CACHE_TIMEOUT))
def authenticate_credentials(cls, token):
return super().authenticate_credentials(token)
def __repr__(self):
return self.__class__.__name__