From 37c4373dce60b31ccc0100d85d0013aef82809e7 Mon Sep 17 00:00:00 2001 From: Jan200101 Date: Tue, 30 Aug 2022 22:55:26 +0200 Subject: implement thread pools --- src/CMakeLists.txt | 1 + src/cli/CMakeLists.txt | 5 +-- src/cli/commands.c | 2 + src/cli/updater.c | 35 ++++------------ src/qt/CMakeLists.txt | 5 +-- src/qt/workers.cpp | 65 +++++++++++++----------------- src/qt/workers.hpp | 4 +- src/threading/CMakeLists.txt | 17 ++++++++ src/threading/cpu.c | 6 +++ src/threading/cpu.h | 14 +++++++ src/threading/pool.c | 95 ++++++++++++++++++++++++++++++++++++++++++++ src/threading/pool.h | 38 ++++++++++++++++++ src/vdf/vdf.c | 2 +- 13 files changed, 214 insertions(+), 75 deletions(-) create mode 100644 src/threading/CMakeLists.txt create mode 100644 src/threading/cpu.c create mode 100644 src/threading/cpu.h create mode 100644 src/threading/pool.c create mode 100644 src/threading/pool.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index af7cbe3..f3113e0 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -3,6 +3,7 @@ find_package(Libcurl REQUIRED) find_package(JsonC REQUIRED) add_subdirectory(hash) add_subdirectory(vdf) +add_subdirectory(threading) set(CFLAGS -Wall -Wextra -pedantic diff --git a/src/cli/CMakeLists.txt b/src/cli/CMakeLists.txt index 2947a5b..fcb386b 100644 --- a/src/cli/CMakeLists.txt +++ b/src/cli/CMakeLists.txt @@ -8,11 +8,8 @@ SET(CLI_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/updater.h ) -set(THREADS_PREFER_PTHREAD_FLAG ON) -find_package(Threads REQUIRED) - add_executable(${FRONTEND_NAME} ${CLI_SOURCES}) target_compile_options(${FRONTEND_NAME} PUBLIC ${CFLAGS}) target_link_libraries(${FRONTEND_NAME} PRIVATE tvn) -target_link_libraries(${FRONTEND_NAME} PRIVATE Threads::Threads) +target_link_libraries(${FRONTEND_NAME} PRIVATE threading) install(TARGETS ${FRONTEND_NAME}) diff --git a/src/cli/commands.c b/src/cli/commands.c index 85b05c2..30d3bca 100644 --- a/src/cli/commands.c +++ b/src/cli/commands.c @@ -16,6 +16,8 @@ static int run(int, char**); static int version(int, char**); static int info(int, char**); +#include "pool.h" + const struct Command commands[] = { { .name = "install", .func = install, .description = "Install OpenFortress"}, { .name = "update", .func = update, .description = "Update an existing install"}, diff --git a/src/cli/updater.c b/src/cli/updater.c index 9dcd9da..2aaa8f1 100644 --- a/src/cli/updater.c +++ b/src/cli/updater.c @@ -1,20 +1,16 @@ #include #include #include -#include #include "fs.h" #include "toast.h" +#include "pool.h" #include "updater.h" #include -#define THREAD_COUNT 4 - struct thread_object_info { - int working; - char* of_dir; char* remote; struct revision_t* rev; @@ -43,10 +39,6 @@ static void* thread_download(void* pinfo) } } } - - info->working = 0; - pthread_exit(0); - return NULL; } @@ -70,35 +62,24 @@ void update_setup(char* of_dir, char* remote, int local_rev, int remote_rev) } } - pthread_t download_threads[THREAD_COUNT] = {0}; - struct thread_object_info thread_info[THREAD_COUNT] = {0}; - size_t tindex = 0; + struct thread_object_info* thread_info = malloc(sizeof(struct thread_object_info) * rev->file_count); + struct pool_t* pool = pool_init(); for (size_t i = 0; i < rev->file_count; ++i) { - while (thread_info[tindex].working) - { - tindex = (tindex+1) % THREAD_COUNT; - } - - pthread_t* thread = &download_threads[tindex]; - struct thread_object_info* info = &thread_info[tindex]; + struct thread_object_info* info = &thread_info[i]; - info->working = 1; info->of_dir = of_dir; info->remote = remote; info->rev = rev; info->index = i; - pthread_create(thread, NULL, thread_download, info); + pool_submit(pool, thread_download, info); } - for (size_t i = 0; i < THREAD_COUNT; ++i) - { - pthread_t* thread = &download_threads[i]; - if (*thread) - pthread_join(*thread, NULL); - } + pool_complete(pool); + pool_free(pool); + free(thread_info); for (size_t i = 0; i < rev->file_count; ++i) { diff --git a/src/qt/CMakeLists.txt b/src/qt/CMakeLists.txt index 3702ad5..4a4caf3 100644 --- a/src/qt/CMakeLists.txt +++ b/src/qt/CMakeLists.txt @@ -28,13 +28,10 @@ if(WIN32) list(APPEND QT_SOURCES ${CMAKE_CURRENT_BINARY_DIR}/version.rc) endif() -set(THREADS_PREFER_PTHREAD_FLAG ON) -find_package(Threads REQUIRED) - add_executable(${FRONTEND_NAME} WIN32 ${QT_SOURCES}) target_compile_options(${FRONTEND_NAME} PUBLIC ${CFLAGS}) target_link_libraries(${FRONTEND_NAME} PRIVATE tvn) -target_link_libraries(${FRONTEND_NAME} PRIVATE Threads::Threads) +target_link_libraries(${FRONTEND_NAME} PRIVATE threading) target_link_libraries(${FRONTEND_NAME} PRIVATE Qt${QT_VERSION_MAJOR}::Widgets) set_property(TARGET ${FRONTEND_NAME} PROPERTY CXX_STANDARD 11) install(TARGETS ${FRONTEND_NAME}) diff --git a/src/qt/workers.cpp b/src/qt/workers.cpp index 5c20f0e..e4faa74 100644 --- a/src/qt/workers.cpp +++ b/src/qt/workers.cpp @@ -5,20 +5,19 @@ #include "net.h" #include "steam.h" #include "toast.h" +#include "pool.h" #include "./ui_mainwindow.h" #include "workers.hpp" -#define THREAD_COUNT 4 - struct thread_object_info { - int working; - QString infoText; char* of_dir; char* remote; struct revision_t* rev; size_t index; + + Worker* worker; }; static void* thread_download(void* pinfo) @@ -41,11 +40,21 @@ static void* thread_download(void* pinfo) downloadObject(of_dir, remote, file); } } - } - info->working = 0; - pthread_exit(0); + QString* threadString = &info->infoText; + if (!threadString->isEmpty()) + { + pthread_mutex_lock(&info->worker->textMutex); + // allow the main thread to clear the string before we continue + while (!info->worker->infoText.isEmpty()) {}; + + info->worker->progress = (int)(((info->index * 100) + 1) / rev->file_count); + info->worker->infoText = *threadString; + emit info->worker->resultReady(Worker::RESULT_UPDATE_TEXT); + pthread_mutex_unlock(&info->worker->textMutex); + } + } return NULL; } @@ -54,6 +63,7 @@ Worker::Worker() net_init(); of_dir = NULL; remote = NULL; + textMutex = PTHREAD_MUTEX_INITIALIZER; } Worker::~Worker() @@ -143,47 +153,26 @@ int Worker::update_setup(int local_rev, int remote_rev) } } - pthread_t download_threads[THREAD_COUNT] = {0}; - struct thread_object_info thread_info[THREAD_COUNT] = {0, NULL, NULL, NULL, NULL, 0}; - size_t tindex = 0; - QString infoStrings[THREAD_COUNT]; + struct thread_object_info* thread_info = new struct thread_object_info[rev->file_count]; + struct pool_t* pool = pool_init(); + pool->condition = &do_work; - for (size_t i = 0; i < rev->file_count && do_work; ++i) + for (size_t i = 0; i < rev->file_count; ++i) { - while (thread_info[tindex].working) - { - tindex = (tindex+1) % THREAD_COUNT; - } - - pthread_t* thread = &download_threads[tindex]; - struct thread_object_info* info = &thread_info[tindex]; - - QString* threadString = &info->infoText; - if (!threadString->isEmpty()) - { - infoText = *threadString; - emit resultReady(RESULT_UPDATE_TEXT); + struct thread_object_info* info = &thread_info[i]; - // allow the main thread to clear the string before we continue - while (!infoText.isEmpty() && do_work) {}; - } - - info->working = 1; info->of_dir = of_dir; info->remote = remote; info->rev = rev; info->index = i; - progress = (int)(((i * 100) + 1) / rev->file_count); + info->worker = this; - pthread_create(thread, NULL, thread_download, info); + pool_submit(pool, thread_download, info); } - for (size_t i = 0; i < THREAD_COUNT; ++i) - { - pthread_t* thread = &download_threads[i]; - if (*thread) - pthread_join(*thread, NULL); - } + pool_complete(pool); + pool_free(pool); + delete[] thread_info; progress = 0; infoText = QString("Processing"); diff --git a/src/qt/workers.hpp b/src/qt/workers.hpp index 506d7ce..8e2a274 100644 --- a/src/qt/workers.hpp +++ b/src/qt/workers.hpp @@ -4,6 +4,7 @@ #include #include #include +#include QT_BEGIN_NAMESPACE namespace Ui { class MainWindow; } @@ -20,7 +21,7 @@ private: char* remote; size_t remote_len; - bool do_work = true; + int do_work = 1; bool update_in_progress = false; QSettings settings; @@ -28,6 +29,7 @@ private: public: int progress = -1; QString infoText; + pthread_mutex_t textMutex; Worker(); ~Worker(); diff --git a/src/threading/CMakeLists.txt b/src/threading/CMakeLists.txt new file mode 100644 index 0000000..b2bd203 --- /dev/null +++ b/src/threading/CMakeLists.txt @@ -0,0 +1,17 @@ +set(THREADS_PREFER_PTHREAD_FLAG ON) +find_package(Threads REQUIRED) + +list(APPEND + THREADING_SOURCES + ${CMAKE_CURRENT_SOURCE_DIR}/cpu.c + ${CMAKE_CURRENT_SOURCE_DIR}/cpu.h + ${CMAKE_CURRENT_SOURCE_DIR}/pool.c + ${CMAKE_CURRENT_SOURCE_DIR}/pool.h +) + + +add_library(threading STATIC ${THREADING_SOURCES}) +target_compile_options(threading PUBLIC ${CFLAGS}) +target_link_libraries(threading PRIVATE Threads::Threads) + +target_include_directories(threading PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) \ No newline at end of file diff --git a/src/threading/cpu.c b/src/threading/cpu.c new file mode 100644 index 0000000..45671df --- /dev/null +++ b/src/threading/cpu.c @@ -0,0 +1,6 @@ +#include + +int get_core_count() +{ + return get_nprocs(); +} diff --git a/src/threading/cpu.h b/src/threading/cpu.h new file mode 100644 index 0000000..98fc1d7 --- /dev/null +++ b/src/threading/cpu.h @@ -0,0 +1,14 @@ +#ifndef CPU_H +#define CPU_H + +#ifdef __cplusplus +extern "C" { +#endif + +int get_core_count(); + +#ifdef __cplusplus +} +#endif + +#endif \ No newline at end of file diff --git a/src/threading/pool.c b/src/threading/pool.c new file mode 100644 index 0000000..266254e --- /dev/null +++ b/src/threading/pool.c @@ -0,0 +1,95 @@ +#include +#include +#include +#include + +#include "cpu.h" +#include "pool.h" + +struct worker_t { + struct pool_t* pool; + int id; +}; + +struct pool_t* pool_init() +{ + struct pool_t* pool = malloc(sizeof(struct pool_t)); + + pool->workers = get_core_count(); + pool->tasks = NULL; + pool->task_next = NULL; + pool->pool_size = 0; + pool->condition = NULL; + + return pool; +} + +void pool_free(struct pool_t* pool) +{ + if (pool->tasks) + free(pool->tasks); + free(pool); +} + +void pool_submit(struct pool_t* pool, worker_func func, void* arg) +{ + pool->pool_size++; + pool->tasks = realloc(pool->tasks, sizeof(struct pool_task_t) * (pool->pool_size)); + + struct pool_task_t* task = &pool->tasks[pool->pool_size-1]; + task->func = func; + task->arg = arg; + task->done = 0; +} + +static void* task_executor(void* pinfo) +{ + struct worker_t* worker = (struct worker_t*)pinfo; + struct pool_t* pool = worker->pool; + + if (pool->task_next) + { + struct pool_task_t* pool_end = pool->tasks + pool->pool_size; + struct pool_task_t* task = pool->task_next++; + while (pool_end > task) + { + if (!task->done) + { + task->func(task->arg); + task->done = 1; + } + + task = pool->task_next++; + } + } + + return NULL; +} + +void pool_complete(struct pool_t* pool) +{ + pthread_t* threads = malloc(sizeof(pthread_t) * pool->workers); + struct worker_t* workers = malloc(sizeof(struct worker_t) * pool->workers); + + if (pool->pool_size) + pool->task_next = &pool->tasks[0]; + + for (int i = 0; i < pool->workers && (pool->condition == NULL || *pool->condition); ++i) + { + struct worker_t* worker = &workers[i]; + pthread_t* thread = &threads[i]; + + worker->pool = pool; + worker->id = i; + + pthread_create(thread, NULL, task_executor, worker); + } + + for (int i = 0; i < pool->workers; ++i) + { + pthread_join(threads[i], NULL); + } + + free(threads); + free(workers); +} \ No newline at end of file diff --git a/src/threading/pool.h b/src/threading/pool.h new file mode 100644 index 0000000..73655d0 --- /dev/null +++ b/src/threading/pool.h @@ -0,0 +1,38 @@ +#ifndef POOL_H +#define POOL_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +typedef void *(*worker_func)(void *); + +struct pool_task_t { + worker_func func; + void* arg; + int done; +}; + +struct pool_t { + int workers; + + struct pool_task_t* tasks; + struct pool_task_t* task_next; + size_t pool_size; + + int* condition; +}; + +struct pool_t* pool_init(); +void pool_free(struct pool_t*); + +void pool_submit(struct pool_t*, worker_func, void*); +void pool_complete(struct pool_t*); + +#ifdef __cplusplus +} +#endif + +#endif \ No newline at end of file diff --git a/src/vdf/vdf.c b/src/vdf/vdf.c index 3b90a02..bb27a55 100644 --- a/src/vdf/vdf.c +++ b/src/vdf/vdf.c @@ -326,7 +326,7 @@ struct vdf_object* vdf_object_index_array_str(const struct vdf_object* o, const if (!o || !str || o->type != VDF_TYPE_ARRAY) return NULL; - for (size_t i = 0; i < o->data.data_array.len; ++i) + for (size_t i = o->data.data_array.len; i >= 0; ++i) { struct vdf_object* k = o->data.data_array.data_value[i]; if (!strcmp(k->key, str)) -- cgit v1.2.3