Anton Zhuravlev 1 year ago
commit 985481b01a

@ -0,0 +1,18 @@
import asyncio
from websockets.server import serve
async def handler(websocket):
async for message in websocket:
print('recv msg', message)
async def main():
async with serve(handler, '0.0.0.0', 8765):
await asyncio.Future() # run forever
asyncio.run(main())

@ -1,18 +1,84 @@
from datetime import datetime
from api.response.base import ResponseBase
from db.models.conveer import DBConveer
from pydantic import Field
class AreaCharts:
name: str
value: int
class AreaCharts(ResponseBase):
name: str = Field(...)
value: int = Field(...)
class TimelineCharts(AreaCharts):
time: datetime
time: datetime = Field(...)
class RequestAnalytics(ResponseBase):
timeline_charts: list[TimelineCharts]
area_charts: list[AreaCharts]
bar_chars: list[AreaCharts]
class ResponseAnalytics(ResponseBase):
timeline_charts: list[TimelineCharts] = Field(...)
area_charts: list[AreaCharts] = Field(...)
total_numb: int = Field(...)
metal_numb: int = Field(...)
wood_numb: int = Field(...)
plastic_numb: int = Field(...)
glass_numb: int = Field(...)
class ResponseAnalyticsFactory:
@staticmethod
def get_from_model_area_charts(value, name) -> AreaCharts:
return AreaCharts(
name=name,
value=value
)
@staticmethod
def count_categories(models: list[DBConveer]) -> list[int]:
wood = 0
glass = 0
metal = 0
plastic = 0
for i in models:
wood += i.wood
glass += i.glass
metal += i.metal
plastic += i.plastic
return [wood, glass, metal, plastic]
@classmethod
def get_from_models_area_charts(cls, models: list[DBConveer]) -> list[AreaCharts]:
wood, glass, metal, plastic = cls.count_categories(models)
return [cls.get_from_model_area_charts(name='wood', value=wood),
cls.get_from_model_area_charts(name='glass', value=glass),
cls.get_from_model_area_charts(name='metal', value=metal),
cls.get_from_model_area_charts(name='plastic', value=plastic)]
@classmethod
def get_from_model_timeline_charts(cls, name, value, time):
return TimelineCharts(
**cls.get_from_model_area_charts(name=name, value=value).__dict__,
time=time)
@classmethod
def get_from_model_timeline_chart(cls, models: list[DBConveer]) -> list[TimelineCharts]:
response = []
print(len(models))
for i in models:
response.append(cls.get_from_model_timeline_charts(name='wood', value=i.wood, time=i.created_at))
response.append(cls.get_from_model_timeline_charts(name='glass', value=i.glass, time=i.created_at))
response.append(cls.get_from_model_timeline_charts(name='plastic', value=i.plastic, time=i.created_at))
response.append(cls.get_from_model_timeline_charts(name='metal', value=i.metal, time=i.created_at))
return response
@classmethod
def get_from_models(cls, models: list[DBConveer]) -> ResponseAnalytics:
return ResponseAnalytics(
area_charts=cls.get_from_models_area_charts(models),
timeline_charts=cls.get_from_model_timeline_chart(models),
total_numb=sum(cls.count_categories(models)),
metal_numb=cls.count_categories(models)[2],
wood_numb=cls.count_categories(models)[0],
plastic_numb=cls.count_categories(models)[3],
glass_numb=cls.count_categories(models)[1]
)

@ -5,44 +5,79 @@ Revises:
Create Date: 2023-10-27 18:19:47.402501
"""
import random
import time
from datetime import datetime, timedelta
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3c651a0d1fe0'
down_revision = None
branch_labels = None
depends_on = None
def generate_normal_random(mean, std_dev):
return int(random.gauss(mean, std_dev))
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('camera',
sa.Column('camera_type', sa.String(), nullable=False),
sa.Column('order_numb', sa.Integer(), nullable=True),
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('created_at', sa.TIMESTAMP(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
sa.Column('updated_at', sa.TIMESTAMP(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
sa.PrimaryKeyConstraint('id', name=op.f('pk_camera')),
sa.UniqueConstraint('id', name=op.f('uq_camera_id')),
sa.UniqueConstraint('order_numb', name=op.f('uq_camera_order_numb'))
)
sa.Column('camera_type', sa.String(), nullable=False),
sa.Column('order_numb', sa.Integer(), nullable=True),
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('created_at', sa.TIMESTAMP(), server_default=sa.text('CURRENT_TIMESTAMP'),
nullable=False),
sa.Column('updated_at', sa.TIMESTAMP(), server_default=sa.text('CURRENT_TIMESTAMP'),
nullable=False),
sa.PrimaryKeyConstraint('id', name=op.f('pk_camera')),
sa.UniqueConstraint('id', name=op.f('uq_camera_id')),
sa.UniqueConstraint('order_numb', name=op.f('uq_camera_order_numb'))
)
op.create_table('conveer',
sa.Column('wood', sa.Integer(), nullable=True),
sa.Column('metal', sa.Integer(), nullable=True),
sa.Column('glass', sa.Integer(), nullable=True),
sa.Column('plastic', sa.Integer(), nullable=True),
sa.Column('camera_id', sa.Integer(), nullable=False),
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('created_at', sa.TIMESTAMP(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
sa.Column('updated_at', sa.TIMESTAMP(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
sa.ForeignKeyConstraint(['camera_id'], ['camera.id'], name=op.f('fk_conveer_camera_id_camera')),
sa.PrimaryKeyConstraint('id', name=op.f('pk_conveer')),
sa.UniqueConstraint('id', name=op.f('uq_conveer_id'))
)
sa.Column('wood', sa.Integer(), nullable=True),
sa.Column('metal', sa.Integer(), nullable=True),
sa.Column('glass', sa.Integer(), nullable=True),
sa.Column('plastic', sa.Integer(), nullable=True),
sa.Column('camera_id', sa.Integer(), nullable=False),
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('created_at', sa.TIMESTAMP(), server_default=sa.text('CURRENT_TIMESTAMP'),
nullable=False),
sa.Column('updated_at', sa.TIMESTAMP(), server_default=sa.text('CURRENT_TIMESTAMP'),
nullable=False),
sa.ForeignKeyConstraint(['camera_id'], ['camera.id'], name=op.f('fk_conveer_camera_id_camera')),
sa.PrimaryKeyConstraint('id', name=op.f('pk_conveer')),
sa.UniqueConstraint('id', name=op.f('uq_conveer_id'))
)
op.execute('''INSERT INTO camera(id,order_numb, camera_type) VALUES (1,1, 'По умолчанию')''')
# ### end Alembic commands ###
import random
from datetime import datetime, timedelta
# Устанавливаем параметры нормального распределения
mean_metal = 10 # Среднее значение для металла
std_dev_metal = 3 # Стандартное отклонение для металла
mean_glass = 5 # Среднее значение для стекла
std_dev_glass = 2 # Стандартное отклонение для стекла
mean_plastic = 7.5 # Среднее значение для пластика
std_dev_plastic = 2.5 # Стандартное отклонение для пластика
mean_wood = 15 # Среднее значение для дерева
std_dev_wood = 5 # Стандартное отклонение для дерева
for _ in range(0, 10):
random_date = datetime.now() - timedelta(days=random.randint(1, 365))
# Генерируем случайные значения, следующие нормальному распределению
metal = generate_normal_random(mean_metal, std_dev_metal)
glass = generate_normal_random(mean_glass, std_dev_glass)
plastic = generate_normal_random(mean_plastic, std_dev_plastic)
wood = generate_normal_random(mean_wood, std_dev_wood)
# Далее вы можете выполнить ваш SQL-запрос с этими случайными значениями
op.execute(f'''INSERT INTO conveer (metal, glass, plastic, wood, camera_id, created_at)
VALUES ({metal}, {glass}, {plastic}, {wood}, 1, '{random_date}');''')
def downgrade():

@ -15,7 +15,7 @@ class AnalyticsRepository(BaseRepository):
async def get_analytic(self,
start_date: Optional[datetime],
end_date: Optional[datetime]):
end_date: Optional[datetime]) -> list[DBConveer]:
query = (
select(DBConveer)
.select_from(DBConveer)

@ -3,6 +3,7 @@ from typing import Optional
from sqlalchemy.ext.asyncio import AsyncSession
from db.models.conveer import DBConveer
from db.repository.analytics import AnalyticsRepository
@ -12,7 +13,7 @@ class AnalyticsManager:
async def get_by_filters(cls,
session: AsyncSession,
start_date: Optional[datetime],
end_date: Optional[datetime]):
datas = await AnalyticsRepository(session).get_analytic(start_date=start_date,
end_date=end_date)
end_date: Optional[datetime]) -> list[DBConveer]:
datas: list[DBConveer] = await AnalyticsRepository(session).get_analytic(start_date=start_date,
end_date=end_date)
return datas

@ -2,21 +2,23 @@ from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from api.request.analytics import AnalyticsFilters
from api.response.analytics import ResponseAnalyticsFactory, ResponseAnalytics
from db.models.conveer import DBConveer
from managers.analytics import AnalyticsManager
from server.depends import get_session, PagesPaginationParams
from server.depends import get_session
router = APIRouter(prefix="/analytics", tags=['Ride'])
router = APIRouter(prefix="/api/analytics", tags=['Ride'])
@router.post('/all')
@router.post('/all', response_model=ResponseAnalytics)
async def get_all_analytics(
filters: AnalyticsFilters,
session: AsyncSession = Depends(get_session),
):
data = await AnalyticsManager.get_by_filters(
data: list[DBConveer] = await AnalyticsManager.get_by_filters(
session=session,
start_date=filters.start_time,
end_date=filters.end_time
)
return data
return ResponseAnalyticsFactory.get_from_models(models=data)

Loading…
Cancel
Save