diff --git a/docker-compose.yaml b/docker-compose.yaml index 47e3a042f5fd4907914791566a8690b111738ddf..d266ae524a6bec16ad835fd6bfe79ccf1d78236c 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -65,6 +65,19 @@ services: volumes: - grafanadata:/var/lib/grafana + producer: + build: + context: ./producer + container_name: ibd-producer + depends_on: + - rabbitmq + env_file: + - ./producer/.env + volumes: + - ./producer:/code + ports: + - "8080:8007" + volumes: elasticdata: diff --git a/producer/.env.sample b/producer/.env.sample new file mode 100644 index 0000000000000000000000000000000000000000..0d8d8d63c192bf73ec19789813043a1afb8de6ba --- /dev/null +++ b/producer/.env.sample @@ -0,0 +1,7 @@ +ENVIRONMENT=dev # options: dev, prod (default dev) + +RABBITMQ_PORT=5672 +RABBITMQ_HOST=ibd-local-stack-rabbitmq +RABBITMQ_QUEUE=ibd-local-stack-data +RABBITMQ_USER=guest +RABBITMQ_PASSWORD=guest diff --git a/producer/Dockerfile b/producer/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..0e8ca68d586634154d7fcb0e037b39d7bde3f7ab --- /dev/null +++ b/producer/Dockerfile @@ -0,0 +1,25 @@ +FROM python:3.13.1-slim-bookworm + +ARG CODE=/code +# Make base folder for the source code +RUN mkdir ${CODE} + +# Copy local code to container location and change working dir to that directory +WORKDIR ${CODE} + +# Install the required libraries +RUN apt-get update -y \ + && apt-get upgrade -y \ + && apt-get install -y \ + python3-dev + +COPY . ${CODE} + +# Install the python packages +RUN pip install -r ${CODE}/requirements.txt + +# Expose the port +EXPOSE 8007 + +# Default command for the image (this can be overwritten on compose) +CMD ["bash", "run_server.sh"] \ No newline at end of file diff --git a/producer/data/nyBUS_small.csv b/producer/data/nyBUS_small.csv new file mode 100644 index 0000000000000000000000000000000000000000..68e1fd40786cce91cccb7b490c74001914b2b89e Binary files /dev/null and b/producer/data/nyBUS_small.csv differ diff --git a/producer/main.py b/producer/main.py new file mode 100644 index 0000000000000000000000000000000000000000..9ba74f561d26bcf8c6d394208a0a4be5499edf90 --- /dev/null +++ b/producer/main.py @@ -0,0 +1,49 @@ +from fastapi import FastAPI, BackgroundTasks +from fastapi.responses import JSONResponse +import asyncio + +from utils.publisher import RabbitMQPublisher + + +app = FastAPI( title="NY BUS Producer API", + description="IBD Producer API to publish NY BUS data.",) +publisher = RabbitMQPublisher() + +@app.get("/", summary="Root Endpoint") +async def root(): + """ + Root Endpoint + This endpoint provides a greeting message and directs users to the Swagger UI documentation for the API. + By navigating to `/docs`, users can access the interactive API documentation where they can see all + available endpoints, their expected parameters, and try out the API directly from the browser. + """ + return {"message": "Welcome to the API. Check /docs for Swagger documentation."} + +@app.post("/start") +async def start_publishing(background_tasks: BackgroundTasks): + """Start publishing messages to RabbitMQ""" + if publisher.is_publishing: + return JSONResponse( + status_code=400, + content={"message": "Publisher is already running"} + ) + + background_tasks.add_task(publisher.start_publishing) + return {"message": "Started publishing messages"} + + +@app.post("/stop") +async def stop_publishing(): + """Stop publishing messages to RabbitMQ""" + publisher.stop_publishing() + return {"message": "Stopped publishing messages"} + + +@app.get("/status") +async def get_status(): + """Get the current status of the publisher""" + return { + "is_publishing": publisher.is_publishing, + "current_position": publisher.current_position, + "timestamp": datetime.now().isoformat() + } \ No newline at end of file diff --git a/producer/requirements.txt b/producer/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..860a7e6082ac9a2130bb48e6a0d08083e61831fa --- /dev/null +++ b/producer/requirements.txt @@ -0,0 +1,4 @@ +fastapi==0.115.6 +pika==1.3.2 +logger==1.4 +uvicorn[standard]==0.21.1 diff --git a/producer/run_server.sh b/producer/run_server.sh new file mode 100644 index 0000000000000000000000000000000000000000..063fa52a733c9a4798f0ff37b61cb71f37031704 --- /dev/null +++ b/producer/run_server.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +if [[ ${ENVIRONMENT} == "prod" ]]; then + #gunicorn main:app --workers 4 --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:8007 + uvicorn main:app --workers 8 --host 0.0.0.0 --port 8007 +else + uvicorn main:app --host 0.0.0.0 --port 8007 --reload +fi diff --git a/producer/utils/__init__.py b/producer/utils/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/producer/utils/logger.py b/producer/utils/logger.py new file mode 100644 index 0000000000000000000000000000000000000000..36ceb079ba15b32de31f43e7ed03a0593bc8d6ea --- /dev/null +++ b/producer/utils/logger.py @@ -0,0 +1,53 @@ +""" This module defines Log method for the project""" + +import logging +import traceback + + +class CustomLogger(logging.Logger): + def __init__(self, name, level=logging.NOTSET): + super().__init__(name, level) + + def exception(self, e: Exception, *args, **kwargs): + # extract and log exception info + LOG.error(e.__cause__) + # Get the current traceback stack + tb = traceback.extract_tb(e.__traceback__) + # Get the last traceback object (the one where the exception occurred) + last_tb = tb[-1] + # Extract the filename, line number, and line content + filename, line_number, function_name, text = last_tb + # Print the cause of the exception and the line where it occurred + self.debug("An exception occurred: %s", e) + self.debug(f"Cause: {e.__cause__}") + self.debug(f"File: {filename}, line {line_number}, in {function_name}") + self.debug(f"Code: {text}") + + def enableDebug(self): + self.setLevel(logging.DEBUG) + + def disableDebug(self): + self.setLevel(logging.INFO) + + +def setup_logger(): + logger_name = "ibd-producer" + logging.setLoggerClass(CustomLogger) + logger = logging.getLogger(logger_name) + logger.setLevel(logging.INFO) + + handler = logging.StreamHandler() + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + handler.setFormatter(formatter) + logger.addHandler(handler) + + # because a new handler was created and added to the logger, + # propagate needs to be set to false to log the message only once + logger.propagate = False + + return logger + + +LOG = setup_logger() diff --git a/producer/utils/publisher.py b/producer/utils/publisher.py new file mode 100644 index 0000000000000000000000000000000000000000..8256bfd8012ddc885f63bbaefbae57e9a9ce7192 --- /dev/null +++ b/producer/utils/publisher.py @@ -0,0 +1,86 @@ +import csv +import pika +import os +import json +from typing import List, Dict +from random import randint +import asyncio +from utils.logger import LOG + +class RabbitMQPublisher: + def __init__(self): + self.csv_path = "/code/data/nyBUS_small.csv" + self.rabbitmq_host = os.environ.get("RABBITMQ_HOST") + self.rabbitmq_port = int(os.environ.get("RABBITMQ_PORT")) + self.rabbitmq_user = os.environ.get("RABBITMQ_USER") + self.rabbitmq_password = os.environ.get("RABBITMQ_PASSWORD") + self.queue_name = os.environ.get("RABBITMQ_QUEUE") + self.is_publishing = False + self.current_position = 0 + self.connection = None + self.channel = None + self.connect() + + def connect(self): + credentials = pika.PlainCredentials(self.rabbitmq_user, self.rabbitmq_password) + parameters = pika.ConnectionParameters(host=self.rabbitmq_host, port=self.rabbitmq_port, credentials=credentials) + self.connection = pika.BlockingConnection(parameters) + self.channel = self.connection.channel() + + def close(self): + if self.connection and not self.connection.is_closed: + self.connection.close() + + def read_csv_chunk(self, chunk_size: int = 5) -> List[Dict]: + """Read a chunk of rows from the CSV file""" + chunk = [] + try: + with open(self.csv_path, 'r') as file: + reader = csv.DictReader(file) + # Skip to current position + for _ in range(self.current_position): + next(reader, None) + + # Read chunk_size rows + for _ in range(chunk_size): + row = next(reader, None) + if row is None: + self.current_position = 0 # Reset to beginning if EOF + break + chunk.append(row) + self.current_position += 1 + + except Exception as e: + LOG.error(f"Error reading CSV: {str(e)}", exc_info=True) + + return chunk + + def publish_to_rabbitmq(self, messages: List[Dict]): + """Publish messages to RabbitMQ queue""" + try: + for message in messages: + self.channel.basic_publish(exchange='', + routing_key=self.queue_name, + body=json.dumps(message)) + + except Exception as e: + LOG.error(f"Error publishing to RabbitMQ: {str(e)}", exc_info=True) + raise + + async def start_publishing(self): + """Start publishing messages""" + self.is_publishing = True + while self.is_publishing: + try: + messages = self.read_csv_chunk() + if messages: + self.publish_to_rabbitmq(messages) + await asyncio.sleep(randint(5,30)) + except Exception as e: + LOG.error(f"Error in publishing loop: {str(e)}", exc_info=True) + await asyncio.sleep(randint(5,30)) # Still sleep on error to prevent tight loop + + def stop_publishing(self): + """Stop publishing messages""" + self.is_publishing = False + self.close() \ No newline at end of file