You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
42 lines
1.4 KiB
42 lines
1.4 KiB
3 months ago
|
import asyncio
|
||
|
import json
|
||
|
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
|
||
|
|
||
|
class KafkaClient:
|
||
|
def __init__(self, topic: str, bootstrap_servers: str, group_id: str = None):
|
||
|
self.topic = topic
|
||
|
self.bootstrap_servers = bootstrap_servers
|
||
|
self.group_id = group_id
|
||
|
self.consumer = None
|
||
|
self.producer = None
|
||
|
|
||
|
async def start(self):
|
||
|
# init producer & consumer
|
||
|
self.producer = AIOKafkaProducer(bootstrap_servers=self.bootstrap_servers)
|
||
|
await self.producer.start()
|
||
|
|
||
|
if self.group_id:
|
||
|
self.consumer = AIOKafkaConsumer(
|
||
|
self.topic,
|
||
|
bootstrap_servers=self.bootstrap_servers,
|
||
|
group_id=self.group_id
|
||
|
)
|
||
|
await self.consumer.start()
|
||
|
|
||
|
async def stop(self):
|
||
|
if self.producer:
|
||
|
await self.producer.stop()
|
||
|
if self.consumer:
|
||
|
await self.consumer.stop()
|
||
|
|
||
|
async def send_message(self, message: dict):
|
||
|
message_json = json.dumps(message).encode('utf-8')
|
||
|
await self.producer.send_and_wait(self.topic, message_json)
|
||
|
|
||
|
async def consume_messages(self, callback):
|
||
|
if not self.consumer:
|
||
|
raise Exception("Consumer is not initialized. Ensure group_id is set and start is called.")
|
||
|
|
||
|
async for message in self.consumer:
|
||
|
message_value = json.loads(message.value.decode('utf-8'))
|
||
|
await callback(message_value)
|