diff --git a/.docker/logstash/rabbitmq.conf b/.docker/logstash/rabbitmq.conf index 54f1ccdc2d50abc1d4d5cb1086c841e8010ffc5b..6a58eabf0b1aefd7ef8f0aa9a36880f550abbba1 100755 --- a/.docker/logstash/rabbitmq.conf +++ b/.docker/logstash/rabbitmq.conf @@ -12,6 +12,44 @@ input { } } +filter { + # Parse datetime fields + date { + match => ["ExpectedArrivalTime", "ISO8601"] + target => "ExpectedArrivalTime" + } + date { + match => ["ScheduledArrivalTime", "ISO8601"] + target => "ScheduledArrivalTime" + } + + # Convert geopoint fields + mutate { + convert => { + "[OriginLocation][lat]" => "float" + "[OriginLocation][lon]" => "float" + "[DestinationLocation][lat]" => "float" + "[DestinationLocation][lon]" => "float" + "[VehicleLocation][lat]" => "float" + "[VehicleLocation][lon]" => "float" + } + } + + # Ensure fields are in the correct geo_point format + mutate { + rename => { + "OriginLocation" => "[OriginLocation][coordinates]" + "DestinationLocation" => "[DestinationLocation][coordinates]" + "VehicleLocation" => "[VehicleLocation][coordinates]" + } + add_field => { + "[OriginLocation][type]" => "point" + "[DestinationLocation][type]" => "point" + "[VehicleLocation][type]" => "point" + } + } +} + output { elasticsearch { hosts => ["ibd-local-stack-elasticsearch:9200"] diff --git a/producer/utils/publisher.py b/producer/utils/publisher.py index 50b9f9efb5d7e8bc880d6b801b125b6cd61ff3d5..d7da666f66cf2bbc682f54fb054f416d65dafdd1 100644 --- a/producer/utils/publisher.py +++ b/producer/utils/publisher.py @@ -6,6 +6,7 @@ from typing import List, Dict from random import randint import asyncio from utils.logger import LOG +from datetime import datetime, timedelta class RabbitMQPublisher: def __init__(self): @@ -56,10 +57,62 @@ class RabbitMQPublisher: def publish_to_rabbitmq(self, message: Dict): """Publish messages to RabbitMQ queue""" + + # Format the message + parsed_message = message + parsed_message["OriginLocation"] = { + "lat":float(message["OriginLat"]), + "lon":float(message["OriginLong"]) + } + del parsed_message["OriginLat"] + del parsed_message["OriginLong"] + + parsed_message["DestinationLocation"] = { + "lat":float(message["DestinationLat"]), + "lon":float(message["DestinationLong"]) + } + + del parsed_message["DestinationLat"] + del parsed_message["DestinationLong"] + + parsed_message["VehicleLocation"] = { + "lat":float(message["VehicleLocation.Latitude"]), + "lon":float(message["VehicleLocation.Longitude"]) + } + + del parsed_message["VehicleLocation.Latitude"] + del parsed_message["VehicleLocation.Longitude"] + + if message["ExpectedArrivalTime"]: + + expected_date = datetime.fromisoformat(message["ExpectedArrivalTime"]) + parsed_message["ExpectedArrivalTime"] = expected_date.strftime("%Y-%m-%dT%H:%M:%S") + + if message["ScheduledArrivalTime"]: + scheduled_arrival = message["ScheduledArrivalTime"] + hour, minute, second = scheduled_arrival.split(":") + hour = int(hour) + minute = int(minute) + second = int(second) + + scheduled_day = expected_date.day + + if hour >= 24: + hour = hour % 24 + + if expected_date.hour == 00 and hour < 24: + scheduled_day = (expected_date - timedelta(days=1)).day + + scheduled_date = expected_date.replace(hour=int(hour), minute=int(minute), second=int(second), day=scheduled_day) + + parsed_message["ScheduledArrivalTime"] = scheduled_date.strftime("%Y-%m-%dT%H:%M:%S") + + parsed_message = json.dumps(parsed_message) + try: self.channel.basic_publish(exchange='', routing_key=self.queue_name, - body=json.dumps(message)) + body=parsed_message) except Exception as e: LOG.error(f"Error publishing to RabbitMQ: {str(e)}", exc_info=True)