feat: добавлены сервисы через kafka для обработки текстовых запросов к llama 3.1 8b

main
Artem-Darius Weber 2 months ago
parent ca7aaaf850
commit f77c99a6f1

@ -1,6 +1,7 @@
import asyncio
import json
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from aiokafka.errors import KafkaConnectionError
class KafkaClient:
def __init__(self, topic: str, bootstrap_servers: str, group_id: str = None):
@ -11,7 +12,7 @@ class KafkaClient:
self.producer = None
async def start(self):
# init producer & consumer
# initialize producer & consumer
self.producer = AIOKafkaProducer(bootstrap_servers=self.bootstrap_servers)
await self.producer.start()
@ -36,7 +37,6 @@ class KafkaClient:
async def consume_messages(self, callback):
if not self.consumer:
raise Exception("Consumer is not initialized. Ensure group_id is set and start is called.")
async for message in self.consumer:
message_value = json.loads(message.value.decode('utf-8'))
await callback(message_value)

@ -3,4 +3,5 @@ FROM python:3.9
WORKDIR /app
COPY . .
RUN pip install -r requirements.txt
CMD ["python", "test_show_responses_from_llm.py"]
EXPOSE 8000
CMD ["python", "test_consumer.py"]

@ -2,45 +2,53 @@ version: '3.8'
services:
kafka:
image: 'bitnami/kafka:latest'
image: confluentinc/cp-kafka:latest
environment:
- KAFKA_BROKER_ID=1
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
- KAFKA_LISTENERS=INSIDE://:9092,OUTSIDE://:29092
- KAFKA_ADVERTISED_LISTENERS=INSIDE://kafka:9092,OUTSIDE://localhost:29092
- KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CREATE_TOPICS: "request_llm_topic:3:1,response_llm_topic:3:1,add_point_cloud_topic:3:1,add_classified_objects_topic:3:1" # Формат: Топик:Кол-во_партиций:Фактор_репликации
ports:
- "9092:9092"
depends_on:
- zookeeper
zookeeper:
image: 'bitnami/zookeeper:latest'
image: confluentinc/cp-zookeeper:latest
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"
producer:
build:
context: .
dockerfile: producer.Dockerfile
environment:
- KAFKA_BOOTSTRAP_SERVERS=kafka:9092
- KAFKA_BOOTSTRAP_SERVERS=localhost:9092
depends_on:
- kafka
kafka:
condition: service_healthy
consumer:
build:
context: .
dockerfile: consumer.Dockerfile
environment:
- KAFKA_BOOTSTRAP_SERVERS=kafka:9092
- KAFKA_BOOTSTRAP_SERVERS=localhost:9092
ports:
- "7781:8000"
depends_on:
- kafka
kafka:
condition: service_healthy
processor:
build:
context: .
dockerfile: processor.Dockerfile
environment:
- KAFKA_BOOTSTRAP_SERVERS=kafka:9092
- KAFKA_BOOTSTRAP_SERVERS=localhost:9092
depends_on:
- kafka
kafka:
condition: service_healthy

@ -1,25 +1,82 @@
#
# LLM Worker #1
# REQUEST --> RESPONSE
#
import asyncio
from AgenticInterfaces.KafkaClient import KafkaClient
import json
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from datetime import datetime
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
model_id = "NousResearch/Meta-Llama-3.1-8B"
request_topic = "request_llm_topic"
response_topic = "response_llm_topic"
bootstrap_servers = "kafka:9092"
tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(
model_id,
torch_dtype=torch.float32, # Использование float32 для CPU
device_map=None # Явное указание, что модель не будет использовать GPU
)
def modify_text(text):
return text[::-1]
model.to("cpu")
async def process_message(message):
modified_text = modify_text(message["text"])
response_message = {"text": modified_text}
await kafka_client.send_message(response_message)
inputs = tokenizer(message['text'], return_tensors="pt").to("cpu")
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=50, # Максимальное количество генерируемых токенов
do_sample=True, # Включение сэмплирования
top_p=0.95, # Параметр nucleus sampling
top_k=50 # Параметр top-k sampling
)
generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
processed_message = {
"answer": generated_text,
"track_uuid": message["track_uuid"],
"processed_timestamp": datetime.utcnow().isoformat()
}
return processed_message
async def start_consumer_producer():
consumer = AIOKafkaConsumer(
'request_llm_topic',
bootstrap_servers='kafka:9092',
group_id="processing_group"
)
producer = AIOKafkaProducer(bootstrap_servers='kafka:9092')
await consumer.start()
await producer.start()
async def main():
global kafka_client
kafka_client = KafkaClient(topic=request_topic, bootstrap_servers=bootstrap_servers, group_id="my-group")
await kafka_client.start()
try:
await kafka_client.consume_messages(process_message)
async for msg in consumer:
message = json.loads(msg.value.decode('utf-8'))
# Обработка сообщения
processed_message = await process_message(message)
# Сериализация обработанного сообщения
processed_message_json = json.dumps(processed_message).encode('utf-8')
# Отправка обработанного сообщения в новый топик response_llm_topic
await producer.send_and_wait("response_llm_topic", processed_message_json)
print(f"Processed and sent message with UUID: {processed_message['track_uuid']}")
finally:
await kafka_client.stop()
await consumer.stop()
await producer.stop()
async def main():
await start_consumer_producer()
asyncio.run(main())
if __name__ == "__main__":
asyncio.run(main())

@ -3,4 +3,4 @@ FROM python:3.9
WORKDIR /app
COPY . .
RUN pip install -r requirements.txt
CMD ["python", "test_send_llm_requests.py"]
CMD ["python", "test_producer.py"]

@ -1 +1,6 @@
fastapi
uvicorn
aiokafka
websockets
torch
transformers

@ -0,0 +1,8 @@
// const ws = new WebSocket("ws://localhost:7781/ws");
const ws = new WebSocket("ws://192.168.0.100:7781/ws");
ws.onmessage = function(event) {
const messages = document.getElementById('messages');
const message = document.createElement('div');
message.textContent = event.data;
messages.appendChild(message);
};

@ -0,0 +1,101 @@
#
# SHOW RESPONSE
#
import asyncio
import json
import random
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from aiokafka import AIOKafkaConsumer
from fastapi.staticfiles import StaticFiles
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
if websocket in self.active_connections:
self.active_connections.remove(websocket)
async def send_message(self, message: str):
for connection in self.active_connections:
try:
await connection.send_text(message)
except WebSocketDisconnect:
self.disconnect(connection)
manager = ConnectionManager()
# HTML для тестирования WebSocket
html = """
<!DOCTYPE html>
<html>
<head>
<title>WebSocket Test</title>
</head>
<body>
<h1>WebSocket Test</h1>
<div id="messages"></div>
<script src="/static/websocket.js"></script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
await asyncio.sleep(1) # Задержка для поддержания соединения
except WebSocketDisconnect:
manager.disconnect(websocket)
async def consume_kafka_and_forward():
consumer = AIOKafkaConsumer(
'response_llm_topic',
bootstrap_servers='kafka:9092',
group_id="websocket_group"
)
await consumer.start()
try:
async for message in consumer:
message_value = json.loads(message.value.decode('utf-8'))
answer = message_value["answer"]
json_message = {"answer": answer}
# Сериализация данных в JSON
message_json = json.dumps(json_message).encode('utf-8')
# Отправляем сообщение ws
await manager.send_message(message_json.decode('utf-8'))
finally:
await consumer.stop()
@app.on_event("startup")
async def startup_event():
asyncio.create_task(consume_kafka_and_forward())
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

@ -0,0 +1,63 @@
#
# SEND REQUEST
#
import asyncio
import uuid
import json
import random
import base64
from aiokafka import AIOKafkaProducer
from datetime import datetime
producer = None
async def init_kafka_producer():
global producer
if producer is None:
producer = AIOKafkaProducer(bootstrap_servers='kafka:9092', max_request_size=10485760)
for _ in range(5):
try:
print(f"Producing message {_}")
await producer.start()
break
except Exception as e:
print(f"Kafka connection error, retrying in 5 seconds: {e}")
await asyncio.sleep(5)
async def stop_kafka_producer():
global producer
if producer is not None:
await producer.stop()
producer = None
async def send_random_data():
try:
timestamp = datetime.utcnow().isoformat()
frame_uuid = str(uuid.uuid4())
MESSAGE = "Hello World!"
message = {
"text": MESSAGE,
"track_uuid": frame_uuid,
"timestamp": timestamp,
}
message_json = json.dumps(message).encode('utf-8')
await producer.send_and_wait("request_llm_topic", message_json)
print(f"Sent message with UUID: {frame_uuid}")
except Exception as e:
print(f"Error sending message to Kafka: {e}")
async def main():
await init_kafka_producer()
try:
while True:
await send_random_data()
await asyncio.sleep(1)
finally:
await stop_kafka_producer()
asyncio.run(main())

@ -1,19 +0,0 @@
import asyncio
from AgenticInterfaces.KafkaClient import KafkaClient
bootstrap_servers = "kafka:9092"
request_topic = "request_llm_topic"
async def send_periodic_messages():
kafka_client = KafkaClient(topic=request_topic, bootstrap_servers=bootstrap_servers)
await kafka_client.start()
try:
while True:
message = {"text": "Пример сообщения"}
await kafka_client.send_message(message)
await asyncio.sleep(5)
finally:
await kafka_client.stop()
asyncio.run(send_periodic_messages())

@ -1,19 +0,0 @@
import asyncio
from AgenticInterfaces.KafkaClient import KafkaClient
bootstrap_servers = "kafka:9092"
response_topic = "response_llm_topic"
async def print_received_messages(message):
print("Получено сообщение:", message["text"])
async def receive_messages():
kafka_client = KafkaClient(topic=response_topic, bootstrap_servers=bootstrap_servers, group_id="response-group")
await kafka_client.start()
try:
await kafka_client.consume_messages(print_received_messages)
finally:
await kafka_client.stop()
asyncio.run(receive_messages())

@ -1,29 +0,0 @@
version: '3.8'
services:
kafka:
image: confluentinc/cp-kafka:latest
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CREATE_TOPICS: "request_llm_topic:3:1,response_llm_topic:3:1" # Формат: Топик:Кол-во_партиций:Фактор_репликации
ports:
- "9092:9092"
networks:
- my_network
depends_on:
- zookeeper
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"
networks:
- my_network
networks:
my_network:
driver: bridge
Loading…
Cancel
Save