diff options
Diffstat (limited to 'libs/libmdbx/src/test/osal-unix.cc')
-rw-r--r-- | libs/libmdbx/src/test/osal-unix.cc | 434 |
1 files changed, 326 insertions, 108 deletions
diff --git a/libs/libmdbx/src/test/osal-unix.cc b/libs/libmdbx/src/test/osal-unix.cc index 0157bace23..4b8694c708 100644 --- a/libs/libmdbx/src/test/osal-unix.cc +++ b/libs/libmdbx/src/test/osal-unix.cc @@ -1,4 +1,4 @@ -/* +/* * Copyright 2017-2019 Leonid Yuriev <leo@yuriev.ru> * and other libmdbx authors: please see AUTHORS file. * All rights reserved. @@ -21,112 +21,266 @@ #include <sys/wait.h> #include <unistd.h> -#ifdef __APPLE__ +#ifndef MDBX_LOCKING +#error "Opps, MDBX_LOCKING is undefined!" +#endif + +#if defined(__APPLE__) && (MDBX_LOCKING == MDBX_LOCKING_POSIX2001 || \ + MDBX_LOCKING == MDBX_LOCKING_POSIX2008) #include "darwin/pthread_barrier.c" +#endif /* __APPLE__ && MDBX_LOCKING >= MDBX_LOCKING_POSIX2001 */ + +#if MDBX_LOCKING == MDBX_LOCKING_SYSV +#include <sys/ipc.h> +#include <sys/sem.h> +#endif /* MDBX_LOCKING == MDBX_LOCKING_SYSV */ + +#if MDBX_LOCKING == MDBX_LOCKING_POSIX1988 +#include <semaphore.h> + +#if __cplusplus >= 201103L +#include <atomic> +static __inline __maybe_unused int atomic_decrement(std::atomic_int *p) { + return std::atomic_fetch_sub(p, 1) - 1; +} +#else +static __inline __maybe_unused int atomic_decrement(volatile int *p) { +#if defined(__GNUC__) || defined(__clang__) + return __sync_sub_and_fetch(p, 1); +#elif defined(_MSC_VER) + STATIC_ASSERT(sizeof(volatile long) == sizeof(volatile int)); + return _InterlockedDecrement((volatile long *)p); +#elif defined(__APPLE__) + return OSAtomicDecrement32Barrier((volatile int *)p); +#else +#error FIXME: Unsupported compiler #endif +} +#endif /* C++11 */ +#endif /* MDBX_LOCKING == MDBX_LOCKING_POSIX1988 */ + +#if MDBX_LOCKING == MDBX_LOCKING_SYSV +static int ipc; +static pid_t ipc_overlord_pid; +static void ipc_remove(void) { + if (ipc_overlord_pid == getpid()) + semctl(ipc, 0, IPC_RMID, nullptr); +} +#else struct shared_t { +#if MDBX_LOCKING == MDBX_LOCKING_POSIX2001 || \ + MDBX_LOCKING == MDBX_LOCKING_POSIX2008 pthread_barrier_t barrier; pthread_mutex_t mutex; - size_t conds_size; - pthread_cond_t conds[1]; + size_t count; + pthread_cond_t events[1]; +#elif MDBX_LOCKING == MDBX_LOCKING_POSIX1988 + struct { +#if __cplusplus >= 201103L + std::atomic_int countdown; +#else + volatile int countdown; +#endif /* C++11 */ + sem_t sema; + } barrier; + size_t count; + sem_t events[1]; +#else +#error "FIXME" +#endif /* MDBX_LOCKING */ }; - static shared_t *shared; +#endif /* MDBX_LOCKING != MDBX_LOCKING_SYSV */ void osal_wait4barrier(void) { +#if MDBX_LOCKING == MDBX_LOCKING_SYSV + struct sembuf op; + op.sem_num = 0; + op.sem_op = -1; + op.sem_flg = IPC_NOWAIT; + if (semop(ipc, &op, 1)) + failure_perror("semop(dec)", errno); + op.sem_op = 0; + op.sem_flg = 0; + if (semop(ipc, &op, 1)) + failure_perror("semop(wait)", errno); +#elif MDBX_LOCKING == MDBX_LOCKING_POSIX2001 || \ + MDBX_LOCKING == MDBX_LOCKING_POSIX2008 assert(shared != nullptr && shared != MAP_FAILED); - int rc = pthread_barrier_wait(&shared->barrier); - if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { - failure_perror("pthread_barrier_wait(shared)", rc); - } + int err = pthread_barrier_wait(&shared->barrier); + if (err != 0 && err != PTHREAD_BARRIER_SERIAL_THREAD) + failure_perror("pthread_barrier_wait(shared)", err); +#elif MDBX_LOCKING == MDBX_LOCKING_POSIX1988 + assert(shared != nullptr && shared != MAP_FAILED); + int err = (atomic_decrement(&shared->barrier.countdown) > 0 && + sem_wait(&shared->barrier.sema)) + ? errno + : 0; + if (err != 0) + failure_perror("sem_wait(shared)", err); + if (sem_post(&shared->barrier.sema)) + failure_perror("sem_post(shared)", errno); +#else +#error "FIXME" +#endif /* MDBX_LOCKING */ } void osal_setup(const std::vector<actor_config> &actors) { +#if MDBX_LOCKING == MDBX_LOCKING_SYSV + if (ipc_overlord_pid) + failure("ipc already created by %ld pid", (long)ipc_overlord_pid); + ipc_overlord_pid = getpid(); +#ifndef SEM_A +#define SEM_A S_IRUSR +#endif +#ifndef SEM_R +#define SEM_R S_IWUSR +#endif + ipc = semget(IPC_PRIVATE, actors.size() + 2, IPC_CREAT | SEM_A | SEM_R); + if (ipc < 0) + failure_perror("semget(IPC_PRIVATE, shared_sems)", errno); + if (atexit(ipc_remove)) + failure_perror("atexit(ipc_remove)", errno); + if (semctl(ipc, 0, SETVAL, (int)(actors.size() + 1))) + failure_perror("semctl(SETVAL.0, shared_sems)", errno); + for (size_t i = 1; i < actors.size() + 2; ++i) + if (semctl(ipc, i, SETVAL, 1)) + failure_perror("semctl(SETVAL.N, shared_sems)", errno); +#else assert(shared == nullptr); - - pthread_mutexattr_t mutexattr; - int rc = pthread_mutexattr_init(&mutexattr); - if (rc) - failure_perror("pthread_mutexattr_init()", rc); - rc = pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED); - if (rc) - failure_perror("pthread_mutexattr_setpshared()", rc); - - pthread_barrierattr_t barrierattr; - rc = pthread_barrierattr_init(&barrierattr); - if (rc) - failure_perror("pthread_barrierattr_init()", rc); - rc = pthread_barrierattr_setpshared(&barrierattr, PTHREAD_PROCESS_SHARED); - if (rc) - failure_perror("pthread_barrierattr_setpshared()", rc); - - pthread_condattr_t condattr; - rc = pthread_condattr_init(&condattr); - if (rc) - failure_perror("pthread_condattr_init()", rc); - rc = pthread_condattr_setpshared(&condattr, PTHREAD_PROCESS_SHARED); - if (rc) - failure_perror("pthread_condattr_setpshared()", rc); - shared = (shared_t *)mmap( - nullptr, sizeof(shared_t) + actors.size() * sizeof(pthread_cond_t), - PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); + nullptr, sizeof(shared_t) + actors.size() * sizeof(shared->events[0]), + PROT_READ | PROT_WRITE, + MAP_SHARED | MAP_ANONYMOUS +#ifdef MAP_HASSEMAPHORE + | MAP_HASSEMAPHORE +#endif + , + -1, 0); if (MAP_FAILED == (void *)shared) failure_perror("mmap(shared_conds)", errno); - rc = pthread_mutex_init(&shared->mutex, &mutexattr); - if (rc) - failure_perror("pthread_mutex_init(shared)", rc); + shared->count = actors.size() + 1; - rc = pthread_barrier_init(&shared->barrier, &barrierattr, actors.size() + 1); - if (rc) - failure_perror("pthread_barrier_init(shared)", rc); +#if MDBX_LOCKING == MDBX_LOCKING_POSIX2001 || \ + MDBX_LOCKING == MDBX_LOCKING_POSIX2008 + pthread_barrierattr_t barrierattr; + int err = pthread_barrierattr_init(&barrierattr); + if (err) + failure_perror("pthread_barrierattr_init()", err); + err = pthread_barrierattr_setpshared(&barrierattr, PTHREAD_PROCESS_SHARED); + if (err) + failure_perror("pthread_barrierattr_setpshared()", err); + + err = pthread_barrier_init(&shared->barrier, &barrierattr, shared->count); + if (err) + failure_perror("pthread_barrier_init(shared)", err); + pthread_barrierattr_destroy(&barrierattr); - const size_t n = actors.size() + 1; - for (size_t i = 0; i < n; ++i) { - pthread_cond_t *event = &shared->conds[i]; - rc = pthread_cond_init(event, &condattr); - if (rc) - failure_perror("pthread_cond_init(shared)", rc); + pthread_mutexattr_t mutexattr; + err = pthread_mutexattr_init(&mutexattr); + if (err) + failure_perror("pthread_mutexattr_init()", err); + err = pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED); + if (err) + failure_perror("pthread_mutexattr_setpshared()", err); + + pthread_condattr_t condattr; + err = pthread_condattr_init(&condattr); + if (err) + failure_perror("pthread_condattr_init()", err); + err = pthread_condattr_setpshared(&condattr, PTHREAD_PROCESS_SHARED); + if (err) + failure_perror("pthread_condattr_setpshared()", err); + + err = pthread_mutex_init(&shared->mutex, &mutexattr); + if (err) + failure_perror("pthread_mutex_init(shared)", err); + + for (size_t i = 0; i < shared->count; ++i) { + pthread_cond_t *event = &shared->events[i]; + err = pthread_cond_init(event, &condattr); + if (err) + failure_perror("pthread_cond_init(shared)", err); log_trace("osal_setup: event(shared pthread_cond) %" PRIuPTR " -> %p", i, - event); + __Wpedantic_format_voidptr(event)); } - shared->conds_size = actors.size() + 1; - - pthread_barrierattr_destroy(&barrierattr); pthread_condattr_destroy(&condattr); pthread_mutexattr_destroy(&mutexattr); +#elif MDBX_LOCKING == MDBX_LOCKING_POSIX1988 + shared->barrier.countdown = shared->count; + if (sem_init(&shared->barrier.sema, true, 1)) + failure_perror("sem_init(shared.barrier)", errno); + for (size_t i = 0; i < shared->count; ++i) { + sem_t *event = &shared->events[i]; + if (sem_init(event, true, 0)) + failure_perror("sem_init(shared.event)", errno); + log_trace("osal_setup: event(shared sem_init) %" PRIuPTR " -> %p", i, + __Wpedantic_format_voidptr(event)); + } +#else +#error "FIXME" +#endif /* MDBX_LOCKING */ +#endif /* MDBX_LOCKING != MDBX_LOCKING_SYSV */ } void osal_broadcast(unsigned id) { - assert(shared != nullptr && shared != MAP_FAILED); log_trace("osal_broadcast: event %u", id); - if (id >= shared->conds_size) +#if MDBX_LOCKING == MDBX_LOCKING_SYSV + if (semctl(ipc, id + 1, SETVAL, 0)) + failure_perror("semctl(SETVAL)", errno); +#else + assert(shared != nullptr && shared != MAP_FAILED); + if (id >= shared->count) failure("osal_broadcast: id > limit"); - int rc = pthread_cond_broadcast(shared->conds + id); - if (rc) - failure_perror("sem_post(shared)", rc); +#if MDBX_LOCKING == MDBX_LOCKING_POSIX2001 || \ + MDBX_LOCKING == MDBX_LOCKING_POSIX2008 + int err = pthread_cond_broadcast(shared->events + id); + if (err) + failure_perror("pthread_cond_broadcast(shared)", err); +#elif MDBX_LOCKING == MDBX_LOCKING_POSIX1988 + if (sem_post(shared->events + id)) + failure_perror("sem_post(shared)", errno); +#else +#error "FIXME" +#endif /* MDBX_LOCKING */ +#endif /* MDBX_LOCKING != MDBX_LOCKING_SYSV */ } int osal_waitfor(unsigned id) { - assert(shared != nullptr && shared != MAP_FAILED); - log_trace("osal_waitfor: event %u", id); - if (id >= shared->conds_size) +#if MDBX_LOCKING == MDBX_LOCKING_SYSV + struct sembuf op; + memset(&op, 0, sizeof(op)); + op.sem_num = (short)(id + 1); + int rc = semop(ipc, &op, 1) ? errno : MDBX_SUCCESS; +#else + assert(shared != nullptr && shared != MAP_FAILED); + if (id >= shared->count) failure("osal_waitfor: id > limit"); +#if MDBX_LOCKING == MDBX_LOCKING_POSIX2001 || \ + MDBX_LOCKING == MDBX_LOCKING_POSIX2008 int rc = pthread_mutex_lock(&shared->mutex); if (rc != 0) failure_perror("pthread_mutex_lock(shared)", rc); - rc = pthread_cond_wait(shared->conds + id, &shared->mutex); + rc = pthread_cond_wait(shared->events + id, &shared->mutex); if (rc && rc != EINTR) failure_perror("pthread_cond_wait(shared)", rc); rc = pthread_mutex_unlock(&shared->mutex); if (rc != 0) failure_perror("pthread_mutex_unlock(shared)", rc); +#elif MDBX_LOCKING == MDBX_LOCKING_POSIX1988 + int rc = sem_wait(shared->events + id) ? errno : 0; + if (rc == 0 && sem_post(shared->events + id)) + failure_perror("sem_post(shared)", errno); +#else +#error "FIXME" +#endif /* MDBX_LOCKING */ +#endif /* MDBX_LOCKING != MDBX_LOCKING_SYSV */ return (rc == 0) ? true : false; } @@ -149,21 +303,69 @@ bool actor_config::osal_deserialize(const char *str, const char *end, //----------------------------------------------------------------------------- +static pid_t overlord_pid; + +static volatile sig_atomic_t sigusr1_head, sigusr2_head; +static void handler_SIGUSR(int signum) { + switch (signum) { + case SIGUSR1: + sigusr1_head += 1; + return; + case SIGUSR2: + sigusr2_head += 1; + return; + default: + abort(); + } +} + +bool osal_progress_push(bool active) { + if (overlord_pid) { + if (kill(overlord_pid, active ? SIGUSR1 : SIGUSR2)) + failure_perror("osal_progress_push: kill(overload)", errno); + return true; + } + + return false; +} + +//----------------------------------------------------------------------------- + static std::unordered_map<pid_t, actor_status> childs; -static void handler_SIGCHLD(int unused) { (void)unused; } +static volatile sig_atomic_t sigalarm_head; +static void handler_SIGCHLD(int signum) { + if (signum == SIGALRM) + sigalarm_head += 1; +} mdbx_pid_t osal_getpid(void) { return getpid(); } int osal_delay(unsigned seconds) { return sleep(seconds) ? errno : 0; } int osal_actor_start(const actor_config &config, mdbx_pid_t &pid) { - if (childs.empty()) - signal(SIGCHLD, handler_SIGCHLD); + if (childs.empty()) { + struct sigaction act; + memset(&act, 0, sizeof(act)); + act.sa_handler = handler_SIGCHLD; + sigaction(SIGCHLD, &act, nullptr); + sigaction(SIGALRM, &act, nullptr); + act.sa_handler = handler_SIGUSR; + sigaction(SIGUSR1, &act, nullptr); + sigaction(SIGUSR2, &act, nullptr); + + sigset_t mask; + sigemptyset(&mask); + sigaddset(&mask, SIGCHLD); + sigaddset(&mask, SIGUSR1); + sigaddset(&mask, SIGUSR2); + sigprocmask(SIG_UNBLOCK, &mask, nullptr); + } pid = fork(); if (pid == 0) { + overlord_pid = getppid(); const bool result = test_execute(config); exit(result ? EXIT_SUCCESS : EXIT_FAILURE); } @@ -171,7 +373,8 @@ int osal_actor_start(const actor_config &config, mdbx_pid_t &pid) { if (pid < 0) return errno; - log_trace("osal_actor_start: fork pid %i for %u", pid, config.actor_id); + log_trace("osal_actor_start: fork pid %ld for %u", (long)pid, + config.actor_id); childs[pid] = as_running; return 0; } @@ -186,60 +389,67 @@ void osal_killall_actors(void) { } int osal_actor_poll(mdbx_pid_t &pid, unsigned timeout) { - struct timespec ts; - ts.tv_nsec = 0; - ts.tv_sec = (timeout > INT_MAX) ? INT_MAX : timeout; -retry: - int status, options = WNOHANG; + static sig_atomic_t sigalarm_tail; + alarm(0) /* cancel prev timeout */; + sigalarm_tail = sigalarm_head /* reset timeout flag */; + + int options = WNOHANG; + if (timeout) { + alarm((timeout > INT_MAX) ? INT_MAX : timeout); + options = 0; + } + #ifdef WUNTRACED options |= WUNTRACED; #endif #ifdef WCONTINUED options |= WCONTINUED; #endif - pid = waitpid(0, &status, options); - - if (pid > 0) { - if (WIFEXITED(status)) - childs[pid] = - (WEXITSTATUS(status) == EXIT_SUCCESS) ? as_successful : as_failed; - else if (WCOREDUMP(status)) - childs[pid] = as_coredump; - else if (WIFSIGNALED(status)) - childs[pid] = as_killed; - else if (WIFSTOPPED(status)) - childs[pid] = as_debuging; - else if (WIFCONTINUED(status)) - childs[pid] = as_running; - else { - assert(false); - } - return 0; - } - if (pid == 0) { - /* child still running */ - if (ts.tv_sec == 0 && ts.tv_nsec == 0) - ts.tv_nsec = 1; - if (nanosleep(&ts, &ts) == 0) { - /* timeout and no signal from child */ - pid = 0; + while (sigalarm_tail == sigalarm_head) { + int status; + pid = waitpid(0, &status, options); + + if (pid > 0) { + if (WIFEXITED(status)) + childs[pid] = + (WEXITSTATUS(status) == EXIT_SUCCESS) ? as_successful : as_failed; + else if (WCOREDUMP(status)) + childs[pid] = as_coredump; + else if (WIFSIGNALED(status)) + childs[pid] = as_killed; + else if (WIFSTOPPED(status)) + childs[pid] = as_debugging; + else if (WIFCONTINUED(status)) + childs[pid] = as_running; + else { + assert(false); + } return 0; } - if (errno == EINTR) - goto retry; - } - switch (errno) { - case EINTR: - pid = 0; - return 0; + static sig_atomic_t sigusr1_tail, sigusr2_tail; + if (sigusr1_tail != sigusr1_head) { + sigusr1_tail = sigusr1_head; + logging::progress_canary(true); + if (pid < 0 && errno == EINTR) + continue; + } + if (sigusr2_tail != sigusr2_head) { + sigusr2_tail = sigusr2_head; + logging::progress_canary(false); + if (pid < 0 && errno == EINTR) + continue; + } - case ECHILD: - default: - pid = 0; - return errno; + if (pid == 0) + break; + + int err = errno; + if (err != EINTR) + return err; } + return 0 /* timeout */; } void osal_yield(void) { @@ -254,9 +464,17 @@ void osal_udelay(unsigned us) { static unsigned threshold_us; if (threshold_us == 0) { +#if defined(_POSIX_CPUTIME) && _POSIX_CPUTIME > -1 && \ + defined(CLOCK_PROCESS_CPUTIME_ID) if (clock_getres(CLOCK_PROCESS_CPUTIME_ID, &ts)) { int rc = errno; - failure_perror("clock_getres(CLOCK_PROCESS_CPUTIME_ID)", rc); + log_warning("clock_getres(CLOCK_PROCESS_CPUTIME_ID), failed errno %d", + rc); + } +#endif /* CLOCK_PROCESS_CPUTIME_ID */ + if (threshold_us == 0 && clock_getres(CLOCK_MONOTONIC, &ts)) { + int rc = errno; + failure_perror("clock_getres(CLOCK_MONOTONIC)", rc); } chrono::time threshold = chrono::from_timespec(ts); assert(threshold.seconds() == 0); |