You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

64 lines
1.5 KiB

#
# SEND REQUEST
#
import asyncio
import uuid
import json
import random
import base64
from aiokafka import AIOKafkaProducer
from datetime import datetime
producer = None
async def init_kafka_producer():
global producer
if producer is None:
producer = AIOKafkaProducer(bootstrap_servers='kafka:9092', max_request_size=10485760)
for _ in range(5):
try:
print(f"Producing message {_}")
await producer.start()
break
except Exception as e:
print(f"Kafka connection error, retrying in 5 seconds: {e}")
await asyncio.sleep(5)
async def stop_kafka_producer():
global producer
if producer is not None:
await producer.stop()
producer = None
async def send_random_data():
try:
timestamp = datetime.utcnow().isoformat()
frame_uuid = str(uuid.uuid4())
MESSAGE = "Hello World!"
message = {
"text": MESSAGE,
"track_uuid": frame_uuid,
"timestamp": timestamp,
}
message_json = json.dumps(message).encode('utf-8')
await producer.send_and_wait("request_llm_topic", message_json)
print(f"Sent message with UUID: {frame_uuid}")
except Exception as e:
print(f"Error sending message to Kafka: {e}")
async def main():
await init_kafka_producer()
try:
while True:
await send_random_data()
await asyncio.sleep(1)
finally:
await stop_kafka_producer()
asyncio.run(main())