Source code for morango.api.permissions

import json

from django.contrib.auth import authenticate
from django.contrib.auth import get_user_model
from rest_framework import authentication
from rest_framework import exceptions
from rest_framework import permissions

from morango.models.core import TransferSession
from morango.utils import SETTINGS

[docs] class BasicMultiArgumentAuthentication(authentication.BasicAuthentication): """ HTTP Basic authentication against username (plus any other optional arguments) and password. """
[docs] def authenticate_credentials(self, userargs, password, request=None): """ Authenticate the userargs and password against Django auth backends. The "userargs" string may be just the username, or a querystring-encoded set of params. """ credentials = {"password": password} if "=" not in userargs: # if it doesn't seem to be in querystring format, just use it as the username credentials[get_user_model().USERNAME_FIELD] = userargs else: # parse out the user args from querystring format into the credentials dict for arg in userargs.split("&"): key, val = arg.split("=") credentials[key] = val # authenticate the user via Django's auth backends user = authenticate(request=request, **credentials) if user is None: raise exceptions.AuthenticationFailed("Invalid credentials.") if not user.is_active: raise exceptions.AuthenticationFailed("User inactive or deleted.") return (user, None)
[docs] class CertificatePermissions(permissions.BasePermission):
[docs] def has_permission(self, request, view): # the Django REST Framework browseable API calls this to see what buttons to show if not return True # we allow anyone to read certificates if request.method in permissions.SAFE_METHODS: return True # other than read (or other safe) operations, we only allow POST if request.method == "POST": # check that the authenticated user has the appropriate permissions to create the certificate if hasattr(request.user, "has_morango_certificate_scope_permission"): scope_definition_id ="scope_definition") scope_params = json.loads("scope_params")) if ( scope_definition_id and scope_params and isinstance(scope_params, dict) ): return request.user.has_morango_certificate_scope_permission( scope_definition_id, scope_params ) return False return False
[docs] class CertificatePushPermissions(permissions.BasePermission): message = "Server does not allow certificate pushing."
[docs] def has_permission(self, request, view): if SETTINGS.ALLOW_CERTIFICATE_PUSHING: return True return False
[docs] class BufferPermissions(permissions.BasePermission):
[docs] def has_permission(self, request, view): if request.method == "GET": sesh_id = request.query_params.get("transfer_session_id") if not sesh_id: return False if not TransferSession.objects.filter( id=sesh_id, active=True, push=False ).exists(): return False return True return request.method in ("POST",)