Skip to content
Snippets Groups Projects
Commit 4202dd8b authored by Razvan Deaconescu's avatar Razvan Deaconescu
Browse files

Update / Improve implementation


Update skeleton and README to new assignment implementation: redesign of
thread pool, the use of generic lists.

Signed-off-by: default avatarRazvan Deaconescu <razvan.deaconescu@upb.ro>
parent 30d7f21a
No related branches found
No related tags found
No related merge requests found
# Parallel Graph # Parallel Graph
For this assignment we will implement a generic thread pool, which we will then use to traverse a graph and compute the sum of the elements contained by the nodes. ## Objectives
- Learn how to design and implement parallel programs
- Gain skills in using synchronization primitives for parallel programs
- Get a better understanding of the POSIX threading and synchronization API
- Gain insight on the differences between serial and parallel programs
## Statement
Implement a generic thread pool, then use it to traverse a graph and compute the sum of the elements contained by the nodes.
You will be provided with a serial implementation of the graph traversal and with most of the data structures needed to implement the thread pool. You will be provided with a serial implementation of the graph traversal and with most of the data structures needed to implement the thread pool.
Your job is to write the thread pool routines and then use the thread pool to traverse the graph. Your job is to write the thread pool routines and then use the thread pool to traverse the graph.
## Thread Pool Description ## Support Code
The support code consists of the directories:
- `src/` is the skeleton parallel graph implementation.
You will have to implement missing parts marked as `TODO` items.
- `utils/` utility files (used for debugging & logging)
- `tests/` are tests used to validate (and grade) the assignment.
## Implementation
### Thread Pool Description
A thread pool contains a given number of active threads that simply wait to be given specific tasks. A thread pool contains a given number of active threads that simply wait to be given specific tasks.
The threads are created when the thread pool is created they poll a task queue until a task is available. The threads are created when the thread pool is created.
Once a tasks are put in the task queue, the threads start running the task. Each thread continuously polls the task queue for available tasks.
A thread pool creates N threads when the thread pool is created and does not destroy (join) them through out the life time of the thread pool. Once tasks are put in the task queue, the threads poll tasks, and start running them.
That way, the penalty of creating and destroying threads ad hoc is avoided. A thread pool creates **N** threads upon its creation and does not destroy (join) them throughout its lifetime.
As such, you must implement the following functions (marked with `TODO` in the provided skeleton): That way, the penalty of creating and destroying threads ad-hoc is avoided.
As such, you must implement the following functions (marked with `TODO` in the provided skeleton, in `src/os_threadpool.c`):
- `task_create`: creates an `os_task_t` that will be put in the task queue - a task consists of a function pointer and an argument.
- `add_task_in_queue`: adds a given task in the thread pool's task queue. - `enqueue_task()`: Enqueue task to the shared task queue.
- `get_task`: get a task from the thread pool's task queue. Use synchronization.
- `threadpool_create`: allocate and initialize a new thread pool. - `dequeue_task()`: Dequeue task from the shared task queue.
- `thread_loop_function`: all the threads in the thread pool will execute this function - they all wait until a task is available in the task queue; once they grab a task they simply invoke the function that was provided to `task_create`. Use synchronization.
- `threadpool_stop`: stop all the threads from execution. - `wait_for_completion()`: Wait for all worker threads.
Use synchronization.
Notice that the thread pool is completely independent from any given application. - `create_threadpool()`: Create a new thread pool.
- `destroy_threadpool()`: Destroy a thread pool.
Assume all threads have been joined.
You must also update the `os_threadpool_t` structure in `src/os_threadpool.h` with the required bits for synchronizing the parallel implementation.
Notice that the thread pool is completely independent of any given application.
Any function can be registered in the task queue. Any function can be registered in the task queue.
## Graph Traversal Since the threads are polling the task queue indefinitely, you need to define a condition for them to stop once the graph has been traversed completely.
That is, the condition used by the `wait_for_completion()` function.
The recommended way is to note when no threads have any more work to do.
Since no thread is doing any work, no other task will be created.
Once you have implemented the thread pool, you need to test it by using it for computing the sum of all the nodes of a graph. ### Graph Traversal
A serial implementation for this algorithm is provided in `skep/serial.c`
Once you have implemented the thread pool, you need to test it by doing a parallel traversal of all connected nodes in a graph.
A serial implementation for this algorithm is provided in `src/serial.c`.
To make use of the thread pool, you will need to create tasks that will be put in the task queue. To make use of the thread pool, you will need to create tasks that will be put in the task queue.
A task consists of 2 steps: A task consists of 2 steps:
- adding the current node value to the overall sum. 1. Add the current node value to the overall sum.
- creating tasks and adding them to the task queue for the neighbouring nodes. 1. Create tasks and add them to the task queue for the neighbouring nodes.
Since the threads are polling the task queue indefinitely, you need to find a condition for the threads to stop once the graph has been traversed completely. Implement this in the `src/parallel.c` (see the `TODO` items).
This condition should be implemented in a function that is passed to `threadpool_stop`. You must implement the parallel and synchronized version of the `process_node()` function, also used in the serial implementation.
`threadpool_stop` then needs to wait for the condition to be satisfied and then joins all the threads.
## Synchronization ### Synchronization
For synchronization you can use mutexes, semaphores, spinlocks, condition variables - anything that grinds your gear. For synchronization you can use mutexes, semaphores, spinlocks, condition variables - anything that grinds your gear.
However, you are not allowed to use hacks such as `sleep`, `printf` synchronization or adding superfluous computation. However, you are not allowed to use hacks such as `sleep()`, `printf()` synchronization or adding superfluous computation.
## Input Files ### Input Files
Reading the graphs from the input files is being taken care of the functions implemented in `src/os_graph.c`. Reading the graphs from the input files is being taken care of the functions implemented in `src/os_graph.c`.
A graph is represented in input files as follows: A graph is represented in input files as follows:
- first line contains 2 integers N and M: N - number of nodes, M - numbed or edges - First line contains 2 integers `N` and `M`: `N` - number of nodes, `M` - numbed or edges
- second line contains N integer numbers - the values of the nodes - Second line contains `N` integer numbers - the values of the nodes.
- the next M lines contain each 2 integers that represent the source and the destination of an edge - The next `M` lines contain each 2 integers that represent the source and the destination of an edge.
## Data Structures ### Data Structures
### Graph #### Graph
A graph is represented internally as an `os_graph_t` (see `src/os_graph.h`). A graph is represented internally by the `os_graph_t` structure (see `src/os_graph.h`).
### List #### List
A list is represented internally as an `os_queue_t` (see `src/os_list.h`). A list is represented internally by the `os_queue_t` structure (see `src/os_list.h`).
You will use this list to implement the task queue. You will use this list to implement the task queue.
### Thread pool #### Thread Pool
A thread pool is represented internally as an `os_threadpool_t` (see `src/os_threadpool.h`) A thread pool is represented internally by the `os_threadpool_t` structure (see `src/os_threadpool.h`).
The thread pool contains information about the task queue and the threads. The thread pool contains information about the task queue and the threads.
You are not allowed to modify these data structures. ### Requirements
However, you can create other data structures that leverage these ones.
Your implementation needs to be contained in the `src/os_threadpool.c`, `src/os_threadpool.h` and `src/parallel.c` files.
Any other files that you are using will not be taken into account.
Any modifications that you are doing to the other files in the `src/` directory will not be taken into account.
## Infrastructure ## Operations
### Compilation ### Building
To compile both the serial and the parallel version, enter the `src/` directory and run: To build both the serial and the parallel versions, run `make` in the `src/` directory:
```console ```console
make student@so:~/.../content/assignments/parallel-graph$ cd src/
student@so:~/.../assignments/parallel-graph/src$ make
``` ```
That will create the `serial` and `parallel` binaries/ That will create the `serial` and `parallel` binaries.
### Testing ## Testing and Grading
Input tests cases are located in `tests/in/`. Testing is automated.
The parallel and the serial version should provide the same results for the same input test case. Tests are located in the `tests/` directory.
```console
student@so:~/.../assignments/parallel-graph/tests$ ls -F
Makefile checker.py grade.sh@ in/
```
If you want manually run a single test, use commands such as below while in the `src/` directory: To test and grade your assignment solution, enter the `tests/` directory and run `grade.sh`.
Note that this requires linters being available.
The easiest is to use a Docker-based setup with everything installed and configured.
When using `grade.sh` you will get grades for checking correctness (maximum `90` points) and for coding style (maxim `10` points).
A successful run will provide you an output ending with:
```console ```console
$./parallel ../tests/in/test5.in ### GRADE
-11
$ ./serial ../tests/in/test5.in
-11 Checker: 90/ 90
Style: 10/ 10
Total: 100/100
### STYLE SUMMARY
```
### Running the Checker
To run only the checker, use the `make check` command in the `tests/` directory:
```console
student@so:~/.../assignments/parallel-graph/tests$ make check
[...]
SRC_PATH=../src python checker.py
make[1]: Entering directory '...'
rm -f *~
[...]
TODO
test1.in ....................... failed ... 0.0
test2.in ....................... failed ... 0.0
test3.in ....................... failed ... 0.0
[...]
Total: 0/100
``` ```
### Checker Obviously, all tests will fail, as there is no implementation.
Each test is worth a number of points.
The maximum grade is `90`.
The testing is automated and performed with the `checker.py` script from the `tests/` directory. A successful run will show the output:
It's easiest to use the `Makefile` to run the tests:
```console ```console
$ make check student@so:~/.../assignments/parallel-graph/tests$ make check
[...] [...]
SRC_PATH=../src python checker.py SRC_PATH=../src python checker.py
test1.in ....................... passed ... 4.5 test1.in ....................... passed ... 4.5
...@@ -130,27 +204,35 @@ test20.in ....................... passed ... 4.5 ...@@ -130,27 +204,35 @@ test20.in ....................... passed ... 4.5
Total: 90/100 Total: 90/100
``` ```
It's recommended that you use the [local Docker-based checker](./README.checker.md#local-checker). ### Running the Linters
You would use the command:
To run the linters, use the `make lint` command in the `tests/` directory:
```console ```console
./local.sh checker student@so:~/.../assignments/parallel-graph/tests$ make lint
[...]
cd .. && checkpatch.pl -f checker/*.sh tests/*.sh
[...]
cd .. && cpplint --recursive src/ tests/ checker/
[...]
cd .. && shellcheck checker/*.sh tests/*.sh
``` ```
to run the checker in a Docker-based environment that is identical to the one used for the official assignment evaluation. Note that the linters have to be installed on your system: [`checkpatch.pl`](https://.com/torvalds/linux/blob/master/scripts/checkpatch.pl), [`cpplint`](https://github.com/cpplint/cpplint), [`shellcheck`](https://www.shellcheck.net/).
They also need to have certain configuration options.
It's easiest to run them in a Docker-based setup with everything configured.
## Grading ### Fine-Grained Testing
The grade that the checker outputs is not the final grade. Input tests cases are located in `tests/in/`.
Your homework will be manually inspected and may suffer from penalties ranging from 1 to 100 points depending on the severity of the hack, including, but not limited to: If you want to run a single test, use commands such as below while in the `src/` directory:
- using a single mutex at the beginning of the traversal ```console
- not using the thread pool to solve the homework $./parallel ../tests/in/test5.in
- inefficient usage of synchronization -38
- incorrect graph traversal
## Deployment $ ./serial ../tests/in/test5.in
-38
```
Your implementation needs to be contained in the `src/os_threadpool.c` and `src/os_parallel.c` files. Results provided by the serial and parallel implementation must be the same for the test to successfully pass.
Any other files that you are using will not be taken into account.
Any modifications that you are doing to the other files in the `src/` directory will not be taken into account.
BUILD_DIR := build BUILD_DIR := build
CC := gcc UTILS_PATH ?= ../utils
CFLAGS := -c -Wall -g CPPFLAGS := -I$(UTILS_PATH)
LD := ld CFLAGS := -Wall -Wextra
LDFLAGS := # Remove the line below to disable debugging support.
LDLIBS := -lpthread CFLAGS += -g -O0
PARALLEL_LDLIBS := -lpthread
SERIAL_SRCS := serial.c os_graph.c
PARALLEL_SRCS:= parallel.c os_graph.c os_list.c os_threadpool.c SERIAL_SRCS := serial.c os_graph.c $(UTILS_PATH)/log/log.c
SERIAL_OBJS := $(patsubst $(SRC)/%.c,$(BUILD_DIR)/%.o,$(SERIAL_SRCS)) PARALLEL_SRCS:= parallel.c os_graph.c os_threadpool.c $(UTILS_PATH)/log/log.c
PARALLEL_OBJS := $(patsubst $(SRC)/%.c,$(BUILD_DIR)/%.o,$(PARALLEL_SRCS)) SERIAL_OBJS := $(patsubst %.c,%.o,$(SERIAL_SRCS))
PARALLEL_OBJS := $(patsubst %.c,%.o,$(PARALLEL_SRCS))
.PHONY: all pack clean always .PHONY: all pack clean always
all: serial parallel all: serial parallel
always: serial: $(SERIAL_OBJS)
mkdir -p build $(CC) -o $@ $^
serial: always $(SERIAL_OBJS)
$(CC) $(LDFLAGS) -o serial $(SERIAL_OBJS)
parallel: always $(PARALLEL_OBJS) parallel: $(PARALLEL_OBJS)
$(CC) $(LDFLAGS) -o parallel $(PARALLEL_OBJS) $(LDLIBS) $(CC) -o $@ $^ $(PARALLEL_LDLIBS)
$(BUILD_DIR)/%.o: %.c $(UTILS_PATH)/log/log.o: $(UTILS_PATH)/log/log.c $(UTILS_PATH)/log/log.h
$(CC) $(CFLAGS) -o $@ $< $(CC) $(CPPFLAGS) $(CFLAGS) -c -o $@ $<
pack: clean pack: clean
-rm -f ../src.zip -rm -f ../src.zip
zip -r ../src.zip * zip -r ../src.zip *
clean: clean:
-rm -f ../src.zip -rm -f $(SERIAL_OBJS) $(PARALLEL_OBJS)
-rm -rf build
-rm -f serial parallel -rm -f serial parallel
-rm -f *~
// SPDX-License-Identifier: BSD-3-Clause // SPDX-License-Identifier: BSD-3-Clause
#include "os_graph.h"
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "os_graph.h"
#include "log/log.h"
#include "utils.h"
/* Node functions */ /* Node functions */
os_node_t *os_create_node(unsigned int _nodeID, int _nodeInfo) os_node_t *os_create_node(unsigned int id, int info)
{ {
os_node_t *newNode; os_node_t *node;
newNode = calloc(1, sizeof(os_node_t)); node = malloc(sizeof(*node));
DIE(node == NULL, "mallloc");
newNode->nodeID = _nodeID; node->id = id;
newNode->nodeInfo = _nodeInfo; node->info = info;
node->num_neighbours = 0;
node->neighbours = NULL;
return newNode; return node;
} }
/* Graph functions */ /* Graph functions */
os_graph_t *create_graph_from_data(unsigned int nc, unsigned int ec, os_graph_t *create_graph_from_data(unsigned int num_nodes, unsigned int num_edges,
int *values, os_edge_t *edges) int *values, os_edge_t *edges)
{ {
int i, isrc, idst;
os_graph_t *graph; os_graph_t *graph;
graph = calloc(1, sizeof(os_graph_t)); graph = malloc(sizeof(*graph));
DIE(graph == NULL, "mallloc");
graph->nCount = nc; graph->num_nodes = num_nodes;
graph->eCount = ec; graph->num_edges = num_edges;
graph->nodes = calloc(nc, sizeof(os_node_t *)); graph->nodes = malloc(num_nodes * sizeof(os_node_t *));
DIE(graph->nodes == NULL, "malloc");
for (i = 0; i < nc; ++i) { for (unsigned int i = 0; i < graph->num_nodes; i++) {
graph->nodes[i] = os_create_node(i, values[i]); graph->nodes[i] = os_create_node(i, values[i]);
graph->nodes[i]->neighbours = calloc(nc, sizeof(unsigned int)); graph->nodes[i]->neighbours = malloc(graph->num_nodes * sizeof(unsigned int));
graph->nodes[i]->cNeighbours = 0; DIE(graph->nodes[i]->neighbours == NULL, "malloc");
graph->nodes[i]->num_neighbours = 0;
} }
for (i = 0; i < ec; ++i) { for (unsigned int i = 0; i < graph->num_edges; i++) {
isrc = edges[i].src; idst = edges[i].dst; unsigned int isrc, idst;
graph->nodes[isrc]->neighbours[graph->nodes[isrc]->cNeighbours++] = idst;
graph->nodes[idst]->neighbours[graph->nodes[idst]->cNeighbours++] = isrc; isrc = edges[i].src;
idst = edges[i].dst;
graph->nodes[isrc]->neighbours[graph->nodes[isrc]->num_neighbours++] = idst;
graph->nodes[idst]->neighbours[graph->nodes[idst]->num_neighbours++] = isrc;
} }
graph->visited = calloc(graph->nCount, sizeof(unsigned int)); graph->visited = malloc(graph->num_nodes * sizeof(*graph->visited));
DIE(graph->visited == NULL, "malloc");
for (unsigned int i = 0; i < graph->num_nodes; i++)
graph->visited[i] = NOT_VISITED;
return graph; return graph;
} }
os_graph_t *create_graph_from_file(FILE *file) os_graph_t *create_graph_from_file(FILE *file)
{ {
unsigned int nCount, eCount; unsigned int num_nodes, num_edges;
int i; unsigned int i;
int *values; int *nodes;
os_edge_t *edges; os_edge_t *edges;
os_graph_t *graph = NULL; os_graph_t *graph = NULL;
if (fscanf(file, "%d %d", &nCount, &eCount) == 0) { if (fscanf(file, "%d %d", &num_nodes, &num_edges) == 0) {
fprintf(stderr, "[ERROR] Can't read from file\n"); log_error("Can't read from file");
goto out; goto out;
} }
values = malloc(nCount * sizeof(int)); nodes = malloc(num_nodes * sizeof(int));
for (i = 0; i < nCount; ++i) { DIE(nodes == NULL, "malloc");
if (fscanf(file, "%d", &values[i]) == 0) { for (i = 0; i < num_nodes; i++) {
fprintf(stderr, "[ERROR] Can't read from file\n"); if (fscanf(file, "%d", &nodes[i]) == 0) {
goto free_values; log_error("Can't read from file");
goto free_nodes;
} }
} }
edges = malloc(eCount * sizeof(os_edge_t)); edges = malloc(num_edges * sizeof(os_edge_t));
for (i = 0; i < eCount; ++i) { DIE(edges == NULL, "malloc");
for (i = 0; i < num_edges; ++i) {
if (fscanf(file, "%d %d", &edges[i].src, &edges[i].dst) == 0) { if (fscanf(file, "%d %d", &edges[i].src, &edges[i].dst) == 0) {
fprintf(stderr, "[ERROR] Can't read from file\n"); log_error("Can't read from file");
goto free_edges; goto free_edges;
} }
} }
graph = create_graph_from_data(nCount, eCount, values, edges); graph = create_graph_from_data(num_nodes, num_edges, nodes, edges);
free_edges: free_edges:
free(edges); free(edges);
free_values: free_nodes:
free(values); free(nodes);
out: out:
return graph; return graph;
} }
void printGraph(os_graph_t *graph) void print_graph(os_graph_t *graph)
{ {
int i, j; for (unsigned int i = 0; i < graph->num_nodes; i++) {
for (i = 0; i < graph->nCount; ++i) {
printf("[%d]: ", i); printf("[%d]: ", i);
for (j = 0; j < graph->nodes[i]->cNeighbours; ++j) for (unsigned int j = 0; j < graph->nodes[i]->num_neighbours; j++)
printf("%d ", graph->nodes[i]->neighbours[j]); printf("%d ", graph->nodes[i]->neighbours[j]);
printf("\n"); printf("\n");
} }
......
...@@ -6,29 +6,33 @@ ...@@ -6,29 +6,33 @@
#include <stdio.h> #include <stdio.h>
typedef struct os_node_t { typedef struct os_node_t {
unsigned int nodeID; unsigned int id;
signed int nodeInfo; int info;
unsigned int cNeighbours; // Neighbours count unsigned int num_neighbours;
unsigned int *neighbours; unsigned int *neighbours;
} os_node_t; } os_node_t;
typedef struct os_graph_t { typedef struct os_graph_t {
unsigned int nCount; // Nodes count unsigned int num_nodes;
unsigned int eCount; // Edges count unsigned int num_edges;
os_node_t **nodes; os_node_t **nodes;
unsigned int *visited; enum {
NOT_VISITED = 0,
PROCESSING = 1,
DONE = 2
} *visited;
} os_graph_t; } os_graph_t;
typedef struct os_edge_t { typedef struct os_edge_t {
int src, dst; unsigned int src, dst;
} os_edge_t; } os_edge_t;
os_node_t *os_create_node(unsigned int _nodeID, int _nodeInfo); os_node_t *os_create_node(unsigned int id, int info);
os_graph_t *create_graph_from_data(unsigned int nc, unsigned int ec, os_graph_t *create_graph_from_data(unsigned int num_nodes, unsigned int num_edges,
int *values, os_edge_t *edges); int *values, os_edge_t *edges);
os_graph_t *create_graph_from_file(FILE *file); os_graph_t *create_graph_from_file(FILE *file);
void printGraph(os_graph_t *graph); void print_graph(os_graph_t *graph);
#endif #endif
...@@ -7,39 +7,39 @@ ...@@ -7,39 +7,39 @@
os_queue_t *queue_create(void) os_queue_t *queue_create(void)
{ {
os_queue_t *newQueue; os_queue_t *queue;
newQueue = malloc(sizeof(os_queue_t)); queue = malloc(sizeof(*queue));
if (newQueue == NULL) { if (queue == NULL) {
perror("malloc"); perror("malloc");
return NULL; return NULL;
} }
pthread_mutex_init(&newQueue->lock, NULL); pthread_mutex_init(&queue->lock, NULL);
newQueue->first = NULL; queue->first = NULL;
newQueue->last = NULL; queue->last = NULL;
return newQueue; return queue;
} }
void queue_add(os_queue_t *queue, void *info) void queue_add(os_queue_t *queue, void *info)
{ {
os_list_node_t *newNode; os_list_node_t *node;
newNode = malloc(sizeof(os_list_node_t)); node = malloc(sizeof(*node));
if (newNode == NULL) { if (node == NULL) {
fprintf(stderr, "[ERROR] %s: Not enough memory\n", __func__); fprintf(stderr, "[ERROR] %s: Not enough memory\n", __func__);
return; return;
} }
newNode->next = NULL; node->next = NULL;
newNode->info = info; node->info = info;
if (queue->last == NULL && queue->first == NULL) { if (queue->last == NULL && queue->first == NULL) {
queue->first = newNode; queue->first = node;
queue->last = newNode; queue->last = node;
} else { } else {
queue->last->next = newNode; queue->last->next = node;
queue->last = newNode; queue->last = node;
} }
} }
......
/* SPDX-License-Identifier: BSD-3-Clause */ /* SPDX-License-Identifier: BSD-3-Clause */
/*
* Heavily inspired for Linux kernel code:
* https://github.com/torvalds/linux/blob/master/include/linux/list.h
*/
#ifndef __OS_LIST_H__ #ifndef __OS_LIST_H__
#define __OS_LIST_H__ 1 #define __OS_LIST_H__ 1
#include <pthread.h> #include <stddef.h>
typedef struct os_list_node_t { typedef struct os_list_node_t {
void *info; struct os_list_node_t *prev, *next;
struct os_list_node_t *next;
} os_list_node_t; } os_list_node_t;
typedef struct { static inline void list_init(os_list_node_t *head)
struct os_list_node_t *first; {
struct os_list_node_t *last; head->prev = head;
pthread_mutex_t lock; head->next = head;
} os_queue_t; }
static inline void list_add(os_list_node_t *head, os_list_node_t *node)
{
node->next = head->next;
node->prev = head;
head->next->prev = node;
head->next = node;
}
static inline void list_add_tail(os_list_node_t *head, os_list_node_t *node)
{
node->prev = head->prev;
node->next = head;
head->prev->next = node;
head->prev = node;
}
static inline void list_del(os_list_node_t *node)
{
node->prev->next = node->next;
node->next->prev = node->prev;
node->next = node;
node->prev = node;
}
static inline int list_empty(os_list_node_t *head)
{
return (head->next == head);
}
#define list_entry(ptr, type, member) ({ \
void *tmp = (void *)(ptr); \
(type *) (tmp - offsetof(type, member)); \
})
#define list_for_each(pos, head) \
for (pos = (head)->next; pos != (head); pos = pos->next)
os_queue_t *queue_create(void); #define list_for_each_safe(pos, tmp, head) \
void queue_add(os_queue_t *queue, void *info); for (pos = (head)->next, tmp = pos->next; pos != (head); \
os_list_node_t *queue_get(os_queue_t *queue); pos = tmp, tmp = pos->next)
#endif #endif
// SPDX-License-Identifier: BSD-3-Clause // SPDX-License-Identifier: BSD-3-Clause
#include "os_threadpool.h"
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <assert.h>
#include <unistd.h> #include <unistd.h>
/* Create a task that thread must execute */ #include "os_threadpool.h"
os_task_t *task_create(void *arg, void (*f)(void *)) #include "log/log.h"
#include "utils.h"
/* Create a task that would be executed by a thread. */
os_task_t *create_task(void (*action)(void *), void *arg, void (*destroy_arg)(void *))
{ {
/* TODO: Implement task creation. */ os_task_t *t;
return NULL;
t = malloc(sizeof(*t));
DIE(t == NULL, "malloc");
t->action = action; // the function
t->argument = arg; // arguments for the function
t->destroy_arg = destroy_arg; // destroy argument function
return t;
} }
/* Add a new task to threadpool task queue */ /* Destroy task. */
void add_task_in_queue(os_threadpool_t *tp, os_task_t *t) void destroy_task(os_task_t *t)
{ {
/* TODO: Implement adding new task in queue. */ if (t->destroy_arg != NULL)
t->destroy_arg(t->argument);
free(t);
} }
/* Get the head of task queue from threadpool */ /* Put a new task to threadpool task queue. */
os_task_t *get_task(os_threadpool_t *tp) void enqueue_task(os_threadpool_t *tp, os_task_t *t)
{ {
/* TODO: Implement getting head of task queue. */ assert(tp != NULL);
return NULL; assert(t != NULL);
/* TODO: Enqueue task to the shared task queue. Use synchronization. */
}
/*
* Check if queue is empty.
* This function should be called in a synchronized manner.
*/
static int queue_is_empty(os_threadpool_t *tp)
{
return list_empty(&tp->head);
} }
/* Initialize the new threadpool */ /*
os_threadpool_t *threadpool_create(unsigned int nTasks, unsigned int nThreads) * Get a task from threadpool task queue.
* Block if no task is available.
* Return NULL if work is complete, i.e. no task will become available,
* i.e. all threads are going to block.
*/
os_task_t *dequeue_task(os_threadpool_t *tp)
{ {
/* TODO: Implement thread pool creation. */ os_task_t *t;
/* TODO: Dequeue task from the shared task queue. Use synchronization. */
return NULL; return NULL;
} }
/* Loop function for threads */ /* Loop function for threads */
void *thread_loop_function(void *args) static void *thread_loop_function(void *arg)
{ {
/* TODO: Implement thread loop function. */ os_threadpool_t *tp = (os_threadpool_t *) arg;
while (1) {
os_task_t *t;
t = dequeue_task(tp);
if (t == NULL)
break;
t->action(t->argument);
destroy_task(t);
}
return NULL; return NULL;
} }
void threadpool_stop(os_threadpool_t *tp, int (*processingIsDone)(os_threadpool_t *)) /* Wait completion of all threads. This is to be called by the main thread. */
void wait_for_completion(os_threadpool_t *tp)
{
/* TODO: Wait for all worker threads. Use synchronization. */
/* Join all worker threads. */
for (unsigned int i = 0; i < tp->num_threads; i++)
pthread_join(tp->threads[i], NULL);
}
/* Create a new threadpool. */
os_threadpool_t *create_threadpool(unsigned int num_threads)
{
os_threadpool_t *tp = NULL;
int rc;
tp = malloc(sizeof(*tp));
DIE(tp == NULL, "malloc");
list_init(&tp->head);
/* TODO: Initialize synchronization data. */
tp->num_threads = num_threads;
tp->threads = malloc(num_threads * sizeof(*tp->threads));
DIE(tp->threads == NULL, "malloc");
for (unsigned int i = 0; i < num_threads; ++i) {
rc = pthread_create(&tp->threads[i], NULL, &thread_loop_function, (void *) tp);
DIE(rc < 0, "pthread_create");
}
return tp;
}
/* Destroy a threadpool. Assume all threads have been joined. */
void destroy_threadpool(os_threadpool_t *tp)
{ {
/* TODO: Implement thread pool stop. */ os_list_node_t *n, *p;
/* TODO: Cleanup synchronization data. */
list_for_each_safe(n, p, &tp->head) {
list_del(n);
destroy_task(list_entry(n, os_task_t, list));
}
free(tp->threads);
free(tp);
} }
/* SPDX-License-Identifier: BSD-3-Clause */ /* SPDX-License-Identifier: BSD-3-Clause */
#ifndef __SO_THREADPOOL_H__ #ifndef __OS_THREADPOOL_H__
#define __SO_THREADPOOL_H__ 1 #define __OS_THREADPOOL_H__ 1
#include <pthread.h> #include <pthread.h>
#include "os_list.h"
typedef struct { typedef struct {
void *argument; void *argument;
void (*task)(void *); void (*action)(void *arg);
void (*destroy_arg)(void *arg);
os_list_node_t list;
} os_task_t; } os_task_t;
typedef struct _node { typedef struct os_threadpool {
os_task_t *task;
struct _node *next;
} os_task_queue_t;
typedef struct {
unsigned int should_stop;
unsigned int num_threads; unsigned int num_threads;
pthread_t *threads; pthread_t *threads;
os_task_queue_t *tasks; /*
pthread_mutex_t taskLock; * Head of queue used to store tasks.
* First item is head.next, if head.next != head (i.e. if queue
* is not empty).
* Last item is head.prev, if head.prev != head (i.e. if queue
* is not empty).
*/
os_list_node_t head;
/* TODO: Define threapool / queue synchronization data. */
} os_threadpool_t; } os_threadpool_t;
os_task_t *task_create(void *arg, void (*f)(void *)); os_task_t *create_task(void (*f)(void *), void *arg, void (*destroy_arg)(void *));
void add_task_in_queue(os_threadpool_t *tp, os_task_t *t); void destroy_task(os_task_t *t);
os_task_t *get_task(os_threadpool_t *tp);
os_threadpool_t *threadpool_create(unsigned int nTasks, unsigned int nThreads); os_threadpool_t *create_threadpool(unsigned int num_threads);
void *thread_loop_function(void *args); void destroy_threadpool(os_threadpool_t *tp);
void threadpool_stop(os_threadpool_t *tp, int (*processingIsDone)(os_threadpool_t *));
void enqueue_task(os_threadpool_t *q, os_task_t *t);
os_task_t *dequeue_task(os_threadpool_t *tp);
void wait_for_completion(os_threadpool_t *tp);
#endif #endif
...@@ -8,13 +8,22 @@ ...@@ -8,13 +8,22 @@
#include "os_graph.h" #include "os_graph.h"
#include "os_threadpool.h" #include "os_threadpool.h"
#include "os_list.h" #include "log/log.h"
#include "utils.h"
#define MAX_TASK 100 #define NUM_THREADS 4
#define MAX_THREAD 4
static int sum; static int sum;
static os_graph_t *graph; static os_graph_t *graph;
static os_threadpool_t *tp;
/* TODO: Define graph synchronization mechanisms. */
/* TODO: Define graph task argument. */
static void process_node(unsigned int idx)
{
/* TODO: Implement thread-pool based processing of graph. */
}
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
...@@ -26,18 +35,15 @@ int main(int argc, char *argv[]) ...@@ -26,18 +35,15 @@ int main(int argc, char *argv[])
} }
input_file = fopen(argv[1], "r"); input_file = fopen(argv[1], "r");
if (input_file == NULL) { DIE(input_file == NULL, "fopen");
perror("fopen");
exit(EXIT_FAILURE);
}
graph = create_graph_from_file(input_file); graph = create_graph_from_file(input_file);
if (graph == NULL) {
fprintf(stderr, "[Error] Can't read the graph from file\n");
exit(EXIT_FAILURE);
}
/* TODO: Create thread pool and traverse the graph. */ /* TODO: Initialize graph synchronization mechanisms. */
tp = create_threadpool(NUM_THREADS);
process_node(0);
wait_for_completion(tp);
destroy_threadpool(tp);
printf("%d", sum); printf("%d", sum);
......
...@@ -2,32 +2,25 @@ ...@@ -2,32 +2,25 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "os_graph.h" #include "os_graph.h"
#include "log/log.h"
#include "utils.h"
static int sum; static int sum;
static os_graph_t *graph; static os_graph_t *graph;
static void processNode(unsigned int nodeIdx) static void process_node(unsigned int idx)
{ {
os_node_t *node; os_node_t *node;
node = graph->nodes[nodeIdx]; node = graph->nodes[idx];
sum += node->nodeInfo; sum += node->info;
for (int i = 0; i < node->cNeighbours; i++) graph->visited[idx] = DONE;
if (graph->visited[node->neighbours[i]] == 0) {
graph->visited[node->neighbours[i]] = 1;
processNode(node->neighbours[i]);
}
}
static void traverse_graph(void) for (unsigned int i = 0; i < node->num_neighbours; i++)
{ if (graph->visited[node->neighbours[i]] == NOT_VISITED)
for (int i = 0; i < graph->nCount; i++) { process_node(node->neighbours[i]);
if (graph->visited[i] == 0) {
graph->visited[i] = 1;
processNode(i);
}
}
} }
int main(int argc, char *argv[]) int main(int argc, char *argv[])
...@@ -40,18 +33,12 @@ int main(int argc, char *argv[]) ...@@ -40,18 +33,12 @@ int main(int argc, char *argv[])
} }
input_file = fopen(argv[1], "r"); input_file = fopen(argv[1], "r");
if (input_file == NULL) { DIE(input_file == NULL, "fopen");
perror("fopen");
exit(EXIT_FAILURE);
}
graph = create_graph_from_file(input_file); graph = create_graph_from_file(input_file);
if (graph == NULL) {
fprintf(stderr, "[Error] Can't read the graph from file\n");
exit(EXIT_FAILURE);
}
traverse_graph(); process_node(0);
printf("%d", sum); printf("%d", sum);
return 0; return 0;
......
SRC_PATH ?= ../src SRC_PATH ?= ../src
CC = gcc UTILS_PATH = $(realpath ../utils)
CPPFLAGS = -I../utils
CFLAGS = -fPIC -Wall -Wextra -g
LDFLAGS = -L$(SRC_PATH)
LDLIBS = -losmem
SOURCEDIR = src .PHONY: all src check lint clean
BUILDDIR = bin
SRCS = $(sort $(wildcard $(SOURCEDIR)/*.c))
BINS = $(patsubst $(SOURCEDIR)/%.c, $(BUILDDIR)/%, $(SRCS))
.PHONY: all clean src check lint all: src
all: src $(BUILDDIR) $(BINS)
$(BUILDDIR):
mkdir -p $(BUILDDIR)
$(BUILDDIR)/%: $(SOURCEDIR)/%.c
$(CC) $(CPPFLAGS) $(CFLAGS) -o $@ $^ $(LDFLAGS) $(LDLIBS)
src: src:
make -C $(SRC_PATH) make -C $(SRC_PATH) UTILS_PATH=$(UTILS_PATH)
check: check: clean
make -C $(SRC_PATH) clean
make clean
make -i SRC_PATH=$(SRC_PATH) make -i SRC_PATH=$(SRC_PATH)
SRC_PATH=$(SRC_PATH) python checker.py SRC_PATH=$(SRC_PATH) python checker.py
lint: lint:
-cd .. && checkpatch.pl -f src/*.c -cd $(SRC_PATH)/.. && checkpatch.pl -f src/*.c
-cd .. && cpplint --recursive src/ -cd $(SRC_PATH)/.. && cpplint --recursive src/
-cd .. && shellcheck checker/*.sh -cd $(SRC_PATH)/.. && shellcheck tests/*.sh
-cd .. && pylint tests/*.py -cd $(SRC_PATH)/.. && pylint tests/*.py
clean: clean:
make -C $(SRC_PATH) clean
-rm -f *~ -rm -f *~
-rm -f $(BINS)
#!/bin/bash
# SPDX-License-Identifier: BSD-3-Clause
# Grade style based on build warnings and linter warnings / errors.
# Points are subtracted from the maximum amount of style points (10).
# - For 15 or more build warnings, all points (10) are subtracted.
# - For [10,15) build warnings, 6 points are subtracted.
# - For [5,10) build warnings, 4 points are subtracted.
# - For [1,5) build warnings, 2 points are subtracted.
# - For 25 ore more linter warnings / errors, all points (10) are subtracted.
# - For [15,25) linter warnings / errors, 6 points are subtracted.
# - For [7,15) linter warnings / errors, 4 points are subtracted.
# - For [1,7) linter warnings / errors, 2 points are subtracted.
# Final style points are between 0 and 10. Results cannot be negative.
#
# Result (grade) is stored in style_grade.out file.
# Collect summary in style_summary.out file.
function grade_style()
{
compiler_warn=$(< checker.out grep -v 'unused parameter' | grep -v 'unused variable' | \
grep -v "discards 'const'" | grep -c '[0-9]\+:[0-9]\+: warning:')
compiler_down=0
if test "$compiler_warn" -ge 15; then
compiler_down=10
elif test "$compiler_warn" -ge 10; then
compiler_down=6
elif test "$compiler_warn" -ge 5; then
compiler_down=4
elif test "$compiler_warn" -ge 1; then
compiler_down=2
fi
cpplint=$(< linter.out grep "Total errors found:" | rev | cut -d ' ' -f 1 | rev)
checkpatch_err=$(< linter.out grep 'total: [0-9]* errors' | grep -o '[0-9]* errors,' | \
cut -d ' ' -f 1 | paste -s -d '+' | bc)
checkpatch_warn=$(< linter.out grep 'total: [0-9]* errors' | grep -o '[0-9]* warnings,' | \
cut -d ' ' -f 1 | paste -s -d '+' | bc)
if test -z "$checkpatch_err"; then
checkpatch_err=0
fi
if test -z "$checkpatch_warn"; then
checkpatch_warn=0
fi
checkpatch=$((checkpatch_err + checkpatch_warn))
checker_all=$((cpplint + checkpatch))
checker_down=0
if test "$checker_all" -ge 25; then
checker_down=10
elif test "$checker_all" -ge 15; then
checker_down=6
elif test "$checker_all" -ge 7; then
checker_down=4
elif test "$checker_all" -ge 1; then
checker_down=2
fi
full_down=$((compiler_down + checker_down))
if test "$full_down" -gt 10; then
full_down=10
fi
style_grade=$((10 - full_down))
echo "$style_grade" > style_grade.out
{
< linter.out grep -v 'unused parameter' | grep -v 'unused variable' | grep -v "discards 'const'" | \
grep '[0-9]\+:[0-9]\+: warning:'
< linter.out grep "Total errors found: [1-9]"
< linter.out grep 'total: [1-9]* errors'
< linter.out grep 'total: 0 errors' | grep '[1-9][0-9]* warnings'
} > style_summary.out
}
# Print grades: total, checker and style.
# Style grade is only awarded for assignments that have past 60 points
# of th checker grade.
print_results()
{
checker_grade=$(< checker.out sed -n '/^Checker:/s/^.*[ \t]\+\([0-9\.]\+\)\/.*$/\1/p')
if test "$(echo "$checker_grade > 60" | bc)" -eq 1; then
style_grade=$(cat style_grade.out)
else
style_grade=0
fi
final_grade=$(echo "scale=2; $checker_grade+$style_grade" | bc)
echo -e "\n\n### GRADE\n\n"
printf "Checker: %58s/ 90\n" "$checker_grade"
printf "Style: %60s/ 10\n" "$style_grade"
printf "Total: %60s/100\n" "$final_grade"
echo -e "\n\n### STYLE SUMMARY\n\n"
cat style_summary.out
}
run_interactive()
{
echo -e "\n\n### CHECKER\n\n"
stdbuf -oL make check 2>&1 | stdbuf -oL sed 's/^Total:/Checker:/g' | tee checker.out
echo -e "\n\n### LINTER\n\n"
stdbuf -oL make lint 2>&1 | tee linter.out
grade_style
print_results
}
run_non_interactive()
{
make check 2>&1 | sed 's/^Total:/Checker:/g' > checker.out
make lint > linter.out 2>&1
grade_style
print_results
echo -e "\n\n### CHECKER\n\n"
cat checker.out
echo -e "\n\n### LINTER\n\n"
cat linter.out
}
# In case of a command line argument disable interactive output.
# That is, do not show output as it generated.
# This is useful to collect all output and present final results at the
# beginning of the script output.
# This is because Moodle limits the output results, and the final results
# would otherwise not show up.
if test $# -eq 0; then
run_interactive
else
run_non_interactive
fi
exclude_files=log\.c
/*
* Copyright (c) 2020 rxi
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
/* Github link: https://github.com/rxi/log.c */
#include "log.h"
#define MAX_CALLBACKS 32
typedef struct {
log_LogFn fn;
void *udata;
int level;
} Callback;
static struct {
void *udata;
log_LockFn lock;
int level;
bool quiet;
Callback callbacks[MAX_CALLBACKS];
} L;
static const char *level_strings[] = {
"TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"
};
#ifdef LOG_USE_COLOR
static const char *level_colors[] = {
"\x1b[94m", "\x1b[36m", "\x1b[32m", "\x1b[33m", "\x1b[31m", "\x1b[35m"
};
#endif
static void stdout_callback(log_Event *ev) {
char buf[16];
buf[strftime(buf, sizeof(buf), "%H:%M:%S", ev->time)] = '\0';
#ifdef LOG_USE_COLOR
fprintf(
ev->udata, "%s %s%-5s\x1b[0m \x1b[90m%s:%d:\x1b[0m ",
buf, level_colors[ev->level], level_strings[ev->level],
ev->file, ev->line);
#else
fprintf(
ev->udata, "%s %-5s %s:%d: ",
buf, level_strings[ev->level], ev->file, ev->line);
#endif
vfprintf(ev->udata, ev->fmt, ev->ap);
fprintf(ev->udata, "\n");
fflush(ev->udata);
}
static void file_callback(log_Event *ev) {
char buf[64];
buf[strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", ev->time)] = '\0';
fprintf(
ev->udata, "%s %-5s %s:%d: ",
buf, level_strings[ev->level], ev->file, ev->line);
vfprintf(ev->udata, ev->fmt, ev->ap);
fprintf(ev->udata, "\n");
fflush(ev->udata);
}
static void lock(void) {
if (L.lock) { L.lock(true, L.udata); }
}
static void unlock(void) {
if (L.lock) { L.lock(false, L.udata); }
}
const char* log_level_string(int level) {
return level_strings[level];
}
void log_set_lock(log_LockFn fn, void *udata) {
L.lock = fn;
L.udata = udata;
}
void log_set_level(int level) {
L.level = level;
}
void log_set_quiet(bool enable) {
L.quiet = enable;
}
int log_add_callback(log_LogFn fn, void *udata, int level) {
for (int i = 0; i < MAX_CALLBACKS; i++) {
if (!L.callbacks[i].fn) {
L.callbacks[i] = (Callback) { fn, udata, level };
return 0;
}
}
return -1;
}
int log_add_fp(FILE *fp, int level) {
return log_add_callback(file_callback, fp, level);
}
static void init_event(log_Event *ev, void *udata) {
if (!ev->time) {
time_t t = time(NULL);
ev->time = localtime(&t);
}
ev->udata = udata;
}
void log_log(int level, const char *file, int line, const char *fmt, ...) {
log_Event ev = {
.fmt = fmt,
.file = file,
.line = line,
.level = level,
};
lock();
if (!L.quiet && level >= L.level) {
init_event(&ev, stderr);
va_start(ev.ap, fmt);
stdout_callback(&ev);
va_end(ev.ap);
}
for (int i = 0; i < MAX_CALLBACKS && L.callbacks[i].fn; i++) {
Callback *cb = &L.callbacks[i];
if (level >= cb->level) {
init_event(&ev, cb->udata);
va_start(ev.ap, fmt);
cb->fn(&ev);
va_end(ev.ap);
}
}
unlock();
}
/**
* Copyright (c) 2020 rxi
*
* This library is free software; you can redistribute it and/or modify it
* under the terms of the MIT license. See `log.c` for details.
*/
/* Github link: https://github.com/rxi/log.c */
#ifndef LOG_H
#define LOG_H
#include <stdio.h>
#include <stdarg.h>
#include <stdbool.h>
#include <time.h>
#ifdef __cplusplus
extern "C" {
#endif
#define LOG_VERSION "0.1.0"
typedef struct {
va_list ap;
const char *fmt;
const char *file;
struct tm *time;
void *udata;
int line;
int level;
} log_Event;
typedef void (*log_LogFn)(log_Event *ev);
typedef void (*log_LockFn)(bool lock, void *udata);
enum { LOG_TRACE, LOG_DEBUG, LOG_INFO, LOG_WARN, LOG_ERROR, LOG_FATAL };
#define log_trace(...) log_log(LOG_TRACE, __FILE__, __LINE__, __VA_ARGS__)
#define log_debug(...) log_log(LOG_DEBUG, __FILE__, __LINE__, __VA_ARGS__)
#define log_info(...) log_log(LOG_INFO, __FILE__, __LINE__, __VA_ARGS__)
#define log_warn(...) log_log(LOG_WARN, __FILE__, __LINE__, __VA_ARGS__)
#define log_error(...) log_log(LOG_ERROR, __FILE__, __LINE__, __VA_ARGS__)
#define log_fatal(...) log_log(LOG_FATAL, __FILE__, __LINE__, __VA_ARGS__)
const char* log_level_string(int level);
void log_set_lock(log_LockFn fn, void *udata);
void log_set_level(int level);
void log_set_quiet(bool enable);
int log_add_callback(log_LogFn fn, void *udata, int level);
int log_add_fp(FILE *fp, int level);
void log_log(int level, const char *file, int line, const char *fmt, ...);
#ifdef __cplusplus
}
#endif
#endif /* LOG_H */
/* SPDX-License-Identifier: BSD-3-Clause */
#ifndef UTILS_H_
#define UTILS_H_ 1
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "log/log.h"
#ifdef __cplusplus
extern "C" {
#endif
#define ERR(assertion, call_description) \
do { \
if (assertion) \
log_error("%s: %s", \
call_description, strerror(errno)); \
} while (0)
#define DIE(assertion, call_description) \
do { \
if (assertion) { \
log_fatal("%s: %s", \
call_description, strerror(errno)); \
exit(EXIT_FAILURE); \
} \
} while (0)
#ifdef __cplusplus
}
#endif
#endif /* UTILS_H_ */
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment