diff --git a/src/consumer.c b/src/consumer.c index 1b411f0a30c79ebc0ab527aaeda9b366582d9f68..155e59d2d1f5c0b78d69ae16b7c5d220e38daa20 100644 --- a/src/consumer.c +++ b/src/consumer.c @@ -9,7 +9,6 @@ #include "utils.h" #include "consumer.h" -<<<<<<< HEAD void so_consumer_ctx_destroy(so_consumer_ctx_t *ctx) { if (!ctx) return; @@ -30,33 +29,19 @@ void so_consumer_ctx_destroy(so_consumer_ctx_t *ctx) { free(ctx); } -======= ->>>>>>> 99c65a3089e66ff604e0fccc0f9cf65d2786dffd void *consumer_thread(void *context) { so_consumer_ctx_t *ctx = (so_consumer_ctx_t *)context; char buffer[PKT_SZ], out_buf[PKT_SZ]; do { -<<<<<<< HEAD pthread_mutex_lock(ctx->buffer_mutex); -======= - pthread_mutex_lock(&ctx->buffer_mutex); ->>>>>>> 99c65a3089e66ff604e0fccc0f9cf65d2786dffd /* while the producer can send packets, we * wait until we have data in buffer */ -<<<<<<< HEAD while (ctx->producer_rb->can_enqueue && !ctx->producer_rb->len) pthread_cond_wait(&ctx->producer_rb->has_packets, ctx->buffer_mutex); -======= - while (ctx->producer_rb->can_enqueue && !ctx->producer_rb->len) { - pthread_mutex_lock(&ctx->producer_rb->mutex); - pthread_cond_wait(&ctx->producer_rb->has_packets, &ctx->producer_rb->mutex); - pthread_mutex_unlock(&ctx->producer_rb->mutex); - } ->>>>>>> 99c65a3089e66ff604e0fccc0f9cf65d2786dffd /* verify if we have a packet to process */ if (!ctx->producer_rb->len) @@ -65,19 +50,12 @@ void *consumer_thread(void *context) /* take a packet */ ring_buffer_dequeue(ctx->producer_rb, (void *)buffer, PKT_SZ); -<<<<<<< HEAD ctx->idx_curr_packet = *ctx->num_packets; (*ctx->num_packets)++; pthread_mutex_unlock(ctx->buffer_mutex); /* create the log */ -======= - - pthread_mutex_lock(&ctx->output_mutex); - pthread_mutex_unlock(&ctx->buffer_mutex); - ->>>>>>> 99c65a3089e66ff604e0fccc0f9cf65d2786dffd struct so_packet_t *pkt = (struct so_packet_t *)buffer; int action = process_packet(pkt); @@ -87,7 +65,6 @@ void *consumer_thread(void *context) int len = snprintf(out_buf, 256, "%s %016lx %lu\n", RES_TO_STR(action), hash, timestamp); -<<<<<<< HEAD /* write the log */ pthread_mutex_lock(ctx->output_mutex); while (ctx->idx_curr_packet != *ctx->idx_curr_log) @@ -134,31 +111,6 @@ pthread_cond_t *create_thread_cond() { pthread_cond_init(cond, NULL); return cond; -======= - write(ctx->out_fd, out_buf, len); - - pthread_mutex_unlock(&ctx->output_mutex); - } while (1); - - pthread_mutex_unlock(&ctx->buffer_mutex); - - /* after last conusmer thread finishes his work, - * we free the memory allocated for context - */ - pthread_mutex_lock(&ctx->mutex); - if (ctx->num_consumers > 1) { - ctx->num_consumers--; - pthread_mutex_unlock(&ctx->mutex); - } else { - close(ctx->out_fd); - pthread_mutex_unlock(&ctx->mutex); - pthread_mutex_destroy(&ctx->mutex); - pthread_mutex_destroy(&ctx->output_mutex); - pthread_mutex_destroy(&ctx->buffer_mutex); - } - - return NULL; ->>>>>>> 99c65a3089e66ff604e0fccc0f9cf65d2786dffd } int create_consumers(pthread_t *tids, @@ -166,7 +118,6 @@ int create_consumers(pthread_t *tids, struct so_ring_buffer_t *rb, const char *out_filename) { -<<<<<<< HEAD int out_fd = open(out_filename, O_CREAT | O_WRONLY, 0644); size_t *num_packets = malloc(1 * sizeof(size_t)); @@ -205,23 +156,6 @@ int create_consumers(pthread_t *tids, ctx->producer_rb = rb; pthread_create(&tids[i], NULL, consumer_thread, (void *)(ctx)); } -======= - /* create the context for threads */ - so_consumer_ctx_t *ctx = malloc(sizeof(so_consumer_ctx_t)); - - DIE(!ctx, "malloc() failed\n"); - - ctx->producer_rb = rb; - ctx->num_consumers = num_consumers; - ctx->out_fd = open(out_filename, O_CREAT | O_WRONLY, 0644); - pthread_mutex_init(&ctx->buffer_mutex, NULL); - pthread_mutex_init(&ctx->output_mutex, NULL); - pthread_mutex_init(&ctx->mutex, NULL); - - /* create the threads */ - for (int i = 0; i < num_consumers; i++) - pthread_create(&tids[i], NULL, consumer_thread, (void *)(ctx)); ->>>>>>> 99c65a3089e66ff604e0fccc0f9cf65d2786dffd return num_consumers; } diff --git a/src/consumer.h b/src/consumer.h index d7d800d47a497931951fe9dc3652c3d208ea1372..bea4283993775cd8322148c23e5ece9b5d77d7f4 100644 --- a/src/consumer.h +++ b/src/consumer.h @@ -11,7 +11,6 @@ typedef struct so_consumer_ctx_t { struct so_ring_buffer_t *producer_rb; size_t num_consumers; -<<<<<<< HEAD /* idx start from 0*/ size_t idx_curr_packet; size_t *idx_curr_log; @@ -23,14 +22,6 @@ typedef struct so_consumer_ctx_t { pthread_mutex_t *output_mutex; pthread_mutex_t *mutex; pthread_cond_t *can_write; -======= - - int out_fd; - - pthread_mutex_t buffer_mutex; - pthread_mutex_t output_mutex; - pthread_mutex_t mutex; ->>>>>>> 99c65a3089e66ff604e0fccc0f9cf65d2786dffd } so_consumer_ctx_t; void* consumer_thread(void *ctx);