You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
42 lines
1.5 KiB
42 lines
1.5 KiB
import asyncio
|
|
import json
|
|
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
|
|
from aiokafka.errors import KafkaConnectionError
|
|
|
|
class KafkaClient:
|
|
def __init__(self, topic: str, bootstrap_servers: str, group_id: str = None):
|
|
self.topic = topic
|
|
self.bootstrap_servers = bootstrap_servers
|
|
self.group_id = group_id
|
|
self.consumer = None
|
|
self.producer = None
|
|
|
|
async def start(self):
|
|
# initialize producer & consumer
|
|
self.producer = AIOKafkaProducer(bootstrap_servers=self.bootstrap_servers)
|
|
await self.producer.start()
|
|
|
|
if self.group_id:
|
|
self.consumer = AIOKafkaConsumer(
|
|
self.topic,
|
|
bootstrap_servers=self.bootstrap_servers,
|
|
group_id=self.group_id
|
|
)
|
|
await self.consumer.start()
|
|
|
|
async def stop(self):
|
|
if self.producer:
|
|
await self.producer.stop()
|
|
if self.consumer:
|
|
await self.consumer.stop()
|
|
|
|
async def send_message(self, message: dict):
|
|
message_json = json.dumps(message).encode('utf-8')
|
|
await self.producer.send_and_wait(self.topic, message_json)
|
|
|
|
async def consume_messages(self, callback):
|
|
if not self.consumer:
|
|
raise Exception("Consumer is not initialized. Ensure group_id is set and start is called.")
|
|
async for message in self.consumer:
|
|
message_value = json.loads(message.value.decode('utf-8'))
|
|
await callback(message_value) |