diff --git a/src/consumer.c b/src/consumer.c index d0363e3eca572474e278d382d2ce17ef043c0fea..634ec56d9bfaa2513a4195fd2b3d6e64fd8a6f1b 100644 --- a/src/consumer.c +++ b/src/consumer.c @@ -9,21 +9,17 @@ #include "packet.h" #include "utils.h" -_Atomic unsigned long global_seq_num; -_Atomic unsigned long write_seq_num; +// _Atomic type variables to avoid race conditions for multiple threads +_Atomic unsigned long global_seq_num; // assign a number to each packet processed by any consumer +_Atomic unsigned long write_seq_num; // assign a number to each packet written to the log file -pthread_mutex_t write_mutex; -pthread_cond_t write_condition; -pthread_mutex_t global_seq_num_mutex; - -void initialize_thread_resources(void) -{ - pthread_mutex_init(&write_mutex, NULL); - pthread_cond_init(&write_condition, NULL); -} +//mutex and condition variables for thread synchronization +pthread_mutex_t write_mutex; // protects acces to shared resources during write +pthread_cond_t write_condition; // used to signal between threads void cleanup_thread_resources(void) { + // destroy mutex and condition variables pthread_mutex_destroy(&write_mutex); pthread_cond_destroy(&write_condition); } @@ -31,33 +27,43 @@ void cleanup_thread_resources(void) void write_to_file(int file_descriptor, unsigned long packet_sequence, char log_entry[], int length) { - pthread_mutex_lock(&write_mutex); + pthread_mutex_lock(&write_mutex); // ensures only one thread writes at a time + // wait until it's the thread's turn to write while (packet_sequence != write_seq_num) pthread_cond_wait(&write_condition, &write_mutex); + + // write to log_entry file and increment sequence write(file_descriptor, log_entry, length); + // using atomic_fetch-add to make sure incrementing isn't interrupted by other threads atomic_fetch_add(&write_seq_num, 1); + + // signal the waiting threads pthread_cond_broadcast(&write_condition); pthread_mutex_unlock(&write_mutex); } -void consumer_thread(so_consumer_ctx_t *ctx) +void *consumer_thread(void *arg) { - char packet_buffer[256]; - char log_entry[256]; + so_consumer_ctx_t *ctx = (so_consumer_ctx_t *)arg; // for coding style purposes + char packet_buffer[256]; // buffer for storing recieved packet data + char log_entry[256]; // buffer for storing log entry unsigned long packet_sequence; int file_descriptor = ctx->log_file; while (true) { - pthread_mutex_lock(&write_mutex); + // dequeue packet from ring buffer + pthread_mutex_lock(&write_mutex); // get exclusive access for a thread ssize_t received_size = ring_buffer_dequeue(ctx->producer_rb, packet_buffer, 256); if (received_size <= 0 && ctx->producer_rb->stop) { - pthread_mutex_unlock(&write_mutex); + pthread_mutex_unlock(&write_mutex); // release mutex before exiting break; } - packet_sequence = atomic_fetch_add(&global_seq_num, 1); + // increment the global sequence number and release the mutex + packet_sequence = atomic_fetch_add(&global_seq_num, 1); // makes sure incrementing isn't interrupted by other threads pthread_mutex_unlock(&write_mutex); + // process packet and write to log file struct so_packet_t *packet = (struct so_packet_t *)packet_buffer; int action_result = process_packet(packet); unsigned long hash_value = packet_hash(packet); @@ -68,13 +74,14 @@ void consumer_thread(so_consumer_ctx_t *ctx) write_to_file(file_descriptor, packet_sequence, log_entry, length); } cleanup_thread_resources(); + return NULL; } - -void *consumer_thread_wrapper(void *arg) +void initialize_consumer_context(so_consumer_ctx_t *ctx, int file_descriptor, struct so_ring_buffer_t *rb) { - consumer_thread((so_consumer_ctx_t *)arg); - return NULL; + ctx->producer_rb = rb; + ctx->log_file = file_descriptor; + ctx->log_mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER; } @@ -84,21 +91,20 @@ int create_consumers(pthread_t *tids, const char *out_filename) { int file_descriptor = open(out_filename, O_RDWR | O_CREAT | O_APPEND, 0666); - + DIE(file_descriptor < 0, "Error opening output file"); for (int i = 0; i < num_consumers; i++) { - so_consumer_ctx_t *ctx = malloc(sizeof(so_consumer_ctx_t)); - if(!ctx){ + so_consumer_ctx_t *ctx = malloc(sizeof(so_consumer_ctx_t));// allocate memory for consumer context + + if (!ctx) DIE(1, "malloc failed"); - } - ctx->producer_rb = rb; - ctx->log_file = file_descriptor; - ctx->log_mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER; + initialize_consumer_context(ctx, file_descriptor, rb); - pthread_create(&tids[i], NULL, consumer_thread_wrapper, ctx); + // create consumer threads + pthread_create(&tids[i], NULL, consumer_thread, ctx); } return num_consumers; -} \ No newline at end of file +} diff --git a/src/consumer.h b/src/consumer.h index 5654068635ba132d45ac291c545a6b4ce12d7cb8..f11eadb41936360e4d965fb0c8e7e9f2c62c2e6f 100644 --- a/src/consumer.h +++ b/src/consumer.h @@ -3,19 +3,18 @@ #ifndef __SO_CONSUMER_H__ #define __SO_CONSUMER_H__ -#include "ring_buffer.h" -#include "packet.h" #include <stdio.h> +#include "ring_buffer.h" +#include "packet.h" typedef struct so_consumer_ctx_t { struct so_ring_buffer_t *producer_rb; /* TODO: add synchronization primitives for timestamp ordering */ - pthread_mutex_t log_mutex; - int log_file; - pthread_cond_t seq_cond; - unsigned long next_seq_to_log; - + pthread_mutex_t log_mutex; // mutex for synchronization writing to the log file + int log_file; // file descriptor for the log file + pthread_cond_t seq_cond; // condition variable for signaling when the sequence number is reached(maintaing log orde) + unsigned long next_seq_to_log;// next sequence number to be written to the log file } so_consumer_ctx_t; int create_consumers(pthread_t *tids, diff --git a/src/firewall b/src/firewall deleted file mode 100755 index 3837e4800c35b40ee0a28c121b57ddd6a796c79c..0000000000000000000000000000000000000000 Binary files a/src/firewall and /dev/null differ diff --git a/src/ring_buffer.c b/src/ring_buffer.c index 106932ddbe074b6c7b03e50d6ec46c92b4383f36..afebef11bb9522e27e76cf1387883502a5ad7595 100644 --- a/src/ring_buffer.c +++ b/src/ring_buffer.c @@ -2,11 +2,11 @@ #include <stdlib.h> #include <pthread.h> -#include <string.h> #include <stdio.h> #include "ring_buffer.h" #include "utils.h" +// struct to store the sizes of the two parts of the buffer when splitting typedef struct { size_t first_part; size_t second_part; @@ -15,11 +15,12 @@ typedef struct { SplitSizes split_space(size_t size, size_t space_at_end) { SplitSizes sizes; - + // split the space into two parts if the size is bigger than the space at the end if (size > space_at_end) { sizes.first_part = space_at_end; sizes.second_part = size - space_at_end; } else { + // otherwise it's all in the first part sizes.first_part = size; sizes.second_part = 0; } @@ -28,147 +29,171 @@ SplitSizes split_space(size_t size, size_t space_at_end) static size_t get_free_space(const so_ring_buffer_t *ring) { + // calculate the free space in the ring buffer by subtracting the length from the capacity return ring->cap - ring->len; } static size_t adjust_position(size_t pos, size_t offset, size_t cap) { + // adjust the position by adding the offset size_t new_pos = pos + offset; - + // if the new position is greater than the capacity, wrap around if (new_pos >= cap) new_pos -= cap; return new_pos; } -static void copy_data_to_ring(so_ring_buffer_t *ring, const void *data, size_t size) +static void copy_to_ring(so_ring_buffer_t *ring, const void *data, size_t size) { + // calculate the space at the end of the buffer size_t space_at_end = ring->cap - ring->write_pos; + // split the space into two parts SplitSizes sizes = split_space(size, space_at_end); + // copy the data to the buffer memcpy(ring->data + ring->write_pos, data, sizes.first_part); + // if there is a second part, copy it to the beginning of the buffer if (sizes.second_part > 0) memcpy(ring->data, (const char *)data + sizes.first_part, sizes.second_part); } -static void copy_data_from_ring(so_ring_buffer_t *ring, void *data, size_t size) +static void copy_from_ring(so_ring_buffer_t *ring, void *data, size_t size) { + // calculate the space at the end of the buffer size_t space_at_end = ring->cap - ring->read_pos; + // split the space into two parts SplitSizes sizes = split_space(size, space_at_end); + // copy the data from the buffer , no need to check for second part memcpy(data, ring->data + ring->read_pos, sizes.first_part); } int ring_buffer_init(so_ring_buffer_t *ring, size_t cap) { - ring->data = malloc(cap); + ring->data = malloc(cap); // allocate memory for the ring buffer if (ring->data == NULL) - DIE(1, "Memory allocation failed"); + DIE(1, "malloc failed"); + // initialize the ring buffer ring->cap = cap; ring->len = 0; ring->read_pos = 0; ring->write_pos = 0; ring->stop = 0; + // initialize the synchronization primitives for synchronization int result = pthread_mutex_init(&ring->mutex, NULL); if (result != 0) - DIE(1, "Failed to initialize mutex"); + DIE(1, "pthread_mutex_init failed"); result = pthread_cond_init(&ring->not_empty, NULL); if (result != 0) - DIE(1, "Failed to initialize not_empty condition"); + DIE(1, "failed to initialize not_empty"); result = pthread_cond_init(&ring->not_full, NULL); if (result != 0) - DIE(1, "Failed to initialize not_full condition"); + DIE(1, "failed to initialize not_full"); return 0; } ssize_t ring_buffer_enqueue(so_ring_buffer_t *ring, void *data, size_t size) { if (data == NULL) { - ERR(1, "Invalid data pointer"); + ERR(1, "invalid data pointer"); return -1; } + // lock to ensure exclusive access to the ring buffer int result = pthread_mutex_lock(&ring->mutex); if (result != 0) { - ERR(result != 0, "Failed to lock mutex"); + ERR(result != 0, "pthread_mutex_lock failed"); return -1; } + // wait until there is enough space in the buffer while (get_free_space(ring) < size && !ring->stop) { result = pthread_cond_wait(&ring->not_full, &ring->mutex); if (result != 0) { pthread_mutex_unlock(&ring->mutex); - ERR(result != 0, "Failed to wait on not_full condition"); + ERR(result != 0, "failed to wait on not_full condition"); return -1; } } + // if the buffer is stopped and there is no space, return if (ring->stop) { pthread_mutex_unlock(&ring->mutex); return -1; } + // copy the data to the buffer + copy_to_ring(ring, data, size); - copy_data_to_ring(ring, data, size); - + // update the write position and length ring->write_pos = adjust_position(ring->write_pos, size, ring->cap); ring->len += size; + // signal that the buffer is not empty result = pthread_cond_signal(&ring->not_empty); if (result != 0) - ERR(result != 0, "Failed to signal not_empty condition"); + ERR(result != 0, "failed to signal not_empty condition"); + // unlock the mutex and allow other threads to access the buffer result = pthread_mutex_unlock(&ring->mutex); if (result != 0) - ERR(result != 0, "Failed to unlock mutex"); + ERR(result != 0, "pthread_mutex_unlock failed"); return size; } ssize_t ring_buffer_dequeue(so_ring_buffer_t *ring, void *data, size_t size) { if (data == NULL) { - ERR(1, "Invalid data pointer"); + ERR(1, "invalid data"); return -1; } + // lock mutex to ensure exclusive access to the ring buffer int result = pthread_mutex_lock(&ring->mutex); if (result != 0) { - ERR(result != 0, "Failed to lock mutex"); + ERR(result != 0, "pthread_mutex_lock failed"); return -1; } + // wait until there is enough data in the buffer while (ring->len < size && !ring->stop) { result = pthread_cond_wait(&ring->not_empty, &ring->mutex); if (result != 0) { pthread_mutex_unlock(&ring->mutex); - ERR(result != 0, "Failed to wait on not_empty condition"); + ERR(result != 0, "failed to wait on not_empty condition"); return -1; } } + // if the buffer is stopped and there is no data, return if (ring->stop && ring->len == 0) { pthread_mutex_unlock(&ring->mutex); return 0; } - copy_data_from_ring(ring, data, size); + // copy the data from the buffer + copy_from_ring(ring, data, size); + // update the read position and length ring->read_pos = adjust_position(ring->read_pos, size, ring->cap); ring->len -= size; + // signal that the buffer is not full result = pthread_cond_signal(&ring->not_full); if (result != 0) - ERR(result != 0, "Failed to signal not_full condition"); + ERR(result != 0, "failed to signal not_full condition"); + // unlock the mutex and allow other threads to access the buffer result = pthread_mutex_unlock(&ring->mutex); if (result != 0) - ERR(result != 0, "Failed to unlock mutex"); + ERR(result != 0, "pthread_mutex_unlock failed"); return size; } @@ -176,45 +201,51 @@ ssize_t ring_buffer_dequeue(so_ring_buffer_t *ring, void *data, size_t size) void ring_buffer_destroy(so_ring_buffer_t *ring) { if (ring == NULL || ring->data == NULL) - DIE(1, "Attempting to destroy an uninitialized ring buffer"); + DIE(1, "invalid ring buffer"); + // free the data allocated for the ring buffer free(ring->data); + // destroy the synchronization primitives and condition variables int result = pthread_mutex_destroy(&ring->mutex); if (result != 0) - DIE(1, "Failed to destroy mutex"); + DIE(1, "failed to destroy mutex"); result = pthread_cond_destroy(&ring->not_empty); if (result != 0) - DIE(1, "Failed to destroy not_empty condition"); + DIE(1, "failed to destroy not_empty"); result = pthread_cond_destroy(&ring->not_full); if (result != 0) - DIE(1, "Failed to destroy not_full condition"); + DIE(1, "failed to destroy not_full"); } void ring_buffer_stop(so_ring_buffer_t *ring) { if (ring == NULL) - DIE(1, "Attempting to stop an uninitialized ring buffer"); + DIE(1, "invalid ring buffer"); + // lock the motex to modify the stop flag int result = pthread_mutex_lock(&ring->mutex); if (result != 0) - DIE(1, "Failed to lock mutex"); + DIE(1, "pthread_mutex_lock failed"); + // set the stop flag to 1 to indicate that the buffer is stopped ring->stop = 1; + // wake up all threads waiting on the condition variables and consumers result = pthread_cond_broadcast(&ring->not_empty); if (result != 0) - DIE(1, "Failed to broadcast not_empty condition"); + DIE(1, "failed to broadcast not_empty condition"); result = pthread_cond_broadcast(&ring->not_full); if (result != 0) - DIE(1, "Failed to broadcast not_full condition"); + DIE(1, "failed to broadcast not_full condition"); + // unlock the mutex to allow other threads to access the buffer result = pthread_mutex_unlock(&ring->mutex); if (result != 0) - DIE(1, "Failed to unlock mutex"); -} \ No newline at end of file + DIE(1, "pthread_mutex_unlock failed"); +} diff --git a/src/ring_buffer.h b/src/ring_buffer.h index 4f8aac3833b70b15e8ff14045bd763e8a0a1131d..1fa880d76bab57cdd1bde706a844f0c2d7f2b53e 100644 --- a/src/ring_buffer.h +++ b/src/ring_buffer.h @@ -6,7 +6,6 @@ #include <sys/types.h> #include <string.h> #include <pthread.h> -#include <semaphore.h> typedef struct so_ring_buffer_t { char *data; @@ -15,11 +14,10 @@ typedef struct so_ring_buffer_t { size_t len; size_t cap; /* TODO: Add syncronization primitives */ - pthread_mutex_t mutex; - pthread_cond_t not_empty; - pthread_cond_t not_full; - sem_t semaphore; - int stop; + pthread_mutex_t mutex; // mutex for synchronization access to the buffer + pthread_cond_t not_empty; // condition variable for signaling when the buffer is not empty + pthread_cond_t not_full;// condition variable for signaling when the buffer is not full + int stop; // flag to signal that the buffer is stopped } so_ring_buffer_t; int ring_buffer_init(so_ring_buffer_t *rb, size_t cap); @@ -28,4 +26,4 @@ ssize_t ring_buffer_dequeue(so_ring_buffer_t *rb, void *data, size_t size); void ring_buffer_destroy(so_ring_buffer_t *rb); void ring_buffer_stop(so_ring_buffer_t *rb); -#endif /* __SO_RINGBUFFER_H__ */ \ No newline at end of file +#endif /* __SO_RINGBUFFER_H__ */ diff --git a/src/serial b/src/serial deleted file mode 100755 index b24d42de3c45cf68a28f90244647f218752aba10..0000000000000000000000000000000000000000 Binary files a/src/serial and /dev/null differ