diff options
Diffstat (limited to 'libdimension/concurrency')
-rw-r--r-- | libdimension/concurrency/future.c | 281 | ||||
-rw-r--r-- | libdimension/concurrency/threads.c | 326 |
2 files changed, 607 insertions, 0 deletions
diff --git a/libdimension/concurrency/future.c b/libdimension/concurrency/future.c new file mode 100644 index 0000000..90ffa24 --- /dev/null +++ b/libdimension/concurrency/future.c @@ -0,0 +1,281 @@ +/************************************************************************* + * Copyright (C) 2009-2014 Tavian Barnes <tavianator@tavianator.com> * + * * + * This file is part of The Dimension Library. * + * * + * The Dimension Library is free software; you can redistribute it and/ * + * or modify it under the terms of the GNU Lesser General Public License * + * as published by the Free Software Foundation; either version 3 of the * + * License, or (at your option) any later version. * + * * + * The Dimension Library is distributed in the hope that it will be * + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty * + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * + * Lesser General Public License for more details. * + * * + * You should have received a copy of the GNU Lesser General Public * + * License along with this program. If not, see * + * <http://www.gnu.org/licenses/>. * + *************************************************************************/ + +/** + * @file + * Future objects. + */ + +#include "internal.h" +#include "internal/concurrency.h" +#include "internal/future.h" +#include <pthread.h> + +/** + * Since C doesn't support anything like C++'s mutable, we fake it by casting + * away the constness. This is okay since all valid dmnsn_futures live on the + * heap, so cannot be const. + */ +#define MUTATE(future) ((dmnsn_future *)(future)) + +// Allocate a new dmnsn_future* +dmnsn_future * +dmnsn_new_future(void) +{ + dmnsn_future *future = DMNSN_MALLOC(dmnsn_future); + future->progress = 0; + future->total = 1; + + dmnsn_initialize_mutex(&future->mutex); + dmnsn_initialize_cond(&future->cond); + + future->min_wait = 1.0; + + future->nthreads = future->nrunning = 1; + future->npaused = 0; + dmnsn_initialize_cond(&future->none_running_cond); + dmnsn_initialize_cond(&future->all_running_cond); + dmnsn_initialize_cond(&future->resume_cond); + + return future; +} + +static void +dmnsn_delete_future(dmnsn_future *future) +{ + if (future) { + dmnsn_destroy_cond(&future->resume_cond); + dmnsn_destroy_cond(&future->all_running_cond); + dmnsn_destroy_cond(&future->none_running_cond); + dmnsn_destroy_cond(&future->cond); + dmnsn_destroy_mutex(&future->mutex); + dmnsn_free(future); + } +} + +// Join the worker thread and delete `future'. +int +dmnsn_future_join(dmnsn_future *future) +{ + void *ptr; + int retval = -1; + + if (future) { + dmnsn_assert(future->npaused == 0, "Attempt to join future while paused"); + + // Get the thread's return value + dmnsn_join_thread(future->thread, &ptr); + if (ptr && ptr != PTHREAD_CANCELED) { + retval = *(int *)ptr; + dmnsn_free(ptr); + } + + // Free the future object + dmnsn_delete_future(future); + } + + return retval; +} + +// Cancel a background thread +void +dmnsn_future_cancel(dmnsn_future *future) +{ + pthread_cancel(future->thread); +} + +/** + * Get the current progress, without locking anything. + * + * future->mutex must be locked for this call to be safe. + */ +static inline double +dmnsn_future_progress_unlocked(const dmnsn_future *future) +{ + return (double)future->progress/future->total; +} + +// Get the current progress of the worker thread, in [0.0, 1.0] +double +dmnsn_future_progress(const dmnsn_future *future) +{ + dmnsn_future *mfuture = MUTATE(future); + double progress; + + dmnsn_lock_mutex(&mfuture->mutex); + progress = dmnsn_future_progress_unlocked(mfuture); + dmnsn_unlock_mutex(&mfuture->mutex); + + return progress; +} + +// Find out whether the task is complete. +bool +dmnsn_future_is_done(const dmnsn_future *future) +{ + dmnsn_future *mfuture = MUTATE(future); + bool result; + + dmnsn_lock_mutex(&mfuture->mutex); + result = future->progress == future->total; + dmnsn_unlock_mutex(&mfuture->mutex); + + return result; +} + +// Wait until dmnsn_future_progress(future) >= progress +void +dmnsn_future_wait(const dmnsn_future *future, double progress) +{ + dmnsn_future *mfuture = MUTATE(future); + + dmnsn_lock_mutex(&mfuture->mutex); + while (dmnsn_future_progress_unlocked(mfuture) < progress) { + // Set the minimum waited-on value + if (progress < mfuture->min_wait) { + mfuture->min_wait = progress; + } + + dmnsn_cond_wait_safely(&mfuture->cond, &mfuture->mutex); + } + dmnsn_unlock_mutex(&mfuture->mutex); +} + +// Pause all threads working on a future. +void +dmnsn_future_pause(dmnsn_future *future) +{ + dmnsn_lock_mutex(&future->mutex); + while (future->nrunning < future->nthreads) { + dmnsn_cond_wait_safely(&future->all_running_cond, &future->mutex); + } + ++future->npaused; + while (future->nrunning > 0) { + dmnsn_cond_wait_safely(&future->none_running_cond, &future->mutex); + } + dmnsn_unlock_mutex(&future->mutex); +} + +// Resume all threads working on a future. +void +dmnsn_future_resume(dmnsn_future *future) +{ + dmnsn_lock_mutex(&future->mutex); + dmnsn_assert(future->npaused > 0, "dmnsn_future_resume() without matching dmnsn_future_pause()"); + if (--future->npaused == 0) { + dmnsn_cond_broadcast(&future->resume_cond); + } + dmnsn_unlock_mutex(&future->mutex); +} + +// Set the total number of loop iterations +void +dmnsn_future_set_total(dmnsn_future *future, size_t total) +{ + dmnsn_lock_mutex(&future->mutex); + future->total = total; + dmnsn_unlock_mutex(&future->mutex); +} + +static void +dmnsn_future_increment_cleanup(void *ptr) +{ + dmnsn_future *future = ptr; + ++future->nrunning; + dmnsn_unlock_mutex_impl(&future->mutex); +} + +// Increment the number of completed loop iterations +void +dmnsn_future_increment(dmnsn_future *future) +{ + // Allow a thread to be canceled whenever it increments a future object -- + // this is close to PTHREAD_CANCEL_ASYNCHRONOUS but allows consistent state + // on cancellation + pthread_testcancel(); + + dmnsn_lock_mutex(&future->mutex); + ++future->progress; + + if (dmnsn_future_progress_unlocked(future) >= future->min_wait) { + future->min_wait = 1.0; + dmnsn_cond_broadcast(&future->cond); + } + + if (future->npaused > 0) { + dmnsn_assert(future->nrunning > 0, "More worker threads than expected"); + + if (--future->nrunning == 0) { + dmnsn_cond_broadcast(&future->none_running_cond); + } + + pthread_cleanup_push(dmnsn_future_increment_cleanup, future); + do { + dmnsn_cond_wait(&future->resume_cond, &future->mutex); + } while (future->npaused > 0); + pthread_cleanup_pop(false); + + if (++future->nrunning == future->nthreads) { + dmnsn_cond_broadcast(&future->all_running_cond); + } + } + dmnsn_unlock_mutex(&future->mutex); +} + +// Immediately set to 100% completion +void +dmnsn_future_finish(dmnsn_future *future) +{ + dmnsn_lock_mutex(&future->mutex); + future->progress = future->total; + future->nthreads = future->nrunning = 0; + dmnsn_cond_broadcast(&future->cond); + dmnsn_cond_broadcast(&future->none_running_cond); + dmnsn_cond_broadcast(&future->all_running_cond); + dmnsn_unlock_mutex(&future->mutex); +} + +// Set the number of threads +void +dmnsn_future_set_nthreads(dmnsn_future *future, unsigned int nthreads) +{ + dmnsn_lock_mutex(&future->mutex); + dmnsn_assert(future->nrunning == future->nthreads, + "dmnsn_future_set_nthreads() called with paused threads"); + future->nthreads = future->nrunning = nthreads; + dmnsn_unlock_mutex(&future->mutex); +} + +// Notify completion of a worker thread +void +dmnsn_future_finish_thread(dmnsn_future *future) +{ + dmnsn_lock_mutex(&future->mutex); + dmnsn_assert(future->nthreads > 0, + "dmnsn_future_finish_thread() called with no threads"); + --future->nthreads; + + dmnsn_assert(future->nrunning > 0, + "dmnsn_future_finish_thread() called with no running threads"); + if (--future->nrunning == 0) { + dmnsn_cond_broadcast(&future->none_running_cond); + } + dmnsn_unlock_mutex(&future->mutex); +} diff --git a/libdimension/concurrency/threads.c b/libdimension/concurrency/threads.c new file mode 100644 index 0000000..93d2ea9 --- /dev/null +++ b/libdimension/concurrency/threads.c @@ -0,0 +1,326 @@ +/************************************************************************* + * Copyright (C) 2010-2014 Tavian Barnes <tavianator@tavianator.com> * + * * + * This file is part of The Dimension Library. * + * * + * The Dimension Library is free software; you can redistribute it and/ * + * or modify it under the terms of the GNU Lesser General Public License * + * as published by the Free Software Foundation; either version 3 of the * + * License, or (at your option) any later version. * + * * + * The Dimension Library is distributed in the hope that it will be * + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty * + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * + * Lesser General Public License for more details. * + * * + * You should have received a copy of the GNU Lesser General Public * + * License along with this program. If not, see * + * <http://www.gnu.org/licenses/>. * + *************************************************************************/ + +/** + * @file + * Background threading. + */ + +#include "internal.h" +#include "internal/concurrency.h" +#include "internal/future.h" +#include <pthread.h> + +/// The payload to pass to the pthread callback. +typedef struct dmnsn_thread_payload { + dmnsn_thread_fn *thread_fn; + void *arg; + dmnsn_future *future; +} dmnsn_thread_payload; + +/// Clean up after a thread. +static void +dmnsn_thread_cleanup(void *arg) +{ + dmnsn_thread_payload *payload = arg; + dmnsn_future *future = payload->future; + dmnsn_free(payload); + + dmnsn_future_finish(future); +} + +/// pthread callback -- call the real thread callback. +static void * +dmnsn_thread(void *arg) +{ + dmnsn_thread_payload *payload = arg; + int *ret; + + pthread_cleanup_push(dmnsn_thread_cleanup, payload); + ret = DMNSN_MALLOC(int); + *ret = payload->thread_fn(payload->arg); + pthread_cleanup_pop(true); + return ret; +} + +void +dmnsn_new_thread(dmnsn_future *future, dmnsn_thread_fn *thread_fn, void *arg) +{ + dmnsn_thread_payload *payload = DMNSN_MALLOC(dmnsn_thread_payload); + payload->thread_fn = thread_fn; + payload->arg = arg; + payload->future = future; + + if (pthread_create(&future->thread, NULL, dmnsn_thread, payload) != 0) { + dmnsn_error("Couldn't start thread."); + } +} + +/// Payload for threads executed by dmnsn_execute_concurrently(). +typedef struct dmnsn_ccthread_payload { + dmnsn_future *future; + dmnsn_ccthread_fn *ccthread_fn; + void *arg; + unsigned int thread, nthreads; + int ret; + bool running; +} dmnsn_ccthread_payload; + +static void * +dmnsn_concurrent_thread(void *ptr) +{ + dmnsn_ccthread_payload *payload = ptr; + payload->ret = payload->ccthread_fn(payload->arg, payload->thread, + payload->nthreads); + if (payload->future) { + dmnsn_future_finish_thread(payload->future); + } + return NULL; +} + +typedef struct dmnsn_ccthread_cleanup_payload { + dmnsn_future *future; + pthread_t *threads; + dmnsn_ccthread_payload *payloads; + unsigned int nthreads; +} dmnsn_ccthread_cleanup_payload; + +static void +dmnsn_ccthread_cleanup(void *ptr) +{ + dmnsn_ccthread_cleanup_payload *payload = ptr; + + for (unsigned int i = 0; i < payload->nthreads; ++i) { + if (payload->payloads[i].running) { + pthread_cancel(payload->threads[i]); + } + } + + for (unsigned int i = 0; i < payload->nthreads; ++i) { + if (payload->payloads[i].running) { + dmnsn_join_thread(payload->threads[i], NULL); + } + } + + if (payload->future) { + dmnsn_future_set_nthreads(payload->future, 1); + } +} + +int +dmnsn_execute_concurrently(dmnsn_future *future, dmnsn_ccthread_fn *ccthread_fn, + void *arg, unsigned int nthreads) +{ + dmnsn_assert(nthreads > 0, "Attempt to execute using 0 concurrent threads."); + + if (future) { + dmnsn_future_set_nthreads(future, nthreads); + } + + pthread_t threads[nthreads]; + dmnsn_ccthread_payload payloads[nthreads]; + for (unsigned int i = 0; i < nthreads; ++i) { + payloads[i].running = false; + } + + int ret = 0; + dmnsn_ccthread_cleanup_payload cleanup_payload = { + .future = future, + .threads = threads, + .payloads = payloads, + .nthreads = nthreads, + }; + pthread_cleanup_push(dmnsn_ccthread_cleanup, &cleanup_payload); + for (unsigned int i = 0; i < nthreads; ++i) { + payloads[i].future = future; + payloads[i].ccthread_fn = ccthread_fn; + payloads[i].arg = arg; + payloads[i].thread = i; + payloads[i].nthreads = nthreads; + payloads[i].ret = -1; + if (pthread_create(&threads[i], NULL, dmnsn_concurrent_thread, + &payloads[i]) != 0) + { + dmnsn_error("Couldn't start worker thread."); + } + payloads[i].running = true; + } + + for (unsigned int i = 0; i < nthreads; ++i) { + dmnsn_join_thread(threads[i], NULL); + payloads[i].running = false; + if (payloads[i].ret != 0) { + ret = payloads[i].ret; + } + } + pthread_cleanup_pop(false); + + if (future) { + dmnsn_future_set_nthreads(future, 1); + } + + return ret; +} + +// pthread wrappers + +void +dmnsn_initialize_mutex(pthread_mutex_t *mutex) +{ + if (pthread_mutex_init(mutex, NULL) != 0) { + dmnsn_error("Couldn't initialize mutex."); + } +} + +void +dmnsn_lock_mutex_impl(pthread_mutex_t *mutex) +{ + if (pthread_mutex_lock(mutex) != 0) { + dmnsn_error("Couldn't lock mutex."); + } +} + +void +dmnsn_unlock_mutex_impl(void *mutex) +{ + if (pthread_mutex_unlock(mutex) != 0) { + dmnsn_error("Couldn't unlock mutex."); + } +} + +void +dmnsn_destroy_mutex(pthread_mutex_t *mutex) +{ + if (pthread_mutex_destroy(mutex) != 0) { + dmnsn_warning("Couldn't destroy mutex."); + } +} + +void +dmnsn_initialize_rwlock(pthread_rwlock_t *rwlock) +{ + if (pthread_rwlock_init(rwlock, NULL) != 0) { + dmnsn_error("Couldn't initialize read-write lock."); + } +} + +void +dmnsn_read_lock_impl(pthread_rwlock_t *rwlock) +{ + if (pthread_rwlock_rdlock(rwlock) != 0) { + dmnsn_error("Couldn't acquire read lock."); + } +} + +void +dmnsn_write_lock_impl(pthread_rwlock_t *rwlock) +{ + if (pthread_rwlock_wrlock(rwlock) != 0) { + dmnsn_error("Couldn't acquire write lock."); + } +} + +void +dmnsn_unlock_rwlock_impl(pthread_rwlock_t *rwlock) +{ + if (pthread_rwlock_unlock(rwlock) != 0) { + dmnsn_error("Couldn't unlock read-write lock."); + } +} + +void +dmnsn_destroy_rwlock(pthread_rwlock_t *rwlock) +{ + if (pthread_rwlock_destroy(rwlock) != 0) { + dmnsn_warning("Couldn't destroy read-write lock."); + } +} + +void +dmnsn_initialize_cond(pthread_cond_t *cond) +{ + if (pthread_cond_init(cond, NULL) != 0) { + dmnsn_error("Couldn't initialize condition variable."); + } +} + +void +dmnsn_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) +{ + if (pthread_cond_wait(cond, mutex) != 0) { + dmnsn_error("Couldn't wait on condition variable."); + } +} + +void +dmnsn_cond_broadcast(pthread_cond_t *cond) +{ + if (pthread_cond_broadcast(cond) != 0) { + dmnsn_error("Couldn't signal condition variable."); + } +} + +void +dmnsn_destroy_cond(pthread_cond_t *cond) +{ + if (pthread_cond_destroy(cond) != 0) { + dmnsn_warning("Couldn't destroy condition variable."); + } +} + +void +dmnsn_once(pthread_once_t *once, dmnsn_once_fn *once_fn) +{ + if (pthread_once(once, once_fn) != 0) { + dmnsn_error("Couldn't call one-shot function."); + } +} + +void +dmnsn_key_create(pthread_key_t *key, dmnsn_callback_fn *destructor) +{ + if (pthread_key_create(key, destructor) != 0) { + dmnsn_error("Couldn't initialize thread-specific pointer."); + } +} + +void +dmnsn_setspecific(pthread_key_t key, const void *value) +{ + if (pthread_setspecific(key, value) != 0) { + dmnsn_error("Couldn't set thread-specific pointer."); + } +} + +void +dmnsn_key_delete(pthread_key_t key) +{ + if (pthread_key_delete(key) != 0) { + dmnsn_warning("Couldn't destroy thread-specific pointer."); + } +} + +void +dmnsn_join_thread(pthread_t thread, void **retval) +{ + if (pthread_join(thread, retval) != 0) { + dmnsn_error("Couldn't join thread."); + } +} |