Skip to content
Snippets Groups Projects
Commit 059c9e12 authored by Dana-Maria CĂRUNTU's avatar Dana-Maria CĂRUNTU
Browse files

update consumer logic (90p)

parent b3129092
No related branches found
No related tags found
No related merge requests found
Pipeline #93110 passed
......@@ -3,72 +3,102 @@
#include <pthread.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdatomic.h>
#include "consumer.h"
#include "ring_buffer.h"
#include "packet.h"
#include "utils.h"
pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
_Atomic unsigned long global_seq_num;
_Atomic unsigned long write_seq_num;
pthread_mutex_t write_mutex;
pthread_cond_t write_condition;
pthread_mutex_t global_seq_num_mutex;
void initialize_thread_resources(void)
{
pthread_mutex_init(&write_mutex, NULL);
pthread_cond_init(&write_condition, NULL);
}
void cleanup_thread_resources(void)
{
pthread_mutex_destroy(&write_mutex);
pthread_cond_destroy(&write_condition);
}
void write_to_file(int file_descriptor, unsigned long packet_sequence, char log_entry[], int length)
{
pthread_mutex_lock(&write_mutex);
while (packet_sequence != write_seq_num)
pthread_cond_wait(&write_condition, &write_mutex);
write(file_descriptor, log_entry, length);
atomic_fetch_add(&write_seq_num, 1);
pthread_cond_broadcast(&write_condition);
pthread_mutex_unlock(&write_mutex);
}
void consumer_thread(so_consumer_ctx_t *ctx)
{
/* TODO: implement consumer thread */
so_packet_t packet;
while (1) {
pthread_mutex_lock(&log_mutex);
ssize_t result = ring_buffer_dequeue(ctx->producer_rb, &packet, sizeof(packet));
// pthread_mutex_unlock(&log_mutex);
if (result <= 0 && ctx->producer_rb->stop) {
pthread_mutex_unlock(&log_mutex);
break;
}
pthread_mutex_unlock(&log_mutex);
char log[PKT_SZ];
so_action_t decision = process_packet(&packet);
unsigned long pkt_hash = packet_hash(&packet);
pthread_mutex_lock(&log_mutex);
snprintf(log, PKT_SZ, "%s %016lx %lu\n", RES_TO_STR(decision), pkt_hash, packet.hdr.timestamp);
//pthread_mutex_lock(&ctx->log_mutex);
fputs(log, ctx->log_file);
fflush(ctx->log_file);
pthread_mutex_unlock(&log_mutex);
//pthread_mutex_unlock(&ctx->log_mutex);
}
// (void) ctx;
char packet_buffer[256];
char log_entry[256];
unsigned long packet_sequence;
int file_descriptor = ctx->log_file;
while (true) {
pthread_mutex_lock(&write_mutex);
ssize_t received_size = ring_buffer_dequeue(ctx->producer_rb, packet_buffer, 256);
if (received_size <= 0 && ctx->producer_rb->stop) {
pthread_mutex_unlock(&write_mutex);
break;
}
packet_sequence = atomic_fetch_add(&global_seq_num, 1);
pthread_mutex_unlock(&write_mutex);
struct so_packet_t *packet = (struct so_packet_t *)packet_buffer;
int action_result = process_packet(packet);
unsigned long hash_value = packet_hash(packet);
unsigned long time_stamp = packet->hdr.timestamp;
int length = snprintf(log_entry, sizeof(log_entry), "%s %016lx %lu\n",
action_result == PASS ? "PASS" : "DROP", hash_value, time_stamp);
write_to_file(file_descriptor, packet_sequence, log_entry, length);
}
cleanup_thread_resources();
}
void *consumer_thread_wrapper(void *arg)
{
consumer_thread((so_consumer_ctx_t *)arg);
return NULL;
}
int create_consumers(pthread_t *tids,
int num_consumers,
struct so_ring_buffer_t *rb,
const char *out_filename)
int num_consumers,
struct so_ring_buffer_t *rb,
const char *out_filename)
{
// (void) tids;
// (void) num_consumers;
// (void) rb;
// (void) out_filename;
FILE *log_file = fopen(out_filename, "w");
if (!log_file) {
perror("Error opening file");
return -1;
}
pthread_mutex_t log_mutex;
pthread_mutex_init(&log_mutex, NULL);
for (int i = 0; i < num_consumers; i++) {
so_consumer_ctx_t *ctx = malloc(sizeof(so_consumer_ctx_t));
ctx->producer_rb = rb;
ctx->log_file = log_file;
ctx->log_mutex = log_mutex;
pthread_create(&tids[i], NULL, consumer_thread, ctx);
}
return num_consumers;
int file_descriptor = open(out_filename, O_RDWR | O_CREAT | O_APPEND, 0666);
DIE(file_descriptor < 0, "Error opening output file");
for (int i = 0; i < num_consumers; i++) {
so_consumer_ctx_t *ctx = malloc(sizeof(so_consumer_ctx_t));
if(!ctx){
DIE(1, "malloc failed");
}
ctx->producer_rb = rb;
ctx->log_file = file_descriptor;
ctx->log_mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
pthread_create(&tids[i], NULL, consumer_thread_wrapper, ctx);
}
return num_consumers;
}
\ No newline at end of file
......@@ -12,10 +12,9 @@ typedef struct so_consumer_ctx_t {
/* TODO: add synchronization primitives for timestamp ordering */
pthread_mutex_t log_mutex;
FILE *log_file;
pthread_cond_t seq_cond;
int log_file;
pthread_cond_t seq_cond;
unsigned long next_seq_to_log;
unsigned long my_seq;
} so_consumer_ctx_t;
......
No preview for this file type
......@@ -8,225 +8,213 @@
#include "utils.h"
typedef struct {
size_t first_part;
size_t second_part;
size_t first_part;
size_t second_part;
} SplitSizes;
SplitSizes split_space(size_t size, size_t space_at_end) {
SplitSizes sizes;
if (size > space_at_end) {
sizes.first_part = space_at_end;
sizes.second_part = size - space_at_end;
} else {
sizes.first_part = size;
sizes.second_part = 0;
}
return sizes;
SplitSizes split_space(size_t size, size_t space_at_end)
{
SplitSizes sizes;
if (size > space_at_end) {
sizes.first_part = space_at_end;
sizes.second_part = size - space_at_end;
} else {
sizes.first_part = size;
sizes.second_part = 0;
}
return sizes;
}
static size_t get_free_space(const so_ring_buffer_t *ring)
{
return ring->cap - ring->len;
return ring->cap - ring->len;
}
static size_t adjust_position(size_t pos, size_t offset, size_t cap)
{
size_t new_pos = pos + offset;
if (new_pos >= cap) {
new_pos -= cap;
}
return new_pos;
size_t new_pos = pos + offset;
if (new_pos >= cap)
new_pos -= cap;
return new_pos;
}
static void copy_data_to_ring(so_ring_buffer_t *ring, const void *data, size_t size)
{
size_t space_at_end = ring->cap - ring->write_pos;
SplitSizes sizes = split_space(size, space_at_end);
memcpy(ring->data + ring->write_pos, data, sizes.first_part);
if (sizes.second_part > 0) {
memcpy(ring->data, (const char *)data + sizes.first_part, sizes.second_part);
}
size_t space_at_end = ring->cap - ring->write_pos;
SplitSizes sizes = split_space(size, space_at_end);
memcpy(ring->data + ring->write_pos, data, sizes.first_part);
if (sizes.second_part > 0)
memcpy(ring->data, (const char *)data + sizes.first_part, sizes.second_part);
}
static void copy_data_from_ring(so_ring_buffer_t *ring, void *data, size_t size)
{
size_t space_at_end = ring->cap - ring->read_pos;
SplitSizes sizes = split_space(size, space_at_end);
memcpy(data, ring->data + ring->read_pos, sizes.first_part);
if (sizes.second_part > 0) {
memcpy((char *)data + sizes.first_part, ring->data, sizes.second_part);
}
size_t space_at_end = ring->cap - ring->read_pos;
SplitSizes sizes = split_space(size, space_at_end);
memcpy(data, ring->data + ring->read_pos, sizes.first_part);
}
int ring_buffer_init(so_ring_buffer_t *ring, size_t cap)
{
ring->data = malloc(cap);
if (ring->data == NULL) {
DIE(1, "Memory allocation failed");
}
ring->cap = cap;
ring->len = 0;
ring->read_pos = 0;
ring->write_pos = 0;
ring->stop = 0;
int result = pthread_mutex_init(&ring->mutex, NULL);
if (result != 0) {
DIE(1, "Failed to initialize mutex");
}
result = pthread_cond_init(&ring->not_empty, NULL);
if (result != 0) {
DIE(1, "Failed to initialize not_empty condition");
}
result = pthread_cond_init(&ring->not_full, NULL);
if (result != 0) {
DIE(1, "Failed to initialize not_full condition");
}
return 0;
ring->data = malloc(cap);
if (ring->data == NULL)
DIE(1, "Memory allocation failed");
ring->cap = cap;
ring->len = 0;
ring->read_pos = 0;
ring->write_pos = 0;
ring->stop = 0;
int result = pthread_mutex_init(&ring->mutex, NULL);
if (result != 0)
DIE(1, "Failed to initialize mutex");
result = pthread_cond_init(&ring->not_empty, NULL);
if (result != 0)
DIE(1, "Failed to initialize not_empty condition");
result = pthread_cond_init(&ring->not_full, NULL);
if (result != 0)
DIE(1, "Failed to initialize not_full condition");
return 0;
}
ssize_t ring_buffer_enqueue(so_ring_buffer_t *ring, void *data, size_t size)
{
if (data == NULL) {
ERR(1, "Invalid data pointer");
return -1;
}
int result = pthread_mutex_lock(&ring->mutex);
if (result != 0) {
ERR(1, "Failed to lock mutex");
return -1;
}
while (get_free_space(ring) < size && !ring->stop) {
result = pthread_cond_wait(&ring->not_full, &ring->mutex);
if (result != 0) {
pthread_mutex_unlock(&ring->mutex);
ERR(result != 0, "Failed to wait on not_full condition");
return -1;
}
}
if (ring->stop) {
pthread_mutex_unlock(&ring->mutex);
return -1;
}
copy_data_to_ring(ring, data, size);
ring->write_pos = adjust_position(ring->write_pos, size, ring->cap);
ring->len += size;
result = pthread_cond_signal(&ring->not_empty);
if (result != 0) {
ERR(1, "Failed to signal not_empty condition");
}
result = pthread_mutex_unlock(&ring->mutex);
if (result != 0) {
ERR(1, "Failed to unlock mutex");
}
return size;
if (data == NULL) {
ERR(1, "Invalid data pointer");
return -1;
}
int result = pthread_mutex_lock(&ring->mutex);
if (result != 0) {
ERR(result != 0, "Failed to lock mutex");
return -1;
}
while (get_free_space(ring) < size && !ring->stop) {
result = pthread_cond_wait(&ring->not_full, &ring->mutex);
if (result != 0) {
pthread_mutex_unlock(&ring->mutex);
ERR(result != 0, "Failed to wait on not_full condition");
return -1;
}
}
if (ring->stop) {
pthread_mutex_unlock(&ring->mutex);
return -1;
}
copy_data_to_ring(ring, data, size);
ring->write_pos = adjust_position(ring->write_pos, size, ring->cap);
ring->len += size;
result = pthread_cond_signal(&ring->not_empty);
if (result != 0)
ERR(result != 0, "Failed to signal not_empty condition");
result = pthread_mutex_unlock(&ring->mutex);
if (result != 0)
ERR(result != 0, "Failed to unlock mutex");
return size;
}
ssize_t ring_buffer_dequeue(so_ring_buffer_t *ring, void *data, size_t size)
{
if (data == NULL) {
ERR(1, "Invalid data pointer");
return -1;
}
int result = pthread_mutex_lock(&ring->mutex);
if (result != 0) {
ERR(1, "Failed to lock mutex");
return -1;
}
while (ring->len < size && !ring->stop) {
result = pthread_cond_wait(&ring->not_empty, &ring->mutex);
if (result != 0) {
pthread_mutex_unlock(&ring->mutex);
ERR(1, "Failed to wait on not_empty condition");
return -1;
}
}
if (ring->stop && ring->len == 0) {
pthread_mutex_unlock(&ring->mutex);
return 0;
}
copy_data_from_ring(ring, data, size);
ring->read_pos = adjust_position(ring->read_pos, size, ring->cap);
ring->len -= size;
result = pthread_cond_signal(&ring->not_full);
if (result != 0) {
ERR(1, "Failed to signal not_full condition");
}
result = pthread_mutex_unlock(&ring->mutex);
if (result != 0) {
ERR(1, "Failed to unlock mutex");
}
return size;
if (data == NULL) {
ERR(1, "Invalid data pointer");
return -1;
}
int result = pthread_mutex_lock(&ring->mutex);
if (result != 0) {
ERR(result != 0, "Failed to lock mutex");
return -1;
}
while (ring->len < size && !ring->stop) {
result = pthread_cond_wait(&ring->not_empty, &ring->mutex);
if (result != 0) {
pthread_mutex_unlock(&ring->mutex);
ERR(result != 0, "Failed to wait on not_empty condition");
return -1;
}
}
if (ring->stop && ring->len == 0) {
pthread_mutex_unlock(&ring->mutex);
return 0;
}
copy_data_from_ring(ring, data, size);
ring->read_pos = adjust_position(ring->read_pos, size, ring->cap);
ring->len -= size;
result = pthread_cond_signal(&ring->not_full);
if (result != 0)
ERR(result != 0, "Failed to signal not_full condition");
result = pthread_mutex_unlock(&ring->mutex);
if (result != 0)
ERR(result != 0, "Failed to unlock mutex");
return size;
}
void ring_buffer_destroy(so_ring_buffer_t *ring)
{
if (ring == NULL || ring->data == NULL) {
DIE(1, "Attempting to destroy an uninitialized ring buffer");
}
free(ring->data);
int result = pthread_mutex_destroy(&ring->mutex);
if (result != 0) {
DIE(1, "Failed to destroy mutex");
}
result = pthread_cond_destroy(&ring->not_empty);
if (result != 0) {
DIE(1, "Failed to destroy not_empty condition");
}
result = pthread_cond_destroy(&ring->not_full);
if (result != 0) {
DIE(1, "Failed to destroy not_full condition");
}
if (ring == NULL || ring->data == NULL)
DIE(1, "Attempting to destroy an uninitialized ring buffer");
free(ring->data);
int result = pthread_mutex_destroy(&ring->mutex);
if (result != 0)
DIE(1, "Failed to destroy mutex");
result = pthread_cond_destroy(&ring->not_empty);
if (result != 0)
DIE(1, "Failed to destroy not_empty condition");
result = pthread_cond_destroy(&ring->not_full);
if (result != 0)
DIE(1, "Failed to destroy not_full condition");
}
void ring_buffer_stop(so_ring_buffer_t *ring) {
if (ring == NULL) {
DIE(1, "Attempting to stop an uninitialized ring buffer");
}
void ring_buffer_stop(so_ring_buffer_t *ring)
{
if (ring == NULL)
DIE(1, "Attempting to stop an uninitialized ring buffer");
int result = pthread_mutex_lock(&ring->mutex);
int result = pthread_mutex_lock(&ring->mutex);
if (result != 0) {
DIE(1, "Failed to lock mutex");
}
if (result != 0)
DIE(1, "Failed to lock mutex");
ring->stop = 1;
ring->stop = 1;
result = pthread_cond_broadcast(&ring->not_empty);
if (result != 0) {
DIE(1, "Failed to broadcast not_empty condition");
}
result = pthread_cond_broadcast(&ring->not_empty);
if (result != 0)
DIE(1, "Failed to broadcast not_empty condition");
result = pthread_cond_broadcast(&ring->not_full);
if (result != 0) {
DIE(1, "Failed to broadcast not_full condition");
}
result = pthread_cond_broadcast(&ring->not_full);
if (result != 0)
DIE(1, "Failed to broadcast not_full condition");
result = pthread_mutex_unlock(&ring->mutex);
if (result != 0) {
DIE(1, "Failed to unlock mutex");
}
}
result = pthread_mutex_unlock(&ring->mutex);
if (result != 0)
DIE(1, "Failed to unlock mutex");
}
\ No newline at end of file
......@@ -6,22 +6,20 @@
#include <sys/types.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>
typedef struct so_ring_buffer_t {
char *data;
size_t read_pos;
size_t write_pos;
size_t len;
size_t cap;
/* TODO: Add syncronization primitives */
pthread_mutex_t mutex;
pthread_cond_t not_empty;
pthread_cond_t not_full;
sem_t semaphore;
int stop;
} so_ring_buffer_t;
int ring_buffer_init(so_ring_buffer_t *rb, size_t cap);
......@@ -30,4 +28,4 @@ ssize_t ring_buffer_dequeue(so_ring_buffer_t *rb, void *data, size_t size);
void ring_buffer_destroy(so_ring_buffer_t *rb);
void ring_buffer_stop(so_ring_buffer_t *rb);
#endif /* __SO_RINGBUFFER_H__ */
#endif /* __SO_RINGBUFFER_H__ */
\ No newline at end of file
No preview for this file type
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment