DHO is Mostly Confused

Rants from Devon H. O'Dell

Wait-free Monitors for Concurrent Batching Workloads

Monitors are hard to use. Per Wikipedia’s entry on the topic, “a monitor is a synchronization construct that allows threads to have both mutual exclusion and the ability to wait (block) for a certain condition to become true.” The use case for such a construct is often to signal other threads that new work may be completed. Unfortunately, they’re really easy to use incorrectly, largely because many APIs for them are terrible.

In POSIX, monitors are provided through the API for condition variables. A new condition variable is created with pthread_cond_init(3); conditions are waited on and signalled using pthread_cond_wait(3) / pthread_cond_timedwait(3) and pthread_cond_signal(3) / pthread_cond_broadcast(3), respectively. To use the pthread_cond_wait(3) interface, you also need a mutex (because what you really have is a monitor); this mutex must be held before calling pthread_cond_wait(3), which will unlock the mutex before waiting on the condition, and return with the mutex still held. If this doesn’t sound subtle enough, consider the really crappy bit: the pthread_cond_signal(3) API does not accept a mutex as an argument at all! What you get from this is a ton of confusion about lost signals when people call pthread_cond_signal(3) without holding the mutex around signalling the condition.

And what happens in this scenario? You lose messages: notifications on a condition are not persistent. This API assumes that the only time the mutex will be unlocked is when a thread is waiting on a condition, and that signals will only occur with that mutex held. The problem is that, although the API had an opportunity to enforce this, it did not do so.

Apologists for the API say that one should “just know” how to use it correctly: these requirements are common knowledge. But misuse appears even in popular software, written by experienced individuals. One such example can be found in the source code of the widely popular Varnish Cache: the condvar is waited on with the lock held, but notifications are sent without the lock held. In fact, even though the condvar is waited on with the lock held, that lock is also released for portions of the loop that waits on the condition, so it’s still possible for the mutex to be acquired when the condition isn’t being waited upon. And it’s done this way because that mutex is conflated between the condition and shared mutable state in the structure it lives in. This misuse of condvars has existed in Varnish for over a decade.

Let’s take a step back and consider a couple use-cases for monitors.

First, let’s consider a log producer where many worker threads are producing log messages, and a consumer thread is persisting those to some kind of storage in batches. When no work is being done, we don’t want the consumer thread to spin waiting for logs to write. So our producers acquire a lock and signal a condition that the consumer waits on. This is problematic in high-volume services, because there’s significant contention on the monitor’s mutex. Additionally, if we have misused the API, it’s possible that a low-volume service with a burst of traffic will have logs delayed until the next log item occurs – which may be a while, since the service doesn’t have much volume.

Consider a sharded worker pool where one or more producers may generate work items, and a pool of workers must each do something different with these work items. In this scenario, every worker thread must see every message. You might be tempted to use pthread_cond_broadcast(3), but this turns out to be an anti-pattern. First of all, each waiter on the condvar must have acquired the mutex to return from pthread_cond_wait(3), so this means that your worker jobs actually execute serially. And if you’ve decided to solve this by unlocking the mutex while you perform your work, another producer may decide to broadcast while no workers are waiting. In reality, you have two options if you want to maintain correctness while also parallelizing your work: either you use N condition variables and N mutexes to signal N workers, or you need to wait on a semaphore (that is also protected by the monitor’s lock!) to ensure all your workers are waiting before you broadcast. Neither of these solutions are particularly elegant.

My final gripe with monitors is that they require lock-based synchronization. Lock-based synchronization is nearly the canonical way to protect shared mutable state from concurrent access, but this approach has several drawbacks.

Locks are not composable. When we talk about composability with regard to an API, we really mean a couple of things. First, a composable interface is self-contained: it does not have dependencies on external state for its correctness. Mutexes are not self-contained: their correct use depends on consuming the mutex API at every appropriate place in code, and the API is also sensitive to ordering. Ordering issues don’t sound bad when all you have to consider are lock and unlock operations, but sometimes mutable state must be protected by multiple mutexes. At this point, the ordering of which mutex is locked first is important, as is the ordering of unlock operations – again, at every point that these mutexes have to operate. Apologists will say that it’s easier to use locks than anything else; I have yet to see or work on a large, multi-threaded codebase that hasn’t run into issues directly related to lack of composability.

Another issue with locks is effectively how they work. If you ask folks what a lock does, they’ll typically say something like, “A lock protects concurrent access to shared memory within its critical section.” But this isn’t really what locks do. Locks are a construct that serialize execution through their critical section such that only a single process may execute that code at a time. This has the side-effect of achieving the goal of protecting concurrent access to shared mutable state, but unless you consider how it achieves that, you’re missing numerous downsides. While a lock is held, no other concurrent task may acquire the lock. This means that resources dedicated to running code are effectively stalled waiting for the critical section to finish. Maybe this doesn’t seem so bad; critical sections are typically pretty short. The problem is that modern schedulers only allow tasks to run for so long. Once you’ve used your timeshare, it’s going to want to run something else. If you’ve lost your timeshare in the middle of a critical section, literally every other process waiting on that lock is also not runnable, and your system makes no progress.

These issues are fundamental to the monitor API provided by pthreads. (Other monitor implementations may address the composability problem a bit better, but the issue with progress guarantees remains by virtue of mutual exclusion.) The pthread monitor interface introduces additional dependencies between a mutex and data, imposes additional requirements on both lock ordering and critical section size, and does nothing to solve the problem of system progress (which is ostensibly one reason you’d want to use threads at all).

Can we do better? At my previous job, I had a few situations similar to the log producer situation I mentioned above: multiple threads producing work items, and a single thread acting upon that work in batches. I wanted to use some lock-free datastructures from Concurrency Kit to create a queue of work that the consumer could easily batch, but there were some problems. With monitors, I’d be blocking work producers while the batch work was being completed, and in some cases that could have taken a non-trivial amount of time. In other cases, the consumer was able to operate quickly, but the production was high-volume, and I didn’t want to have producers blocking each other in their fast path. So I made a wait-free monitor for MPSC workloads.

I had wanted to write about this monitor while I worked there, but never got around to it. Now that I don’t work there anymore, I don’t have the code. And I didn’t remember what clever tricks I had used. So I decided to reïnvent it. My new version is portable to any POSIX system with support for Concurrency Kit’s atomic primitives; I seem to recall that what I’d done in the original code was Linux-specific. I may implement system-specific versions later; POSIX’s lack of an interface like eventfd(2) in Linux, and its lack of ability to report time slept in poll(2) on EINTR make the POSIX implementation a bit more heavyweight than it really needs to be. The remainder of this article will be a discussion of my new implementation.

The notification system can exist in one of three states.

enum monitor_state {
    STATE_WORKING,
    STATE_PENDING,
    STATE_BLOCKED,
};

STATE_WORKING is the initial state of the system, and is the state of the monitor when it is created. This state indicates that no waiters are currently listening to the monitor. The second state of the monitor is STATE_PENDING, which indicates that work is pending. The final state is STATE_BLOCKED, which indicates that the consumer wanted to do some work, but none was available.

The structure itself is relatively simple: it contains some file descriptors for a pipe (to block when no work is pending), the state of the monitor, and an unfortunate flag as to whether the pipe needs draining (this is a side-effect of POSIX lacking anything eventfd(2)-like). It also contains a generation counter that is used to avoid having multiple producers write to the pipe to unblock a consumer. This turns out to be rather important, for reasons discussed below.

struct monitor {
    union {
        struct {
            int r;
            int w;
        } pipe;
        int fds[2];
    };
    uint8_t state;
    uint8_t drain;
    uint64_t gen;
};

Creating a monitor is as you would expect:

struct monitor *
monitor_create(void)
{
    struct monitor *m = malloc(sizeof *m);

    if (m == NULL) {
        return NULL;
    }

    m->state = STATE_WORKING;
    if (pipe(m->fds) == -1) {
        perror("pipe");
        free(m);
        return NULL;
    }

    return m;
}

Destroying a monitor is a bit of a hassle because I wanted to actually handle errors with close(2). I had hoped this would make things idempotent such that you could retry destroying the thing when close(2) failed, but I’m not sure what the caller would do here anyway. I might just ignore these errors anyway; I’m not sure whether it’s really feasible to get EIO, and I’m not sure I care about any realistic ways to get EBADF.

bool
monitor_destroy(struct monitor *m)
{
    bool fail = false;

    int r;
    if (m->pipe.r != -1) {
        do {
            r = close(m->pipe.r);
        } while (r == -1 && errno == EINTR);

        if (r == -1) {
            return false;
        }
    }

    m->pipe.r = -1;

    if (m->pipe.w != -1) {
        do {
            r = close(m->pipe.w);
        } while (r == -1 && errno == EINTR);

        if (r == -1) {
            return false;
        }
    }

    m->pipe.w = -1;

    free(m);
    return true;
}

The actual cool stuff happens when we start thinking about how a waiter should behave when there’s no mutex involved. First, some function boilerplate:

bool
monitor_wait(struct monitor *n, int timeout_ms)
{
    enum monitor_state cur;
    bool rv = true;

    assert(n != NULL);

When we enter this function, the first question we’re trying to answer is whether any work was produced while the worker wasn’t listening. Since we start out in STATE_WORKING, this means that if we enter the function in that state, no work is pending for us to do, and so we should block. Alternatively, if we enter in STATE_PENDING, that means work does exist, and we shouldn’t bother blocking. If we guarantee that this function is the only function that can move the state to STATE_BLOCKED and that it always exits in STATE_WORKING, we can achieve this by doing a simple atomic compare-and-swap to move our state from STATE_WORKING to STATE_BLOCKED. If the CAS operation fails, we know that work is pending.

    if (!ck_pr_cas_8(&m->state, STATE_WORKING, STATE_BLOCKED)) {
        ck_pr_store_8(&m->state, STATE_WORKING);
        ck_pr_fence_store();
        goto drain;
    }

Alternatively, when the CAS operation succeeds, that means no work was available for us to complete. In this case, we need to wait for a notification; we’ll receive this over the read side of our pipe(2). Because we may not want to wait forever, we allow the user to specify a timeout that we pass to poll(2).

    struct pollfd pfd[1] = {{
        .fd = m->pipe.r,
        .events = POLLIN|POLLPRI,
    }};

    errno = 0;
    int p = poll(pfd, 1, timeout);
    if (p == -1 && errno == EINTR) {
        errno = EAGAIN;
        rv = false;
        goto drain;
    } else if (p == -1) {
        perror("poll");
        rv = false;
        goto drain;
    } else if (p == 0 || (pfd[0].revents & (POLLHUP|POLLERR)) != 0) {
        rv = false;
        goto drain;
    }

You’ve noticed that we’re doing goto drain; instead of ever returning anything so far. Some errors returned by syscalls we call can’t easily be dealt with inside this code, so it’s possible that we exit without any work to do anyway. However, this does mean that it’s possible that someone had to write to the pipe to wake us, and there’s data stuck there. We detect this condition and try to drain the pipe when it has been written to, even when we didn’t need to block for work. This does mean that there are cases where we might do syscalls when we didn’t need to, but this is only really likely to occur when there’s a timeout. If poll(2) is told to block forever, this is only likely to happen on EINTR, or for (probably catastrophic) errors. If everything goes well here, we clear our draining flag, which will only be set again if we block.

drain:
    if (ck_pr_load_8(&m->drain) == 1) {
        ssize_t r; 
        uint8_t buf;
        do {
            p = poll(pfd, 1, 0);
            if (p == 0) {
                break;
            } else if (p == -1 && errno == EINTR) {
                continue;
            } else if (p == -1) {
                perror("drain poll");
                rv = false;
                goto ret;
            } else if ((pfd[0].revents & (POLLHUP|POLLERR)) != 0) {
                rv = false;
                goto ret;
            }

            do {
                r = read(m->pipe.r, &buf, sizeof buf)
            } while (r == -1 && errno == EINTR);

            if (r == -1) {
                perror("read");
                goto ret;
            }
        } while (p == 1);
    }

    ck_pr_store_8(&m->drain, 0);

Whenever we leave this function, we have to leave in STATE_WORKING. However, we do return to the caller whether we left because work was available (in which case rv is true). At some point, I’d like to add an error API so the caller can figure out exactly what failed when this function returns false – so that’ll probably happen before this makes it to GitHub. In any case, transitioning to STATE_WORKING whenever we return is really the big trick to making this wait-free.

ret:
    ck_pr_store_8(&m->state, STATE_WORKING);
    ck_pr_fence_store();

    return rv;
}

Waking up a waiter is pretty simple. First, our function prologue…

bool
monitor_wake(struct monitor *m)
{
    enum monitor_state cur;

    assert(m != NULL);

Whenever we enter this function, it is always OK to set the state to STATE_PENDING. We set this state using an atomic fetch-and-set operation, which allows us to move the state forward while also knowing what the previous state was. If the consumer is working, the state will either have already been set to STATE_PENDING by another producer, or it will still be in STATE_WORKING. In either case, we can be assured that when the consumer is ready, it will notice that we’ve produced work.

    cur = ck_pr_fas_8(&m->state, STATE_PENDING);
    if (cur == STATE_PENDING || cur == STATE_WORKING) {
        ck_pr_fence_store();
        return true;
    }

When the consumer is blocked, however, the state we read will be STATE_BLOCKED, and we will have set that to STATE_PENDING. This turns out to be just fine. If the consumer is blocked indefinitely, it doesn’t care about what the value is until its woken up again via its pipe. If it’s blocked on a timeout, one of a few benign scenarios can happen:

  1. The timeout occurs, and the consumer re-enters without any new work having been produced. The state will remain STATE_WORKING, and the consumer will block again.

  2. The timeout occurs, and a producer sets the state to STATE_PENDING above after the consumer sets the state to STATE_WORKING. The consumer re-enters and notices the new work. Nothing is written to the pipe, and nothing is read from the pipe.

  3. The timeout occurs, and several producers set the state to STATE_PENDING above before the consumer sets the state to STATE_WORKING. The producers race on updating the generation count below; the winner will write to the pipe. The consumer sets the state to STATE_WORKING and returns. No new work is produced. When it re-enters, it will set its state to STATE_BLOCKING and immediately read the value written by the producer, picking up that work.

  4. The same as above, but new work is produced before the consumer re-enters. The consumer enters and sees the state as STATE_PENDING. It also notices that the drain flag is set, and drains the pipe. It will have picked up the work that was produced by the subsequent producers as well as the one that wrote to the pipe. Even if there is a race on the drain flag being set, the consumer will eventually notice the drain and clean it up.

None of these cases are terrible: in all cases, the consumer is eventually notified of the work. When an error occurs in a syscall, the producer is free to try again.

    uint64_t gen = ck_pr_load_64(&m->gen);
    if (!ck_pr_cas_8(&m->gen, gen, gen + 1)) {
        ck_pr_fence_store();
        return true;
    }

    ssize_t r;
    do {
        uint8_t buf = 0;
        r = write(m->pipe.w, &buf, sizeof buf);
    } while (r == -1 && errno == EINTR);

    if (r == -1) {
        return false;
    }

    ck_pr_store_8(&m->drain, 1);
    ck_pr_fence_store();

    return true;
}

The system described above is wait-free when work is available, which is really the only situation in which non-blocking behavior is helpful for a monitor, since the goal is to block when no work can be done. This is ideal for concurrent batching workloads of nearly any volume. The composability of the interface means that, even for low volume systems, there’s no chance that forgetting about implementation subtleties will result in missed notifications. And the wait-free nature means that with high-volume workloads, your producer’s fast-path only takes the hit of a single atomic operation on average, and you will almost never incur the penalty of a syscall.

This is a stepping stone for additional work. The system presented here is really only appropriate for single-consumer workloads. In particular, multiple consumers sharing the same monitor would notice the monitor in STATE_BLOCKED upon entry to the monitor_wait function. This would cause them to erroneously think work was available, and result in spinning. Before I put this code on Github, I’d really like to solve the multiple-consumer problem. In particular, I’d like to have interfaces for signalling any consumer, signalling a specific consumer, and broadcasting to all consumers. I believe that this is all possible to do wait-free, assuming that the multiple-consumer versions simply slap an interface around managing N different monitors.

As a final note, I haven’t tested this code yet at all; it’s entirely possible that it’s broken. I’m not writing any C in my new job, which is a bit of a bummer. This also means that I don’t really have any use-cases for this code at the moment, making it a bit more difficult to test. I’ll of course spend a bit more time on this before publishing anything to Github.

More Posts