diff --git a/agentic/llm/AgenticInterfaces/KafkaClient.py b/agentic/llm/AgenticInterfaces/KafkaClient.py index 5ec3422..e355361 100644 --- a/agentic/llm/AgenticInterfaces/KafkaClient.py +++ b/agentic/llm/AgenticInterfaces/KafkaClient.py @@ -1,6 +1,7 @@ import asyncio import json from aiokafka import AIOKafkaConsumer, AIOKafkaProducer +from aiokafka.errors import KafkaConnectionError class KafkaClient: def __init__(self, topic: str, bootstrap_servers: str, group_id: str = None): @@ -11,7 +12,7 @@ class KafkaClient: self.producer = None async def start(self): - # init producer & consumer + # initialize producer & consumer self.producer = AIOKafkaProducer(bootstrap_servers=self.bootstrap_servers) await self.producer.start() @@ -36,7 +37,6 @@ class KafkaClient: async def consume_messages(self, callback): if not self.consumer: raise Exception("Consumer is not initialized. Ensure group_id is set and start is called.") - async for message in self.consumer: message_value = json.loads(message.value.decode('utf-8')) await callback(message_value) \ No newline at end of file diff --git a/agentic/llm/consumer.Dockerfile b/agentic/llm/consumer.Dockerfile index 810a3b7..bc8167b 100644 --- a/agentic/llm/consumer.Dockerfile +++ b/agentic/llm/consumer.Dockerfile @@ -3,4 +3,5 @@ FROM python:3.9 WORKDIR /app COPY . . RUN pip install -r requirements.txt -CMD ["python", "test_show_responses_from_llm.py"] +EXPOSE 8000 +CMD ["python", "test_consumer.py"] diff --git a/agentic/llm/docker-compose.yaml b/agentic/llm/docker-compose.yaml index e6e5aab..8b415fa 100644 --- a/agentic/llm/docker-compose.yaml +++ b/agentic/llm/docker-compose.yaml @@ -2,45 +2,53 @@ version: '3.8' services: kafka: - image: 'bitnami/kafka:latest' + image: confluentinc/cp-kafka:latest environment: - - KAFKA_BROKER_ID=1 - - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT - - KAFKA_LISTENERS=INSIDE://:9092,OUTSIDE://:29092 - - KAFKA_ADVERTISED_LISTENERS=INSIDE://kafka:9092,OUTSIDE://localhost:29092 - - KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE - - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_CREATE_TOPICS: "request_llm_topic:3:1,response_llm_topic:3:1,add_point_cloud_topic:3:1,add_classified_objects_topic:3:1" # Формат: Топик:Кол-во_партиций:Фактор_репликации + ports: + - "9092:9092" depends_on: - zookeeper zookeeper: - image: 'bitnami/zookeeper:latest' + image: confluentinc/cp-zookeeper:latest environment: - - ALLOW_ANONYMOUS_LOGIN=yes + ZOOKEEPER_CLIENT_PORT: 2181 + ports: + - "2181:2181" producer: build: context: . dockerfile: producer.Dockerfile environment: - - KAFKA_BOOTSTRAP_SERVERS=kafka:9092 + - KAFKA_BOOTSTRAP_SERVERS=localhost:9092 depends_on: - - kafka + kafka: + condition: service_healthy consumer: build: context: . dockerfile: consumer.Dockerfile environment: - - KAFKA_BOOTSTRAP_SERVERS=kafka:9092 + - KAFKA_BOOTSTRAP_SERVERS=localhost:9092 + ports: + - "7781:8000" depends_on: - - kafka + kafka: + condition: service_healthy processor: build: context: . dockerfile: processor.Dockerfile environment: - - KAFKA_BOOTSTRAP_SERVERS=kafka:9092 + - KAFKA_BOOTSTRAP_SERVERS=localhost:9092 depends_on: - - kafka \ No newline at end of file + kafka: + condition: service_healthy \ No newline at end of file diff --git a/agentic/llm/llm_worker.py b/agentic/llm/llm_worker.py index 9910d2a..619ea9a 100644 --- a/agentic/llm/llm_worker.py +++ b/agentic/llm/llm_worker.py @@ -1,25 +1,82 @@ +# +# LLM Worker #1 +# REQUEST --> RESPONSE +# + + import asyncio -from AgenticInterfaces.KafkaClient import KafkaClient +import json +from aiokafka import AIOKafkaConsumer, AIOKafkaProducer +from datetime import datetime +import torch +from transformers import AutoTokenizer, AutoModelForCausalLM + +model_id = "NousResearch/Meta-Llama-3.1-8B" -request_topic = "request_llm_topic" -response_topic = "response_llm_topic" -bootstrap_servers = "kafka:9092" +tokenizer = AutoTokenizer.from_pretrained(model_id) +model = AutoModelForCausalLM.from_pretrained( + model_id, + torch_dtype=torch.float32, # Использование float32 для CPU + device_map=None # Явное указание, что модель не будет использовать GPU +) -def modify_text(text): - return text[::-1] +model.to("cpu") async def process_message(message): - modified_text = modify_text(message["text"]) - response_message = {"text": modified_text} - await kafka_client.send_message(response_message) + inputs = tokenizer(message['text'], return_tensors="pt").to("cpu") + + with torch.no_grad(): + outputs = model.generate( + **inputs, + max_new_tokens=50, # Максимальное количество генерируемых токенов + do_sample=True, # Включение сэмплирования + top_p=0.95, # Параметр nucleus sampling + top_k=50 # Параметр top-k sampling + ) + generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True) + + processed_message = { + "answer": generated_text, + "track_uuid": message["track_uuid"], + "processed_timestamp": datetime.utcnow().isoformat() + } + return processed_message + + +async def start_consumer_producer(): + consumer = AIOKafkaConsumer( + 'request_llm_topic', + bootstrap_servers='kafka:9092', + group_id="processing_group" + ) + + producer = AIOKafkaProducer(bootstrap_servers='kafka:9092') + + await consumer.start() + await producer.start() -async def main(): - global kafka_client - kafka_client = KafkaClient(topic=request_topic, bootstrap_servers=bootstrap_servers, group_id="my-group") - await kafka_client.start() try: - await kafka_client.consume_messages(process_message) + async for msg in consumer: + message = json.loads(msg.value.decode('utf-8')) + + # Обработка сообщения + processed_message = await process_message(message) + + # Сериализация обработанного сообщения + processed_message_json = json.dumps(processed_message).encode('utf-8') + + # Отправка обработанного сообщения в новый топик response_llm_topic + await producer.send_and_wait("response_llm_topic", processed_message_json) + print(f"Processed and sent message with UUID: {processed_message['track_uuid']}") + finally: - await kafka_client.stop() + await consumer.stop() + await producer.stop() + + +async def main(): + await start_consumer_producer() + -asyncio.run(main()) \ No newline at end of file +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/agentic/llm/producer.Dockerfile b/agentic/llm/producer.Dockerfile index 2f40db7..860b40e 100644 --- a/agentic/llm/producer.Dockerfile +++ b/agentic/llm/producer.Dockerfile @@ -3,4 +3,4 @@ FROM python:3.9 WORKDIR /app COPY . . RUN pip install -r requirements.txt -CMD ["python", "test_send_llm_requests.py"] +CMD ["python", "test_producer.py"] diff --git a/agentic/llm/requirements.txt b/agentic/llm/requirements.txt index aa3a931..420fb69 100644 --- a/agentic/llm/requirements.txt +++ b/agentic/llm/requirements.txt @@ -1 +1,6 @@ +fastapi +uvicorn aiokafka +websockets +torch +transformers diff --git a/agentic/llm/static/websocket.js b/agentic/llm/static/websocket.js new file mode 100644 index 0000000..ccb47ec --- /dev/null +++ b/agentic/llm/static/websocket.js @@ -0,0 +1,8 @@ +// const ws = new WebSocket("ws://localhost:7781/ws"); +const ws = new WebSocket("ws://192.168.0.100:7781/ws"); +ws.onmessage = function(event) { + const messages = document.getElementById('messages'); + const message = document.createElement('div'); + message.textContent = event.data; + messages.appendChild(message); +}; \ No newline at end of file diff --git a/agentic/llm/test_consumer.py b/agentic/llm/test_consumer.py new file mode 100644 index 0000000..77801fd --- /dev/null +++ b/agentic/llm/test_consumer.py @@ -0,0 +1,101 @@ +# +# SHOW RESPONSE +# + +import asyncio +import json +import random +from fastapi import FastAPI, WebSocket, WebSocketDisconnect +from fastapi.responses import HTMLResponse +from aiokafka import AIOKafkaConsumer +from fastapi.staticfiles import StaticFiles + +app = FastAPI() +app.mount("/static", StaticFiles(directory="static"), name="static") + +class ConnectionManager: + def __init__(self): + self.active_connections: list[WebSocket] = [] + + async def connect(self, websocket: WebSocket): + await websocket.accept() + self.active_connections.append(websocket) + + def disconnect(self, websocket: WebSocket): + if websocket in self.active_connections: + self.active_connections.remove(websocket) + + async def send_message(self, message: str): + for connection in self.active_connections: + try: + await connection.send_text(message) + except WebSocketDisconnect: + self.disconnect(connection) + + +manager = ConnectionManager() + +# HTML для тестирования WebSocket +html = """ + + + + WebSocket Test + + +

WebSocket Test

+
+ + + +""" + + +@app.get("/") +async def get(): + return HTMLResponse(html) + + +@app.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket): + await manager.connect(websocket) + try: + while True: + await asyncio.sleep(1) # Задержка для поддержания соединения + except WebSocketDisconnect: + manager.disconnect(websocket) + + +async def consume_kafka_and_forward(): + consumer = AIOKafkaConsumer( + 'response_llm_topic', + bootstrap_servers='kafka:9092', + group_id="websocket_group" + ) + await consumer.start() + try: + async for message in consumer: + message_value = json.loads(message.value.decode('utf-8')) + + answer = message_value["answer"] + + json_message = {"answer": answer} + + # Сериализация данных в JSON + message_json = json.dumps(json_message).encode('utf-8') + + # Отправляем сообщение ws + await manager.send_message(message_json.decode('utf-8')) + finally: + await consumer.stop() + + +@app.on_event("startup") +async def startup_event(): + asyncio.create_task(consume_kafka_and_forward()) + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file diff --git a/agentic/llm/test_producer.py b/agentic/llm/test_producer.py new file mode 100644 index 0000000..3acc505 --- /dev/null +++ b/agentic/llm/test_producer.py @@ -0,0 +1,63 @@ +# +# SEND REQUEST +# + + +import asyncio +import uuid +import json +import random +import base64 +from aiokafka import AIOKafkaProducer +from datetime import datetime + +producer = None + +async def init_kafka_producer(): + global producer + if producer is None: + producer = AIOKafkaProducer(bootstrap_servers='kafka:9092', max_request_size=10485760) + for _ in range(5): + try: + print(f"Producing message {_}") + await producer.start() + break + except Exception as e: + print(f"Kafka connection error, retrying in 5 seconds: {e}") + await asyncio.sleep(5) + +async def stop_kafka_producer(): + global producer + if producer is not None: + await producer.stop() + producer = None + +async def send_random_data(): + try: + timestamp = datetime.utcnow().isoformat() + frame_uuid = str(uuid.uuid4()) + + MESSAGE = "Hello World!" + + message = { + "text": MESSAGE, + "track_uuid": frame_uuid, + "timestamp": timestamp, + } + + message_json = json.dumps(message).encode('utf-8') + await producer.send_and_wait("request_llm_topic", message_json) + print(f"Sent message with UUID: {frame_uuid}") + except Exception as e: + print(f"Error sending message to Kafka: {e}") + +async def main(): + await init_kafka_producer() + try: + while True: + await send_random_data() + await asyncio.sleep(1) + finally: + await stop_kafka_producer() + +asyncio.run(main()) diff --git a/agentic/llm/test_send_llm_requests.py b/agentic/llm/test_send_llm_requests.py deleted file mode 100644 index 660ba43..0000000 --- a/agentic/llm/test_send_llm_requests.py +++ /dev/null @@ -1,19 +0,0 @@ -import asyncio -from AgenticInterfaces.KafkaClient import KafkaClient - -bootstrap_servers = "kafka:9092" -request_topic = "request_llm_topic" - -async def send_periodic_messages(): - kafka_client = KafkaClient(topic=request_topic, bootstrap_servers=bootstrap_servers) - await kafka_client.start() - - try: - while True: - message = {"text": "Пример сообщения"} - await kafka_client.send_message(message) - await asyncio.sleep(5) - finally: - await kafka_client.stop() - -asyncio.run(send_periodic_messages()) \ No newline at end of file diff --git a/agentic/llm/test_show_responses_from_llm.py b/agentic/llm/test_show_responses_from_llm.py deleted file mode 100644 index ba8c732..0000000 --- a/agentic/llm/test_show_responses_from_llm.py +++ /dev/null @@ -1,19 +0,0 @@ -import asyncio -from AgenticInterfaces.KafkaClient import KafkaClient - -bootstrap_servers = "kafka:9092" -response_topic = "response_llm_topic" - -async def print_received_messages(message): - print("Получено сообщение:", message["text"]) - -async def receive_messages(): - kafka_client = KafkaClient(topic=response_topic, bootstrap_servers=bootstrap_servers, group_id="response-group") - await kafka_client.start() - - try: - await kafka_client.consume_messages(print_received_messages) - finally: - await kafka_client.stop() - -asyncio.run(receive_messages()) \ No newline at end of file diff --git a/infrastructure/kafka/docker-compose.yaml b/infrastructure/kafka/docker-compose.yaml deleted file mode 100644 index 176c5e2..0000000 --- a/infrastructure/kafka/docker-compose.yaml +++ /dev/null @@ -1,29 +0,0 @@ -version: '3.8' -services: - kafka: - image: confluentinc/cp-kafka:latest - environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_CREATE_TOPICS: "request_llm_topic:3:1,response_llm_topic:3:1" # Формат: Топик:Кол-во_партиций:Фактор_репликации - ports: - - "9092:9092" - networks: - - my_network - depends_on: - - zookeeper - - zookeeper: - image: confluentinc/cp-zookeeper:latest - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ports: - - "2181:2181" - networks: - - my_network - -networks: - my_network: - driver: bridge \ No newline at end of file