Skip to content
Snippets Groups Projects
Commit d638b4b6 authored by Dana-Maria CĂRUNTU's avatar Dana-Maria CĂRUNTU
Browse files

format code and add comments + coding style done

parent 059c9e12
No related branches found
No related tags found
No related merge requests found
Pipeline #93288 passed
......@@ -9,21 +9,17 @@
#include "packet.h"
#include "utils.h"
_Atomic unsigned long global_seq_num;
_Atomic unsigned long write_seq_num;
// _Atomic type variables to avoid race conditions for multiple threads
_Atomic unsigned long global_seq_num; // assign a number to each packet processed by any consumer
_Atomic unsigned long write_seq_num; // assign a number to each packet written to the log file
pthread_mutex_t write_mutex;
pthread_cond_t write_condition;
pthread_mutex_t global_seq_num_mutex;
void initialize_thread_resources(void)
{
pthread_mutex_init(&write_mutex, NULL);
pthread_cond_init(&write_condition, NULL);
}
//mutex and condition variables for thread synchronization
pthread_mutex_t write_mutex; // protects acces to shared resources during write
pthread_cond_t write_condition; // used to signal between threads
void cleanup_thread_resources(void)
{
// destroy mutex and condition variables
pthread_mutex_destroy(&write_mutex);
pthread_cond_destroy(&write_condition);
}
......@@ -31,33 +27,43 @@ void cleanup_thread_resources(void)
void write_to_file(int file_descriptor, unsigned long packet_sequence, char log_entry[], int length)
{
pthread_mutex_lock(&write_mutex);
pthread_mutex_lock(&write_mutex); // ensures only one thread writes at a time
// wait until it's the thread's turn to write
while (packet_sequence != write_seq_num)
pthread_cond_wait(&write_condition, &write_mutex);
// write to log_entry file and increment sequence
write(file_descriptor, log_entry, length);
// using atomic_fetch-add to make sure incrementing isn't interrupted by other threads
atomic_fetch_add(&write_seq_num, 1);
// signal the waiting threads
pthread_cond_broadcast(&write_condition);
pthread_mutex_unlock(&write_mutex);
}
void consumer_thread(so_consumer_ctx_t *ctx)
void *consumer_thread(void *arg)
{
char packet_buffer[256];
char log_entry[256];
so_consumer_ctx_t *ctx = (so_consumer_ctx_t *)arg; // for coding style purposes
char packet_buffer[256]; // buffer for storing recieved packet data
char log_entry[256]; // buffer for storing log entry
unsigned long packet_sequence;
int file_descriptor = ctx->log_file;
while (true) {
pthread_mutex_lock(&write_mutex);
// dequeue packet from ring buffer
pthread_mutex_lock(&write_mutex); // get exclusive access for a thread
ssize_t received_size = ring_buffer_dequeue(ctx->producer_rb, packet_buffer, 256);
if (received_size <= 0 && ctx->producer_rb->stop) {
pthread_mutex_unlock(&write_mutex);
pthread_mutex_unlock(&write_mutex); // release mutex before exiting
break;
}
packet_sequence = atomic_fetch_add(&global_seq_num, 1);
// increment the global sequence number and release the mutex
packet_sequence = atomic_fetch_add(&global_seq_num, 1); // makes sure incrementing isn't interrupted by other threads
pthread_mutex_unlock(&write_mutex);
// process packet and write to log file
struct so_packet_t *packet = (struct so_packet_t *)packet_buffer;
int action_result = process_packet(packet);
unsigned long hash_value = packet_hash(packet);
......@@ -68,13 +74,14 @@ void consumer_thread(so_consumer_ctx_t *ctx)
write_to_file(file_descriptor, packet_sequence, log_entry, length);
}
cleanup_thread_resources();
return NULL;
}
void *consumer_thread_wrapper(void *arg)
void initialize_consumer_context(so_consumer_ctx_t *ctx, int file_descriptor, struct so_ring_buffer_t *rb)
{
consumer_thread((so_consumer_ctx_t *)arg);
return NULL;
ctx->producer_rb = rb;
ctx->log_file = file_descriptor;
ctx->log_mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
}
......@@ -84,21 +91,20 @@ int create_consumers(pthread_t *tids,
const char *out_filename)
{
int file_descriptor = open(out_filename, O_RDWR | O_CREAT | O_APPEND, 0666);
DIE(file_descriptor < 0, "Error opening output file");
for (int i = 0; i < num_consumers; i++) {
so_consumer_ctx_t *ctx = malloc(sizeof(so_consumer_ctx_t));
if(!ctx){
so_consumer_ctx_t *ctx = malloc(sizeof(so_consumer_ctx_t));// allocate memory for consumer context
if (!ctx)
DIE(1, "malloc failed");
}
ctx->producer_rb = rb;
ctx->log_file = file_descriptor;
ctx->log_mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
initialize_consumer_context(ctx, file_descriptor, rb);
pthread_create(&tids[i], NULL, consumer_thread_wrapper, ctx);
// create consumer threads
pthread_create(&tids[i], NULL, consumer_thread, ctx);
}
return num_consumers;
}
\ No newline at end of file
}
......@@ -3,19 +3,18 @@
#ifndef __SO_CONSUMER_H__
#define __SO_CONSUMER_H__
#include "ring_buffer.h"
#include "packet.h"
#include <stdio.h>
#include "ring_buffer.h"
#include "packet.h"
typedef struct so_consumer_ctx_t {
struct so_ring_buffer_t *producer_rb;
/* TODO: add synchronization primitives for timestamp ordering */
pthread_mutex_t log_mutex;
int log_file;
pthread_cond_t seq_cond;
unsigned long next_seq_to_log;
pthread_mutex_t log_mutex; // mutex for synchronization writing to the log file
int log_file; // file descriptor for the log file
pthread_cond_t seq_cond; // condition variable for signaling when the sequence number is reached(maintaing log orde)
unsigned long next_seq_to_log;// next sequence number to be written to the log file
} so_consumer_ctx_t;
int create_consumers(pthread_t *tids,
......
File deleted
......@@ -2,11 +2,11 @@
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <stdio.h>
#include "ring_buffer.h"
#include "utils.h"
// struct to store the sizes of the two parts of the buffer when splitting
typedef struct {
size_t first_part;
size_t second_part;
......@@ -15,11 +15,12 @@ typedef struct {
SplitSizes split_space(size_t size, size_t space_at_end)
{
SplitSizes sizes;
// split the space into two parts if the size is bigger than the space at the end
if (size > space_at_end) {
sizes.first_part = space_at_end;
sizes.second_part = size - space_at_end;
} else {
// otherwise it's all in the first part
sizes.first_part = size;
sizes.second_part = 0;
}
......@@ -28,147 +29,171 @@ SplitSizes split_space(size_t size, size_t space_at_end)
static size_t get_free_space(const so_ring_buffer_t *ring)
{
// calculate the free space in the ring buffer by subtracting the length from the capacity
return ring->cap - ring->len;
}
static size_t adjust_position(size_t pos, size_t offset, size_t cap)
{
// adjust the position by adding the offset
size_t new_pos = pos + offset;
// if the new position is greater than the capacity, wrap around
if (new_pos >= cap)
new_pos -= cap;
return new_pos;
}
static void copy_data_to_ring(so_ring_buffer_t *ring, const void *data, size_t size)
static void copy_to_ring(so_ring_buffer_t *ring, const void *data, size_t size)
{
// calculate the space at the end of the buffer
size_t space_at_end = ring->cap - ring->write_pos;
// split the space into two parts
SplitSizes sizes = split_space(size, space_at_end);
// copy the data to the buffer
memcpy(ring->data + ring->write_pos, data, sizes.first_part);
// if there is a second part, copy it to the beginning of the buffer
if (sizes.second_part > 0)
memcpy(ring->data, (const char *)data + sizes.first_part, sizes.second_part);
}
static void copy_data_from_ring(so_ring_buffer_t *ring, void *data, size_t size)
static void copy_from_ring(so_ring_buffer_t *ring, void *data, size_t size)
{
// calculate the space at the end of the buffer
size_t space_at_end = ring->cap - ring->read_pos;
// split the space into two parts
SplitSizes sizes = split_space(size, space_at_end);
// copy the data from the buffer , no need to check for second part
memcpy(data, ring->data + ring->read_pos, sizes.first_part);
}
int ring_buffer_init(so_ring_buffer_t *ring, size_t cap)
{
ring->data = malloc(cap);
ring->data = malloc(cap); // allocate memory for the ring buffer
if (ring->data == NULL)
DIE(1, "Memory allocation failed");
DIE(1, "malloc failed");
// initialize the ring buffer
ring->cap = cap;
ring->len = 0;
ring->read_pos = 0;
ring->write_pos = 0;
ring->stop = 0;
// initialize the synchronization primitives for synchronization
int result = pthread_mutex_init(&ring->mutex, NULL);
if (result != 0)
DIE(1, "Failed to initialize mutex");
DIE(1, "pthread_mutex_init failed");
result = pthread_cond_init(&ring->not_empty, NULL);
if (result != 0)
DIE(1, "Failed to initialize not_empty condition");
DIE(1, "failed to initialize not_empty");
result = pthread_cond_init(&ring->not_full, NULL);
if (result != 0)
DIE(1, "Failed to initialize not_full condition");
DIE(1, "failed to initialize not_full");
return 0;
}
ssize_t ring_buffer_enqueue(so_ring_buffer_t *ring, void *data, size_t size)
{
if (data == NULL) {
ERR(1, "Invalid data pointer");
ERR(1, "invalid data pointer");
return -1;
}
// lock to ensure exclusive access to the ring buffer
int result = pthread_mutex_lock(&ring->mutex);
if (result != 0) {
ERR(result != 0, "Failed to lock mutex");
ERR(result != 0, "pthread_mutex_lock failed");
return -1;
}
// wait until there is enough space in the buffer
while (get_free_space(ring) < size && !ring->stop) {
result = pthread_cond_wait(&ring->not_full, &ring->mutex);
if (result != 0) {
pthread_mutex_unlock(&ring->mutex);
ERR(result != 0, "Failed to wait on not_full condition");
ERR(result != 0, "failed to wait on not_full condition");
return -1;
}
}
// if the buffer is stopped and there is no space, return
if (ring->stop) {
pthread_mutex_unlock(&ring->mutex);
return -1;
}
// copy the data to the buffer
copy_to_ring(ring, data, size);
copy_data_to_ring(ring, data, size);
// update the write position and length
ring->write_pos = adjust_position(ring->write_pos, size, ring->cap);
ring->len += size;
// signal that the buffer is not empty
result = pthread_cond_signal(&ring->not_empty);
if (result != 0)
ERR(result != 0, "Failed to signal not_empty condition");
ERR(result != 0, "failed to signal not_empty condition");
// unlock the mutex and allow other threads to access the buffer
result = pthread_mutex_unlock(&ring->mutex);
if (result != 0)
ERR(result != 0, "Failed to unlock mutex");
ERR(result != 0, "pthread_mutex_unlock failed");
return size;
}
ssize_t ring_buffer_dequeue(so_ring_buffer_t *ring, void *data, size_t size)
{
if (data == NULL) {
ERR(1, "Invalid data pointer");
ERR(1, "invalid data");
return -1;
}
// lock mutex to ensure exclusive access to the ring buffer
int result = pthread_mutex_lock(&ring->mutex);
if (result != 0) {
ERR(result != 0, "Failed to lock mutex");
ERR(result != 0, "pthread_mutex_lock failed");
return -1;
}
// wait until there is enough data in the buffer
while (ring->len < size && !ring->stop) {
result = pthread_cond_wait(&ring->not_empty, &ring->mutex);
if (result != 0) {
pthread_mutex_unlock(&ring->mutex);
ERR(result != 0, "Failed to wait on not_empty condition");
ERR(result != 0, "failed to wait on not_empty condition");
return -1;
}
}
// if the buffer is stopped and there is no data, return
if (ring->stop && ring->len == 0) {
pthread_mutex_unlock(&ring->mutex);
return 0;
}
copy_data_from_ring(ring, data, size);
// copy the data from the buffer
copy_from_ring(ring, data, size);
// update the read position and length
ring->read_pos = adjust_position(ring->read_pos, size, ring->cap);
ring->len -= size;
// signal that the buffer is not full
result = pthread_cond_signal(&ring->not_full);
if (result != 0)
ERR(result != 0, "Failed to signal not_full condition");
ERR(result != 0, "failed to signal not_full condition");
// unlock the mutex and allow other threads to access the buffer
result = pthread_mutex_unlock(&ring->mutex);
if (result != 0)
ERR(result != 0, "Failed to unlock mutex");
ERR(result != 0, "pthread_mutex_unlock failed");
return size;
}
......@@ -176,45 +201,51 @@ ssize_t ring_buffer_dequeue(so_ring_buffer_t *ring, void *data, size_t size)
void ring_buffer_destroy(so_ring_buffer_t *ring)
{
if (ring == NULL || ring->data == NULL)
DIE(1, "Attempting to destroy an uninitialized ring buffer");
DIE(1, "invalid ring buffer");
// free the data allocated for the ring buffer
free(ring->data);
// destroy the synchronization primitives and condition variables
int result = pthread_mutex_destroy(&ring->mutex);
if (result != 0)
DIE(1, "Failed to destroy mutex");
DIE(1, "failed to destroy mutex");
result = pthread_cond_destroy(&ring->not_empty);
if (result != 0)
DIE(1, "Failed to destroy not_empty condition");
DIE(1, "failed to destroy not_empty");
result = pthread_cond_destroy(&ring->not_full);
if (result != 0)
DIE(1, "Failed to destroy not_full condition");
DIE(1, "failed to destroy not_full");
}
void ring_buffer_stop(so_ring_buffer_t *ring)
{
if (ring == NULL)
DIE(1, "Attempting to stop an uninitialized ring buffer");
DIE(1, "invalid ring buffer");
// lock the motex to modify the stop flag
int result = pthread_mutex_lock(&ring->mutex);
if (result != 0)
DIE(1, "Failed to lock mutex");
DIE(1, "pthread_mutex_lock failed");
// set the stop flag to 1 to indicate that the buffer is stopped
ring->stop = 1;
// wake up all threads waiting on the condition variables and consumers
result = pthread_cond_broadcast(&ring->not_empty);
if (result != 0)
DIE(1, "Failed to broadcast not_empty condition");
DIE(1, "failed to broadcast not_empty condition");
result = pthread_cond_broadcast(&ring->not_full);
if (result != 0)
DIE(1, "Failed to broadcast not_full condition");
DIE(1, "failed to broadcast not_full condition");
// unlock the mutex to allow other threads to access the buffer
result = pthread_mutex_unlock(&ring->mutex);
if (result != 0)
DIE(1, "Failed to unlock mutex");
}
\ No newline at end of file
DIE(1, "pthread_mutex_unlock failed");
}
......@@ -6,7 +6,6 @@
#include <sys/types.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>
typedef struct so_ring_buffer_t {
char *data;
......@@ -15,11 +14,10 @@ typedef struct so_ring_buffer_t {
size_t len;
size_t cap;
/* TODO: Add syncronization primitives */
pthread_mutex_t mutex;
pthread_cond_t not_empty;
pthread_cond_t not_full;
sem_t semaphore;
int stop;
pthread_mutex_t mutex; // mutex for synchronization access to the buffer
pthread_cond_t not_empty; // condition variable for signaling when the buffer is not empty
pthread_cond_t not_full;// condition variable for signaling when the buffer is not full
int stop; // flag to signal that the buffer is stopped
} so_ring_buffer_t;
int ring_buffer_init(so_ring_buffer_t *rb, size_t cap);
......@@ -28,4 +26,4 @@ ssize_t ring_buffer_dequeue(so_ring_buffer_t *rb, void *data, size_t size);
void ring_buffer_destroy(so_ring_buffer_t *rb);
void ring_buffer_stop(so_ring_buffer_t *rb);
#endif /* __SO_RINGBUFFER_H__ */
\ No newline at end of file
#endif /* __SO_RINGBUFFER_H__ */
File deleted
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment