Skip to content
Snippets Groups Projects
Commit 1f77e14f authored by stefan's avatar stefan
Browse files

modified message delay

parent 4cf96b81
No related branches found
No related tags found
No related merge requests found
......@@ -7,6 +7,7 @@ from utils.publisher import RabbitMQPublisher
app = FastAPI( title="NY BUS Producer API",
description="IBD Producer API to publish NY BUS data.",)
publisher = RabbitMQPublisher()
@app.get("/", summary="Root Endpoint")
......@@ -27,6 +28,8 @@ async def start_publishing(background_tasks: BackgroundTasks):
status_code=400,
content={"message": "Publisher is already running"}
)
else:
publisher.connect()
background_tasks.add_task(publisher.start_publishing)
return {"message": "Started publishing messages"}
......
......@@ -19,7 +19,6 @@ class RabbitMQPublisher:
self.current_position = 0
self.connection = None
self.channel = None
self.connect()
def connect(self):
credentials = pika.PlainCredentials(self.rabbitmq_user, self.rabbitmq_password)
......@@ -55,13 +54,12 @@ class RabbitMQPublisher:
return chunk
def publish_to_rabbitmq(self, messages: List[Dict]):
def publish_to_rabbitmq(self, message: Dict):
"""Publish messages to RabbitMQ queue"""
try:
for message in messages:
self.channel.basic_publish(exchange='',
routing_key=self.queue_name,
body=json.dumps(message))
self.channel.basic_publish(exchange='',
routing_key=self.queue_name,
body=json.dumps(message))
except Exception as e:
LOG.error(f"Error publishing to RabbitMQ: {str(e)}", exc_info=True)
......@@ -74,11 +72,12 @@ class RabbitMQPublisher:
try:
messages = self.read_csv_chunk()
if messages:
self.publish_to_rabbitmq(messages)
await asyncio.sleep(randint(5,30))
for message in messages:
self.publish_to_rabbitmq(message)
await asyncio.sleep(randint(5, 30))
except Exception as e:
LOG.error(f"Error in publishing loop: {str(e)}", exc_info=True)
await asyncio.sleep(randint(5,30)) # Still sleep on error to prevent tight loop
await asyncio.sleep(5)
def stop_publishing(self):
"""Stop publishing messages"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment