parent
46b9656e2f
commit
b4beefae8e
@ -0,0 +1,3 @@
|
||||
from django.contrib import admin
|
||||
|
||||
# Register your models here.
|
@ -0,0 +1,6 @@
|
||||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class BusinessConfig(AppConfig):
|
||||
default_auto_field = 'django.db.models.BigAutoField'
|
||||
name = 'business'
|
@ -0,0 +1,35 @@
|
||||
# Generated by Django 5.0.4 on 2024-04-07 02:02
|
||||
|
||||
import django.db.models.deletion
|
||||
from django.conf import settings
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
initial = True
|
||||
|
||||
dependencies = [
|
||||
('organizations', '0001_initial'),
|
||||
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='OrganizationAccount',
|
||||
fields=[
|
||||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('organization', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='organizations.organization')),
|
||||
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
|
||||
],
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='QRCodeUsing',
|
||||
fields=[
|
||||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('used_dt', models.DateTimeField(auto_now=True)),
|
||||
('client', models.ForeignKey(default=None, null=True, on_delete=django.db.models.deletion.SET_NULL, to=settings.AUTH_USER_MODEL)),
|
||||
('organization', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='organizations.organization')),
|
||||
],
|
||||
),
|
||||
]
|
@ -0,0 +1,15 @@
|
||||
from django.conf import settings
|
||||
from django.db import models
|
||||
|
||||
# Create your models here.
|
||||
|
||||
|
||||
class OrganizationAccount(models.Model):
|
||||
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
|
||||
organization = models.ForeignKey('organizations.Organization', on_delete=models.CASCADE)
|
||||
|
||||
|
||||
class QRCodeUsing(models.Model):
|
||||
organization = models.ForeignKey('organizations.Organization', on_delete=models.CASCADE)
|
||||
used_dt = models.DateTimeField(auto_now=True)
|
||||
client = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True, default=None)
|
@ -0,0 +1,49 @@
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from django.db.models.functions import TruncDate
|
||||
|
||||
from core.repositories import BaseRepository, ObjectDoesNotExist
|
||||
from .models import OrganizationAccount, QRCodeUsing
|
||||
|
||||
|
||||
class OrganizationAccountRepository(BaseRepository):
|
||||
model = OrganizationAccount
|
||||
|
||||
@classmethod
|
||||
def get(cls, **kwargs):
|
||||
try:
|
||||
return cls.model.objects.get(**kwargs)
|
||||
except ObjectDoesNotExist:
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def filter_by_organization(cls, organization_id):
|
||||
return cls.model.objects.filter(organization__id=organization_id)
|
||||
|
||||
@classmethod
|
||||
def none(cls):
|
||||
return cls.model.objects.none()
|
||||
|
||||
|
||||
class QRCodeUsingRepository(BaseRepository):
|
||||
model = QRCodeUsing
|
||||
|
||||
@staticmethod
|
||||
def __get_dates_from_timestamp(timestamp):
|
||||
return datetime.fromtimestamp(timestamp).date(), datetime.fromtimestamp(timestamp).date() + timedelta(days=1)
|
||||
|
||||
@classmethod
|
||||
def get_by_timestamp(cls, organization_id, timestamp):
|
||||
start_date, end_date = cls.__get_dates_from_timestamp(timestamp)
|
||||
return cls.model.objects.filter(organization__pk=organization_id, used_dt__gte=start_date,
|
||||
used_dt__lt=end_date)
|
||||
|
||||
@classmethod
|
||||
def get_with_dates(cls, organization_id):
|
||||
return cls.model.objects.filter(organization__pk=organization_id).values(date=TruncDate('used_dt')).distinct()
|
||||
|
||||
@classmethod
|
||||
def count(cls, organization_id, timestamp):
|
||||
start_date, end_date = cls.__get_dates_from_timestamp(timestamp)
|
||||
return cls.model.objects.filter(organization__pk=organization_id, used_dt__gte=start_date,
|
||||
used_dt__lt=end_date).count()
|
@ -0,0 +1,39 @@
|
||||
from datetime import datetime, time
|
||||
from django.conf import settings
|
||||
from rest_framework import serializers
|
||||
from pytz import timezone
|
||||
|
||||
from .models import QRCodeUsing, OrganizationAccount
|
||||
from .repositories import QRCodeUsingRepository
|
||||
|
||||
|
||||
class ListUsingQRCodeSerializer(serializers.Serializer):
|
||||
date = serializers.SerializerMethodField(method_name='get_timestamp', read_only=True)
|
||||
scansCount = serializers.SerializerMethodField(method_name='get_scans_count', read_only=True)
|
||||
|
||||
def get_timestamp(self, obj):
|
||||
return datetime.combine(obj['date'], time=time(), tzinfo=timezone(settings.TIME_ZONE)).timestamp()
|
||||
|
||||
def get_scans_count(self, obj):
|
||||
pk = int(''.join([i for i in self.context['request'].path if i.isdigit()]))
|
||||
return QRCodeUsingRepository.count(pk, self.get_timestamp(obj))
|
||||
|
||||
|
||||
class DetailedUsingQRCodeSerializer(serializers.ModelSerializer):
|
||||
|
||||
timestamp = serializers.SerializerMethodField(method_name='get_timestamp', read_only=True)
|
||||
user = serializers.StringRelatedField(source='client', read_only=True)
|
||||
discountPercent = serializers.IntegerField(source='discount')
|
||||
|
||||
class Meta:
|
||||
model = QRCodeUsing
|
||||
fields = ('id', 'user', 'organization_id', 'timestamp')
|
||||
|
||||
def get_timestamp(self, obj):
|
||||
return obj.used_dt.timestamp()
|
||||
|
||||
|
||||
class OrganizationAccountSerializer(serializers.ModelSerializer):
|
||||
class Meta:
|
||||
model = OrganizationAccount
|
||||
fields = ('user', 'organization')
|
@ -0,0 +1,14 @@
|
||||
from crypto.services import decrypt
|
||||
from jwtauth.repositories import UserRepository, ObjectDoesNotExist
|
||||
|
||||
|
||||
def qr_prove(cryptred_str):
|
||||
try:
|
||||
data = decrypt(cryptred_str)
|
||||
except Exception:
|
||||
return None
|
||||
try:
|
||||
email = data
|
||||
return UserRepository.get(email)
|
||||
except (KeyError, ObjectDoesNotExist):
|
||||
return None
|
@ -0,0 +1,3 @@
|
||||
from django.test import TestCase
|
||||
|
||||
# Create your tests here.
|
@ -0,0 +1,13 @@
|
||||
from django.urls import path
|
||||
|
||||
from .views import *
|
||||
|
||||
|
||||
urlpatterns = [
|
||||
path('organizations/accounts/<int:id>/', OrganizationAccountDestroyAPIView.as_view()),
|
||||
path('organization/<int:id>/accounts/', OrganizationAccountsListAPIView.as_view()),
|
||||
path('organization/<int:id>/accounts/', OrganizationAccountCreateAPIView.as_view()),
|
||||
path('organization/<int:id>/scans/<int:timestamp>/', QRCodeUsingDetailedListAPIView.as_view()),
|
||||
path('organization/<int:id>/scans/', QRCodeUsingListAPIView.as_view()),
|
||||
path('qr-code/check/', QrCodeProveAPIView.as_view()),
|
||||
]
|
@ -0,0 +1,97 @@
|
||||
from drf_yasg import openapi
|
||||
from drf_yasg.utils import swagger_auto_schema
|
||||
from rest_framework import status
|
||||
from rest_framework.generics import ListAPIView, CreateAPIView, DestroyAPIView
|
||||
from rest_framework.pagination import PageNumberPagination
|
||||
from rest_framework.permissions import IsAuthenticated
|
||||
from rest_framework.response import Response
|
||||
from rest_framework.views import APIView
|
||||
|
||||
from core.permissions import IsOrganizationOwner
|
||||
from .repositories import QRCodeUsingRepository, OrganizationAccountRepository
|
||||
from .serializers import DetailedUsingQRCodeSerializer, ListUsingQRCodeSerializer, OrganizationAccount
|
||||
from .services import qr_prove
|
||||
|
||||
# Create your views here.
|
||||
|
||||
|
||||
class QRCodeUsingListAPIView(ListAPIView):
|
||||
pagination_class = PageNumberPagination
|
||||
serializer_class = ListUsingQRCodeSerializer
|
||||
lookup_field = 'id'
|
||||
|
||||
def get_queryset(self):
|
||||
return QRCodeUsingRepository.get_with_dates(self.kwargs.get('id'))
|
||||
|
||||
|
||||
class QRCodeUsingDetailedListAPIView(ListAPIView):
|
||||
pagination_class = PageNumberPagination
|
||||
serializer_class = DetailedUsingQRCodeSerializer
|
||||
lookup_field = 'id'
|
||||
|
||||
def get_queryset(self):
|
||||
return QRCodeUsingRepository.get_by_timestamp(self.kwargs.get('id'), int(self.kwargs.get('timestamp')))
|
||||
|
||||
|
||||
class QrCodeProveAPIView(APIView):
|
||||
permission_classes = (IsAuthenticated,)
|
||||
|
||||
@swagger_auto_schema(request_body=openapi.Schema(
|
||||
type=openapi.TYPE_OBJECT,
|
||||
properties={
|
||||
'qrCodeData': openapi.Schema(type=openapi.TYPE_STRING, description='access_token'),
|
||||
},
|
||||
required=['qrCodeData']
|
||||
),
|
||||
responses={
|
||||
status.HTTP_200_OK: openapi.Schema(
|
||||
type=openapi.TYPE_OBJECT
|
||||
),
|
||||
status.HTTP_400_BAD_REQUEST: openapi.Schema(
|
||||
type=openapi.TYPE_OBJECT
|
||||
),
|
||||
status.HTTP_403_FORBIDDEN: openapi.Schema(
|
||||
type=openapi.TYPE_OBJECT
|
||||
)
|
||||
}
|
||||
)
|
||||
def post(self, request):
|
||||
self.check_permissions(request)
|
||||
oa = OrganizationAccountRepository.get(user=self.request.user)
|
||||
if oa:
|
||||
try:
|
||||
client = qr_prove(self.request.data['qrCodeData'])
|
||||
except KeyError:
|
||||
return Response(status=status.HTTP_400_BAD_REQUEST)
|
||||
if client:
|
||||
QRCodeUsingRepository.create(
|
||||
organization_id=oa.organization.pk,
|
||||
client=client
|
||||
)
|
||||
return Response(status=status.HTTP_200_OK)
|
||||
return Response(status=status.HTTP_400_BAD_REQUEST)
|
||||
return Response(status=status.HTTP_403_FORBIDDEN)
|
||||
|
||||
|
||||
class OrganizationAccountCreateAPIView(CreateAPIView):
|
||||
serializer_class = OrganizationAccount
|
||||
permission_classes = [IsOrganizationOwner]
|
||||
queryset = OrganizationAccountRepository.all()
|
||||
|
||||
|
||||
class OrganizationAccountDestroyAPIView(DestroyAPIView):
|
||||
serializer_class = OrganizationAccount
|
||||
permission_classes = [IsOrganizationOwner]
|
||||
queryset = OrganizationAccountRepository.all()
|
||||
|
||||
|
||||
class OrganizationAccountsListAPIView(ListAPIView):
|
||||
serializer_class = OrganizationAccount
|
||||
permission_classes = [IsOrganizationOwner]
|
||||
|
||||
def get_queryset(self):
|
||||
pk = self.kwargs.get('id')
|
||||
if pk:
|
||||
return OrganizationAccountRepository.filter_by_organization(int(pk))
|
||||
else:
|
||||
return OrganizationAccountRepository.none()
|
@ -0,0 +1,23 @@
|
||||
class BaseCryptoMSDU:
|
||||
"""
|
||||
class that describes any CryptoSystem
|
||||
encrypt_function is partial solution of MSDU using as encode function
|
||||
private_key is secret key for encode and decode
|
||||
decrypt_function is function to decrypt message
|
||||
for async systems
|
||||
U is module
|
||||
w is open key
|
||||
v is another private key
|
||||
"""
|
||||
|
||||
encrypt_function = None
|
||||
secret_key = None
|
||||
decrypt_function = None
|
||||
|
||||
@classmethod
|
||||
def encrypt(cls, msg: int) -> int:
|
||||
return cls.encrypt_function(msg, cls.secret_key)
|
||||
|
||||
@classmethod
|
||||
def decrypt(cls, msg: int) -> int:
|
||||
return cls.decrypt_function(msg, cls.secret_key)
|
@ -0,0 +1,39 @@
|
||||
class KGramCoder:
|
||||
"""
|
||||
class to describe any k-grams coding,
|
||||
step is ascii letter that start in ur alphabet(better user ord(letter)),
|
||||
alphabet power is length of your alphabet,
|
||||
alphabet - just for add or override codes,
|
||||
use_default_alphabet - if True using ansi coding(and adding your alphabet), else you need to describe your alphabet
|
||||
"""
|
||||
k: int = None
|
||||
start_symbol: int = 1
|
||||
alphabet_power: int = None
|
||||
alphabet: dict = {}
|
||||
use_default_alphabet: bool = True
|
||||
|
||||
@classmethod
|
||||
def encode(cls, k_gram: str) -> int:
|
||||
if cls.use_default_alphabet:
|
||||
return sum([cls.alphabet_power ** i * (ord(char) - cls.start_symbol) if char not in
|
||||
cls.alphabet else
|
||||
cls.alphabet_power ** i * cls.alphabet[char] for i, char in enumerate(list(k_gram[::-1]))])
|
||||
else:
|
||||
return sum([cls.alphabet_power ** i * cls.alphabet[char] for i, char in enumerate(list(k_gram[::-1]))])
|
||||
|
||||
@classmethod
|
||||
def decode(cls, encrypted_k_gram: int) -> str:
|
||||
alphabet_codes = dict((v, k) for k, v in cls.alphabet.items())
|
||||
k_gram_list = []
|
||||
for i in range(cls.k):
|
||||
if cls.use_default_alphabet:
|
||||
res = encrypted_k_gram % cls.alphabet_power
|
||||
if res in alphabet_codes.keys():
|
||||
k_gram_list.append(alphabet_codes[res])
|
||||
else:
|
||||
k_gram_list.append(str(chr(res + cls.start_symbol)))
|
||||
else:
|
||||
k_gram_list.append(str(alphabet_codes[encrypted_k_gram % cls.alphabet_power]))
|
||||
encrypted_k_gram //= cls.alphabet_power
|
||||
|
||||
return ''.join(k_gram_list)[::-1]
|
@ -0,0 +1,11 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class Message:
|
||||
"""
|
||||
dataclass for Message
|
||||
"""
|
||||
string: str
|
||||
k: int
|
||||
delimiter: str = ' '
|
@ -0,0 +1,23 @@
|
||||
from typing import Type
|
||||
|
||||
from .crypto_system import BaseCryptoMSDU
|
||||
from .k_gram_coder import KGramCoder
|
||||
from .message import Message
|
||||
|
||||
|
||||
class MessageCoder:
|
||||
"""
|
||||
class for code or decode Message
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def encode(msg: Message, encrypter_class: Type[BaseCryptoMSDU], k_gram_coder: Type[KGramCoder]):
|
||||
k_gram_list = [msg.string[i:i+2] for i in range(0, len(msg.string), msg.k)]
|
||||
return msg.delimiter.join(map(lambda x: str(encrypter_class.encrypt(x)),
|
||||
[k_gram_coder.encode(k_gram) for k_gram in k_gram_list]))
|
||||
|
||||
@staticmethod
|
||||
def decode(msg: Message, decrypter_class: Type[BaseCryptoMSDU], k_gram_coder: Type[KGramCoder]):
|
||||
encrypted_k_gram_list = list(map(int, msg.string.split(msg.delimiter)))
|
||||
return ''.join([k_gram_coder.decode(decrypter_class.decrypt(int(encrypted_k_gram)))
|
||||
for encrypted_k_gram in encrypted_k_gram_list])
|
@ -0,0 +1,41 @@
|
||||
from ast import literal_eval
|
||||
|
||||
from crypto.crypto_system import BaseCryptoMSDU
|
||||
from crypto.k_gram_coder import KGramCoder
|
||||
from crypto.message import Message
|
||||
from crypto.message_coder import MessageCoder
|
||||
|
||||
|
||||
def encrypt_function(msg: int, key: int) -> int:
|
||||
power = 5
|
||||
return (msg + 3 * key) ** power + (9 * msg + 4 * key) ** power + (12 * msg + 19 * key) ** power + (
|
||||
28 * msg + 21 * key) ** power + (31 * msg + 36 * key) ** power + (39 * msg + 37 * key) ** power - (
|
||||
4 * msg + 9 * key) ** power - (19 * msg + 12 * key) ** power - (21 * msg + 28 * key) ** power - (
|
||||
36 * msg + 31 * key) ** power - (37 * msg + 39 * key) ** power
|
||||
|
||||
|
||||
def decrypt_function(coded_msg: int, key: int) -> int:
|
||||
return int(coded_msg ** (1 / 5) - key) // 3
|
||||
|
||||
|
||||
class SymmMSDUPowerFive(BaseCryptoMSDU):
|
||||
encrypt_function = encrypt_function
|
||||
secret_key = 9
|
||||
decrypt_function = decrypt_function
|
||||
|
||||
|
||||
class BiGramCoder(KGramCoder):
|
||||
k = 2
|
||||
alphabet_power = 128
|
||||
|
||||
|
||||
def encrypt(data):
|
||||
str_data = str(data)
|
||||
if len(str_data) % 2:
|
||||
str_data += ' '
|
||||
base_msg = Message(str_data, 2)
|
||||
return MessageCoder.encode(base_msg, SymmMSDUPowerFive, BiGramCoder)
|
||||
|
||||
|
||||
def decrypt(st):
|
||||
return literal_eval(MessageCoder.decode(Message(st, 2), SymmMSDUPowerFive, BiGramCoder))
|
@ -0,0 +1,23 @@
|
||||
# Generated by Django 5.0.4 on 2024-04-07 02:02
|
||||
|
||||
import django.db.models.deletion
|
||||
from django.conf import settings
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('jwtauth', '0001_initial'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='QRCode',
|
||||
fields=[
|
||||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('qr_str', models.CharField(blank=True, default='', max_length=1024)),
|
||||
('user', models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
|
||||
],
|
||||
),
|
||||
]
|
Loading…
Reference in new issue