aboutsummaryrefslogtreecommitdiff
path: root/src/threading/pool.c
blob: 15988521d2cdbd421f48255e883f6e7dc8aa92af (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#include <stdlib.h>
#include <stddef.h>
#include <stdio.h>
#include <pthread.h>

#include "cpu.h"
#include "pool.h"

struct worker_t {
    struct pool_t* pool;
    int id;
    int done;
};

struct pool_t* pool_init()
{
    struct pool_t* pool = malloc(sizeof(struct pool_t));

    pool->workers = get_core_count();
    pool->tasks = NULL;
    pool->task_next = NULL;
    pool->pool_size = 0;
    pool->condition = NULL;

    return pool;
}

void pool_free(struct pool_t* pool)
{
    if (pool->tasks)
        free(pool->tasks);
    free(pool);
}

void pool_submit(struct pool_t* pool, worker_func func, void* arg)
{
    pool->pool_size++;
    pool->tasks = realloc(pool->tasks, sizeof(struct pool_task_t) * (pool->pool_size));

    struct pool_task_t* task = &pool->tasks[pool->pool_size-1];
    task->func = func;
    task->arg = arg;
    task->done = 0;
}

static void* task_executor(void* pinfo)
{
    struct worker_t* worker = (struct worker_t*)pinfo;
    struct pool_t* pool = worker->pool;

    if (pool->task_next)
    {
        struct pool_task_t* pool_end = pool->tasks + pool->pool_size;
        struct pool_task_t* task = pool->task_next++;
        while (pool_end > task && (pool->condition == NULL || *pool->condition))
        {
            if (!task->done)
            {
                task->func(task->arg);
                task->done = 1;
            }
            task = pool->task_next++;
        }
    }
    worker->done = 1;
}

void pool_complete(struct pool_t* pool)
{
    pthread_t* threads = malloc(sizeof(pthread_t) * pool->workers);
    struct worker_t* workers = malloc(sizeof(struct worker_t) * pool->workers);

    if (pool->pool_size)
        pool->task_next = &pool->tasks[0];

    for (int i = 0; i < pool->workers && (pool->condition == NULL || *pool->condition); ++i)
    {
        struct worker_t* worker = &workers[i];
        pthread_t* thread = &threads[i];

        worker->pool = pool;
        worker->id = i;
        worker->done = 0;

        pthread_create(thread, NULL, task_executor, worker);
    }

    for (int i = 0; i < pool->workers; ++i)
    {
        if (!workers[i].done)
            pthread_join(threads[i], NULL);
    }

    free(threads);
    free(workers);
}