Skip to content
Snippets Groups Projects
Commit 5fbf852a authored by albert_mark.stan's avatar albert_mark.stan
Browse files

M

parent 64b6e217
No related branches found
No related tags found
No related merge requests found
Pipeline #95482 passed
{
"files.associations": {
"ring_buffer.h": "c"
}
}
\ No newline at end of file
// SPDX-License-Identifier: BSD-3-Clause
#include <pthread.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include "consumer.h"
#include "ring_buffer.h"
#include "packet.h"
#include "utils.h"
#include <stdio.h>
void consumer_thread(so_consumer_ctx_t *ctx)
{
/* TODO: implement consumer thread */
(void) ctx;
char buffer[PKT_SZ];
char log_entry[256];
ssize_t sz;
while (1) {
sz = ring_buffer_dequeue(ctx->producer_rb, buffer, PKT_SZ);
if (sz <= 0) break;
struct so_packet_t *pkt = (struct so_packet_t *)buffer;
int action = process_packet(pkt);
unsigned long hash = packet_hash(pkt);
unsigned long timestamp = pkt->hdr.timestamp;
pthread_mutex_lock(&ctx->lock);
while (timestamp < ctx->last_processed_timestamp) {
pthread_cond_wait(&ctx->cond, &ctx->lock);
}
int len = snprintf(log_entry, 256, "%s %016lx %lu",
RES_TO_STR(action), hash, timestamp);
write(ctx->output_fd, log_entry, len);
ctx->last_processed_timestamp = timestamp;
pthread_cond_broadcast(&ctx->cond);
pthread_mutex_unlock(&ctx->lock);
}
}
int create_consumers(pthread_t *tids,
int num_consumers,
struct so_ring_buffer_t *rb,
const char *out_filename)
int create_consumers(pthread_t *tids, int num_consumers, struct so_ring_buffer_t *rb, const char *out_filename)
{
(void) tids;
(void) num_consumers;
(void) rb;
(void) out_filename;
for (int i = 0; i < num_consumers; i++) {
/*
* TODO: Launch consumer threads
**/
}
return num_consumers;
}
for (int i = 0; i < num_consumers; i++) {
so_consumer_ctx_t *ctx = calloc(1, sizeof(so_consumer_ctx_t));
ctx->producer_rb = rb;
ctx->output_fd = open(out_filename, O_RDWR | O_CREAT | O_TRUNC, 0666);
DIE(ctx->output_fd < 0, "open");
pthread_mutex_init(&ctx->lock, NULL);
pthread_cond_init(&ctx->cond, NULL);
ctx->last_processed_timestamp = 0;
pthread_create(&tids[i], NULL, (void *(*)(void *))consumer_thread, ctx);
}
return num_consumers;
}
\ No newline at end of file
......@@ -9,7 +9,11 @@
typedef struct so_consumer_ctx_t {
struct so_ring_buffer_t *producer_rb;
/* TODO: add synchronization primitives for timestamp ordering */
int output_fd;
pthread_mutex_t lock;
pthread_cond_t cond;
unsigned long last_processed_timestamp;
} so_consumer_ctx_t;
int create_consumers(pthread_t *tids,
......@@ -17,4 +21,4 @@ int create_consumers(pthread_t *tids,
so_ring_buffer_t *rb,
const char *out_filename);
#endif /* __SO_CONSUMER_H__ */
#endif /* __SO_CONSUMER_H__ */
\ No newline at end of file
File added
......@@ -68,11 +68,14 @@ int main(int argc, char **argv)
/* start publishing data */
publish_data(&ring_buffer, argv[1]);
/* TODO: wait for child threads to finish execution*/
for (int i = 0; i < num_consumers; i++) {
pthread_join(thread_ids[i], NULL);
}
(void) threads;
free(thread_ids);
return 0;
}
// SPDX-License-Identifier: BSD-3-Clause
#include "ring_buffer.h"
int ring_buffer_init(so_ring_buffer_t *ring, size_t cap)
{
/* TODO: implement ring_buffer_init */
(void) ring;
(void) cap;
ring->data = malloc(cap);
if (!ring->data) return -1;
ring->read_pos = 0;
ring->write_pos = 0;
ring->len = 0;
ring->cap = cap;
return 1;
pthread_mutex_init(&ring->lock, NULL);
pthread_cond_init(&ring->not_empty, NULL);
pthread_cond_init(&ring->not_full, NULL);
return 0;
}
ssize_t ring_buffer_enqueue(so_ring_buffer_t *ring, void *data, size_t size)
{
/* TODO: implement ring_buffer_enqueue */
(void) ring;
(void) data;
(void) size;
pthread_mutex_lock(&ring->lock);
while (ring->len == ring->cap) {
pthread_cond_wait(&ring->not_full, &ring->lock);
}
memcpy(ring->data + ring->write_pos, data, size);
ring->write_pos = (ring->write_pos + size) % ring->cap;
ring->len += size;
return -1;
pthread_cond_signal(&ring->not_empty);
pthread_mutex_unlock(&ring->lock);
return size;
}
ssize_t ring_buffer_dequeue(so_ring_buffer_t *ring, void *data, size_t size)
{
/* TODO: Implement ring_buffer_dequeue */
(void) ring;
(void) data;
(void) size;
pthread_mutex_lock(&ring->lock);
while (ring->len == 0) {
pthread_cond_wait(&ring->not_empty, &ring->lock);
}
memcpy(data, ring->data + ring->read_pos, size);
ring->read_pos = (ring->read_pos + size) % ring->cap;
ring->len -= size;
return -1;
pthread_cond_signal(&ring->not_full);
pthread_mutex_unlock(&ring->lock);
return size;
}
void ring_buffer_destroy(so_ring_buffer_t *ring)
{
/* TODO: Implement ring_buffer_destroy */
(void) ring;
pthread_mutex_destroy(&ring->lock);
pthread_cond_destroy(&ring->not_empty);
pthread_cond_destroy(&ring->not_full);
free(ring->data);
}
void ring_buffer_stop(so_ring_buffer_t *ring)
{
/* TODO: Implement ring_buffer_stop */
(void) ring;
}
pthread_mutex_lock(&ring->lock);
pthread_cond_broadcast(&ring->not_empty);
pthread_cond_broadcast(&ring->not_full);
pthread_mutex_unlock(&ring->lock);
}
\ No newline at end of file
......@@ -5,6 +5,9 @@
#include <sys/types.h>
#include <string.h>
#include <pthread.h>
#include "packet.h"
#include <stdio.h>
typedef struct so_ring_buffer_t {
char *data;
......@@ -15,7 +18,11 @@ typedef struct so_ring_buffer_t {
size_t len;
size_t cap;
/* TODO: Add syncronization primitives */
pthread_mutex_t lock;
pthread_cond_t not_empty;
pthread_cond_t not_full;
} so_ring_buffer_t;
int ring_buffer_init(so_ring_buffer_t *rb, size_t cap);
......@@ -24,4 +31,4 @@ ssize_t ring_buffer_dequeue(so_ring_buffer_t *rb, void *data, size_t size);
void ring_buffer_destroy(so_ring_buffer_t *rb);
void ring_buffer_stop(so_ring_buffer_t *rb);
#endif /* __SO_RINGBUFFER_H__ */
#endif /* __SO_RINGBUFFER_H__ */
\ No newline at end of file
File added
File added
File added
File added
File added
File added
PASS a308b556b0990e95 0
DROP b8e6bc6fe03bfb41 3
DROP 3cc0721d55e9cf43 10
DROP 3643f500ad347f15 18
PASS c7c047a21b145143 27
DROP b6ea7ae68c224529 36
PASS ba882b7fa9b7ead3 41
DROP 645af67562c0e479 46
DROP 8d44c96b809d8633 52
DROP 44213846ad3c0f3b 60
DROP 1fdffd846ea0b989 0
PASS 41c9c9d01fae8a0b 9
DROP f78402c3976b84c1 13
DROP 1ea0450cafde8ae3 21
DROP 35afc287f2169b47 26
PASS ae3da41adf1f7443 34
PASS 1a8ea389d5776b47 44
PASS dbda99fbbac8eaa9 49
DROP ee3cf656bafb7d29 58
DROP fe0eac6c31e4b50d 68
DROP 507d9ed7f944efeb 76
PASS c2c53ed41015c55f 80
PASS ae4659a18ecb7257 87
PASS b42d3bf250ff21e9 90
PASS cc42dde8327e8e2f 97
DROP 0d16a5b5c0ca618f 105
PASS dcc9f958be208107 115
DROP 3556d749cac3c793 124
PASS e4130d693c4df5c5 127
DROP 9f7811a2ed7f733f 137
DROP e300151d63639181 142
DROP 6cfc64f2bccaabbd 151
PASS d13ebbb9ea3cab65 161
DROP d4edb9644c6b9497 170
PASS 86b73f81b504f097 179
PASS c6428c4e16231097 185
PASS 08d16458b997659b 192
DROP 26eab7d21087e4b5 201
DROP e2e58d5b4a0d6051 205
DROP 7395ae5f1d495301 210
PASS 2f88aed6bd955aff 215
DROP f27fb88de4e08ef9 224
DROP e1efe1cd37ac38cd 234
DROP d77234fb4501a9b1 238
DROP 035db473cc0e9057 246
PASS 062805cbcc9543d3 249
PASS 101b0119a1ec5b79 253
PASS c22c355258814a93 258
PASS c86a0b6d36cb4735 266
DROP fe020c45541a11cb 274
PASS 2bc54b95d3623a99 280
PASS 48dcecf19d805c85 285
PASS 6074a826880ede51 293
DROP 1ce65292ee3c237d 296
PASS 5a45e005a29ff855 302
PASS 87b04f26014abfdf 310
PASS 1bed9b6cc2eb8a75 317
PASS d0f4e11cabe8c499 322
DROP 119933b58c5be2d5 331
DROP 603579f16bdfbb99 338
DROP 966008e10576b951 344
DROP 5bfdbfbdced44c93 352
PASS f939ca5f25361911 361
DROP 88898df98757faa9 370
PASS b6888beef9c8781b 376
DROP e3a420ab13420965 384
DROP 51da2c1349eae2b5 393
DROP 3fc8a075b2d6f559 401
PASS 88980074bdc22037 411
DROP 2f24a42be9b670f1 421
DROP 4ef61d4e9c056e73 431
PASS 5758b106e14fae1f 436
PASS de5fed0f8d9dbb89 445
DROP 99ddd3be959c84e1 449
DROP f0feb29ae051c771 457
PASS f572923c32608271 464
PASS bb18407436d0c24d 467
PASS 0a3e563b2e7ce773 474
PASS 79d83ae1ff4c407d 478
DROP e09dfc24a05cf007 486
PASS 3f544351335d27d5 495
PASS 975251407aaf88a5 505
DROP ed4fe35c61968baf 508
DROP 8090875aff8aad8d 518
PASS c03f50b059d96429 521
PASS 24edd193296aea43 528
PASS 8e0ccdf1e1a46e35 538
PASS 11c86399bf3d3441 541
DROP ebcdf80a8f043157 547
DROP cb44d8384e63c8ff 556
DROP b56b6d04efb63d01 564
DROP 959f7ec726e48685 568
DROP 839defa1aaf0c4eb 575
DROP f2b1e1d4cc127647 580
PASS 3f738679ac106149 589
DROP 04cb36aae7ded1cf 594
PASS c6c9903e47591099 600
DROP dc4129a718c09621 607
DROP f13fb4def6aa0e83 615
DROP 39400c9b7e9096db 621
DROP 951434fe7127897d 631
DROP 3ac219f8e9a78273 637
DROP eefccc8d69b461bf 644
PASS a9acfcadbd486001 653
PASS c990bf9ee852991f 662
DROP 7e58a20ddc62fc1b 670
PASS 62ab06ff907d1671 675
DROP 0656604b5f35c98b 680
PASS cfb074933757d421 690
DROP 40243823ecc307a5 694
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment