summaryrefslogtreecommitdiff
path: root/libs/libmdbx/src/test/osal-unix.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libs/libmdbx/src/test/osal-unix.cc')
-rw-r--r--libs/libmdbx/src/test/osal-unix.cc434
1 files changed, 326 insertions, 108 deletions
diff --git a/libs/libmdbx/src/test/osal-unix.cc b/libs/libmdbx/src/test/osal-unix.cc
index 0157bace23..4b8694c708 100644
--- a/libs/libmdbx/src/test/osal-unix.cc
+++ b/libs/libmdbx/src/test/osal-unix.cc
@@ -1,4 +1,4 @@
-/*
+/*
* Copyright 2017-2019 Leonid Yuriev <leo@yuriev.ru>
* and other libmdbx authors: please see AUTHORS file.
* All rights reserved.
@@ -21,112 +21,266 @@
#include <sys/wait.h>
#include <unistd.h>
-#ifdef __APPLE__
+#ifndef MDBX_LOCKING
+#error "Opps, MDBX_LOCKING is undefined!"
+#endif
+
+#if defined(__APPLE__) && (MDBX_LOCKING == MDBX_LOCKING_POSIX2001 || \
+ MDBX_LOCKING == MDBX_LOCKING_POSIX2008)
#include "darwin/pthread_barrier.c"
+#endif /* __APPLE__ && MDBX_LOCKING >= MDBX_LOCKING_POSIX2001 */
+
+#if MDBX_LOCKING == MDBX_LOCKING_SYSV
+#include <sys/ipc.h>
+#include <sys/sem.h>
+#endif /* MDBX_LOCKING == MDBX_LOCKING_SYSV */
+
+#if MDBX_LOCKING == MDBX_LOCKING_POSIX1988
+#include <semaphore.h>
+
+#if __cplusplus >= 201103L
+#include <atomic>
+static __inline __maybe_unused int atomic_decrement(std::atomic_int *p) {
+ return std::atomic_fetch_sub(p, 1) - 1;
+}
+#else
+static __inline __maybe_unused int atomic_decrement(volatile int *p) {
+#if defined(__GNUC__) || defined(__clang__)
+ return __sync_sub_and_fetch(p, 1);
+#elif defined(_MSC_VER)
+ STATIC_ASSERT(sizeof(volatile long) == sizeof(volatile int));
+ return _InterlockedDecrement((volatile long *)p);
+#elif defined(__APPLE__)
+ return OSAtomicDecrement32Barrier((volatile int *)p);
+#else
+#error FIXME: Unsupported compiler
#endif
+}
+#endif /* C++11 */
+#endif /* MDBX_LOCKING == MDBX_LOCKING_POSIX1988 */
+
+#if MDBX_LOCKING == MDBX_LOCKING_SYSV
+static int ipc;
+static pid_t ipc_overlord_pid;
+static void ipc_remove(void) {
+ if (ipc_overlord_pid == getpid())
+ semctl(ipc, 0, IPC_RMID, nullptr);
+}
+#else
struct shared_t {
+#if MDBX_LOCKING == MDBX_LOCKING_POSIX2001 || \
+ MDBX_LOCKING == MDBX_LOCKING_POSIX2008
pthread_barrier_t barrier;
pthread_mutex_t mutex;
- size_t conds_size;
- pthread_cond_t conds[1];
+ size_t count;
+ pthread_cond_t events[1];
+#elif MDBX_LOCKING == MDBX_LOCKING_POSIX1988
+ struct {
+#if __cplusplus >= 201103L
+ std::atomic_int countdown;
+#else
+ volatile int countdown;
+#endif /* C++11 */
+ sem_t sema;
+ } barrier;
+ size_t count;
+ sem_t events[1];
+#else
+#error "FIXME"
+#endif /* MDBX_LOCKING */
};
-
static shared_t *shared;
+#endif /* MDBX_LOCKING != MDBX_LOCKING_SYSV */
void osal_wait4barrier(void) {
+#if MDBX_LOCKING == MDBX_LOCKING_SYSV
+ struct sembuf op;
+ op.sem_num = 0;
+ op.sem_op = -1;
+ op.sem_flg = IPC_NOWAIT;
+ if (semop(ipc, &op, 1))
+ failure_perror("semop(dec)", errno);
+ op.sem_op = 0;
+ op.sem_flg = 0;
+ if (semop(ipc, &op, 1))
+ failure_perror("semop(wait)", errno);
+#elif MDBX_LOCKING == MDBX_LOCKING_POSIX2001 || \
+ MDBX_LOCKING == MDBX_LOCKING_POSIX2008
assert(shared != nullptr && shared != MAP_FAILED);
- int rc = pthread_barrier_wait(&shared->barrier);
- if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
- failure_perror("pthread_barrier_wait(shared)", rc);
- }
+ int err = pthread_barrier_wait(&shared->barrier);
+ if (err != 0 && err != PTHREAD_BARRIER_SERIAL_THREAD)
+ failure_perror("pthread_barrier_wait(shared)", err);
+#elif MDBX_LOCKING == MDBX_LOCKING_POSIX1988
+ assert(shared != nullptr && shared != MAP_FAILED);
+ int err = (atomic_decrement(&shared->barrier.countdown) > 0 &&
+ sem_wait(&shared->barrier.sema))
+ ? errno
+ : 0;
+ if (err != 0)
+ failure_perror("sem_wait(shared)", err);
+ if (sem_post(&shared->barrier.sema))
+ failure_perror("sem_post(shared)", errno);
+#else
+#error "FIXME"
+#endif /* MDBX_LOCKING */
}
void osal_setup(const std::vector<actor_config> &actors) {
+#if MDBX_LOCKING == MDBX_LOCKING_SYSV
+ if (ipc_overlord_pid)
+ failure("ipc already created by %ld pid", (long)ipc_overlord_pid);
+ ipc_overlord_pid = getpid();
+#ifndef SEM_A
+#define SEM_A S_IRUSR
+#endif
+#ifndef SEM_R
+#define SEM_R S_IWUSR
+#endif
+ ipc = semget(IPC_PRIVATE, actors.size() + 2, IPC_CREAT | SEM_A | SEM_R);
+ if (ipc < 0)
+ failure_perror("semget(IPC_PRIVATE, shared_sems)", errno);
+ if (atexit(ipc_remove))
+ failure_perror("atexit(ipc_remove)", errno);
+ if (semctl(ipc, 0, SETVAL, (int)(actors.size() + 1)))
+ failure_perror("semctl(SETVAL.0, shared_sems)", errno);
+ for (size_t i = 1; i < actors.size() + 2; ++i)
+ if (semctl(ipc, i, SETVAL, 1))
+ failure_perror("semctl(SETVAL.N, shared_sems)", errno);
+#else
assert(shared == nullptr);
-
- pthread_mutexattr_t mutexattr;
- int rc = pthread_mutexattr_init(&mutexattr);
- if (rc)
- failure_perror("pthread_mutexattr_init()", rc);
- rc = pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED);
- if (rc)
- failure_perror("pthread_mutexattr_setpshared()", rc);
-
- pthread_barrierattr_t barrierattr;
- rc = pthread_barrierattr_init(&barrierattr);
- if (rc)
- failure_perror("pthread_barrierattr_init()", rc);
- rc = pthread_barrierattr_setpshared(&barrierattr, PTHREAD_PROCESS_SHARED);
- if (rc)
- failure_perror("pthread_barrierattr_setpshared()", rc);
-
- pthread_condattr_t condattr;
- rc = pthread_condattr_init(&condattr);
- if (rc)
- failure_perror("pthread_condattr_init()", rc);
- rc = pthread_condattr_setpshared(&condattr, PTHREAD_PROCESS_SHARED);
- if (rc)
- failure_perror("pthread_condattr_setpshared()", rc);
-
shared = (shared_t *)mmap(
- nullptr, sizeof(shared_t) + actors.size() * sizeof(pthread_cond_t),
- PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
+ nullptr, sizeof(shared_t) + actors.size() * sizeof(shared->events[0]),
+ PROT_READ | PROT_WRITE,
+ MAP_SHARED | MAP_ANONYMOUS
+#ifdef MAP_HASSEMAPHORE
+ | MAP_HASSEMAPHORE
+#endif
+ ,
+ -1, 0);
if (MAP_FAILED == (void *)shared)
failure_perror("mmap(shared_conds)", errno);
- rc = pthread_mutex_init(&shared->mutex, &mutexattr);
- if (rc)
- failure_perror("pthread_mutex_init(shared)", rc);
+ shared->count = actors.size() + 1;
- rc = pthread_barrier_init(&shared->barrier, &barrierattr, actors.size() + 1);
- if (rc)
- failure_perror("pthread_barrier_init(shared)", rc);
+#if MDBX_LOCKING == MDBX_LOCKING_POSIX2001 || \
+ MDBX_LOCKING == MDBX_LOCKING_POSIX2008
+ pthread_barrierattr_t barrierattr;
+ int err = pthread_barrierattr_init(&barrierattr);
+ if (err)
+ failure_perror("pthread_barrierattr_init()", err);
+ err = pthread_barrierattr_setpshared(&barrierattr, PTHREAD_PROCESS_SHARED);
+ if (err)
+ failure_perror("pthread_barrierattr_setpshared()", err);
+
+ err = pthread_barrier_init(&shared->barrier, &barrierattr, shared->count);
+ if (err)
+ failure_perror("pthread_barrier_init(shared)", err);
+ pthread_barrierattr_destroy(&barrierattr);
- const size_t n = actors.size() + 1;
- for (size_t i = 0; i < n; ++i) {
- pthread_cond_t *event = &shared->conds[i];
- rc = pthread_cond_init(event, &condattr);
- if (rc)
- failure_perror("pthread_cond_init(shared)", rc);
+ pthread_mutexattr_t mutexattr;
+ err = pthread_mutexattr_init(&mutexattr);
+ if (err)
+ failure_perror("pthread_mutexattr_init()", err);
+ err = pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED);
+ if (err)
+ failure_perror("pthread_mutexattr_setpshared()", err);
+
+ pthread_condattr_t condattr;
+ err = pthread_condattr_init(&condattr);
+ if (err)
+ failure_perror("pthread_condattr_init()", err);
+ err = pthread_condattr_setpshared(&condattr, PTHREAD_PROCESS_SHARED);
+ if (err)
+ failure_perror("pthread_condattr_setpshared()", err);
+
+ err = pthread_mutex_init(&shared->mutex, &mutexattr);
+ if (err)
+ failure_perror("pthread_mutex_init(shared)", err);
+
+ for (size_t i = 0; i < shared->count; ++i) {
+ pthread_cond_t *event = &shared->events[i];
+ err = pthread_cond_init(event, &condattr);
+ if (err)
+ failure_perror("pthread_cond_init(shared)", err);
log_trace("osal_setup: event(shared pthread_cond) %" PRIuPTR " -> %p", i,
- event);
+ __Wpedantic_format_voidptr(event));
}
- shared->conds_size = actors.size() + 1;
-
- pthread_barrierattr_destroy(&barrierattr);
pthread_condattr_destroy(&condattr);
pthread_mutexattr_destroy(&mutexattr);
+#elif MDBX_LOCKING == MDBX_LOCKING_POSIX1988
+ shared->barrier.countdown = shared->count;
+ if (sem_init(&shared->barrier.sema, true, 1))
+ failure_perror("sem_init(shared.barrier)", errno);
+ for (size_t i = 0; i < shared->count; ++i) {
+ sem_t *event = &shared->events[i];
+ if (sem_init(event, true, 0))
+ failure_perror("sem_init(shared.event)", errno);
+ log_trace("osal_setup: event(shared sem_init) %" PRIuPTR " -> %p", i,
+ __Wpedantic_format_voidptr(event));
+ }
+#else
+#error "FIXME"
+#endif /* MDBX_LOCKING */
+#endif /* MDBX_LOCKING != MDBX_LOCKING_SYSV */
}
void osal_broadcast(unsigned id) {
- assert(shared != nullptr && shared != MAP_FAILED);
log_trace("osal_broadcast: event %u", id);
- if (id >= shared->conds_size)
+#if MDBX_LOCKING == MDBX_LOCKING_SYSV
+ if (semctl(ipc, id + 1, SETVAL, 0))
+ failure_perror("semctl(SETVAL)", errno);
+#else
+ assert(shared != nullptr && shared != MAP_FAILED);
+ if (id >= shared->count)
failure("osal_broadcast: id > limit");
- int rc = pthread_cond_broadcast(shared->conds + id);
- if (rc)
- failure_perror("sem_post(shared)", rc);
+#if MDBX_LOCKING == MDBX_LOCKING_POSIX2001 || \
+ MDBX_LOCKING == MDBX_LOCKING_POSIX2008
+ int err = pthread_cond_broadcast(shared->events + id);
+ if (err)
+ failure_perror("pthread_cond_broadcast(shared)", err);
+#elif MDBX_LOCKING == MDBX_LOCKING_POSIX1988
+ if (sem_post(shared->events + id))
+ failure_perror("sem_post(shared)", errno);
+#else
+#error "FIXME"
+#endif /* MDBX_LOCKING */
+#endif /* MDBX_LOCKING != MDBX_LOCKING_SYSV */
}
int osal_waitfor(unsigned id) {
- assert(shared != nullptr && shared != MAP_FAILED);
-
log_trace("osal_waitfor: event %u", id);
- if (id >= shared->conds_size)
+#if MDBX_LOCKING == MDBX_LOCKING_SYSV
+ struct sembuf op;
+ memset(&op, 0, sizeof(op));
+ op.sem_num = (short)(id + 1);
+ int rc = semop(ipc, &op, 1) ? errno : MDBX_SUCCESS;
+#else
+ assert(shared != nullptr && shared != MAP_FAILED);
+ if (id >= shared->count)
failure("osal_waitfor: id > limit");
+#if MDBX_LOCKING == MDBX_LOCKING_POSIX2001 || \
+ MDBX_LOCKING == MDBX_LOCKING_POSIX2008
int rc = pthread_mutex_lock(&shared->mutex);
if (rc != 0)
failure_perror("pthread_mutex_lock(shared)", rc);
- rc = pthread_cond_wait(shared->conds + id, &shared->mutex);
+ rc = pthread_cond_wait(shared->events + id, &shared->mutex);
if (rc && rc != EINTR)
failure_perror("pthread_cond_wait(shared)", rc);
rc = pthread_mutex_unlock(&shared->mutex);
if (rc != 0)
failure_perror("pthread_mutex_unlock(shared)", rc);
+#elif MDBX_LOCKING == MDBX_LOCKING_POSIX1988
+ int rc = sem_wait(shared->events + id) ? errno : 0;
+ if (rc == 0 && sem_post(shared->events + id))
+ failure_perror("sem_post(shared)", errno);
+#else
+#error "FIXME"
+#endif /* MDBX_LOCKING */
+#endif /* MDBX_LOCKING != MDBX_LOCKING_SYSV */
return (rc == 0) ? true : false;
}
@@ -149,21 +303,69 @@ bool actor_config::osal_deserialize(const char *str, const char *end,
//-----------------------------------------------------------------------------
+static pid_t overlord_pid;
+
+static volatile sig_atomic_t sigusr1_head, sigusr2_head;
+static void handler_SIGUSR(int signum) {
+ switch (signum) {
+ case SIGUSR1:
+ sigusr1_head += 1;
+ return;
+ case SIGUSR2:
+ sigusr2_head += 1;
+ return;
+ default:
+ abort();
+ }
+}
+
+bool osal_progress_push(bool active) {
+ if (overlord_pid) {
+ if (kill(overlord_pid, active ? SIGUSR1 : SIGUSR2))
+ failure_perror("osal_progress_push: kill(overload)", errno);
+ return true;
+ }
+
+ return false;
+}
+
+//-----------------------------------------------------------------------------
+
static std::unordered_map<pid_t, actor_status> childs;
-static void handler_SIGCHLD(int unused) { (void)unused; }
+static volatile sig_atomic_t sigalarm_head;
+static void handler_SIGCHLD(int signum) {
+ if (signum == SIGALRM)
+ sigalarm_head += 1;
+}
mdbx_pid_t osal_getpid(void) { return getpid(); }
int osal_delay(unsigned seconds) { return sleep(seconds) ? errno : 0; }
int osal_actor_start(const actor_config &config, mdbx_pid_t &pid) {
- if (childs.empty())
- signal(SIGCHLD, handler_SIGCHLD);
+ if (childs.empty()) {
+ struct sigaction act;
+ memset(&act, 0, sizeof(act));
+ act.sa_handler = handler_SIGCHLD;
+ sigaction(SIGCHLD, &act, nullptr);
+ sigaction(SIGALRM, &act, nullptr);
+ act.sa_handler = handler_SIGUSR;
+ sigaction(SIGUSR1, &act, nullptr);
+ sigaction(SIGUSR2, &act, nullptr);
+
+ sigset_t mask;
+ sigemptyset(&mask);
+ sigaddset(&mask, SIGCHLD);
+ sigaddset(&mask, SIGUSR1);
+ sigaddset(&mask, SIGUSR2);
+ sigprocmask(SIG_UNBLOCK, &mask, nullptr);
+ }
pid = fork();
if (pid == 0) {
+ overlord_pid = getppid();
const bool result = test_execute(config);
exit(result ? EXIT_SUCCESS : EXIT_FAILURE);
}
@@ -171,7 +373,8 @@ int osal_actor_start(const actor_config &config, mdbx_pid_t &pid) {
if (pid < 0)
return errno;
- log_trace("osal_actor_start: fork pid %i for %u", pid, config.actor_id);
+ log_trace("osal_actor_start: fork pid %ld for %u", (long)pid,
+ config.actor_id);
childs[pid] = as_running;
return 0;
}
@@ -186,60 +389,67 @@ void osal_killall_actors(void) {
}
int osal_actor_poll(mdbx_pid_t &pid, unsigned timeout) {
- struct timespec ts;
- ts.tv_nsec = 0;
- ts.tv_sec = (timeout > INT_MAX) ? INT_MAX : timeout;
-retry:
- int status, options = WNOHANG;
+ static sig_atomic_t sigalarm_tail;
+ alarm(0) /* cancel prev timeout */;
+ sigalarm_tail = sigalarm_head /* reset timeout flag */;
+
+ int options = WNOHANG;
+ if (timeout) {
+ alarm((timeout > INT_MAX) ? INT_MAX : timeout);
+ options = 0;
+ }
+
#ifdef WUNTRACED
options |= WUNTRACED;
#endif
#ifdef WCONTINUED
options |= WCONTINUED;
#endif
- pid = waitpid(0, &status, options);
-
- if (pid > 0) {
- if (WIFEXITED(status))
- childs[pid] =
- (WEXITSTATUS(status) == EXIT_SUCCESS) ? as_successful : as_failed;
- else if (WCOREDUMP(status))
- childs[pid] = as_coredump;
- else if (WIFSIGNALED(status))
- childs[pid] = as_killed;
- else if (WIFSTOPPED(status))
- childs[pid] = as_debuging;
- else if (WIFCONTINUED(status))
- childs[pid] = as_running;
- else {
- assert(false);
- }
- return 0;
- }
- if (pid == 0) {
- /* child still running */
- if (ts.tv_sec == 0 && ts.tv_nsec == 0)
- ts.tv_nsec = 1;
- if (nanosleep(&ts, &ts) == 0) {
- /* timeout and no signal from child */
- pid = 0;
+ while (sigalarm_tail == sigalarm_head) {
+ int status;
+ pid = waitpid(0, &status, options);
+
+ if (pid > 0) {
+ if (WIFEXITED(status))
+ childs[pid] =
+ (WEXITSTATUS(status) == EXIT_SUCCESS) ? as_successful : as_failed;
+ else if (WCOREDUMP(status))
+ childs[pid] = as_coredump;
+ else if (WIFSIGNALED(status))
+ childs[pid] = as_killed;
+ else if (WIFSTOPPED(status))
+ childs[pid] = as_debugging;
+ else if (WIFCONTINUED(status))
+ childs[pid] = as_running;
+ else {
+ assert(false);
+ }
return 0;
}
- if (errno == EINTR)
- goto retry;
- }
- switch (errno) {
- case EINTR:
- pid = 0;
- return 0;
+ static sig_atomic_t sigusr1_tail, sigusr2_tail;
+ if (sigusr1_tail != sigusr1_head) {
+ sigusr1_tail = sigusr1_head;
+ logging::progress_canary(true);
+ if (pid < 0 && errno == EINTR)
+ continue;
+ }
+ if (sigusr2_tail != sigusr2_head) {
+ sigusr2_tail = sigusr2_head;
+ logging::progress_canary(false);
+ if (pid < 0 && errno == EINTR)
+ continue;
+ }
- case ECHILD:
- default:
- pid = 0;
- return errno;
+ if (pid == 0)
+ break;
+
+ int err = errno;
+ if (err != EINTR)
+ return err;
}
+ return 0 /* timeout */;
}
void osal_yield(void) {
@@ -254,9 +464,17 @@ void osal_udelay(unsigned us) {
static unsigned threshold_us;
if (threshold_us == 0) {
+#if defined(_POSIX_CPUTIME) && _POSIX_CPUTIME > -1 && \
+ defined(CLOCK_PROCESS_CPUTIME_ID)
if (clock_getres(CLOCK_PROCESS_CPUTIME_ID, &ts)) {
int rc = errno;
- failure_perror("clock_getres(CLOCK_PROCESS_CPUTIME_ID)", rc);
+ log_warning("clock_getres(CLOCK_PROCESS_CPUTIME_ID), failed errno %d",
+ rc);
+ }
+#endif /* CLOCK_PROCESS_CPUTIME_ID */
+ if (threshold_us == 0 && clock_getres(CLOCK_MONOTONIC, &ts)) {
+ int rc = errno;
+ failure_perror("clock_getres(CLOCK_MONOTONIC)", rc);
}
chrono::time threshold = chrono::from_timespec(ts);
assert(threshold.seconds() == 0);