diff --git a/src/consumer.c b/src/consumer.c index 356e13a1b5eee2a804f30bb66a60d8958a6cfc72..d0363e3eca572474e278d382d2ce17ef043c0fea 100644 --- a/src/consumer.c +++ b/src/consumer.c @@ -3,72 +3,102 @@ #include <pthread.h> #include <fcntl.h> #include <unistd.h> - +#include <stdatomic.h> #include "consumer.h" #include "ring_buffer.h" #include "packet.h" #include "utils.h" -pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER; +_Atomic unsigned long global_seq_num; +_Atomic unsigned long write_seq_num; + +pthread_mutex_t write_mutex; +pthread_cond_t write_condition; +pthread_mutex_t global_seq_num_mutex; + +void initialize_thread_resources(void) +{ + pthread_mutex_init(&write_mutex, NULL); + pthread_cond_init(&write_condition, NULL); +} + +void cleanup_thread_resources(void) +{ + pthread_mutex_destroy(&write_mutex); + pthread_cond_destroy(&write_condition); +} + + +void write_to_file(int file_descriptor, unsigned long packet_sequence, char log_entry[], int length) +{ + pthread_mutex_lock(&write_mutex); + while (packet_sequence != write_seq_num) + pthread_cond_wait(&write_condition, &write_mutex); + write(file_descriptor, log_entry, length); + atomic_fetch_add(&write_seq_num, 1); + pthread_cond_broadcast(&write_condition); + pthread_mutex_unlock(&write_mutex); +} void consumer_thread(so_consumer_ctx_t *ctx) { - /* TODO: implement consumer thread */ - so_packet_t packet; - - - while (1) { - pthread_mutex_lock(&log_mutex); - ssize_t result = ring_buffer_dequeue(ctx->producer_rb, &packet, sizeof(packet)); - // pthread_mutex_unlock(&log_mutex); - if (result <= 0 && ctx->producer_rb->stop) { - pthread_mutex_unlock(&log_mutex); - break; - } - pthread_mutex_unlock(&log_mutex); - char log[PKT_SZ]; - so_action_t decision = process_packet(&packet); - unsigned long pkt_hash = packet_hash(&packet); - pthread_mutex_lock(&log_mutex); - snprintf(log, PKT_SZ, "%s %016lx %lu\n", RES_TO_STR(decision), pkt_hash, packet.hdr.timestamp); - - //pthread_mutex_lock(&ctx->log_mutex); - fputs(log, ctx->log_file); - fflush(ctx->log_file); - pthread_mutex_unlock(&log_mutex); - //pthread_mutex_unlock(&ctx->log_mutex); - } - // (void) ctx; + char packet_buffer[256]; + char log_entry[256]; + unsigned long packet_sequence; + int file_descriptor = ctx->log_file; + + while (true) { + pthread_mutex_lock(&write_mutex); + ssize_t received_size = ring_buffer_dequeue(ctx->producer_rb, packet_buffer, 256); + + if (received_size <= 0 && ctx->producer_rb->stop) { + pthread_mutex_unlock(&write_mutex); + break; + } + packet_sequence = atomic_fetch_add(&global_seq_num, 1); + pthread_mutex_unlock(&write_mutex); + + struct so_packet_t *packet = (struct so_packet_t *)packet_buffer; + int action_result = process_packet(packet); + unsigned long hash_value = packet_hash(packet); + unsigned long time_stamp = packet->hdr.timestamp; + + int length = snprintf(log_entry, sizeof(log_entry), "%s %016lx %lu\n", + action_result == PASS ? "PASS" : "DROP", hash_value, time_stamp); + write_to_file(file_descriptor, packet_sequence, log_entry, length); + } + cleanup_thread_resources(); +} + + +void *consumer_thread_wrapper(void *arg) +{ + consumer_thread((so_consumer_ctx_t *)arg); return NULL; } + int create_consumers(pthread_t *tids, - int num_consumers, - struct so_ring_buffer_t *rb, - const char *out_filename) + int num_consumers, + struct so_ring_buffer_t *rb, + const char *out_filename) { - // (void) tids; - // (void) num_consumers; - // (void) rb; - // (void) out_filename; - - - FILE *log_file = fopen(out_filename, "w"); - if (!log_file) { - perror("Error opening file"); - return -1; - } - - pthread_mutex_t log_mutex; - pthread_mutex_init(&log_mutex, NULL); - for (int i = 0; i < num_consumers; i++) { - so_consumer_ctx_t *ctx = malloc(sizeof(so_consumer_ctx_t)); - ctx->producer_rb = rb; - ctx->log_file = log_file; - ctx->log_mutex = log_mutex; - - pthread_create(&tids[i], NULL, consumer_thread, ctx); - } - - return num_consumers; + int file_descriptor = open(out_filename, O_RDWR | O_CREAT | O_APPEND, 0666); + + DIE(file_descriptor < 0, "Error opening output file"); + + for (int i = 0; i < num_consumers; i++) { + so_consumer_ctx_t *ctx = malloc(sizeof(so_consumer_ctx_t)); + if(!ctx){ + DIE(1, "malloc failed"); + } + + ctx->producer_rb = rb; + ctx->log_file = file_descriptor; + ctx->log_mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER; + + pthread_create(&tids[i], NULL, consumer_thread_wrapper, ctx); + } + + return num_consumers; } \ No newline at end of file diff --git a/src/consumer.h b/src/consumer.h index b19b7b26934135c391b0df378a6c2843c31878c2..5654068635ba132d45ac291c545a6b4ce12d7cb8 100644 --- a/src/consumer.h +++ b/src/consumer.h @@ -12,10 +12,9 @@ typedef struct so_consumer_ctx_t { /* TODO: add synchronization primitives for timestamp ordering */ pthread_mutex_t log_mutex; - FILE *log_file; - pthread_cond_t seq_cond; + int log_file; + pthread_cond_t seq_cond; unsigned long next_seq_to_log; - unsigned long my_seq; } so_consumer_ctx_t; diff --git a/src/firewall b/src/firewall index 4a3aac26a41192ea6925dea0c818607aeb4b1d1a..3837e4800c35b40ee0a28c121b57ddd6a796c79c 100755 Binary files a/src/firewall and b/src/firewall differ diff --git a/src/ring_buffer.c b/src/ring_buffer.c index e059e7de5f075fc385c154e6c2ad6ba0e2409920..106932ddbe074b6c7b03e50d6ec46c92b4383f36 100644 --- a/src/ring_buffer.c +++ b/src/ring_buffer.c @@ -8,225 +8,213 @@ #include "utils.h" typedef struct { - size_t first_part; - size_t second_part; + size_t first_part; + size_t second_part; } SplitSizes; -SplitSizes split_space(size_t size, size_t space_at_end) { - SplitSizes sizes; - if (size > space_at_end) { - sizes.first_part = space_at_end; - sizes.second_part = size - space_at_end; - } else { - sizes.first_part = size; - sizes.second_part = 0; - } - return sizes; +SplitSizes split_space(size_t size, size_t space_at_end) +{ + SplitSizes sizes; + + if (size > space_at_end) { + sizes.first_part = space_at_end; + sizes.second_part = size - space_at_end; + } else { + sizes.first_part = size; + sizes.second_part = 0; + } + return sizes; } static size_t get_free_space(const so_ring_buffer_t *ring) { - return ring->cap - ring->len; + return ring->cap - ring->len; } static size_t adjust_position(size_t pos, size_t offset, size_t cap) { - size_t new_pos = pos + offset; - if (new_pos >= cap) { - new_pos -= cap; - } - return new_pos; + size_t new_pos = pos + offset; + + if (new_pos >= cap) + new_pos -= cap; + return new_pos; } static void copy_data_to_ring(so_ring_buffer_t *ring, const void *data, size_t size) { - size_t space_at_end = ring->cap - ring->write_pos; - SplitSizes sizes = split_space(size, space_at_end); - memcpy(ring->data + ring->write_pos, data, sizes.first_part); - if (sizes.second_part > 0) { - memcpy(ring->data, (const char *)data + sizes.first_part, sizes.second_part); - } + size_t space_at_end = ring->cap - ring->write_pos; + SplitSizes sizes = split_space(size, space_at_end); + + memcpy(ring->data + ring->write_pos, data, sizes.first_part); + if (sizes.second_part > 0) + memcpy(ring->data, (const char *)data + sizes.first_part, sizes.second_part); } static void copy_data_from_ring(so_ring_buffer_t *ring, void *data, size_t size) { - size_t space_at_end = ring->cap - ring->read_pos; - SplitSizes sizes = split_space(size, space_at_end); - memcpy(data, ring->data + ring->read_pos, sizes.first_part); - if (sizes.second_part > 0) { - memcpy((char *)data + sizes.first_part, ring->data, sizes.second_part); - } + size_t space_at_end = ring->cap - ring->read_pos; + SplitSizes sizes = split_space(size, space_at_end); + + memcpy(data, ring->data + ring->read_pos, sizes.first_part); } int ring_buffer_init(so_ring_buffer_t *ring, size_t cap) { - ring->data = malloc(cap); - if (ring->data == NULL) { - DIE(1, "Memory allocation failed"); - } - - ring->cap = cap; - ring->len = 0; - ring->read_pos = 0; - ring->write_pos = 0; - ring->stop = 0; - - int result = pthread_mutex_init(&ring->mutex, NULL); - if (result != 0) { - DIE(1, "Failed to initialize mutex"); - } - - result = pthread_cond_init(&ring->not_empty, NULL); - if (result != 0) { - DIE(1, "Failed to initialize not_empty condition"); - } - - result = pthread_cond_init(&ring->not_full, NULL); - if (result != 0) { - DIE(1, "Failed to initialize not_full condition"); - } - - return 0; + ring->data = malloc(cap); + if (ring->data == NULL) + DIE(1, "Memory allocation failed"); + + ring->cap = cap; + ring->len = 0; + ring->read_pos = 0; + ring->write_pos = 0; + ring->stop = 0; + + int result = pthread_mutex_init(&ring->mutex, NULL); + + if (result != 0) + DIE(1, "Failed to initialize mutex"); + + result = pthread_cond_init(&ring->not_empty, NULL); + if (result != 0) + DIE(1, "Failed to initialize not_empty condition"); + + result = pthread_cond_init(&ring->not_full, NULL); + if (result != 0) + DIE(1, "Failed to initialize not_full condition"); + return 0; } ssize_t ring_buffer_enqueue(so_ring_buffer_t *ring, void *data, size_t size) { - if (data == NULL) { - ERR(1, "Invalid data pointer"); - return -1; - } - - int result = pthread_mutex_lock(&ring->mutex); - if (result != 0) { - ERR(1, "Failed to lock mutex"); - return -1; - } - - while (get_free_space(ring) < size && !ring->stop) { - result = pthread_cond_wait(&ring->not_full, &ring->mutex); - if (result != 0) { - pthread_mutex_unlock(&ring->mutex); - ERR(result != 0, "Failed to wait on not_full condition"); - return -1; - } - } - - if (ring->stop) { - pthread_mutex_unlock(&ring->mutex); - return -1; - } - - copy_data_to_ring(ring, data, size); - - ring->write_pos = adjust_position(ring->write_pos, size, ring->cap); - ring->len += size; - - result = pthread_cond_signal(&ring->not_empty); - if (result != 0) { - ERR(1, "Failed to signal not_empty condition"); - } - - result = pthread_mutex_unlock(&ring->mutex); - if (result != 0) { - ERR(1, "Failed to unlock mutex"); - } - - return size; + if (data == NULL) { + ERR(1, "Invalid data pointer"); + return -1; + } + + int result = pthread_mutex_lock(&ring->mutex); + + if (result != 0) { + ERR(result != 0, "Failed to lock mutex"); + return -1; + } + + while (get_free_space(ring) < size && !ring->stop) { + result = pthread_cond_wait(&ring->not_full, &ring->mutex); + if (result != 0) { + pthread_mutex_unlock(&ring->mutex); + ERR(result != 0, "Failed to wait on not_full condition"); + return -1; + } + } + + if (ring->stop) { + pthread_mutex_unlock(&ring->mutex); + return -1; + } + + + copy_data_to_ring(ring, data, size); + + ring->write_pos = adjust_position(ring->write_pos, size, ring->cap); + ring->len += size; + + result = pthread_cond_signal(&ring->not_empty); + if (result != 0) + ERR(result != 0, "Failed to signal not_empty condition"); + + result = pthread_mutex_unlock(&ring->mutex); + if (result != 0) + ERR(result != 0, "Failed to unlock mutex"); + return size; } ssize_t ring_buffer_dequeue(so_ring_buffer_t *ring, void *data, size_t size) { - if (data == NULL) { - ERR(1, "Invalid data pointer"); - return -1; - } - - int result = pthread_mutex_lock(&ring->mutex); - if (result != 0) { - ERR(1, "Failed to lock mutex"); - return -1; - } - - while (ring->len < size && !ring->stop) { - result = pthread_cond_wait(&ring->not_empty, &ring->mutex); - if (result != 0) { - pthread_mutex_unlock(&ring->mutex); - ERR(1, "Failed to wait on not_empty condition"); - return -1; - } - } - - if (ring->stop && ring->len == 0) { - pthread_mutex_unlock(&ring->mutex); - return 0; - } - - copy_data_from_ring(ring, data, size); - - ring->read_pos = adjust_position(ring->read_pos, size, ring->cap); - ring->len -= size; - - result = pthread_cond_signal(&ring->not_full); - if (result != 0) { - ERR(1, "Failed to signal not_full condition"); - } - - result = pthread_mutex_unlock(&ring->mutex); - if (result != 0) { - ERR(1, "Failed to unlock mutex"); - } - - return size; + if (data == NULL) { + ERR(1, "Invalid data pointer"); + return -1; + } + + int result = pthread_mutex_lock(&ring->mutex); + + if (result != 0) { + ERR(result != 0, "Failed to lock mutex"); + return -1; + } + + while (ring->len < size && !ring->stop) { + result = pthread_cond_wait(&ring->not_empty, &ring->mutex); + if (result != 0) { + pthread_mutex_unlock(&ring->mutex); + ERR(result != 0, "Failed to wait on not_empty condition"); + return -1; + } + } + + if (ring->stop && ring->len == 0) { + pthread_mutex_unlock(&ring->mutex); + return 0; + } + + copy_data_from_ring(ring, data, size); + + ring->read_pos = adjust_position(ring->read_pos, size, ring->cap); + ring->len -= size; + + result = pthread_cond_signal(&ring->not_full); + if (result != 0) + ERR(result != 0, "Failed to signal not_full condition"); + + result = pthread_mutex_unlock(&ring->mutex); + if (result != 0) + ERR(result != 0, "Failed to unlock mutex"); + + return size; } void ring_buffer_destroy(so_ring_buffer_t *ring) { - if (ring == NULL || ring->data == NULL) { - DIE(1, "Attempting to destroy an uninitialized ring buffer"); - } - - free(ring->data); - - int result = pthread_mutex_destroy(&ring->mutex); - if (result != 0) { - DIE(1, "Failed to destroy mutex"); - } - - result = pthread_cond_destroy(&ring->not_empty); - if (result != 0) { - DIE(1, "Failed to destroy not_empty condition"); - } - - result = pthread_cond_destroy(&ring->not_full); - if (result != 0) { - DIE(1, "Failed to destroy not_full condition"); - } + if (ring == NULL || ring->data == NULL) + DIE(1, "Attempting to destroy an uninitialized ring buffer"); + + free(ring->data); + + int result = pthread_mutex_destroy(&ring->mutex); + + if (result != 0) + DIE(1, "Failed to destroy mutex"); + + result = pthread_cond_destroy(&ring->not_empty); + if (result != 0) + DIE(1, "Failed to destroy not_empty condition"); + + result = pthread_cond_destroy(&ring->not_full); + if (result != 0) + DIE(1, "Failed to destroy not_full condition"); } -void ring_buffer_stop(so_ring_buffer_t *ring) { - if (ring == NULL) { - DIE(1, "Attempting to stop an uninitialized ring buffer"); - } +void ring_buffer_stop(so_ring_buffer_t *ring) +{ + if (ring == NULL) + DIE(1, "Attempting to stop an uninitialized ring buffer"); + + int result = pthread_mutex_lock(&ring->mutex); - int result = pthread_mutex_lock(&ring->mutex); - if (result != 0) { - DIE(1, "Failed to lock mutex"); - } + if (result != 0) + DIE(1, "Failed to lock mutex"); - ring->stop = 1; + ring->stop = 1; - result = pthread_cond_broadcast(&ring->not_empty); - if (result != 0) { - DIE(1, "Failed to broadcast not_empty condition"); - } + result = pthread_cond_broadcast(&ring->not_empty); + if (result != 0) + DIE(1, "Failed to broadcast not_empty condition"); - result = pthread_cond_broadcast(&ring->not_full); - if (result != 0) { - DIE(1, "Failed to broadcast not_full condition"); - } + result = pthread_cond_broadcast(&ring->not_full); + if (result != 0) + DIE(1, "Failed to broadcast not_full condition"); - result = pthread_mutex_unlock(&ring->mutex); - if (result != 0) { - DIE(1, "Failed to unlock mutex"); - } -} + result = pthread_mutex_unlock(&ring->mutex); + if (result != 0) + DIE(1, "Failed to unlock mutex"); +} \ No newline at end of file diff --git a/src/ring_buffer.h b/src/ring_buffer.h index ccb911911d715738064fdbb42ef21178bf5bc241..4f8aac3833b70b15e8ff14045bd763e8a0a1131d 100644 --- a/src/ring_buffer.h +++ b/src/ring_buffer.h @@ -6,22 +6,20 @@ #include <sys/types.h> #include <string.h> #include <pthread.h> +#include <semaphore.h> typedef struct so_ring_buffer_t { char *data; - size_t read_pos; size_t write_pos; - size_t len; size_t cap; - /* TODO: Add syncronization primitives */ pthread_mutex_t mutex; pthread_cond_t not_empty; pthread_cond_t not_full; + sem_t semaphore; int stop; - } so_ring_buffer_t; int ring_buffer_init(so_ring_buffer_t *rb, size_t cap); @@ -30,4 +28,4 @@ ssize_t ring_buffer_dequeue(so_ring_buffer_t *rb, void *data, size_t size); void ring_buffer_destroy(so_ring_buffer_t *rb); void ring_buffer_stop(so_ring_buffer_t *rb); -#endif /* __SO_RINGBUFFER_H__ */ +#endif /* __SO_RINGBUFFER_H__ */ \ No newline at end of file diff --git a/src/serial b/src/serial index 265816e01681ccfeebe22341d05d9207747a9c33..b24d42de3c45cf68a28f90244647f218752aba10 100755 Binary files a/src/serial and b/src/serial differ