#
# SHOW RESPONSE
#
import asyncio
import json
import random
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from aiokafka import AIOKafkaConsumer
from fastapi.staticfiles import StaticFiles
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
if websocket in self.active_connections:
self.active_connections.remove(websocket)
async def send_message(self, message: str):
for connection in self.active_connections:
try:
await connection.send_text(message)
except WebSocketDisconnect:
self.disconnect(connection)
manager = ConnectionManager()
# HTML для тестирования WebSocket
html = """
WebSocket Test
WebSocket Test
"""
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
await asyncio.sleep(1) # Задержка для поддержания соединения
except WebSocketDisconnect:
manager.disconnect(websocket)
async def consume_kafka_and_forward():
consumer = AIOKafkaConsumer(
'response_llm_topic',
bootstrap_servers='kafka:9092',
group_id="websocket_group"
)
await consumer.start()
try:
async for message in consumer:
message_value = json.loads(message.value.decode('utf-8'))
answer = message_value["answer"]
json_message = {"answer": answer}
# Сериализация данных в JSON
message_json = json.dumps(json_message).encode('utf-8')
# Отправляем сообщение ws
await manager.send_message(message_json.decode('utf-8'))
finally:
await consumer.stop()
@app.on_event("startup")
async def startup_event():
asyncio.create_task(consume_kafka_and_forward())
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)