diff --git a/src/os_threadpool.c b/src/os_threadpool.c index f75e78ed74232ba7710fc71e551d8478d650c7c0..da1d9810f0c0f37bfec69a27ff994a48a2343abe 100644 --- a/src/os_threadpool.c +++ b/src/os_threadpool.c @@ -17,9 +17,9 @@ os_task_t *create_task(void (*action)(void *), void *arg, void (*destroy_arg)(vo t = malloc(sizeof(*t)); DIE(t == NULL, "malloc"); - t->action = action; // the function - t->argument = arg; // arguments for the function - t->destroy_arg = destroy_arg; // destroy argument function + t->action = action; // the function + t->argument = arg; // arguments for the function + t->destroy_arg = destroy_arg; // destroy argument function return t; } @@ -38,7 +38,10 @@ void enqueue_task(os_threadpool_t *tp, os_task_t *t) assert(tp != NULL); assert(t != NULL); - /* TODO: Enqueue task to the shared task queue. Use synchronization. */ + pthread_mutex_lock(&tp->queue_mutex); + list_add_tail(&tp->head, &t->list); + pthread_cond_signal(&tp->queue_cond); + pthread_mutex_unlock(&tp->queue_mutex); } /* @@ -59,16 +62,24 @@ static int queue_is_empty(os_threadpool_t *tp) os_task_t *dequeue_task(os_threadpool_t *tp) { - os_task_t *t; + os_task_t *task = NULL; - /* TODO: Dequeue task from the shared task queue. Use synchronization. */ - return NULL; + pthread_mutex_lock(&tp->queue_mutex); + + if (!queue_is_empty(tp)) { + task = list_entry(tp->head.next, os_task_t, list); + list_del(tp->head.next); + } + + pthread_mutex_unlock(&tp->queue_mutex); + + return task; } /* Loop function for threads */ static void *thread_loop_function(void *arg) { - os_threadpool_t *tp = (os_threadpool_t *) arg; + os_threadpool_t *tp = (os_threadpool_t *)arg; while (1) { os_task_t *t; @@ -76,7 +87,8 @@ static void *thread_loop_function(void *arg) t = dequeue_task(tp); if (t == NULL) break; - t->action(t->argument); + dataArgs *args = (dataArgs *)t->argument; + t->action(args->nodeIdx); destroy_task(t); } @@ -86,11 +98,16 @@ static void *thread_loop_function(void *arg) /* Wait completion of all threads. This is to be called by the main thread. */ void wait_for_completion(os_threadpool_t *tp) { - /* TODO: Wait for all worker threads. Use synchronization. */ + pthread_mutex_lock(&tp->queue_mutex); - /* Join all worker threads. */ - for (unsigned int i = 0; i < tp->num_threads; i++) + pthread_cond_broadcast(&tp->queue_cond); + + pthread_mutex_unlock(&tp->queue_mutex); + + // Join all worker threads + for (unsigned int i = 0; i < tp->num_threads; i++) { pthread_join(tp->threads[i], NULL); + } } /* Create a new threadpool. */ @@ -104,13 +121,13 @@ os_threadpool_t *create_threadpool(unsigned int num_threads) list_init(&tp->head); - /* TODO: Initialize synchronization data. */ - + pthread_mutex_init(&tp->queue_mutex, NULL); + pthread_cond_init(&tp->queue_cond, NULL); tp->num_threads = num_threads; tp->threads = malloc(num_threads * sizeof(*tp->threads)); DIE(tp->threads == NULL, "malloc"); for (unsigned int i = 0; i < num_threads; ++i) { - rc = pthread_create(&tp->threads[i], NULL, &thread_loop_function, (void *) tp); + rc = pthread_create(&tp->threads[i], NULL, &thread_loop_function, (void *)tp); DIE(rc < 0, "pthread_create"); } @@ -122,8 +139,7 @@ void destroy_threadpool(os_threadpool_t *tp) { os_list_node_t *n, *p; - /* TODO: Cleanup synchronization data. */ - + pthread_mutex_destroy(&tp->queue_mutex); list_for_each_safe(n, p, &tp->head) { list_del(n); destroy_task(list_entry(n, os_task_t, list)); diff --git a/src/os_threadpool.h b/src/os_threadpool.h index f20ef5b766765876087a3f92b049a7969d338e10..e6c6733da3d648fb534472ca8a9930bea566eee5 100644 --- a/src/os_threadpool.h +++ b/src/os_threadpool.h @@ -13,6 +13,7 @@ typedef struct { os_list_node_t list; } os_task_t; + typedef struct os_threadpool { unsigned int num_threads; pthread_t *threads; @@ -27,8 +28,16 @@ typedef struct os_threadpool { os_list_node_t head; /* TODO: Define threapool / queue synchronization data. */ + pthread_mutex_t queue_mutex; // Mutex for protecting the task queue + pthread_cond_t queue_cond; // Condition variable for task availability + } os_threadpool_t; +typedef struct dataArgs +{ + unsigned int nodeIdx; +} dataArgs; + os_task_t *create_task(void (*f)(void *), void *arg, void (*destroy_arg)(void *)); void destroy_task(os_task_t *t); diff --git a/src/parallel.c b/src/parallel.c index 262b8cf17014d1b27e6fc8ba07e96476ae872f35..4a6d89b9aeadb4aa376a2b6aea297ac263904e3b 100644 --- a/src/parallel.c +++ b/src/parallel.c @@ -11,18 +11,52 @@ #include "log/log.h" #include "utils.h" -#define NUM_THREADS 4 +#define NUM_THREADS 4 static int sum; static os_graph_t *graph; static os_threadpool_t *tp; /* TODO: Define graph synchronization mechanisms. */ +pthread_mutex_t mutex; +pthread_mutex_t mutex2; +pthread_mutex_t mutex3; + /* TODO: Define graph task argument. */ +typedef struct { + os_graph_t *graph; + unsigned int node_idx; +} graph_task_t; static void process_node(unsigned int idx) { - /* TODO: Implement thread-pool based processing of graph. */ + os_node_t *node = graph->nodes[idx]; + + pthread_mutex_lock(&mutex); + if (graph->visited[idx] != DONE) { + sum += node->info; + graph->visited[idx] = DONE; + } + pthread_mutex_unlock(&mutex); + + for (unsigned int i = 0; i < node->num_neighbours; i++) { + pthread_mutex_lock(&mutex); + if (graph->visited[node->neighbours[i]] == NOT_VISITED) { + struct dataArgs data; + + data.nodeIdx = node->neighbours[idx]; + struct dataArgs *data_ptr = malloc(sizeof(struct dataArgs)); + + *data_ptr = data; + graph->visited[node->neighbours[i]] = PROCESSING; + os_task_t *task = create_task((void *)process_node, data_ptr, free); + + enqueue_task(tp, task); + pthread_mutex_unlock(&mutex); + break; + } + pthread_mutex_unlock(&mutex); + } } int main(int argc, char *argv[]) @@ -39,9 +73,13 @@ int main(int argc, char *argv[]) graph = create_graph_from_file(input_file); - /* TODO: Initialize graph synchronization mechanisms. */ + pthread_mutex_init(&mutex, NULL); tp = create_threadpool(NUM_THREADS); - process_node(0); + for (int i = 0; i < graph->num_nodes; i++) { + if (graph->visited[i] != DONE) + process_node(i); + } + wait_for_completion(tp); destroy_threadpool(tp); diff --git a/tests/Makefile b/tests/Makefile index fc1b1b7c454a659f44a8199c28b3a0ab70c1e1b3..a53316562a845c1a6a22e125ebd99960bdb4f786 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -10,7 +10,7 @@ src: check: clean make -i SRC_PATH=$(SRC_PATH) - SRC_PATH=$(SRC_PATH) python checker.py + SRC_PATH=$(SRC_PATH) python3 checker.py lint: -cd $(SRC_PATH)/.. && checkpatch.pl -f src/*.c