diff --git a/.idea/robot-mors-object-track.iml b/.idea/robot-mors-object-track.iml index d0876a7..bc1f7a0 100644 --- a/.idea/robot-mors-object-track.iml +++ b/.idea/robot-mors-object-track.iml @@ -1,7 +1,10 @@ - + + + + diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 2a0680a..9f8ebbb 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,6 +1,7 @@ + \ No newline at end of file diff --git a/.idea/webResources.xml b/.idea/webResources.xml new file mode 100644 index 0000000..c75b706 --- /dev/null +++ b/.idea/webResources.xml @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/agentic/llm/AgenticInterfaces/KafkaClient.py b/agentic/llm/AgenticInterfaces/KafkaClient.py new file mode 100644 index 0000000..5ec3422 --- /dev/null +++ b/agentic/llm/AgenticInterfaces/KafkaClient.py @@ -0,0 +1,42 @@ +import asyncio +import json +from aiokafka import AIOKafkaConsumer, AIOKafkaProducer + +class KafkaClient: + def __init__(self, topic: str, bootstrap_servers: str, group_id: str = None): + self.topic = topic + self.bootstrap_servers = bootstrap_servers + self.group_id = group_id + self.consumer = None + self.producer = None + + async def start(self): + # init producer & consumer + self.producer = AIOKafkaProducer(bootstrap_servers=self.bootstrap_servers) + await self.producer.start() + + if self.group_id: + self.consumer = AIOKafkaConsumer( + self.topic, + bootstrap_servers=self.bootstrap_servers, + group_id=self.group_id + ) + await self.consumer.start() + + async def stop(self): + if self.producer: + await self.producer.stop() + if self.consumer: + await self.consumer.stop() + + async def send_message(self, message: dict): + message_json = json.dumps(message).encode('utf-8') + await self.producer.send_and_wait(self.topic, message_json) + + async def consume_messages(self, callback): + if not self.consumer: + raise Exception("Consumer is not initialized. Ensure group_id is set and start is called.") + + async for message in self.consumer: + message_value = json.loads(message.value.decode('utf-8')) + await callback(message_value) \ No newline at end of file diff --git a/101_llama_inference_kafka_WORKER.py b/agentic/llm/AgenticInterfaces/__init__.py similarity index 100% rename from 101_llama_inference_kafka_WORKER.py rename to agentic/llm/AgenticInterfaces/__init__.py diff --git a/agentic/llm/consumer.Dockerfile b/agentic/llm/consumer.Dockerfile new file mode 100644 index 0000000..905ead4 --- /dev/null +++ b/agentic/llm/consumer.Dockerfile @@ -0,0 +1,7 @@ +# producer.Dockerfile +FROM python:3.9 +WORKDIR /app +COPY test_show_responses_from_llm.py . +COPY requirements.txt . +RUN pip install -r requirements.txt +CMD ["python", "test_show_responses_from_llm.py"] diff --git a/agentic/llm/docker-compose.yaml b/agentic/llm/docker-compose.yaml new file mode 100644 index 0000000..e6e5aab --- /dev/null +++ b/agentic/llm/docker-compose.yaml @@ -0,0 +1,46 @@ +version: '3.8' + +services: + kafka: + image: 'bitnami/kafka:latest' + environment: + - KAFKA_BROKER_ID=1 + - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT + - KAFKA_LISTENERS=INSIDE://:9092,OUTSIDE://:29092 + - KAFKA_ADVERTISED_LISTENERS=INSIDE://kafka:9092,OUTSIDE://localhost:29092 + - KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + depends_on: + - zookeeper + + zookeeper: + image: 'bitnami/zookeeper:latest' + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + + producer: + build: + context: . + dockerfile: producer.Dockerfile + environment: + - KAFKA_BOOTSTRAP_SERVERS=kafka:9092 + depends_on: + - kafka + + consumer: + build: + context: . + dockerfile: consumer.Dockerfile + environment: + - KAFKA_BOOTSTRAP_SERVERS=kafka:9092 + depends_on: + - kafka + + processor: + build: + context: . + dockerfile: processor.Dockerfile + environment: + - KAFKA_BOOTSTRAP_SERVERS=kafka:9092 + depends_on: + - kafka \ No newline at end of file diff --git a/agentic/llm/llm_worker.py b/agentic/llm/llm_worker.py new file mode 100644 index 0000000..9910d2a --- /dev/null +++ b/agentic/llm/llm_worker.py @@ -0,0 +1,25 @@ +import asyncio +from AgenticInterfaces.KafkaClient import KafkaClient + +request_topic = "request_llm_topic" +response_topic = "response_llm_topic" +bootstrap_servers = "kafka:9092" + +def modify_text(text): + return text[::-1] + +async def process_message(message): + modified_text = modify_text(message["text"]) + response_message = {"text": modified_text} + await kafka_client.send_message(response_message) + +async def main(): + global kafka_client + kafka_client = KafkaClient(topic=request_topic, bootstrap_servers=bootstrap_servers, group_id="my-group") + await kafka_client.start() + try: + await kafka_client.consume_messages(process_message) + finally: + await kafka_client.stop() + +asyncio.run(main()) \ No newline at end of file diff --git a/agentic/llm/processor.Dockerfile b/agentic/llm/processor.Dockerfile new file mode 100644 index 0000000..174cee8 --- /dev/null +++ b/agentic/llm/processor.Dockerfile @@ -0,0 +1,7 @@ +# processor.Dockerfile +FROM python:3.9 +WORKDIR /app +COPY llm_worker.py . +COPY requirements.txt . +RUN pip install -r requirements.txt +CMD ["python", "llm_worker.py"] \ No newline at end of file diff --git a/agentic/llm/producer.Dockerfile b/agentic/llm/producer.Dockerfile new file mode 100644 index 0000000..d125afe --- /dev/null +++ b/agentic/llm/producer.Dockerfile @@ -0,0 +1,7 @@ +# producer.Dockerfile +FROM python:3.9 +WORKDIR /app +COPY test_send_llm_requests.py . +COPY requirements.txt . +RUN pip install -r requirements.txt +CMD ["python", "test_send_llm_requests.py"] diff --git a/agentic/llm/requirements.txt b/agentic/llm/requirements.txt new file mode 100644 index 0000000..aa3a931 --- /dev/null +++ b/agentic/llm/requirements.txt @@ -0,0 +1 @@ +aiokafka diff --git a/agentic/llm/test_send_llm_requests.py b/agentic/llm/test_send_llm_requests.py new file mode 100644 index 0000000..660ba43 --- /dev/null +++ b/agentic/llm/test_send_llm_requests.py @@ -0,0 +1,19 @@ +import asyncio +from AgenticInterfaces.KafkaClient import KafkaClient + +bootstrap_servers = "kafka:9092" +request_topic = "request_llm_topic" + +async def send_periodic_messages(): + kafka_client = KafkaClient(topic=request_topic, bootstrap_servers=bootstrap_servers) + await kafka_client.start() + + try: + while True: + message = {"text": "Пример сообщения"} + await kafka_client.send_message(message) + await asyncio.sleep(5) + finally: + await kafka_client.stop() + +asyncio.run(send_periodic_messages()) \ No newline at end of file diff --git a/agentic/llm/test_show_responses_from_llm.py b/agentic/llm/test_show_responses_from_llm.py new file mode 100644 index 0000000..ba8c732 --- /dev/null +++ b/agentic/llm/test_show_responses_from_llm.py @@ -0,0 +1,19 @@ +import asyncio +from AgenticInterfaces.KafkaClient import KafkaClient + +bootstrap_servers = "kafka:9092" +response_topic = "response_llm_topic" + +async def print_received_messages(message): + print("Получено сообщение:", message["text"]) + +async def receive_messages(): + kafka_client = KafkaClient(topic=response_topic, bootstrap_servers=bootstrap_servers, group_id="response-group") + await kafka_client.start() + + try: + await kafka_client.consume_messages(print_received_messages) + finally: + await kafka_client.stop() + +asyncio.run(receive_messages()) \ No newline at end of file