main
Nikita 1 year ago
parent d17c94e67a
commit 96b4fed18c

@ -1,18 +1,83 @@
from datetime import datetime from datetime import datetime
from api.response.base import ResponseBase from api.response.base import ResponseBase
from db.models.conveer import DBConveer
from pydantic import Field
class AreaCharts: class AreaCharts(ResponseBase):
name: str name: str = Field(...)
value: int value: int = Field(...)
class TimelineCharts(AreaCharts): class TimelineCharts(AreaCharts):
time: datetime time: datetime = Field(...)
class RequestAnalytics(ResponseBase): class ResponseAnalytics(ResponseBase):
timeline_charts: list[TimelineCharts] timeline_charts: list[TimelineCharts] = Field(...)
area_charts: list[AreaCharts] area_charts: list[AreaCharts] = Field(...)
bar_chars: list[AreaCharts] total_numb: int = Field(...)
metal_numb: int = Field(...)
wood_numb: int = Field(...)
plastic_numb: int = Field(...)
glass_numb: int = Field(...)
class ResponseAnalyticsFactory:
@staticmethod
def get_from_model_area_charts(value, name) -> AreaCharts:
return AreaCharts(
name=name,
value=value
)
@staticmethod
def count_categories(models: list[DBConveer]) -> list[int]:
wood = 0
glass = 0
metal = 0
plastic = 0
for i in models:
wood += i.wood
glass += i.glass
metal += i.metal
plastic += i.plastic
return [wood, glass, metal, plastic]
@classmethod
def get_from_models_area_charts(cls, models: list[DBConveer]) -> list[AreaCharts]:
wood, glass, metal, plastic = cls.count_categories(models)
return [cls.get_from_model_area_charts(name='wood', value=wood),
cls.get_from_model_area_charts(name='glass', value=glass),
cls.get_from_model_area_charts(name='metal', value=metal),
cls.get_from_model_area_charts(name='plastic', value=plastic)]
@classmethod
def get_from_model_timeline_charts(cls, name, value, time):
return TimelineCharts(
**cls.get_from_model_area_charts(name=name, value=value).__dict__,
time=time)
@classmethod
def get_from_model_timeline_chart(cls, models: list[DBConveer]) -> list[TimelineCharts]:
response = []
for i in models:
response.append(cls.get_from_model_timeline_charts(name='wood', value=i.wood, time=i.created_at))
response.append(cls.get_from_model_timeline_charts(name='glass', value=i.glass, time=i.created_at))
response.append(cls.get_from_model_timeline_charts(name='plastic', value=i.plastic, time=i.created_at))
response.append(cls.get_from_model_timeline_charts(name='metal', value=i.metal, time=i.created_at))
return response
@classmethod
def get_from_models(cls, models: list[DBConveer]) -> ResponseAnalytics:
return ResponseAnalytics(
area_charts=cls.get_from_models_area_charts(models),
timeline_charts=cls.get_from_model_timeline_chart(models),
total_numb=sum(cls.count_categories(models)),
metal_numb=cls.count_categories(models)[2],
wood_numb=cls.count_categories(models)[0],
plastic_numb=cls.count_categories(models)[3],
glass_numb=cls.count_categories(models)[1]
)

@ -15,7 +15,7 @@ class AnalyticsRepository(BaseRepository):
async def get_analytic(self, async def get_analytic(self,
start_date: Optional[datetime], start_date: Optional[datetime],
end_date: Optional[datetime]): end_date: Optional[datetime]) -> list[DBConveer]:
query = ( query = (
select(DBConveer) select(DBConveer)
.select_from(DBConveer) .select_from(DBConveer)

@ -3,6 +3,7 @@ from typing import Optional
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from db.models.conveer import DBConveer
from db.repository.analytics import AnalyticsRepository from db.repository.analytics import AnalyticsRepository
@ -12,7 +13,7 @@ class AnalyticsManager:
async def get_by_filters(cls, async def get_by_filters(cls,
session: AsyncSession, session: AsyncSession,
start_date: Optional[datetime], start_date: Optional[datetime],
end_date: Optional[datetime]): end_date: Optional[datetime]) -> list[DBConveer]:
datas = await AnalyticsRepository(session).get_analytic(start_date=start_date, datas: list[DBConveer] = await AnalyticsRepository(session).get_analytic(start_date=start_date,
end_date=end_date) end_date=end_date)
return datas return datas

@ -2,21 +2,23 @@ from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from api.request.analytics import AnalyticsFilters from api.request.analytics import AnalyticsFilters
from api.response.analytics import ResponseAnalyticsFactory, ResponseAnalytics
from db.models.conveer import DBConveer
from managers.analytics import AnalyticsManager from managers.analytics import AnalyticsManager
from server.depends import get_session, PagesPaginationParams from server.depends import get_session
router = APIRouter(prefix="/api/analytics", tags=['Ride']) router = APIRouter(prefix="/api/analytics", tags=['Ride'])
@router.post('/all') @router.post('/all', response_model=ResponseAnalytics)
async def get_all_analytics( async def get_all_analytics(
filters: AnalyticsFilters, filters: AnalyticsFilters,
session: AsyncSession = Depends(get_session), session: AsyncSession = Depends(get_session),
): ):
data = await AnalyticsManager.get_by_filters( data: list[DBConveer] = await AnalyticsManager.get_by_filters(
session=session, session=session,
start_date=filters.start_time, start_date=filters.start_time,
end_date=filters.end_time end_date=filters.end_time
) )
return data return ResponseAnalyticsFactory.get_from_models(models=data)

Loading…
Cancel
Save