From bd7571656021139d5f4be535a9a6e95b21db4b4f Mon Sep 17 00:00:00 2001 From: Artem Darius Weber Date: Sat, 9 Nov 2024 13:29:50 +0300 Subject: [PATCH] =?UTF-8?q?test:=20=D0=9F=D1=80=D0=BE=D0=B2=D0=B5=D1=80?= =?UTF-8?q?=D0=BA=D0=B0=20=D1=80=D0=B0=D0=B1=D0=BE=D1=82=D0=BE=D1=81=D0=BF?= =?UTF-8?q?=D0=BE=D1=81=D0=BE=D0=B1=D0=BD=D0=BE=D1=81=D1=82=D0=B8=20=D1=81?= =?UTF-8?q?=D0=BE=D0=BE=D0=B1=D1=89=D0=B5=D0=BD=D0=B8=D0=B9=20=D0=BE=D0=B1?= =?UTF-8?q?=D1=80=D0=B0=D0=B1=D0=BE=D1=82=D0=BA=D0=B8=20=D0=B7=D0=B0=D0=BF?= =?UTF-8?q?=D1=80=D0=BE=D1=81=D0=BE=D0=B2=20=D0=BA=20LLM?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/robot-mors-object-track.iml | 5 +- .idea/vcs.xml | 1 + .idea/webResources.xml | 17 +++++++ agentic/llm/AgenticInterfaces/KafkaClient.py | 42 +++++++++++++++++ .../llm/AgenticInterfaces/__init__.py | 0 agentic/llm/consumer.Dockerfile | 7 +++ agentic/llm/docker-compose.yaml | 46 +++++++++++++++++++ agentic/llm/llm_worker.py | 25 ++++++++++ agentic/llm/processor.Dockerfile | 7 +++ agentic/llm/producer.Dockerfile | 7 +++ agentic/llm/requirements.txt | 1 + agentic/llm/test_send_llm_requests.py | 19 ++++++++ agentic/llm/test_show_responses_from_llm.py | 19 ++++++++ 13 files changed, 195 insertions(+), 1 deletion(-) create mode 100644 .idea/webResources.xml create mode 100644 agentic/llm/AgenticInterfaces/KafkaClient.py rename 101_llama_inference_kafka_WORKER.py => agentic/llm/AgenticInterfaces/__init__.py (100%) create mode 100644 agentic/llm/consumer.Dockerfile create mode 100644 agentic/llm/docker-compose.yaml create mode 100644 agentic/llm/llm_worker.py create mode 100644 agentic/llm/processor.Dockerfile create mode 100644 agentic/llm/producer.Dockerfile create mode 100644 agentic/llm/requirements.txt create mode 100644 agentic/llm/test_send_llm_requests.py create mode 100644 agentic/llm/test_show_responses_from_llm.py diff --git a/.idea/robot-mors-object-track.iml b/.idea/robot-mors-object-track.iml index d0876a7..bc1f7a0 100644 --- a/.idea/robot-mors-object-track.iml +++ b/.idea/robot-mors-object-track.iml @@ -1,7 +1,10 @@ - + + + + diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 2a0680a..9f8ebbb 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,6 +1,7 @@ + \ No newline at end of file diff --git a/.idea/webResources.xml b/.idea/webResources.xml new file mode 100644 index 0000000..c75b706 --- /dev/null +++ b/.idea/webResources.xml @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/agentic/llm/AgenticInterfaces/KafkaClient.py b/agentic/llm/AgenticInterfaces/KafkaClient.py new file mode 100644 index 0000000..5ec3422 --- /dev/null +++ b/agentic/llm/AgenticInterfaces/KafkaClient.py @@ -0,0 +1,42 @@ +import asyncio +import json +from aiokafka import AIOKafkaConsumer, AIOKafkaProducer + +class KafkaClient: + def __init__(self, topic: str, bootstrap_servers: str, group_id: str = None): + self.topic = topic + self.bootstrap_servers = bootstrap_servers + self.group_id = group_id + self.consumer = None + self.producer = None + + async def start(self): + # init producer & consumer + self.producer = AIOKafkaProducer(bootstrap_servers=self.bootstrap_servers) + await self.producer.start() + + if self.group_id: + self.consumer = AIOKafkaConsumer( + self.topic, + bootstrap_servers=self.bootstrap_servers, + group_id=self.group_id + ) + await self.consumer.start() + + async def stop(self): + if self.producer: + await self.producer.stop() + if self.consumer: + await self.consumer.stop() + + async def send_message(self, message: dict): + message_json = json.dumps(message).encode('utf-8') + await self.producer.send_and_wait(self.topic, message_json) + + async def consume_messages(self, callback): + if not self.consumer: + raise Exception("Consumer is not initialized. Ensure group_id is set and start is called.") + + async for message in self.consumer: + message_value = json.loads(message.value.decode('utf-8')) + await callback(message_value) \ No newline at end of file diff --git a/101_llama_inference_kafka_WORKER.py b/agentic/llm/AgenticInterfaces/__init__.py similarity index 100% rename from 101_llama_inference_kafka_WORKER.py rename to agentic/llm/AgenticInterfaces/__init__.py diff --git a/agentic/llm/consumer.Dockerfile b/agentic/llm/consumer.Dockerfile new file mode 100644 index 0000000..905ead4 --- /dev/null +++ b/agentic/llm/consumer.Dockerfile @@ -0,0 +1,7 @@ +# producer.Dockerfile +FROM python:3.9 +WORKDIR /app +COPY test_show_responses_from_llm.py . +COPY requirements.txt . +RUN pip install -r requirements.txt +CMD ["python", "test_show_responses_from_llm.py"] diff --git a/agentic/llm/docker-compose.yaml b/agentic/llm/docker-compose.yaml new file mode 100644 index 0000000..e6e5aab --- /dev/null +++ b/agentic/llm/docker-compose.yaml @@ -0,0 +1,46 @@ +version: '3.8' + +services: + kafka: + image: 'bitnami/kafka:latest' + environment: + - KAFKA_BROKER_ID=1 + - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT + - KAFKA_LISTENERS=INSIDE://:9092,OUTSIDE://:29092 + - KAFKA_ADVERTISED_LISTENERS=INSIDE://kafka:9092,OUTSIDE://localhost:29092 + - KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + depends_on: + - zookeeper + + zookeeper: + image: 'bitnami/zookeeper:latest' + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + + producer: + build: + context: . + dockerfile: producer.Dockerfile + environment: + - KAFKA_BOOTSTRAP_SERVERS=kafka:9092 + depends_on: + - kafka + + consumer: + build: + context: . + dockerfile: consumer.Dockerfile + environment: + - KAFKA_BOOTSTRAP_SERVERS=kafka:9092 + depends_on: + - kafka + + processor: + build: + context: . + dockerfile: processor.Dockerfile + environment: + - KAFKA_BOOTSTRAP_SERVERS=kafka:9092 + depends_on: + - kafka \ No newline at end of file diff --git a/agentic/llm/llm_worker.py b/agentic/llm/llm_worker.py new file mode 100644 index 0000000..9910d2a --- /dev/null +++ b/agentic/llm/llm_worker.py @@ -0,0 +1,25 @@ +import asyncio +from AgenticInterfaces.KafkaClient import KafkaClient + +request_topic = "request_llm_topic" +response_topic = "response_llm_topic" +bootstrap_servers = "kafka:9092" + +def modify_text(text): + return text[::-1] + +async def process_message(message): + modified_text = modify_text(message["text"]) + response_message = {"text": modified_text} + await kafka_client.send_message(response_message) + +async def main(): + global kafka_client + kafka_client = KafkaClient(topic=request_topic, bootstrap_servers=bootstrap_servers, group_id="my-group") + await kafka_client.start() + try: + await kafka_client.consume_messages(process_message) + finally: + await kafka_client.stop() + +asyncio.run(main()) \ No newline at end of file diff --git a/agentic/llm/processor.Dockerfile b/agentic/llm/processor.Dockerfile new file mode 100644 index 0000000..174cee8 --- /dev/null +++ b/agentic/llm/processor.Dockerfile @@ -0,0 +1,7 @@ +# processor.Dockerfile +FROM python:3.9 +WORKDIR /app +COPY llm_worker.py . +COPY requirements.txt . +RUN pip install -r requirements.txt +CMD ["python", "llm_worker.py"] \ No newline at end of file diff --git a/agentic/llm/producer.Dockerfile b/agentic/llm/producer.Dockerfile new file mode 100644 index 0000000..d125afe --- /dev/null +++ b/agentic/llm/producer.Dockerfile @@ -0,0 +1,7 @@ +# producer.Dockerfile +FROM python:3.9 +WORKDIR /app +COPY test_send_llm_requests.py . +COPY requirements.txt . +RUN pip install -r requirements.txt +CMD ["python", "test_send_llm_requests.py"] diff --git a/agentic/llm/requirements.txt b/agentic/llm/requirements.txt new file mode 100644 index 0000000..aa3a931 --- /dev/null +++ b/agentic/llm/requirements.txt @@ -0,0 +1 @@ +aiokafka diff --git a/agentic/llm/test_send_llm_requests.py b/agentic/llm/test_send_llm_requests.py new file mode 100644 index 0000000..660ba43 --- /dev/null +++ b/agentic/llm/test_send_llm_requests.py @@ -0,0 +1,19 @@ +import asyncio +from AgenticInterfaces.KafkaClient import KafkaClient + +bootstrap_servers = "kafka:9092" +request_topic = "request_llm_topic" + +async def send_periodic_messages(): + kafka_client = KafkaClient(topic=request_topic, bootstrap_servers=bootstrap_servers) + await kafka_client.start() + + try: + while True: + message = {"text": "Пример сообщения"} + await kafka_client.send_message(message) + await asyncio.sleep(5) + finally: + await kafka_client.stop() + +asyncio.run(send_periodic_messages()) \ No newline at end of file diff --git a/agentic/llm/test_show_responses_from_llm.py b/agentic/llm/test_show_responses_from_llm.py new file mode 100644 index 0000000..ba8c732 --- /dev/null +++ b/agentic/llm/test_show_responses_from_llm.py @@ -0,0 +1,19 @@ +import asyncio +from AgenticInterfaces.KafkaClient import KafkaClient + +bootstrap_servers = "kafka:9092" +response_topic = "response_llm_topic" + +async def print_received_messages(message): + print("Получено сообщение:", message["text"]) + +async def receive_messages(): + kafka_client = KafkaClient(topic=response_topic, bootstrap_servers=bootstrap_servers, group_id="response-group") + await kafka_client.start() + + try: + await kafka_client.consume_messages(print_received_messages) + finally: + await kafka_client.stop() + +asyncio.run(receive_messages()) \ No newline at end of file