开发者

How to achieve lock-free, but blocking behavior?

I'm implementing a lock-free single producer single consumer queue for an intensive network application. I have a bunch of worker th开发者_JAVA技巧reads receiving work in their own separate queues, which they then dequeue and process.

Removing the locks from these queues have greatly improved the performance under high load, but they no longer block when the queues are empty, which in turn causes the CPU usage to skyrocket.

How can I efficiently cause a thread to block until it can successfully dequeue something or is killed/interrupted?


If you're on Linux, look into using a Futex. It provides the performance of a non-locking implementation by using atomic operations rather than kernel calls like a mutex would, but should you need to set the process to idle because of some condition not being true (i.e., lock-contention), it will then make the appropriate kernel calls to put the process to sleep and wake it back up at a future event. It's basically like a very fast semaphore.


On Linux, futex can be used to block a thread. But be aware that Futexes Are Tricky!

UPDATE: condition variables are much safer to use than futexes, and are more portable. However, a condition variable is used in combination with a mutex, so strictly speaking the result will not be lock-free anymore. However, if your primary goal is performance (and not the guaranty of global progress), and the locked portion (i.e. a condition to check after thread wakeup) is small, it might happen that you will get satisfactory results without the need to go into subtleties of integrating futexes into the algorithm.


If you're on Windows, you won't be able to use futexes, but Windows Vista has a similar mechanism called Keyed Events. Unfortunately, this isn't part of the published API (it's an NTDLL native API), but you can use it as long as you accept the caveat that it might change in future versions of Windows (and you don't need to run on pre-Vista kernels). Be sure to read the article I linked above. Here's an untested sketch of how it might work:

/* Interlocked SList queue using keyed event signaling */

struct queue {
    SLIST_HEADER slist;
    // Note: Multiple queues can (and should) share a keyed event handle
    HANDLE keyed_event;
    // Initial value: 0
    // Prior to blocking, the queue_pop function increments this to 1, then
    // rechecks the queue. If it finds an item, it attempts to compxchg back to
    // 0; if this fails, then it's racing with a push, and has to block
    LONG block_flag;
};

void init_queue(queue *qPtr) {
    NtCreateKeyedEvent(&qPtr->keyed_event, -1, NULL, 0);
    InitializeSListHead(&qPtr->slist);
    qPtr->blocking = 0;
}

void queue_push(queue *qPtr, SLIST_ENTRY *entry) {
    InterlockedPushEntrySList(&qPtr->slist, entry);

    // Transition block flag 1 -> 0. If this succeeds (block flag was 1), we
    // have committed to a keyed-event handshake
    LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1);
    if (oldv) {
        NtReleaseKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL);
    }
}

SLIST_ENTRY *queue_pop(queue *qPtr) {
    SLIST_ENTRY *entry = InterlockedPopEntrySList(&qPtr->slist);
    if (entry)
        return entry; // fast path

    // Transition block flag 0 -> 1. We must recheck the queue after this point
    // in case we race with queue_push; however since ReleaseKeyedEvent
    // blocks until it is matched up with a wait, we must perform the wait if
    // queue_push sees us
    LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 1, 0);

    assert(oldv == 0);

    entry = InterlockedPopEntrySList(&qPtr->slist);
    if (entry) {
        // Try to abort
        oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1);
        if (oldv == 1)
            return entry; // nobody saw us, we can just exit with the value
    }

    // Either we don't have an entry, or we are forced to wait because
    // queue_push saw our block flag. So do the wait
    NtWaitForKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL);
    // block_flag has been reset by queue_push

    if (!entry)
        entry = InterlockedPopEntrySList(&qPtr->slist);
    assert(entry);

    return entry;
}

You could also use a similar protocol using Slim Read Write locks and Condition Variables, with a lockless fast path. These are wrappers over keyed events, so they may incur more overhead than using keyed events directly.


Have you tried conditional waiting? When the queue becomes empty, just start waiting for a new job. The thread putting jobs in the queue should fire the signal. This way you only use locks when the queue is empty.

https://computing.llnl.gov/tutorials/pthreads/#ConditionVariables


You can cause a thread to sleep by using the sigwait() function. You can wake the thread with pthread_kill. This is much faster than condition variables.


You could add sleeps while it's waiting. Just pick the biggest wait you're willing to have, then do something like this (pseudocode because I don't remember pthread syntax):

WAIT_TIME = 100; // Set this to whatever you're happy with
while(loop_condition) {
   thing = get_from_queue()
   if(thing == null) {
       sleep(WAIT_TIME);
   } else {
       handle(thing);
   }
}

Even something short like a 100 ms sleep should significantly lower the CPU usage. I'm not sure at what point the context switching will make it worse than busy waiting though.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜