Skip to content
Snippets Groups Projects
Commit f868070c authored by Mihail NECULA's avatar Mihail NECULA
Browse files

Resolve conflicts

parent a6cd1be5
Branches solution
No related tags found
No related merge requests found
Pipeline #86791 passed
......@@ -9,7 +9,6 @@
#include "utils.h"
#include "consumer.h"
<<<<<<< HEAD
void so_consumer_ctx_destroy(so_consumer_ctx_t *ctx) {
if (!ctx)
return;
......@@ -30,33 +29,19 @@ void so_consumer_ctx_destroy(so_consumer_ctx_t *ctx) {
free(ctx);
}
=======
>>>>>>> 99c65a3089e66ff604e0fccc0f9cf65d2786dffd
void *consumer_thread(void *context)
{
so_consumer_ctx_t *ctx = (so_consumer_ctx_t *)context;
char buffer[PKT_SZ], out_buf[PKT_SZ];
do {
<<<<<<< HEAD
pthread_mutex_lock(ctx->buffer_mutex);
=======
pthread_mutex_lock(&ctx->buffer_mutex);
>>>>>>> 99c65a3089e66ff604e0fccc0f9cf65d2786dffd
/* while the producer can send packets, we
* wait until we have data in buffer
*/
<<<<<<< HEAD
while (ctx->producer_rb->can_enqueue && !ctx->producer_rb->len)
pthread_cond_wait(&ctx->producer_rb->has_packets, ctx->buffer_mutex);
=======
while (ctx->producer_rb->can_enqueue && !ctx->producer_rb->len) {
pthread_mutex_lock(&ctx->producer_rb->mutex);
pthread_cond_wait(&ctx->producer_rb->has_packets, &ctx->producer_rb->mutex);
pthread_mutex_unlock(&ctx->producer_rb->mutex);
}
>>>>>>> 99c65a3089e66ff604e0fccc0f9cf65d2786dffd
/* verify if we have a packet to process */
if (!ctx->producer_rb->len)
......@@ -65,19 +50,12 @@ void *consumer_thread(void *context)
/* take a packet */
ring_buffer_dequeue(ctx->producer_rb, (void *)buffer, PKT_SZ);
<<<<<<< HEAD
ctx->idx_curr_packet = *ctx->num_packets;
(*ctx->num_packets)++;
pthread_mutex_unlock(ctx->buffer_mutex);
/* create the log */
=======
pthread_mutex_lock(&ctx->output_mutex);
pthread_mutex_unlock(&ctx->buffer_mutex);
>>>>>>> 99c65a3089e66ff604e0fccc0f9cf65d2786dffd
struct so_packet_t *pkt = (struct so_packet_t *)buffer;
int action = process_packet(pkt);
......@@ -87,7 +65,6 @@ void *consumer_thread(void *context)
int len = snprintf(out_buf, 256, "%s %016lx %lu\n",
RES_TO_STR(action), hash, timestamp);
<<<<<<< HEAD
/* write the log */
pthread_mutex_lock(ctx->output_mutex);
while (ctx->idx_curr_packet != *ctx->idx_curr_log)
......@@ -134,31 +111,6 @@ pthread_cond_t *create_thread_cond() {
pthread_cond_init(cond, NULL);
return cond;
=======
write(ctx->out_fd, out_buf, len);
pthread_mutex_unlock(&ctx->output_mutex);
} while (1);
pthread_mutex_unlock(&ctx->buffer_mutex);
/* after last conusmer thread finishes his work,
* we free the memory allocated for context
*/
pthread_mutex_lock(&ctx->mutex);
if (ctx->num_consumers > 1) {
ctx->num_consumers--;
pthread_mutex_unlock(&ctx->mutex);
} else {
close(ctx->out_fd);
pthread_mutex_unlock(&ctx->mutex);
pthread_mutex_destroy(&ctx->mutex);
pthread_mutex_destroy(&ctx->output_mutex);
pthread_mutex_destroy(&ctx->buffer_mutex);
}
return NULL;
>>>>>>> 99c65a3089e66ff604e0fccc0f9cf65d2786dffd
}
int create_consumers(pthread_t *tids,
......@@ -166,7 +118,6 @@ int create_consumers(pthread_t *tids,
struct so_ring_buffer_t *rb,
const char *out_filename)
{
<<<<<<< HEAD
int out_fd = open(out_filename, O_CREAT | O_WRONLY, 0644);
size_t *num_packets = malloc(1 * sizeof(size_t));
......@@ -205,23 +156,6 @@ int create_consumers(pthread_t *tids,
ctx->producer_rb = rb;
pthread_create(&tids[i], NULL, consumer_thread, (void *)(ctx));
}
=======
/* create the context for threads */
so_consumer_ctx_t *ctx = malloc(sizeof(so_consumer_ctx_t));
DIE(!ctx, "malloc() failed\n");
ctx->producer_rb = rb;
ctx->num_consumers = num_consumers;
ctx->out_fd = open(out_filename, O_CREAT | O_WRONLY, 0644);
pthread_mutex_init(&ctx->buffer_mutex, NULL);
pthread_mutex_init(&ctx->output_mutex, NULL);
pthread_mutex_init(&ctx->mutex, NULL);
/* create the threads */
for (int i = 0; i < num_consumers; i++)
pthread_create(&tids[i], NULL, consumer_thread, (void *)(ctx));
>>>>>>> 99c65a3089e66ff604e0fccc0f9cf65d2786dffd
return num_consumers;
}
......@@ -11,7 +11,6 @@
typedef struct so_consumer_ctx_t {
struct so_ring_buffer_t *producer_rb;
size_t num_consumers;
<<<<<<< HEAD
/* idx start from 0*/
size_t idx_curr_packet;
size_t *idx_curr_log;
......@@ -23,14 +22,6 @@ typedef struct so_consumer_ctx_t {
pthread_mutex_t *output_mutex;
pthread_mutex_t *mutex;
pthread_cond_t *can_write;
=======
int out_fd;
pthread_mutex_t buffer_mutex;
pthread_mutex_t output_mutex;
pthread_mutex_t mutex;
>>>>>>> 99c65a3089e66ff604e0fccc0f9cf65d2786dffd
} so_consumer_ctx_t;
void* consumer_thread(void *ctx);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment