diff --git a/src/os_threadpool.c b/src/os_threadpool.c index f75e78ed74232ba7710fc71e551d8478d650c7c0..de33d1696954362f57befac3698c8fcae83679ff 100644 --- a/src/os_threadpool.c +++ b/src/os_threadpool.c @@ -9,6 +9,8 @@ #include "log/log.h" #include "utils.h" +int nr; + /* Create a task that would be executed by a thread. */ os_task_t *create_task(void (*action)(void *), void *arg, void (*destroy_arg)(void *)) { @@ -39,6 +41,9 @@ void enqueue_task(os_threadpool_t *tp, os_task_t *t) assert(t != NULL); /* TODO: Enqueue task to the shared task queue. Use synchronization. */ + pthread_mutex_lock(&(tp->mutex)); + list_add_tail(&(tp->head), &(t->list)); + pthread_mutex_unlock(&(tp->mutex)); } /* @@ -61,7 +66,19 @@ os_task_t *dequeue_task(os_threadpool_t *tp) { os_task_t *t; + while (tp->x == 0) + continue; + /* TODO: Dequeue task from the shared task queue. Use synchronization. */ + if (!queue_is_empty(tp)) { + pthread_mutex_lock(&(tp->mutex)); + os_list_node_t *node = tp->head.next; + + t = list_entry(node, os_task_t, list); + list_del(tp->head.next); + pthread_mutex_unlock(&(tp->mutex)); + return t; + } return NULL; } @@ -80,6 +97,7 @@ static void *thread_loop_function(void *arg) destroy_task(t); } + nr++; return NULL; } @@ -87,6 +105,8 @@ static void *thread_loop_function(void *arg) void wait_for_completion(os_threadpool_t *tp) { /* TODO: Wait for all worker threads. Use synchronization. */ + while (nr != 4) + continue; /* Join all worker threads. */ for (unsigned int i = 0; i < tp->num_threads; i++) @@ -105,6 +125,8 @@ os_threadpool_t *create_threadpool(unsigned int num_threads) list_init(&tp->head); /* TODO: Initialize synchronization data. */ + pthread_mutex_init(&(tp->mutex), NULL); + tp->x = 0; tp->num_threads = num_threads; tp->threads = malloc(num_threads * sizeof(*tp->threads)); @@ -123,6 +145,7 @@ void destroy_threadpool(os_threadpool_t *tp) os_list_node_t *n, *p; /* TODO: Cleanup synchronization data. */ + pthread_mutex_destroy(&(tp->mutex)); list_for_each_safe(n, p, &tp->head) { list_del(n); diff --git a/src/os_threadpool.h b/src/os_threadpool.h index f20ef5b766765876087a3f92b049a7969d338e10..792d05cba1b7e546fbec6809778180efca2479bf 100644 --- a/src/os_threadpool.h +++ b/src/os_threadpool.h @@ -4,6 +4,7 @@ #define __OS_THREADPOOL_H__ 1 #include <pthread.h> +#include <semaphore.h> #include "os_list.h" typedef struct { @@ -27,6 +28,8 @@ typedef struct os_threadpool { os_list_node_t head; /* TODO: Define threapool / queue synchronization data. */ + pthread_mutex_t mutex; + int x; } os_threadpool_t; os_task_t *create_task(void (*f)(void *), void *arg, void (*destroy_arg)(void *)); diff --git a/src/parallel.c b/src/parallel.c index 262b8cf17014d1b27e6fc8ba07e96476ae872f35..57cd6eaf1aff78f7775c4f49980e87b70b9f54a1 100644 --- a/src/parallel.c +++ b/src/parallel.c @@ -17,14 +17,37 @@ static int sum; static os_graph_t *graph; static os_threadpool_t *tp; /* TODO: Define graph synchronization mechanisms. */ +pthread_mutex_t mutex; /* TODO: Define graph task argument. */ +void action(void *idx) +{ + os_node_t *node; + + pthread_mutex_lock(&mutex); + node = graph->nodes[*(int *)idx]; + sum += node->info; + graph->visited[*(int *)idx] = DONE; + + for (unsigned int i = 0; i < node->num_neighbours; i++) + if (graph->visited[node->neighbours[i]] == NOT_VISITED) { + graph->visited[node->neighbours[i]] = PROCESSING; + os_task_t *t = create_task(action, &(node->neighbours[i]), NULL); + + enqueue_task(tp, t); + tp->x = 1; + } + pthread_mutex_unlock(&mutex); + tp->x = 1; +} static void process_node(unsigned int idx) { /* TODO: Implement thread-pool based processing of graph. */ + action(&idx); } + int main(int argc, char *argv[]) { FILE *input_file; @@ -40,6 +63,8 @@ int main(int argc, char *argv[]) graph = create_graph_from_file(input_file); /* TODO: Initialize graph synchronization mechanisms. */ + pthread_mutex_init(&mutex, NULL); + tp = create_threadpool(NUM_THREADS); process_node(0); wait_for_completion(tp);