From 1f77e14f9d22280517a5a5d76ddb60f42a3caa07 Mon Sep 17 00:00:00 2001 From: stefan <mihalachestefan2001@gmail.com> Date: Mon, 6 Jan 2025 16:54:26 +0200 Subject: [PATCH] modified message delay --- producer/main.py | 3 +++ producer/utils/publisher.py | 17 ++++++++--------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/producer/main.py b/producer/main.py index 9ba74f5..e35b2fa 100644 --- a/producer/main.py +++ b/producer/main.py @@ -7,6 +7,7 @@ from utils.publisher import RabbitMQPublisher app = FastAPI( title="NY BUS Producer API", description="IBD Producer API to publish NY BUS data.",) + publisher = RabbitMQPublisher() @app.get("/", summary="Root Endpoint") @@ -27,6 +28,8 @@ async def start_publishing(background_tasks: BackgroundTasks): status_code=400, content={"message": "Publisher is already running"} ) + else: + publisher.connect() background_tasks.add_task(publisher.start_publishing) return {"message": "Started publishing messages"} diff --git a/producer/utils/publisher.py b/producer/utils/publisher.py index 8256bfd..50b9f9e 100644 --- a/producer/utils/publisher.py +++ b/producer/utils/publisher.py @@ -19,7 +19,6 @@ class RabbitMQPublisher: self.current_position = 0 self.connection = None self.channel = None - self.connect() def connect(self): credentials = pika.PlainCredentials(self.rabbitmq_user, self.rabbitmq_password) @@ -55,13 +54,12 @@ class RabbitMQPublisher: return chunk - def publish_to_rabbitmq(self, messages: List[Dict]): + def publish_to_rabbitmq(self, message: Dict): """Publish messages to RabbitMQ queue""" try: - for message in messages: - self.channel.basic_publish(exchange='', - routing_key=self.queue_name, - body=json.dumps(message)) + self.channel.basic_publish(exchange='', + routing_key=self.queue_name, + body=json.dumps(message)) except Exception as e: LOG.error(f"Error publishing to RabbitMQ: {str(e)}", exc_info=True) @@ -74,11 +72,12 @@ class RabbitMQPublisher: try: messages = self.read_csv_chunk() if messages: - self.publish_to_rabbitmq(messages) - await asyncio.sleep(randint(5,30)) + for message in messages: + self.publish_to_rabbitmq(message) + await asyncio.sleep(randint(5, 30)) except Exception as e: LOG.error(f"Error in publishing loop: {str(e)}", exc_info=True) - await asyncio.sleep(randint(5,30)) # Still sleep on error to prevent tight loop + await asyncio.sleep(5) def stop_publishing(self): """Stop publishing messages""" -- GitLab