From bf2bebcf559c05a6c469bc23aae0f9a9cc02c2b6 Mon Sep 17 00:00:00 2001 From: stefan <mihalachestefan2001@gmail.com> Date: Tue, 7 Jan 2025 17:51:57 +0200 Subject: [PATCH] modified logstash and publisher to map to elasticsearch types --- .docker/logstash/rabbitmq.conf | 38 +++++++++++++++++++++++ producer/utils/publisher.py | 55 +++++++++++++++++++++++++++++++++- 2 files changed, 92 insertions(+), 1 deletion(-) diff --git a/.docker/logstash/rabbitmq.conf b/.docker/logstash/rabbitmq.conf index 54f1ccd..6a58eab 100755 --- a/.docker/logstash/rabbitmq.conf +++ b/.docker/logstash/rabbitmq.conf @@ -12,6 +12,44 @@ input { } } +filter { + # Parse datetime fields + date { + match => ["ExpectedArrivalTime", "ISO8601"] + target => "ExpectedArrivalTime" + } + date { + match => ["ScheduledArrivalTime", "ISO8601"] + target => "ScheduledArrivalTime" + } + + # Convert geopoint fields + mutate { + convert => { + "[OriginLocation][lat]" => "float" + "[OriginLocation][lon]" => "float" + "[DestinationLocation][lat]" => "float" + "[DestinationLocation][lon]" => "float" + "[VehicleLocation][lat]" => "float" + "[VehicleLocation][lon]" => "float" + } + } + + # Ensure fields are in the correct geo_point format + mutate { + rename => { + "OriginLocation" => "[OriginLocation][coordinates]" + "DestinationLocation" => "[DestinationLocation][coordinates]" + "VehicleLocation" => "[VehicleLocation][coordinates]" + } + add_field => { + "[OriginLocation][type]" => "point" + "[DestinationLocation][type]" => "point" + "[VehicleLocation][type]" => "point" + } + } +} + output { elasticsearch { hosts => ["ibd-local-stack-elasticsearch:9200"] diff --git a/producer/utils/publisher.py b/producer/utils/publisher.py index 50b9f9e..d7da666 100644 --- a/producer/utils/publisher.py +++ b/producer/utils/publisher.py @@ -6,6 +6,7 @@ from typing import List, Dict from random import randint import asyncio from utils.logger import LOG +from datetime import datetime, timedelta class RabbitMQPublisher: def __init__(self): @@ -56,10 +57,62 @@ class RabbitMQPublisher: def publish_to_rabbitmq(self, message: Dict): """Publish messages to RabbitMQ queue""" + + # Format the message + parsed_message = message + parsed_message["OriginLocation"] = { + "lat":float(message["OriginLat"]), + "lon":float(message["OriginLong"]) + } + del parsed_message["OriginLat"] + del parsed_message["OriginLong"] + + parsed_message["DestinationLocation"] = { + "lat":float(message["DestinationLat"]), + "lon":float(message["DestinationLong"]) + } + + del parsed_message["DestinationLat"] + del parsed_message["DestinationLong"] + + parsed_message["VehicleLocation"] = { + "lat":float(message["VehicleLocation.Latitude"]), + "lon":float(message["VehicleLocation.Longitude"]) + } + + del parsed_message["VehicleLocation.Latitude"] + del parsed_message["VehicleLocation.Longitude"] + + if message["ExpectedArrivalTime"]: + + expected_date = datetime.fromisoformat(message["ExpectedArrivalTime"]) + parsed_message["ExpectedArrivalTime"] = expected_date.strftime("%Y-%m-%dT%H:%M:%S") + + if message["ScheduledArrivalTime"]: + scheduled_arrival = message["ScheduledArrivalTime"] + hour, minute, second = scheduled_arrival.split(":") + hour = int(hour) + minute = int(minute) + second = int(second) + + scheduled_day = expected_date.day + + if hour >= 24: + hour = hour % 24 + + if expected_date.hour == 00 and hour < 24: + scheduled_day = (expected_date - timedelta(days=1)).day + + scheduled_date = expected_date.replace(hour=int(hour), minute=int(minute), second=int(second), day=scheduled_day) + + parsed_message["ScheduledArrivalTime"] = scheduled_date.strftime("%Y-%m-%dT%H:%M:%S") + + parsed_message = json.dumps(parsed_message) + try: self.channel.basic_publish(exchange='', routing_key=self.queue_name, - body=json.dumps(message)) + body=parsed_message) except Exception as e: LOG.error(f"Error publishing to RabbitMQ: {str(e)}", exc_info=True) -- GitLab