initial commit

main
through-your-tears 9 months ago
parent 74a982a7d6
commit 932c6d36be

@ -0,0 +1,31 @@
version: '3.3'
services:
web:
restart: on-failure
build:
context: ./src
dockerfile: Dockerfile
command: gunicorn conf.wsgi:application --bind 0.0.0.0:8000
volumes:
- static_volume:/home/app/web/static
- media_volume:/home/app/web/media
ports:
- 8000:8000
depends_on:
- db
db:
restart: always
image: kartoza/postgis:13.0
environment:
- POSTGRES_USER=my_user
- POSTGRES_PASSWORD=mysecretpassword
- POSTGRES_DB=db
ports:
- 5432:5432
redis:
image: redis:5-alpine
volumes:
postgres_data:
static_volume:
media_volume:

@ -0,0 +1,69 @@
###########
# BUILDER #
###########
# pull official base image
FROM python:3.10-slim as builder
# set work directory
WORKDIR /usr/src/app
# set environment variables
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
RUN apt-get update -y \
&& apt-get install gcc python3-dev musl-dev binutils libproj-dev gdal-bin g++ -y
# lint
RUN pip install --upgrade pip
COPY . .
# install dependencies
COPY ./requirements.txt .
RUN pip wheel --no-cache-dir --no-deps --wheel-dir /usr/src/app/wheels -r requirements.txt
#########
# FINAL #
#########
# pull official base image
FROM python:3.10-slim
# create directory for the app user
RUN mkdir -p /home/app
# create the app user
RUN addgroup --system app && adduser --system app --ingroup app
# create the appropriate directories
ENV HOME=/home/app
ENV APP_HOME=/home/app/web
RUN mkdir $APP_HOME
RUN mkdir $APP_HOME/static
RUN mkdir $APP_HOME/media
WORKDIR $APP_HOME
# install dependencies
RUN apt-get update -y && apt-get install gcc python3-dev musl-dev binutils libproj-dev gdal-bin g++ libpq-dev -y
COPY --from=builder /usr/src/app/wheels /wheels
COPY --from=builder /usr/src/app/requirements.txt .
RUN pip install --no-cache /wheels/*
# copy entrypoint.prod.sh
COPY ./entrypoint.prod.sh .
RUN sed -i 's/\r$//g' $APP_HOME/entrypoint.prod.sh
RUN chmod +x $APP_HOME/entrypoint.prod.sh
# copy project
COPY . $APP_HOME
# chown all the files to the app user
RUN chown -R app:app $APP_HOME
# change to the app user
USER app
# run entrypoint.prod.sh
ENTRYPOINT ["/home/app/web/entrypoint.prod.sh"]

@ -0,0 +1,16 @@
"""
ASGI config for core project.
It exposes the ASGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/5.0/howto/deployment/asgi/
"""
import os
from django.core.asgi import get_asgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'conf.settings')
application = get_asgi_application()

@ -0,0 +1,28 @@
from rest_framework.views import exception_handler
def core_exception_handler(exc, context):
response = exception_handler(exc, context)
handlers = {
'ValidationError': _handle_generic_error
}
exception_class = exc.__class__.__name__
if exception_class in handlers:
return handlers[exception_class](exc, context, response)
if response:
if response.data.get('detail') == 'Токен испорчен' or response.data.get('detail') == (
'Пользователь соответствующий данному токену не найден.') or response.data.get('detail') == (
'Учетные данные не были предоставлены.'
):
response.status_code = 401
return response
def _handle_generic_error(exc, context, response):
response.data = {
'errors': response.data
}
return response

@ -0,0 +1,170 @@
"""
Django settings for core project.
Generated by 'django-admin startproject' using Django 5.0.4.
For more information on this file, see
https://docs.djangoproject.com/en/5.0/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/5.0/ref/settings/
"""
from pathlib import Path
import dj_database_url
# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/5.0/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = 'django-insecure-*48y!hsr!tup+5z2r64o6c6mw%axkh@#bx62pes8=r7@_+7qtv'
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
ALLOWED_HOSTS = ['*']
# Application definition
INSTALLED_APPS = [
'jwtauth',
'organizations',
'jwt',
'corsheaders',
'rest_framework',
'drf_yasg',
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'corsheaders.middleware.CorsMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
CORS_ALLOW_ALL_ORIGINS = True
CORS_ALLOW_CREDENTIALS = True
ROOT_URLCONF = 'conf.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
WSGI_APPLICATION = 'conf.wsgi.application'
# Database
# https://docs.djangoproject.com/en/5.0/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.contrib.gis.db.backends.postgis',
'NAME': 'db',
'USER': 'my_user',
'PASSWORD': 'mysecretpassword',
'HOST': 'db',
'PORT': '5432'
}
}
AUTH_USER_MODEL = 'jwtauth.CustomUser'
# Password validation
# https://docs.djangoproject.com/en/5.0/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
# Internationalization
# https://docs.djangoproject.com/en/5.0/topics/i18n/
LANGUAGE_CODE = 'ru-ru'
TIME_ZONE = 'Europe/Moscow'
USE_I18N = True
USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/5.0/howto/static-files/
STATIC_URL = 'static/'
STATIC_ROOT = BASE_DIR / 'static/'
MEDIA_URL = 'media/'
MEDIA_ROOT = BASE_DIR / 'media'
# Default primary key field type
# https://docs.djangoproject.com/en/5.0/ref/settings/#default-auto-field
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
REST_FRAMEWORK = {
'DEFAULT_SCHEMA_CLASS': 'rest_framework.schemas.coreapi.AutoSchema',
'EXCEPTION_HANDLER': 'conf.exceptions.core_exception_handler',
# 'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination',
'PAGE_SIZE': 30,
'NON_FIELD_ERRORS_KEY': 'error',
'DEFAULT_AUTHENTICATION_CLASSES': (
'jwtauth.backends.JWTAuthentication',
),
}
CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": "redis://redis:6379/0",
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
}
}
}
SESSION_ENGINE = "django.contrib.sessions.backends.cache"
SESSION_CACHE_ALIAS = "default"
CACHE_TTL = 60 * 1

@ -0,0 +1,35 @@
"""
URL configuration for core project.
The `urlpatterns` list routes URLs to views. For more information please see:
https://docs.djangoproject.com/en/5.0/topics/http/urls/
Examples:
Function views
1. Add an import: from my_app import views
2. Add a URL to urlpatterns: path('', views.home, name='home')
Class-based views
1. Add an import: from other_app.views import Home
2. Add a URL to urlpatterns: path('', Home.as_view(), name='home')
Including another URLconf
1. Import the include() function: from django.urls import include, path
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
"""
from django.conf import settings
from django.conf.urls.static import static
from django.contrib import admin
from django.urls import path, include, re_path
from core.openapi import schema_view
api_urls = [
path('organizations/', include('organizations.urls')),
path('user/', include('jwtauth.urls'))
]
urlpatterns = [
re_path(r'^docs/$', schema_view.with_ui('swagger', cache_timeout=0), name='schema-swagger-ui'),
re_path(r'^redoc/$', schema_view.with_ui('redoc', cache_timeout=0), name='schema-redoc'),
path('admin/', admin.site.urls),
path('api/', include(api_urls))
] + static(settings.STATIC_URL, document_root=settings.STATIC_ROOT)

@ -0,0 +1,16 @@
"""
WSGI config for core project.
It exposes the WSGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/5.0/howto/deployment/wsgi/
"""
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'conf.settings')
application = get_wsgi_application()

@ -0,0 +1,7 @@
from rest_framework.exceptions import APIException
class CodeExpired(APIException):
status_code = 400
default_detail = 'Email code has been expired'
default_code = 'code_expired'

@ -0,0 +1,19 @@
from django.contrib.gis.geos.point import Point
from rest_framework import serializers
class LocationField(serializers.Field):
default_error_messages = {
"invalid": "Value must be valid list with lang and lat values."
}
def to_representation(self, value):
return value.coords
def to_internal_value(self, data):
try:
point = Point((data[0], data[1]))
except Exception:
self.fail("invalid")
return point

@ -0,0 +1,24 @@
from drf_yasg import openapi
from drf_yasg.generators import OpenAPISchemaGenerator
from drf_yasg.views import get_schema_view
from rest_framework import permissions
class BothHttpAndHttpsSchemaGenerator(OpenAPISchemaGenerator):
def get_schema(self, request=None, public=False):
schema = super().get_schema(request, public)
schema.schemes = ["http", "https"]
return schema
schema_view = get_schema_view(
openapi.Info(
title="",
default_version='v1',
description=" API",
license=openapi.License(name="BSD License"),
),
public=True,
generator_class=BothHttpAndHttpsSchemaGenerator,
permission_classes=[permissions.AllowAny],
)

@ -0,0 +1,10 @@
from rest_framework.permissions import BasePermission, SAFE_METHODS
class IsAuthorOrReadOnly(BasePermission):
def has_object_permission(self, request, view, obj):
if request.method in SAFE_METHODS:
return True
return obj.user == request.user

@ -0,0 +1,31 @@
from django.core.exceptions import ObjectDoesNotExist
class BaseRepository:
model = None
@classmethod
def get(cls, pk):
try:
return cls.model.objects.get(pk=pk)
except ObjectDoesNotExist:
return None
@classmethod
def create(cls, **kwargs):
return cls.model.objects.create(**kwargs)
@classmethod
def update(cls, instance, **kwargs):
for key, value in kwargs.items():
setattr(instance, key, value)
instance.save()
return instance
@classmethod
def delete(cls, instance):
instance.delete()
@classmethod
def all(cls):
return cls.model.objects.all()

@ -0,0 +1,14 @@
#!/bin/sh
if [ "$DATABASE" = "postgres" ]
then
echo "Waiting for postgres..."
while ! nc -z $SQL_HOST $SQL_PORT; do
sleep 0.1
done
echo "PostgreSQL started"
fi
exec "$@"

@ -0,0 +1,42 @@
from django.contrib import admin
from django.contrib.auth.models import Group
from django.contrib.auth.admin import UserAdmin as BaseUserAdmin
from .forms import UserCreationForm, UserChangeForm
from .models import CustomUser, Profile, RefreshToken
class UserAdmin(BaseUserAdmin):
# The forms to add and change user instances
form = UserChangeForm
add_form = UserCreationForm
# The fields to be used in displaying the User model.
# These override the definitions on the base UserAdmin
# that reference specific fields on custom_auth.User.
list_display = ('email', 'is_staff')
list_filter = ('is_staff',)
fieldsets = (
(None, {'fields': ('email', 'password', 'is_staff')}),
)
# add_fieldsets is not a standard ModelAdmin attribute. UserAdmin
# overrides get_fieldsets to use this attribute when creating a user.
add_fieldsets = (
(None, {
'classes': ('wide',),
'fields': ('email', 'password1', 'password2', 'is_staff', 'is_superuser'),
}),
)
search_fields = ('email', 'is_staff')
ordering = ('email',)
filter_horizontal = ()
# Now register the new UserAdmin...
admin.site.register(CustomUser, UserAdmin)
# ... and, since we're not using Django's built-in permissions,
# unregister the Group model from admin.
admin.site.unregister(Group)
admin.site.register(RefreshToken)
admin.site.register(Profile)

@ -0,0 +1,6 @@
from django.apps import AppConfig
class AuthConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'jwtauth'

@ -0,0 +1,48 @@
import datetime
from django.conf import settings
import jwt
from rest_framework import authentication, exceptions
from .models import CustomUser
class JWTAuthentication(authentication.BaseAuthentication):
authentication_header_prefix = 'Token'
def authenticate(self, request):
request.user = None
auth_header = authentication.get_authorization_header(request).split()
auth_header_prefix = self.authentication_header_prefix.lower()
if not auth_header:
return
if len(auth_header) == 1:
return
elif len(auth_header) > 2:
return
prefix = auth_header[0].decode('utf-8')
token = auth_header[1].decode('utf-8')
if prefix.lower() != auth_header_prefix:
return
return self.authenticate_credentials(token)
def authenticate_credentials(self, token):
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256'])
except Exception:
return None
try:
user = CustomUser.objects.get(pk=payload['id'])
except CustomUser.DoesNotExist:
msg = 'Пользователь соответствующий данному токену не найден.'
raise exceptions.AuthenticationFailed(msg)
return user, token

@ -0,0 +1,50 @@
from django import forms
from django.forms.models import ModelForm
from django.contrib.auth.forms import ReadOnlyPasswordHashField
from django.core.exceptions import ValidationError
from.models import CustomUser
class UserCreationForm(ModelForm):
"""A form for creating new users. Includes all the required
fields, plus a repeated password."""
password1 = forms.CharField(label='Пароль', widget=forms.PasswordInput)
password2 = forms.CharField(label='Подтверждение пароля', widget=forms.PasswordInput)
class Meta:
model = CustomUser
fields = ('email', 'is_staff', 'is_superuser')
def clean_password2(self):
# Check that the two password entries match
password1 = self.cleaned_data.get("password1")
password2 = self.cleaned_data.get("password2")
if password1 and password2 and password1 != password2:
raise ValidationError("Пароли не совпадают")
return password2
def save(self, commit=True):
# Save the provided password in hashed format
user = super().save(commit=False)
user.set_password(self.cleaned_data["password1"])
if commit:
user.save()
return user
class UserChangeForm(ModelForm):
"""A form for updating users. Includes all the fields on
the user, but replaces the password field with admin's
password hash display field.
"""
password = ReadOnlyPasswordHashField()
class Meta:
model = CustomUser
fields = ('email', 'password', 'is_staff')
def clean_password(self):
# Regardless of what the user provides, return the initial value.
# This is done here, rather than on the field, because the
# field does not have access to the initial value
return self.initial["password"]

@ -0,0 +1,28 @@
from django.contrib.auth.base_user import BaseUserManager
class CustomUserManager(BaseUserManager):
use_in_migrations = True
def create_user(self, email, password=None):
if not email:
raise ValueError(_('The email must be set'))
email = self.normalize_email(email)
user = self.model(
email=email,
)
user.set_password(password)
user.save(using=self._db)
return user
def create_superuser(self, email, password=None):
user = self.create_user(
email,
password=password
)
user.is_staff = True
user.is_superuser = True
user.save(using=self._db)
return user

@ -0,0 +1,65 @@
from datetime import datetime, timedelta
from django.conf import settings
from django.contrib.auth.base_user import AbstractBaseUser
from django.contrib.auth.models import PermissionsMixin
from django.db import models
from pytz import timezone
from .managers import CustomUserManager
from .token_generators import generate_jwt
# Create your models here.
class CustomUser(AbstractBaseUser, PermissionsMixin):
class Meta:
verbose_name = 'User'
verbose_name_plural = 'Users'
email = models.EmailField(verbose_name='email', unique=True)
is_staff = models.BooleanField(default=False, verbose_name='Administrator')
USERNAME_FIELD = 'email'
objects = CustomUserManager()
def __str__(self):
return self.email
@property
def access_token(self):
return generate_jwt(self.pk)
class RefreshToken(models.Model):
token = models.CharField(max_length=255)
expires = models.DateTimeField(default=datetime.now)
user = models.ForeignKey(CustomUser, on_delete=models.CASCADE)
used = models.BooleanField(default=False)
def save(
self, force_insert=False, force_update=False, using=None, update_fields=None
):
self.expires = datetime.now(timezone(settings.TIME_ZONE)) + timedelta(**settings.REFRESH_TOKEN_LIFETIME)
super().save(force_insert, force_update, using, update_fields)
def __str__(self):
return self.token
class Profile(models.Model):
user = models.OneToOneField(CustomUser, on_delete=models.CASCADE)
first_name = models.CharField(max_length=32, default='', blank=True)
last_name = models.CharField(max_length=32, default='', blank=True)
title = models.CharField(max_length=32, default='', blank=True)
avatar = models.ImageField(upload_to='avatar/', null=True, blank=True, default=None)
friends = models.ManyToManyField('self', blank=True, through='Friend')
class Friend(models.Model):
first = models.ForeignKey(Profile, on_delete=models.CASCADE)
second = models.ForeignKey(Profile, on_delete=models.CASCADE)
approved = models.BooleanField(default=False)

@ -0,0 +1,24 @@
import json
from rest_framework.renderers import JSONRenderer
class CustomUserJSONRenderer(JSONRenderer):
charset = 'utf-8'
def render(self, data, accepted_media_type=None, renderer_context=None):
try:
errors = data.get('errors', None)
access_token = data.get('access_token', None)
if errors is not None:
return super(CustomUserJSONRenderer, self).render(data)
if access_token is not None and isinstance(access_token, bytes):
data['access_token'] = access_token.decode('utf-8')
return json.dumps(
data
)
except:
return None

@ -0,0 +1,39 @@
from core.repositories import BaseRepository, ObjectDoesNotExist
from .models import CustomUser, Profile, RefreshToken
class UserRepository(BaseRepository):
model = CustomUser
@classmethod
def get(cls, email):
try:
return cls.model.objects.get(email=email)
except ObjectDoesNotExist:
return None
@classmethod
def update(cls, instance, **kwargs):
password = kwargs.pop('password', None)
for key, value in kwargs.items():
setattr(instance, key, value)
if password:
instance.set_password(password)
instance.save()
return instance
class ProfileRepository(BaseRepository):
model = Profile
@classmethod
def get_by_user(cls, user):
return cls.model.objects.get(user=user)
class RefreshTokenRepository(BaseRepository):
model = RefreshToken
@classmethod
def get(cls, token):
return cls.model.objects.get(token=token)

@ -0,0 +1,118 @@
from django.contrib.auth import authenticate
from rest_framework import serializers
from .models import CustomUser, Profile, RefreshToken
from .repositories import UserRepository, ProfileRepository, RefreshTokenRepository
from .token_generators import generate_rt
class RegistrationSerializer(serializers.ModelSerializer):
password = serializers.CharField(
max_length=128,
min_length=8,
write_only=True
)
access_token = serializers.CharField(max_length=255, read_only=True)
refresh_token = serializers.CharField(max_length=255, read_only=True)
class Meta:
model = CustomUser
fields = ['email', 'password', 'access_token', 'refresh_token']
def create(self, validated_data):
user = UserRepository.create(**validated_data)
token = RefreshTokenRepository.create(
token=generate_rt(),
user=user
)
return {
'username': user.username,
'access_token': user.access_token,
'refresh_token': token.token
}
class LoginSerializer(serializers.Serializer):
email = serializers.CharField(max_length=255)
password = serializers.CharField(max_length=128, write_only=True)
access_token = serializers.CharField(max_length=255, read_only=True)
refresh_token = serializers.StringRelatedField(read_only=True)
def validate(self, attrs):
email = attrs.get('email', None)
password = attrs.get('password', None)
if email is None:
raise serializers.ValidationError(
str(attrs)
)
if password is None:
raise serializers.ValidationError(
'A password is required to log in'
)
user = authenticate(username=email, password=password)
if user is None:
raise serializers.ValidationError(
'A user with this username and password was not found'
)
token = RefreshTokenRepository.create(
token=generate_rt(),
user=user
)
return {
"email": email,
'access_token': user.access_token,
'refresh_token': token.token
}
class CustomUserSerializer(serializers.ModelSerializer):
password = serializers.CharField(
max_length=128,
min_length=8,
write_only=True
)
class Meta:
model = CustomUser
fields = ('email', 'is_staff', 'password')
read_only_fields = ('is_staff',)
def update(self, instance, validated_data):
password = validated_data.pop('password', None)
for key, value in validated_data.items():
setattr(instance, key, value)
if password is not None:
instance.set_password(password)
instance.save()
return instance
class ProfileSerializer(serializers.ModelSerializer):
user = serializers.StringRelatedField()
friends = serializers.SerializerMethodField(read_only=True)
subscribers = serializers.SerializerMethodField(read_only=True)
def get_friends(self, obj):
return obj.friends.filter(approved=True).count()
def get_subscribers(self, obj):
return obj.friends.filter(approved=False).count()
class Meta:
model = Profile
fields = ('user', 'first_name', 'last_name', 'title', 'avatar', 'friends', 'subscribers')

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

@ -0,0 +1,23 @@
import datetime, random, string
import pytz
from django.conf import settings
import jwt
def generate_jwt(pk):
dt = datetime.datetime.now(tz=pytz.timezone('Europe/Moscow')) + datetime.timedelta(
seconds=settings.TOKEN_LIFETIME['seconds'],
minutes=settings.TOKEN_LIFETIME['minutes'],
hours=settings.TOKEN_LIFETIME['hours'],
days=settings.TOKEN_LIFETIME['days']
)
token = jwt.api_jwt.encode({
'id': pk,
'exp': dt,
}, settings.SECRET_KEY, algorithm='HS256')
return token
def generate_rt():
return ''.join([random.choice(string.ascii_letters) for i in range(128)])

@ -0,0 +1,16 @@
from django.urls import path
from rest_framework.routers import DefaultRouter
from .views import RegistrationAPIView, LoginAPIView, CustomUserRetrieveUpdateAPIView, RefreshAPIView, ProfileRetrieveUpdateDestroyAPIView, ProfileRetrieveAPIView, ProfileListAPIView
app_name = 'jwtauth'
urlpatterns = [
path('registration/', RegistrationAPIView.as_view()),
path('login/', LoginAPIView.as_view()),
path('refresh/', RefreshAPIView.as_view()),
path('profile/<int:id>', ProfileRetrieveAPIView.as_view()),
path('profiles/', ProfileListAPIView.as_view()),
path('profile/', ProfileRetrieveUpdateDestroyAPIView.as_view()),
path('', CustomUserRetrieveUpdateAPIView.as_view()),
]

@ -0,0 +1,217 @@
from datetime import datetime
from django.conf import settings
from django.shortcuts import get_object_or_404
from drf_yasg.utils import swagger_auto_schema
from drf_yasg import openapi
from pytz import timezone
from rest_framework import status
from rest_framework.generics import RetrieveUpdateAPIView, RetrieveUpdateDestroyAPIView, RetrieveAPIView, CreateAPIView, ListAPIView
from rest_framework.pagination import PageNumberPagination
from rest_framework.permissions import AllowAny, IsAuthenticated, IsAdminUser
from rest_framework.response import Response
from rest_framework.views import APIView
from core.permissions import IsAuthorOrReadOnly
from .models import CustomUser, RefreshToken
from .repositories import RefreshTokenRepository, ProfileRepository
from .serializers import RegistrationSerializer, LoginSerializer, CustomUserSerializer, ProfileSerializer
from .renderers import CustomUserJSONRenderer
from .token_generators import generate_rt, generate_jwt
# Create your views here.
class RefreshAPIView(APIView):
"""
Refreshing old access token
"""
permission_classes = (AllowAny,)
@swagger_auto_schema(request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'refresh_token': openapi.Schema(type=openapi.TYPE_STRING, description='refresh_token'),
},
required=['refresh_token']
),
responses={
status.HTTP_200_OK: openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'access_token': openapi.Schema(type=openapi.TYPE_STRING),
'refresh_token': openapi.Schema(type=openapi.TYPE_STRING)
}
),
}
)
def post(self, request):
try:
refresh_token = request.data.get('refresh_token')
old_token = RefreshTokenRepository.get(token=refresh_token)
user = old_token.user
except RefreshToken.DoesNotExist:
return Response({
'error': 'Token is not valid'
}, status=status.HTTP_400_BAD_REQUEST)
if not old_token.used and old_token.expires > datetime.now(tz=timezone(settings.TIME_ZONE)):
token = RefreshToken.objects.create(
token=generate_rt(),
user=user
)
old_token.delete()
data = {
'access_token': generate_jwt(user.pk),
'refresh_token': token.token
}
return Response(data, status=status.HTTP_200_OK)
else:
return Response(
data={
'error': 'Token expired',
},
status=status.HTTP_400_BAD_REQUEST
)
class RegistrationAPIView(APIView):
permission_classes = (AllowAny,)
serializer_class = RegistrationSerializer
renderer_classes = (CustomUserJSONRenderer,)
@swagger_auto_schema(request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'email': openapi.Schema(type=openapi.TYPE_STRING, description='email'),
'password': openapi.Schema(type=openapi.TYPE_STRING, description='user password')
}
),
responses={
status.HTTP_200_OK: openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'email': openapi.Schema(type=openapi.TYPE_STRING),
'access_token': openapi.Schema(type=openapi.TYPE_STRING),
'refresh_token': openapi.Schema(type=openapi.TYPE_STRING)
}
),
}
)
def post(self, request):
user = request.data
serializer = self.serializer_class(data=user)
serializer.is_valid(raise_exception=True)
serializer.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)
class LoginAPIView(APIView):
permission_classes = (AllowAny,)
serializer_class = LoginSerializer
renderer_classes = (CustomUserJSONRenderer,)
@swagger_auto_schema(request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'email': openapi.Schema(type=openapi.TYPE_STRING, description='email'),
'password': openapi.Schema(type=openapi.TYPE_STRING, description='user password')
},
),
responses={
status.HTTP_200_OK: openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'email': openapi.Schema(type=openapi.TYPE_STRING),
'access_token': openapi.Schema(type=openapi.TYPE_STRING),
'refresh_token': openapi.Schema(type=openapi.TYPE_STRING)
}
),
}
)
def post(self, request):
user = request.data
serializer = self.serializer_class(data=user)
serializer.is_valid(raise_exception=True)
return Response(serializer.data, status=status.HTTP_200_OK)
class CustomUserRetrieveUpdateAPIView(RetrieveUpdateAPIView):
permission_classes = (IsAuthenticated,)
renderer_classes = (CustomUserJSONRenderer,)
serializer_class = CustomUserSerializer
def retrieve(self, request, *args, **kwargs):
serializer = self.serializer_class(request.user)
return Response(serializer.data, status=status.HTTP_200_OK)
def update(self, request, *args, **kwargs):
serializer_data = request.data
serializer = self.serializer_class(
request.user, data=serializer_data, partial=True
)
serializer.is_valid(raise_exception=True)
serializer.save()
return Response(serializer.data, status=status.HTTP_200_OK)
# class ChangePasswordAPIView(APIView):
#
# permission_classes = (IsAuthenticated,)
#
# @swagger_auto_schema(request_body=openapi.Schema(
# type=openapi.TYPE_OBJECT,
# properties={
# 'new_password': openapi.Schema(type=openapi.TYPE_STRING, description='refresh_token'),
# },
# required=['new_password']
# ),
# responses={
# status.HTTP_200_OK: openapi.Schema(
# type=openapi.TYPE_OBJECT,
# properties={
# 'message': 'password changed',
# }
# ),
# status.HTTP_401_UN
# }
# )
# def post(self, request):
# pass
class ProfileRetrieveUpdateDestroyAPIView(CreateAPIView, RetrieveUpdateDestroyAPIView):
permission_classes = (IsAuthenticated,)
serializer_class = ProfileSerializer
queryset = ProfileRepository.all()
def get_queryset(self):
return self.queryset
def get_object(self):
queryset = self.get_queryset()
obj = get_object_or_404(queryset, user=self.request.user)
return obj
def perform_create(self, serializer):
serializer.save(user=self.request.user)
def perform_update(self, serializer):
serializer.save(user=self.request.user)
class ProfileRetrieveAPIView(RetrieveAPIView):
permission_classes = (IsAuthorOrReadOnly,)
serializer_class = ProfileSerializer
queryset = ProfileRepository.all()
class ProfileListAPIView(ListAPIView):
queryset = ProfileRepository.all()
serializer_class = ProfileSerializer
permission_classes = [IsAdminUser]
pagination_class = PageNumberPagination

@ -0,0 +1,22 @@
#!/usr/bin/env python
"""Django's command-line utility for administrative tasks."""
import os
import sys
def main():
"""Run administrative tasks."""
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'conf.settings')
try:
from django.core.management import execute_from_command_line
except ImportError as exc:
raise ImportError(
"Couldn't import Django. Are you sure it's installed and "
"available on your PYTHONPATH environment variable? Did you "
"forget to activate a virtual environment?"
) from exc
execute_from_command_line(sys.argv)
if __name__ == '__main__':
main()

@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

@ -0,0 +1,6 @@
from django.apps import AppConfig
class OrganizationConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'organizations'

@ -0,0 +1,34 @@
from django.db import models
from django.contrib.gis.db.models import PointField
# Create your models here.
class Region(models.Model):
code = models.PositiveIntegerField()
name = models.CharField(max_length=64)
class Location(models.Model):
coords = PointField(db_index=True)
region = models.ForeignKey(Region, on_delete=models.SET_NULL, null=True)
address = models.CharField(max_length=512, blank=True, null=True)
class Category(models.Model):
name = models.CharField(max_length=64)
icon = models.ImageField(upload_to='icons/category/')
class Organization(models.Model):
location = models.ForeignKey(Location, on_delete=models.SET_NULL, null=True)
name = models.CharField(max_length=256)
phone = models.CharField(max_length=20)
website = models.URLField()
description = models.TextField()
owner = models.ForeignKey('jwtauth.CustomUser', on_delete=models.SET_NULL, null=True)
class OrganizationImage(models.Model):
image = models.ImageField(upload_to='images/organizations')
organization = models.ForeignKey(Organization, on_delete=models.CASCADE, related_name='organization_images')

@ -0,0 +1,30 @@
from .models import Category, Location, Organization, OrganizationImage, Region
from core.repositories import BaseRepository
class CategoryRepository(BaseRepository):
model = Category
class LocationRepository(BaseRepository):
model = Location
class OrganizationRepository(BaseRepository):
model = Organization
@classmethod
def search(cls, name):
return cls.model.objects.filter(name__icontains=name)
@classmethod
def get_by_category(cls, category_id):
return cls.model.objects.filter(category__pk=category_id)
class OrganizationImagesRepository(BaseRepository):
model = OrganizationImage
class RegionRepository(BaseRepository):
model = Region

@ -0,0 +1,41 @@
from rest_framework import serializers
from core.fields import LocationField
from jwtauth.serializers import CustomUserSerializer
from .models import Category, Location, Organization, OrganizationImage
from .repositories import OrganizationImagesRepository
class CategorySerializer(serializers.ModelSerializer):
class Meta:
model = Category
fields = '__all__'
class LocationSerializer(serializers.ModelSerializer):
coords = LocationField()
region = serializers.StringRelatedField()
class Meta:
model = Location
fields = ('coords', 'region', 'address')
class OrganizationListSerializer(serializers.ModelSerializer):
location = LocationSerializer()
images = serializers.StringRelatedField(many=True)
class Meta:
model = Organization
fields = ('location', 'images', 'name', 'phone', 'description', 'website')
class OrganizationCreateSerializer(serializers.ModelSerializer):
location = LocationSerializer()
images = serializers.PrimaryKeyRelatedField(many=True, queryset=OrganizationImagesRepository)
owner = serializers.StringRelatedField()
class Meta:
model = Organization
fields = ('location', 'images', 'name', 'phone', 'website', 'description', 'owner')

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

@ -0,0 +1,13 @@
from django.urls import path
from rest_framework.routers import DefaultRouter
from .views import OrganizationViewSet, OrganizationSearchListAPIView, OrganizationByCategoryListAPIView
app_name = "organizations"
router = DefaultRouter()
router.register('', OrganizationViewSet, 'organizations')
urlpatterns = [
path('category/<int:id>/organizations/', OrganizationByCategoryListAPIView.as_view()),
path('search/<str:name>/', OrganizationSearchListAPIView.as_view()),
] + router.urls

@ -0,0 +1,79 @@
from django.conf import settings
from django.utils.decorators import method_decorator
from django.views.decorators.cache import cache_page
from django.views.decorators.vary import vary_on_cookie
from rest_framework.generics import ListAPIView
from rest_framework.pagination import PageNumberPagination
from rest_framework.viewsets import ModelViewSet
from core.permissions import IsAuthorOrReadOnly
from .repositories import CategoryRepository, OrganizationRepository
from .serializers import CategorySerializer, OrganizationListSerializer, OrganizationCreateSerializer
# Create your views here.
class OrganizationViewSet(ModelViewSet):
pagination_class = PageNumberPagination
permission_classes = [IsAuthorOrReadOnly]
@method_decorator(vary_on_cookie)
@method_decorator(cache_page(settings.CACHE_TTL))
def dispatch(self, request, *args, **kwargs):
return super(OrganizationViewSet, self).dispatch(*args, **kwargs)
def get_serializer_class(self):
if self.action == 'list' or self.action == 'retrieve':
return OrganizationListSerializer
else:
return OrganizationCreateSerializer
def perform_create(self, serializer):
serializer.save(owner=self.request.user)
def perform_update(self, serializer):
serializer.save(owner=self.request.user)
class OrganizationSearchListAPIView(ListAPIView):
pagination_class = PageNumberPagination
serializer_class = OrganizationListSerializer
@method_decorator(vary_on_cookie)
@method_decorator(cache_page(settings.CACHE_TTL))
def dispatch(self, request, *args, **kwargs):
return super(OrganizationSearchListAPIView, self).dispatch(*args, **kwargs)
def get_queryset(self):
name = self.kwargs.get('name')
if name:
return OrganizationRepository.search(name)
else:
return OrganizationRepository.all()
def list(self, request, *args, **kwargs):
return super().list(request, args, kwargs)
class CategoryListAPIView(ListAPIView):
pagination_class = PageNumberPagination
queryset = CategoryRepository.all()
serializer_class = CategorySerializer
class OrganizationByCategoryListAPIView(ListAPIView):
pagination_class = PageNumberPagination
serializer_class = OrganizationListSerializer
@method_decorator(vary_on_cookie)
@method_decorator(cache_page(settings.CACHE_TTL))
def dispatch(self, request, *args, **kwargs):
return super(OrganizationByCategoryListAPIView, self).dispatch(*args, **kwargs)
def get_queryset(self):
category = self.kwargs.get('id')
if category:
return OrganizationRepository.get_by_category(int(category))
else:
return OrganizationRepository.all()

Binary file not shown.
Loading…
Cancel
Save