aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJan200101 <sentrycraft123@gmail.com>2022-08-30 22:55:26 +0200
committerJan200101 <sentrycraft123@gmail.com>2022-08-30 22:55:26 +0200
commit37c4373dce60b31ccc0100d85d0013aef82809e7 (patch)
tree5779a0f437ea1a88b91a4a9e6a3c879d216c9d50
parent91f695797fac3e1c2d20bea70f8c877d7df03b68 (diff)
downloadOFQT-37c4373dce60b31ccc0100d85d0013aef82809e7.tar.gz
OFQT-37c4373dce60b31ccc0100d85d0013aef82809e7.zip
implement thread pools
-rw-r--r--src/CMakeLists.txt1
-rw-r--r--src/cli/CMakeLists.txt5
-rw-r--r--src/cli/commands.c2
-rw-r--r--src/cli/updater.c35
-rw-r--r--src/qt/CMakeLists.txt5
-rw-r--r--src/qt/workers.cpp65
-rw-r--r--src/qt/workers.hpp4
-rw-r--r--src/threading/CMakeLists.txt17
-rw-r--r--src/threading/cpu.c6
-rw-r--r--src/threading/cpu.h14
-rw-r--r--src/threading/pool.c95
-rw-r--r--src/threading/pool.h38
-rw-r--r--src/vdf/vdf.c2
13 files changed, 214 insertions, 75 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index af7cbe3..f3113e0 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -3,6 +3,7 @@ find_package(Libcurl REQUIRED)
find_package(JsonC REQUIRED)
add_subdirectory(hash)
add_subdirectory(vdf)
+add_subdirectory(threading)
set(CFLAGS
-Wall -Wextra -pedantic
diff --git a/src/cli/CMakeLists.txt b/src/cli/CMakeLists.txt
index 2947a5b..fcb386b 100644
--- a/src/cli/CMakeLists.txt
+++ b/src/cli/CMakeLists.txt
@@ -8,11 +8,8 @@ SET(CLI_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/updater.h
)
-set(THREADS_PREFER_PTHREAD_FLAG ON)
-find_package(Threads REQUIRED)
-
add_executable(${FRONTEND_NAME} ${CLI_SOURCES})
target_compile_options(${FRONTEND_NAME} PUBLIC ${CFLAGS})
target_link_libraries(${FRONTEND_NAME} PRIVATE tvn)
-target_link_libraries(${FRONTEND_NAME} PRIVATE Threads::Threads)
+target_link_libraries(${FRONTEND_NAME} PRIVATE threading)
install(TARGETS ${FRONTEND_NAME})
diff --git a/src/cli/commands.c b/src/cli/commands.c
index 85b05c2..30d3bca 100644
--- a/src/cli/commands.c
+++ b/src/cli/commands.c
@@ -16,6 +16,8 @@ static int run(int, char**);
static int version(int, char**);
static int info(int, char**);
+#include "pool.h"
+
const struct Command commands[] = {
{ .name = "install", .func = install, .description = "Install OpenFortress"},
{ .name = "update", .func = update, .description = "Update an existing install"},
diff --git a/src/cli/updater.c b/src/cli/updater.c
index 9dcd9da..2aaa8f1 100644
--- a/src/cli/updater.c
+++ b/src/cli/updater.c
@@ -1,20 +1,16 @@
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
-#include <pthread.h>
#include "fs.h"
#include "toast.h"
+#include "pool.h"
#include "updater.h"
#include <assert.h>
-#define THREAD_COUNT 4
-
struct thread_object_info {
- int working;
-
char* of_dir;
char* remote;
struct revision_t* rev;
@@ -43,10 +39,6 @@ static void* thread_download(void* pinfo)
}
}
}
-
- info->working = 0;
- pthread_exit(0);
-
return NULL;
}
@@ -70,35 +62,24 @@ void update_setup(char* of_dir, char* remote, int local_rev, int remote_rev)
}
}
- pthread_t download_threads[THREAD_COUNT] = {0};
- struct thread_object_info thread_info[THREAD_COUNT] = {0};
- size_t tindex = 0;
+ struct thread_object_info* thread_info = malloc(sizeof(struct thread_object_info) * rev->file_count);
+ struct pool_t* pool = pool_init();
for (size_t i = 0; i < rev->file_count; ++i)
{
- while (thread_info[tindex].working)
- {
- tindex = (tindex+1) % THREAD_COUNT;
- }
-
- pthread_t* thread = &download_threads[tindex];
- struct thread_object_info* info = &thread_info[tindex];
+ struct thread_object_info* info = &thread_info[i];
- info->working = 1;
info->of_dir = of_dir;
info->remote = remote;
info->rev = rev;
info->index = i;
- pthread_create(thread, NULL, thread_download, info);
+ pool_submit(pool, thread_download, info);
}
- for (size_t i = 0; i < THREAD_COUNT; ++i)
- {
- pthread_t* thread = &download_threads[i];
- if (*thread)
- pthread_join(*thread, NULL);
- }
+ pool_complete(pool);
+ pool_free(pool);
+ free(thread_info);
for (size_t i = 0; i < rev->file_count; ++i)
{
diff --git a/src/qt/CMakeLists.txt b/src/qt/CMakeLists.txt
index 3702ad5..4a4caf3 100644
--- a/src/qt/CMakeLists.txt
+++ b/src/qt/CMakeLists.txt
@@ -28,13 +28,10 @@ if(WIN32)
list(APPEND QT_SOURCES ${CMAKE_CURRENT_BINARY_DIR}/version.rc)
endif()
-set(THREADS_PREFER_PTHREAD_FLAG ON)
-find_package(Threads REQUIRED)
-
add_executable(${FRONTEND_NAME} WIN32 ${QT_SOURCES})
target_compile_options(${FRONTEND_NAME} PUBLIC ${CFLAGS})
target_link_libraries(${FRONTEND_NAME} PRIVATE tvn)
-target_link_libraries(${FRONTEND_NAME} PRIVATE Threads::Threads)
+target_link_libraries(${FRONTEND_NAME} PRIVATE threading)
target_link_libraries(${FRONTEND_NAME} PRIVATE Qt${QT_VERSION_MAJOR}::Widgets)
set_property(TARGET ${FRONTEND_NAME} PROPERTY CXX_STANDARD 11)
install(TARGETS ${FRONTEND_NAME})
diff --git a/src/qt/workers.cpp b/src/qt/workers.cpp
index 5c20f0e..e4faa74 100644
--- a/src/qt/workers.cpp
+++ b/src/qt/workers.cpp
@@ -5,20 +5,19 @@
#include "net.h"
#include "steam.h"
#include "toast.h"
+#include "pool.h"
#include "./ui_mainwindow.h"
#include "workers.hpp"
-#define THREAD_COUNT 4
-
struct thread_object_info {
- int working;
-
QString infoText;
char* of_dir;
char* remote;
struct revision_t* rev;
size_t index;
+
+ Worker* worker;
};
static void* thread_download(void* pinfo)
@@ -41,11 +40,21 @@ static void* thread_download(void* pinfo)
downloadObject(of_dir, remote, file);
}
}
- }
- info->working = 0;
- pthread_exit(0);
+ QString* threadString = &info->infoText;
+ if (!threadString->isEmpty())
+ {
+ pthread_mutex_lock(&info->worker->textMutex);
+ // allow the main thread to clear the string before we continue
+ while (!info->worker->infoText.isEmpty()) {};
+
+ info->worker->progress = (int)(((info->index * 100) + 1) / rev->file_count);
+ info->worker->infoText = *threadString;
+ emit info->worker->resultReady(Worker::RESULT_UPDATE_TEXT);
+ pthread_mutex_unlock(&info->worker->textMutex);
+ }
+ }
return NULL;
}
@@ -54,6 +63,7 @@ Worker::Worker()
net_init();
of_dir = NULL;
remote = NULL;
+ textMutex = PTHREAD_MUTEX_INITIALIZER;
}
Worker::~Worker()
@@ -143,47 +153,26 @@ int Worker::update_setup(int local_rev, int remote_rev)
}
}
- pthread_t download_threads[THREAD_COUNT] = {0};
- struct thread_object_info thread_info[THREAD_COUNT] = {0, NULL, NULL, NULL, NULL, 0};
- size_t tindex = 0;
- QString infoStrings[THREAD_COUNT];
+ struct thread_object_info* thread_info = new struct thread_object_info[rev->file_count];
+ struct pool_t* pool = pool_init();
+ pool->condition = &do_work;
- for (size_t i = 0; i < rev->file_count && do_work; ++i)
+ for (size_t i = 0; i < rev->file_count; ++i)
{
- while (thread_info[tindex].working)
- {
- tindex = (tindex+1) % THREAD_COUNT;
- }
-
- pthread_t* thread = &download_threads[tindex];
- struct thread_object_info* info = &thread_info[tindex];
-
- QString* threadString = &info->infoText;
- if (!threadString->isEmpty())
- {
- infoText = *threadString;
- emit resultReady(RESULT_UPDATE_TEXT);
+ struct thread_object_info* info = &thread_info[i];
- // allow the main thread to clear the string before we continue
- while (!infoText.isEmpty() && do_work) {};
- }
-
- info->working = 1;
info->of_dir = of_dir;
info->remote = remote;
info->rev = rev;
info->index = i;
- progress = (int)(((i * 100) + 1) / rev->file_count);
+ info->worker = this;
- pthread_create(thread, NULL, thread_download, info);
+ pool_submit(pool, thread_download, info);
}
- for (size_t i = 0; i < THREAD_COUNT; ++i)
- {
- pthread_t* thread = &download_threads[i];
- if (*thread)
- pthread_join(*thread, NULL);
- }
+ pool_complete(pool);
+ pool_free(pool);
+ delete[] thread_info;
progress = 0;
infoText = QString("Processing");
diff --git a/src/qt/workers.hpp b/src/qt/workers.hpp
index 506d7ce..8e2a274 100644
--- a/src/qt/workers.hpp
+++ b/src/qt/workers.hpp
@@ -4,6 +4,7 @@
#include <QObject>
#include <QSettings>
#include <limits.h>
+#include <pthread.h>
QT_BEGIN_NAMESPACE
namespace Ui { class MainWindow; }
@@ -20,7 +21,7 @@ private:
char* remote;
size_t remote_len;
- bool do_work = true;
+ int do_work = 1;
bool update_in_progress = false;
QSettings settings;
@@ -28,6 +29,7 @@ private:
public:
int progress = -1;
QString infoText;
+ pthread_mutex_t textMutex;
Worker();
~Worker();
diff --git a/src/threading/CMakeLists.txt b/src/threading/CMakeLists.txt
new file mode 100644
index 0000000..b2bd203
--- /dev/null
+++ b/src/threading/CMakeLists.txt
@@ -0,0 +1,17 @@
+set(THREADS_PREFER_PTHREAD_FLAG ON)
+find_package(Threads REQUIRED)
+
+list(APPEND
+ THREADING_SOURCES
+ ${CMAKE_CURRENT_SOURCE_DIR}/cpu.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/cpu.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/pool.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/pool.h
+)
+
+
+add_library(threading STATIC ${THREADING_SOURCES})
+target_compile_options(threading PUBLIC ${CFLAGS})
+target_link_libraries(threading PRIVATE Threads::Threads)
+
+target_include_directories(threading PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) \ No newline at end of file
diff --git a/src/threading/cpu.c b/src/threading/cpu.c
new file mode 100644
index 0000000..45671df
--- /dev/null
+++ b/src/threading/cpu.c
@@ -0,0 +1,6 @@
+#include <sys/sysinfo.h>
+
+int get_core_count()
+{
+ return get_nprocs();
+}
diff --git a/src/threading/cpu.h b/src/threading/cpu.h
new file mode 100644
index 0000000..98fc1d7
--- /dev/null
+++ b/src/threading/cpu.h
@@ -0,0 +1,14 @@
+#ifndef CPU_H
+#define CPU_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+int get_core_count();
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif \ No newline at end of file
diff --git a/src/threading/pool.c b/src/threading/pool.c
new file mode 100644
index 0000000..266254e
--- /dev/null
+++ b/src/threading/pool.c
@@ -0,0 +1,95 @@
+#include <stdlib.h>
+#include <stddef.h>
+#include <stdio.h>
+#include <pthread.h>
+
+#include "cpu.h"
+#include "pool.h"
+
+struct worker_t {
+ struct pool_t* pool;
+ int id;
+};
+
+struct pool_t* pool_init()
+{
+ struct pool_t* pool = malloc(sizeof(struct pool_t));
+
+ pool->workers = get_core_count();
+ pool->tasks = NULL;
+ pool->task_next = NULL;
+ pool->pool_size = 0;
+ pool->condition = NULL;
+
+ return pool;
+}
+
+void pool_free(struct pool_t* pool)
+{
+ if (pool->tasks)
+ free(pool->tasks);
+ free(pool);
+}
+
+void pool_submit(struct pool_t* pool, worker_func func, void* arg)
+{
+ pool->pool_size++;
+ pool->tasks = realloc(pool->tasks, sizeof(struct pool_task_t) * (pool->pool_size));
+
+ struct pool_task_t* task = &pool->tasks[pool->pool_size-1];
+ task->func = func;
+ task->arg = arg;
+ task->done = 0;
+}
+
+static void* task_executor(void* pinfo)
+{
+ struct worker_t* worker = (struct worker_t*)pinfo;
+ struct pool_t* pool = worker->pool;
+
+ if (pool->task_next)
+ {
+ struct pool_task_t* pool_end = pool->tasks + pool->pool_size;
+ struct pool_task_t* task = pool->task_next++;
+ while (pool_end > task)
+ {
+ if (!task->done)
+ {
+ task->func(task->arg);
+ task->done = 1;
+ }
+
+ task = pool->task_next++;
+ }
+ }
+
+ return NULL;
+}
+
+void pool_complete(struct pool_t* pool)
+{
+ pthread_t* threads = malloc(sizeof(pthread_t) * pool->workers);
+ struct worker_t* workers = malloc(sizeof(struct worker_t) * pool->workers);
+
+ if (pool->pool_size)
+ pool->task_next = &pool->tasks[0];
+
+ for (int i = 0; i < pool->workers && (pool->condition == NULL || *pool->condition); ++i)
+ {
+ struct worker_t* worker = &workers[i];
+ pthread_t* thread = &threads[i];
+
+ worker->pool = pool;
+ worker->id = i;
+
+ pthread_create(thread, NULL, task_executor, worker);
+ }
+
+ for (int i = 0; i < pool->workers; ++i)
+ {
+ pthread_join(threads[i], NULL);
+ }
+
+ free(threads);
+ free(workers);
+} \ No newline at end of file
diff --git a/src/threading/pool.h b/src/threading/pool.h
new file mode 100644
index 0000000..73655d0
--- /dev/null
+++ b/src/threading/pool.h
@@ -0,0 +1,38 @@
+#ifndef POOL_H
+#define POOL_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stddef.h>
+
+typedef void *(*worker_func)(void *);
+
+struct pool_task_t {
+ worker_func func;
+ void* arg;
+ int done;
+};
+
+struct pool_t {
+ int workers;
+
+ struct pool_task_t* tasks;
+ struct pool_task_t* task_next;
+ size_t pool_size;
+
+ int* condition;
+};
+
+struct pool_t* pool_init();
+void pool_free(struct pool_t*);
+
+void pool_submit(struct pool_t*, worker_func, void*);
+void pool_complete(struct pool_t*);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif \ No newline at end of file
diff --git a/src/vdf/vdf.c b/src/vdf/vdf.c
index 3b90a02..bb27a55 100644
--- a/src/vdf/vdf.c
+++ b/src/vdf/vdf.c
@@ -326,7 +326,7 @@ struct vdf_object* vdf_object_index_array_str(const struct vdf_object* o, const
if (!o || !str || o->type != VDF_TYPE_ARRAY)
return NULL;
- for (size_t i = 0; i < o->data.data_array.len; ++i)
+ for (size_t i = o->data.data_array.len; i >= 0; ++i)
{
struct vdf_object* k = o->data.data_array.data_value[i];
if (!strcmp(k->key, str))