Skip to content
Snippets Groups Projects
Commit a1d5738f authored by Radu-Cristian POPESCU's avatar Radu-Cristian POPESCU
Browse files

tema3

parent 02edde74
No related branches found
No related tags found
No related merge requests found
Pipeline #46373 passed
......@@ -17,9 +17,9 @@ os_task_t *create_task(void (*action)(void *), void *arg, void (*destroy_arg)(vo
t = malloc(sizeof(*t));
DIE(t == NULL, "malloc");
t->action = action; // the function
t->argument = arg; // arguments for the function
t->destroy_arg = destroy_arg; // destroy argument function
t->action = action; // the function
t->argument = arg; // arguments for the function
t->destroy_arg = destroy_arg; // destroy argument function
return t;
}
......@@ -38,7 +38,10 @@ void enqueue_task(os_threadpool_t *tp, os_task_t *t)
assert(tp != NULL);
assert(t != NULL);
/* TODO: Enqueue task to the shared task queue. Use synchronization. */
pthread_mutex_lock(&tp->queue_mutex);
list_add_tail(&tp->head, &t->list);
pthread_cond_signal(&tp->queue_cond);
pthread_mutex_unlock(&tp->queue_mutex);
}
/*
......@@ -59,16 +62,24 @@ static int queue_is_empty(os_threadpool_t *tp)
os_task_t *dequeue_task(os_threadpool_t *tp)
{
os_task_t *t;
os_task_t *task = NULL;
/* TODO: Dequeue task from the shared task queue. Use synchronization. */
return NULL;
pthread_mutex_lock(&tp->queue_mutex);
if (!queue_is_empty(tp)) {
task = list_entry(tp->head.next, os_task_t, list);
list_del(tp->head.next);
}
pthread_mutex_unlock(&tp->queue_mutex);
return task;
}
/* Loop function for threads */
static void *thread_loop_function(void *arg)
{
os_threadpool_t *tp = (os_threadpool_t *) arg;
os_threadpool_t *tp = (os_threadpool_t *)arg;
while (1) {
os_task_t *t;
......@@ -76,7 +87,8 @@ static void *thread_loop_function(void *arg)
t = dequeue_task(tp);
if (t == NULL)
break;
t->action(t->argument);
dataArgs *args = (dataArgs *)t->argument;
t->action(args->nodeIdx);
destroy_task(t);
}
......@@ -86,11 +98,16 @@ static void *thread_loop_function(void *arg)
/* Wait completion of all threads. This is to be called by the main thread. */
void wait_for_completion(os_threadpool_t *tp)
{
/* TODO: Wait for all worker threads. Use synchronization. */
pthread_mutex_lock(&tp->queue_mutex);
/* Join all worker threads. */
for (unsigned int i = 0; i < tp->num_threads; i++)
pthread_cond_broadcast(&tp->queue_cond);
pthread_mutex_unlock(&tp->queue_mutex);
// Join all worker threads
for (unsigned int i = 0; i < tp->num_threads; i++) {
pthread_join(tp->threads[i], NULL);
}
}
/* Create a new threadpool. */
......@@ -104,13 +121,13 @@ os_threadpool_t *create_threadpool(unsigned int num_threads)
list_init(&tp->head);
/* TODO: Initialize synchronization data. */
pthread_mutex_init(&tp->queue_mutex, NULL);
pthread_cond_init(&tp->queue_cond, NULL);
tp->num_threads = num_threads;
tp->threads = malloc(num_threads * sizeof(*tp->threads));
DIE(tp->threads == NULL, "malloc");
for (unsigned int i = 0; i < num_threads; ++i) {
rc = pthread_create(&tp->threads[i], NULL, &thread_loop_function, (void *) tp);
rc = pthread_create(&tp->threads[i], NULL, &thread_loop_function, (void *)tp);
DIE(rc < 0, "pthread_create");
}
......@@ -122,8 +139,7 @@ void destroy_threadpool(os_threadpool_t *tp)
{
os_list_node_t *n, *p;
/* TODO: Cleanup synchronization data. */
pthread_mutex_destroy(&tp->queue_mutex);
list_for_each_safe(n, p, &tp->head) {
list_del(n);
destroy_task(list_entry(n, os_task_t, list));
......
......@@ -13,6 +13,7 @@ typedef struct {
os_list_node_t list;
} os_task_t;
typedef struct os_threadpool {
unsigned int num_threads;
pthread_t *threads;
......@@ -27,8 +28,16 @@ typedef struct os_threadpool {
os_list_node_t head;
/* TODO: Define threapool / queue synchronization data. */
pthread_mutex_t queue_mutex; // Mutex for protecting the task queue
pthread_cond_t queue_cond; // Condition variable for task availability
} os_threadpool_t;
typedef struct dataArgs
{
unsigned int nodeIdx;
} dataArgs;
os_task_t *create_task(void (*f)(void *), void *arg, void (*destroy_arg)(void *));
void destroy_task(os_task_t *t);
......
......@@ -11,18 +11,52 @@
#include "log/log.h"
#include "utils.h"
#define NUM_THREADS 4
#define NUM_THREADS 4
static int sum;
static os_graph_t *graph;
static os_threadpool_t *tp;
/* TODO: Define graph synchronization mechanisms. */
pthread_mutex_t mutex;
pthread_mutex_t mutex2;
pthread_mutex_t mutex3;
/* TODO: Define graph task argument. */
typedef struct {
os_graph_t *graph;
unsigned int node_idx;
} graph_task_t;
static void process_node(unsigned int idx)
{
/* TODO: Implement thread-pool based processing of graph. */
os_node_t *node = graph->nodes[idx];
pthread_mutex_lock(&mutex);
if (graph->visited[idx] != DONE) {
sum += node->info;
graph->visited[idx] = DONE;
}
pthread_mutex_unlock(&mutex);
for (unsigned int i = 0; i < node->num_neighbours; i++) {
pthread_mutex_lock(&mutex);
if (graph->visited[node->neighbours[i]] == NOT_VISITED) {
struct dataArgs data;
data.nodeIdx = node->neighbours[idx];
struct dataArgs *data_ptr = malloc(sizeof(struct dataArgs));
*data_ptr = data;
graph->visited[node->neighbours[i]] = PROCESSING;
os_task_t *task = create_task((void *)process_node, data_ptr, free);
enqueue_task(tp, task);
pthread_mutex_unlock(&mutex);
break;
}
pthread_mutex_unlock(&mutex);
}
}
int main(int argc, char *argv[])
......@@ -39,9 +73,13 @@ int main(int argc, char *argv[])
graph = create_graph_from_file(input_file);
/* TODO: Initialize graph synchronization mechanisms. */
pthread_mutex_init(&mutex, NULL);
tp = create_threadpool(NUM_THREADS);
process_node(0);
for (int i = 0; i < graph->num_nodes; i++) {
if (graph->visited[i] != DONE)
process_node(i);
}
wait_for_completion(tp);
destroy_threadpool(tp);
......
......@@ -10,7 +10,7 @@ src:
check: clean
make -i SRC_PATH=$(SRC_PATH)
SRC_PATH=$(SRC_PATH) python checker.py
SRC_PATH=$(SRC_PATH) python3 checker.py
lint:
-cd $(SRC_PATH)/.. && checkpatch.pl -f src/*.c
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment