c++-gtk-utils
async_queue.h
Go to the documentation of this file.
00001 /* Copyright (C) 2006 to 2012 Chris Vine
00002 
00003 The library comprised in this file or of which this file is part is
00004 distributed by Chris Vine under the GNU Lesser General Public
00005 License as follows:
00006 
00007    This library is free software; you can redistribute it and/or
00008    modify it under the terms of the GNU Lesser General Public License
00009    as published by the Free Software Foundation; either version 2.1 of
00010    the License, or (at your option) any later version.
00011 
00012    This library is distributed in the hope that it will be useful, but
00013    WITHOUT ANY WARRANTY; without even the implied warranty of
00014    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00015    Lesser General Public License, version 2.1, for more details.
00016 
00017    You should have received a copy of the GNU Lesser General Public
00018    License, version 2.1, along with this library (see the file LGPL.TXT
00019    which came with this source code package in the c++-gtk-utils
00020    sub-directory); if not, write to the Free Software Foundation, Inc.,
00021    51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00022 
00023 However, it is not intended that the object code of a program whose
00024 source code instantiates a template from this file or uses macros or
00025 inline functions (of any length) should by reason only of that
00026 instantiation or use be subject to the restrictions of use in the GNU
00027 Lesser General Public License.  With that in mind, the words "and
00028 macros, inline functions and instantiations of templates (of any
00029 length)" shall be treated as substituted for the words "and small
00030 macros and small inline functions (ten lines or less in length)" in
00031 the fourth paragraph of section 5 of that licence.  This does not
00032 affect any other reason why object code may be subject to the
00033 restrictions in that licence (nor for the avoidance of doubt does it
00034 affect the application of section 2 of that licence to modifications
00035 of the source code in this file).
00036 
00037 */
00038 
00039 /**
00040  * @file async_queue.h
00041  * @brief This file provides thread-safe asynchronous queue classes.
00042  *
00043  * AsyncQueue is a class which provides some of the functionality of a
00044  * std::queue object (but note that the AsyncQueue::pop(value_type&
00045  * obj) method provides the pop()ed element by reference - see the
00046  * comments on that method for the reason), except that it has mutex
00047  * locking of the data container so as to permit push()ing and
00048  * pop()ing from different threads.  It is therefore useful for
00049  * passing data between threads, perhaps in response to a signal being
00050  * emitted from a Notifier object.  Passing the data by means of a
00051  * SharedLockPtr object, or an IntrusivePtr object referencing data
00052  * derived from IntrusiveLockCounter, would be ideal.
00053  *
00054  * AsyncQueueDispatch is a class which has a blocking pop() method,
00055  * which allows it to be waited on by a dedicated event/message
00056  * dispatching thread for incoming work (represented by the data
00057  * pushed onto the queue).  In the same way, it can be used to
00058  * implement thread pools, by having threads in the pool waiting on
00059  * the queue.
00060  *
00061  * By default the queues use a std::list object as their container
00062  * because in the kind of use mentioned above they are unlikely to
00063  * hold many objects but they can be changed to, say, a std::deque
00064  * object by specifying it as the second template parameter.
00065  */
00066 
00067 #ifndef CGU_ASYNC_QUEUE_H
00068 #define CGU_ASYNC_QUEUE_H
00069 
00070 #include <queue>
00071 #include <list>
00072 #include <exception>
00073 #include <utility>    // for std::move and std::forward
00074 #include <algorithm>  // for std::swap
00075 #include <time.h>
00076 
00077 #include <c++-gtk-utils/mutex.h>
00078 #include <c++-gtk-utils/thread.h>
00079 #include <c++-gtk-utils/cgu_config.h>
00080 
00081 #ifdef CGU_USE_SCHED_YIELD
00082 #include <sched.h>
00083 #else
00084 #include <unistd.h>
00085 #endif
00086 
00087 namespace Cgu {
00088 
00089 /**
00090  * @class AsyncQueuePopError async_queue.h c++-gtk-utils/async_queue.h
00091  * @brief An exception thrown if calling pop() on a AsyncQueue or
00092  * AsyncQueueDispatch object fails because the queue is empty.
00093  * @sa AsyncQueue AsyncQueueDispatch
00094  */
00095 
00096 struct AsyncQueuePopError: public std::exception {
00097   virtual const char* what() const throw() {return "AsyncQueuePopError: popping from empty AsyncQueue object\n";}
00098 };
00099 
00100 
00101 /**
00102  * @class AsyncQueue async_queue.h c++-gtk-utils/async_queue.h
00103  * @brief A thread-safe asynchronous queue.
00104  * @sa AsyncQueueDispatch AsyncResult
00105  *
00106  * AsyncQueue is a class which provides some of the functionality of a
00107  * std::queue object (but note that the AsyncQueue::pop(value_type&
00108  * obj) method provides the pop()ed element by reference - see the
00109  * comments on that method for the reason), except that it has mutex
00110  * locking of the data container so as to permit push()ing and
00111  * pop()ing from different threads.  It is therefore useful for
00112  * passing data between threads, perhaps in response to a signal being
00113  * emitted from a Notifier object.  Passing the data by means of a
00114  * SharedLockPtr object, or an IntrusivePtr object referencing data
00115  * derived from IntrusiveLockCounter, would be ideal.
00116  *
00117  * By default the queue uses a std::list object as its container
00118  * because in the kind of use mentioned above it is unlikely to hold
00119  * many objects but it can be changed to, say, a std::deque object by
00120  * specifying it as the second template parameter.
00121  *
00122  * If the library is installed using the
00123  * --with-glib-memory-slices-compat or
00124  * --with-glib-memory-slices-no-compat configuration options, any
00125  * AsyncQueue objects constructed on free store will be constructed in
00126  * glib memory slices.  This does not affect the queue container
00127  * itself: to change the allocator of the C++ container, a custom
00128  * allocator type can be provided when the AsyncQueue object is
00129  * instantiated offering the standard allocator interface.  If glib
00130  * memory slices are not used or no AsyncQueue objects are constructed
00131  * on free store, it is not necessary to call g_thread_init() before
00132  * manipulating or using an AsyncQueue object in multiple threads, but
00133  * prior to glib version 2.32 glib itself (and thus glib memory
00134  * slices) are not thread safe unless that function has been called.
00135  */
00136 
00137 template <class T, class Container = std::list<T> > class AsyncQueue {
00138 public:
00139   typedef typename Container::value_type value_type;
00140   typedef typename Container::size_type size_type;
00141   typedef Container container_type;
00142 private:
00143 // TODO: put 'q' after 'mutex' at the next ABI break, so move
00144 // construction is strongly exception safe
00145   std::queue<T, Container> q;
00146   mutable Thread::Mutex mutex;
00147 
00148 // this won't throw: it is for the user to ensure the arguments do not
00149 // refer to the same mutex object
00150   void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
00151     m1.lock();
00152     for(;;) {
00153       if (!m2.trylock()) {
00154         return;
00155       }
00156       m1.unlock();
00157       // spin nicely
00158 #ifdef CGU_USE_SCHED_YIELD
00159       sched_yield();
00160 #else
00161       usleep(10);
00162 #endif
00163       m1.lock();
00164     }
00165   }
00166 public:
00167 /**
00168  * Pushes an item onto the queue.  This method has strong exception
00169  * safety if the container is a std::list or std::deque container (the
00170  * default is std::list), except that if std::deque is used as the
00171  * container and the copy constructor, move constructor, assignment
00172  * operator or move assignment operator of the queue item throws, it
00173  * only gives the basic exception guarantee (and the basic guarantee
00174  * is not given by std::deque if the queue item's move constructor
00175  * throws and it uses a non-default allocator which does not provide
00176  * for it to be CopyInsertable).  It is thread safe.
00177  * @param obj The item to be pushed onto the queue.
00178  * @exception std::bad_alloc The method might throw std::bad_alloc if
00179  * memory is exhausted and the system throws in that case.  It might
00180  * also throw if the copy constructor, move constructor, assignment
00181  * operator or move assignment operator of the queue item might throw.
00182  */
00183   void push(const value_type& obj) {
00184     Thread::Mutex::Lock lock{mutex};
00185     q.push(obj);
00186   }
00187 
00188 /**
00189  * Pushes an item onto the queue.  This method has strong exception
00190  * safety if the container is a std::list or std::deque container (the
00191  * default is std::list), except that if std::deque is used as the
00192  * container and the copy constructor, move constructor, assignment
00193  * operator or move assignment operator of the queue item throws, it
00194  * only gives the basic exception guarantee (and the basic guarantee
00195  * is not given by std::deque if the queue item's move constructor
00196  * throws and it uses a non-default allocator which does not provide
00197  * for it to be CopyInsertable).  It is thread safe.
00198  * @param obj The item to be pushed onto the queue.
00199  * @exception std::bad_alloc The method might throw std::bad_alloc if
00200  * memory is exhausted and the system throws in that case.  It might
00201  * also throw if the copy constructor, move constructor, assignment
00202  * operator or move assignment operator of the queue item might throw.
00203  *
00204  * Since 2.0.0-rc5
00205  */
00206   void push(value_type&& obj) {
00207     Thread::Mutex::Lock lock{mutex};
00208     q.push(std::move(obj));
00209   }
00210 
00211 /**
00212  * Pushes an item onto the queue by constructing it in place: that is,
00213  * by passing to this method the item's constructor's arguments,
00214  * rather than the item itself.  This method has strong exception
00215  * safety if the container is a std::list or std::deque container (the
00216  * default is std::list).  (Technically, for a std::deque container,
00217  * emplace() only offers the same exception guarantees as does push(),
00218  * namely only the basic guarantee where a copy or move of the queue
00219  * item throws during the call, but the purpose of emplace is to
00220  * construct in place and any reasonable implementation will not copy
00221  * or move the queue item.)  It is thread safe.
00222  * @param args The constructor arguments for the item to be pushed
00223  * onto the queue.
00224  * @exception std::bad_alloc The method might throw std::bad_alloc if
00225  * memory is exhausted and the system throws in that case.  It might
00226  * also throw if the item's constructor (including any of its
00227  * constructor arguments) might throw when constructing the item.
00228  * @note The constructor of the item pushed onto the queue must not
00229  * access any of the methods of the same queue object, or a deadlock
00230  * might occur.
00231  *
00232  * Since 2.0.0-rc5
00233  */
00234   template<class... Args>
00235   void emplace(Args&&... args) {
00236     Thread::Mutex::Lock lock{mutex};
00237     q.emplace(std::forward<Args>(args)...);
00238   }
00239 
00240 /**
00241  * Pops an item from the queue.  This method has strong exception
00242  * safety if the container is a std::deque or std::list container (the
00243  * default is std::list), provided the destructor of a contained item
00244  * does not throw.  It is thread safe.
00245  * @param obj A value type reference to which the item at the front of
00246  * the queue will be assigned.
00247  * @exception AsyncQueuePopError If the queue is empty when a pop is
00248  * attempted, this method will throw AsyncQueuePopError.  It might
00249  * also throw if the assignment operator of the queue item might
00250  * throw.  In order to complete pop() operations atomically under a
00251  * single lock and to retain strong exception safety, the object into
00252  * which the pop()ed data is to be placed is passed as an argument by
00253  * reference (this avoids a copy from a temporary object after the
00254  * data has been extracted from the queue, which would occur if the
00255  * item extracted were returned by value).  It might also throw if the
00256  * destructor of the queue item might throw (but that should never
00257  * happen), or if the empty() method of the container type throws
00258  * (which would not happen on any sane implementation).
00259  */
00260   void pop(value_type& obj) {
00261     Thread::Mutex::Lock lock{mutex};
00262     if (q.empty()) throw AsyncQueuePopError();
00263     obj = q.front();
00264     q.pop();
00265   }
00266 
00267 /**
00268  * Discards the item at the front of the queue.  This method has
00269  * strong exception safety if the container is a std::deque or
00270  * std::list container (the default is std::list), provided the
00271  * destructor of a contained item does not throw.  It is thread safe.
00272  * @exception AsyncQueuePopError If the queue is empty when a pop is
00273  * attempted, this method will throw AsyncQueuePopError.  It might
00274  * also throw if the destructor of the queue item might throw (but
00275  * that should never happen), or if the empty() method of the
00276  * container type throws (which would not happen on any sane
00277  * implementation).
00278  */
00279   void pop() {
00280     Thread::Mutex::Lock lock{mutex};
00281     if (q.empty()) throw AsyncQueuePopError();
00282     q.pop();
00283   }
00284 
00285 /**
00286  * @return Whether the queue is empty.  It will not throw assuming
00287  * that the empty() method of the container type does not throw, as it
00288  * will not on any sane implementation.
00289  * @note This method is thread safe, but the return value may not be
00290  * valid if another thread has pushed to or popped from the queue
00291  * before the value returned by the method is acted on.  It is
00292  * provided as a utility, but may not be meaningful, depending on the
00293  * intended usage.
00294  */
00295   bool empty() const {
00296     Thread::Mutex::Lock lock{mutex};
00297     return q.empty();
00298   }
00299 
00300 /**
00301  * @return The number of items currently in the queue.  It will not
00302  * throw assuming that the size() method of the container type does
00303  * not throw, as it will not on any sane implementation.
00304  * @note This method is thread safe, but the return value may not be
00305  * valid if another thread has pushed to or popped from the queue
00306  * before the value returned by the method is acted on.  It is
00307  * provided as a utility, but may not be meaningful, depending on the
00308  * intended usage.
00309  *
00310  * Since 2.0.8
00311  */
00312   size_type size() const {
00313     Thread::Mutex::Lock lock{mutex};
00314     return q.size();
00315   }
00316 
00317 /**
00318  * Swaps the contents of 'this' and 'other'.  It will not throw
00319  * assuming that the swap method of the container type does not throw
00320  * (which the C++11 standard requires not to happen with the standard
00321  * sequence containers).  It is thread safe and the swap is
00322  * thread-wise atomic.  A non-class function
00323  * Cgu::swap(Cgu::AsyncQueue&, Cgu::AsyncQueue&) method is also
00324  * provided which will call this method.
00325  * @param other The object to be swapped with this one.
00326  *
00327  * Since 2.0.8
00328  */
00329   void swap(AsyncQueue& other) {
00330     if (this != &other) {
00331       lock2(mutex, other.mutex); // doesn't throw
00332       Thread::Mutex::Lock l1{mutex, Thread::locked};
00333       Thread::Mutex::Lock l2{other.mutex, Thread::locked};
00334       q.swap(other.q);
00335     }
00336   }
00337 
00338 /**
00339  * The assignment operator is strongly exception safe with the
00340  * standard sequence containers (it uses copy and swap).  It is also
00341  * thread safe, as it safely locks both the assignor's and assignee's
00342  * mutex to provide a thread-wise atomic assignment.
00343  * @param rhs The assignor.
00344  * @exception std::bad_alloc The copy constructor of the queue's
00345  * container type, and so this assignment operator, might throw
00346  * std::bad_alloc if memory is exhausted and the system throws in that
00347  * case.  This assignment operator will also throw if the copy
00348  * constructor of the queue's container type throws any other
00349  * exceptions, including if any copy or move constructor or copy or
00350  * move assignment operator of a contained item throws.
00351  * @exception Thread::MutexError The assignment operator might throw
00352  * Thread::MutexError if initialization of a transitional object's
00353  * contained mutex fails.  (It is often not worth checking for this,
00354  * as it means either memory is exhausted or pthread has run out of
00355  * other resources to create new mutexes.)
00356  *
00357  * Since 2.0.8
00358  */
00359   AsyncQueue& operator=(const AsyncQueue& rhs) {
00360     if (this != &rhs) {
00361       lock2(mutex, rhs.mutex); // doesn't throw
00362       Thread::Mutex::Lock l1{mutex, Thread::locked};
00363       Thread::Mutex::Lock l2{rhs.mutex, Thread::locked};
00364       std::queue<T, Container> temp(rhs.q);
00365       q.swap(temp);
00366     }
00367     return *this;
00368   }
00369 
00370 /**
00371  * This move assignment operator is thread safe as regards the
00372  * assignee (the object moved to), but no synchronization is carried
00373  * out with respect to the rvalue assignor/movant.  This is because
00374  * temporaries are only visible and accessible in the thread carrying
00375  * out the move operation and synchronization for them would represent
00376  * pointless overhead.  In a case where the user uses std::move to
00377  * force a move from a named object, and that named object's lifetime
00378  * is managed by (or the object is otherwise accessed by) a different
00379  * thread than the one making the move, the user must carry out her
00380  * own synchronization with respect to that different thread, both to
00381  * ensure that a consistent view of the the named object is obtained
00382  * and because that object will be mutated by the move.  This method
00383  * invokes std::queue's move assignment operator, and therefore has
00384  * the same exception safety as the standard library's implementation
00385  * of that operator.  It will not normally throw unless a custom
00386  * allocator is used which throws on move assignment, or the
00387  * destructor of a contained item throws.
00388  * @param rhs The assignor/movant.
00389  *
00390  * Since 2.0.8
00391  */
00392   AsyncQueue& operator=(AsyncQueue&& rhs) {
00393     Thread::Mutex::Lock lock{mutex};
00394     q = std::move(rhs.q);
00395     return *this;
00396   }
00397 
00398 /**
00399  * @exception std::bad_alloc The default constructor might throw
00400  * std::bad_alloc if memory is exhausted and the system throws in that
00401  * case.
00402  * @exception Thread::MutexError The default constructor might throw
00403  * Thread::MutexError if initialization of the contained mutex fails.
00404  * (It is often not worth checking for this, as it means either memory
00405  * is exhausted or pthread has run out of other resources to create
00406  * new mutexes.)
00407  */
00408   AsyncQueue() = default;
00409 
00410 /**
00411  * As regards thread safety, the move constructor does not synchronize
00412  * with respect to the initializing rvalue.  This is because
00413  * temporaries are only visible and accessible in the thread carrying
00414  * out the move operation and synchronization for them would represent
00415  * pointless overhead.  In a case where a user uses std::move to force
00416  * a move from a named object, and that named object's lifetime is
00417  * managed by (or the object is otherwise accessed by) a different
00418  * thread than the one making the move, the user must carry out her
00419  * own synchronization with respect to that different thread, both to
00420  * ensure that a consistent view of the the named object is obtained
00421  * and because that object will be mutated by the move.
00422  * @param rhs The AsyncQueue object to be moved.
00423  * @exception Thread::MutexError The move constructor might throw
00424  * Thread::MutexError if initialization of the contained mutex fails.
00425  * (It is often not worth checking for this, as it means either memory
00426  * is exhausted or pthread has run out of other resources to create
00427  * new mutexes.)  It might also throw if the queue's container type's
00428  * move constructor might throw, but it should not do that unless a
00429  * custom allocator is in use.
00430  * @note If this constructor throws Thread::MutexError, and a named
00431  * object is moved using std::move, this constructor is not strongly
00432  * exception safe (items in the moved queue will be lost).  Fixing
00433  * this efficiently requires changing the order of construction of
00434  * data members of this class, which cannot be done until the next ABI
00435  * break for this library as it would alter object layout.  As noted
00436  * above, in most cases the possibility of Thread::MutexError throwing
00437  * can be ignored, but where that is not the case and strong exception
00438  * safety is wanted, the user should either not employ std::move with
00439  * named objects when invoking this class's constructors, or should
00440  * construct an AsyncQueue object using the default constructor and
00441  * then move assign to it.
00442  *
00443  * Since 2.0.8
00444  */
00445   AsyncQueue(AsyncQueue&& rhs): q(std::move(rhs.q)) {}
00446 
00447 /**
00448  * The copy constructor is thread safe, as it locks the initializing
00449  * object's mutex to obtain a consistent view of it.
00450  * @param rhs The AsyncQueue object to be copied.
00451  * @exception std::bad_alloc The copy constructor of the queue's
00452  * container type, and so this constructor, might throw std::bad_alloc
00453  * if memory is exhausted and the system throws in that case.  It will
00454  * also throw if the copy constructor of the queue's container type
00455  * throws any other exceptions, including if any copy or move
00456  * constructor or copy or move assignment operator of a contained item
00457  * throws.
00458  * @exception Thread::MutexError The copy constructor might throw
00459  * Thread::MutexError if initialization of the contained mutex fails.
00460  * (It is often not worth checking for this, as it means either memory
00461  * is exhausted or pthread has run out of other resources to create
00462  * new mutexes.)
00463  *
00464  * Since 2.0.8
00465  */
00466   // we use the comma operator here to lock the mutex and call the
00467   // copy constructor: the lock will be retained until the end of the
00468   // full expression in which it is lexically situated, namely until
00469   // the end of q's constructor - see C++11 1.9/10 and 12.2/3
00470   AsyncQueue(const AsyncQueue& rhs): q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
00471 
00472 /**
00473  * The destructor does not throw unless the destructor of a contained
00474  * item throws.  It is thread safe (any thread may delete the
00475  * AsyncQueue object).
00476  */
00477   ~AsyncQueue() {
00478     // lock and unlock the mutex in the destructor so that we have an
00479     // acquire operation to ensure that when the std::queue object is
00480     // destroyed memory is synchronised, so any thread may destroy the
00481     // AsyncQueue object
00482     Thread::Mutex::Lock lock{mutex};
00483   }
00484 
00485 /* Only has effect if --with-glib-memory-slices-compat or
00486  * --with-glib-memory-slices-no-compat option picked */
00487   CGU_GLIB_MEMORY_SLICES_FUNCS
00488 };
00489 
00490 /**
00491  * @class AsyncQueueDispatch async_queue.h c++-gtk-utils/async_queue.h
00492  * @brief A thread-safe asynchronous queue with a blocking pop()
00493  * method.
00494  * @sa AsyncQueue AsyncResult
00495  *
00496  * AsyncQueueDispatch is similar to the AsyncQueue class, except that
00497  * it has a blocking pop_dispatch() method, which allows it to be
00498  * waited on by a dedicated event/message dispatching thread for
00499  * incoming work (represented by the data pushed onto the queue).  In
00500  * the same way, it can be used to implement thread pools, by having
00501  * threads in the pool waiting on the queue.  The AsyncResult class
00502  * can be useful for passing results between threads in conjunction
00503  * with AsyncQueueDispatch (the documentation on AsyncResult gives an
00504  * example).
00505  *
00506  * By default the queue uses a std::list object as its container
00507  * because in the kind of use mentioned above it is unlikely to hold
00508  * many objects but it can be changed to, say, a std::deque object by
00509  * specifying it as the second template parameter.
00510  *
00511  * If the library is installed using the
00512  * --with-glib-memory-slices-compat or
00513  * --with-glib-memory-slices-no-compat configuration options, any
00514  * AsyncQueueDispatch objects constructed on free store will be
00515  * constructed in glib memory slices.  This does not affect the queue
00516  * container itself: to change the allocator of the C++ container, a
00517  * custom allocator type can be provided when the AsyncQueueDispatch
00518  * object is instantiated offering the standard allocator interface.
00519  * If glib memory slices are not used or no AsyncQueueDispatch objects
00520  * are constructed on free store, it is not necessary to call
00521  * g_thread_init() before manipulating or using an AsyncQueueDispatch
00522  * object in multiple threads, but prior to glib version 2.32 glib
00523  * itself (and thus glib memory slices) are not thread safe unless
00524  * that function has been called.
00525  */
00526 
00527 template <class T, class Container = std::list<T> > class AsyncQueueDispatch {
00528 public:
00529   typedef typename Container::value_type value_type;
00530   typedef typename Container::size_type size_type;
00531   typedef Container container_type;
00532 private:
00533 // TODO: put 'q' after 'mutex' and 'cond' at the next ABI break, so
00534 // move construction is strongly exception safe
00535   std::queue<T, Container> q;
00536   mutable Thread::Mutex mutex;
00537   Thread::Cond cond;
00538 
00539 // this won't throw: it is for the user to ensure the arguments do not
00540 // refer to the same mutex object
00541   void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
00542     m1.lock();
00543     for(;;) {
00544       if (!m2.trylock()) {
00545         return;
00546       }
00547       m1.unlock();
00548       // spin nicely
00549 #ifdef CGU_USE_SCHED_YIELD
00550       sched_yield();
00551 #else
00552       usleep(10);
00553 #endif
00554       m1.lock();
00555     }
00556   }
00557 public:
00558 /**
00559  * Pushes an item onto the queue.  This method has strong exception
00560  * safety if the container is a std::list or std::deque container (the
00561  * default is std::list), except that if std::deque is used as the
00562  * container and the copy constructor, move constructor, assignment
00563  * operator or move assignment operator of the queue item throws, it
00564  * only gives the basic exception guarantee (and the basic guarantee
00565  * is not given by std::deque if the queue item's move constructor
00566  * throws and it uses a non-default allocator which does not provide
00567  * for it to be CopyInsertable).  It is thread safe.
00568  * @param obj The item to be pushed onto the queue.
00569  * @exception std::bad_alloc The method might throw std::bad_alloc if
00570  * memory is exhausted and the system throws in that case.  It might
00571  * also throw if the copy constructor, move constructor, assignment
00572  * operator or move assignment operator of the queue item might throw.
00573  */
00574   void push(const value_type& obj) {
00575     Thread::Mutex::Lock lock{mutex};
00576     q.push(obj);
00577     cond.signal();
00578   }
00579 
00580 /**
00581  * Pushes an item onto the queue.  This method has strong exception
00582  * safety if the container is a std::list or std::deque container (the
00583  * default is std::list), except that if std::deque is used as the
00584  * container and the copy constructor, move constructor, assignment
00585  * operator or move assignment operator of the queue item throws, it
00586  * only gives the basic exception guarantee (and the basic guarantee
00587  * is not given by std::deque if the queue item's move constructor
00588  * throws and it uses a non-default allocator which does not provide
00589  * for it to be CopyInsertable).  It is thread safe.
00590  * @param obj The item to be pushed onto the queue.
00591  * @exception std::bad_alloc The method might throw std::bad_alloc if
00592  * memory is exhausted and the system throws in that case.  It might
00593  * also throw if the copy constructor, move constructor, assignment
00594  * operator or move assignment operator of the queue item might throw.
00595  *
00596  * Since 2.0.0-rc5
00597  */
00598   void push(value_type&& obj) {
00599     Thread::Mutex::Lock lock{mutex};
00600     q.push(std::move(obj));
00601     cond.signal();
00602   }
00603 
00604 /**
00605  * Pushes an item onto the queue by constructing it in place: that is,
00606  * by passing to this method the item's constructor's arguments,
00607  * rather than the item itself.  This method has strong exception
00608  * safety if the container is a std::list or std::deque container (the
00609  * default is std::list).  (Technically, for a std::deque container,
00610  * emplace() only offers the same exception guarantees as does push(),
00611  * namely only the basic guarantee where a copy or move of the queue
00612  * item throws during the call, but the purpose of emplace is to
00613  * construct in place and any reasonable implementation will not copy
00614  * or move the queue item.)  It is thread safe.
00615  * @param args The constructor arguments for the item to be pushed
00616  * onto the queue.
00617  * @exception std::bad_alloc The method might throw std::bad_alloc if
00618  * memory is exhausted and the system throws in that case.  It might
00619  * also throw if the item's constructor (including any of its
00620  * constructor arguments) might throw when constructing the item.
00621  * @note The constructor of the item pushed onto the queue must not
00622  * access any of the methods of the same queue object, or a deadlock
00623  * might occur.
00624  *
00625  * Since 2.0.0-rc5
00626  */
00627   template<class... Args>
00628   void emplace(Args&&... args) {
00629     Thread::Mutex::Lock lock{mutex};
00630     q.emplace(std::forward<Args>(args)...);
00631     cond.signal();
00632   }
00633 
00634 /**
00635  * Pops an item from the queue.  This method has strong exception
00636  * safety if the container is a std::deque or std::list container (the
00637  * default is std::list), provided the destructor of a contained item
00638  * does not throw.  It is thread safe.
00639  * @param obj A value type reference to which the item at the front of
00640  * the queue will be assigned.
00641  * @exception AsyncQueuePopError If the queue is empty when a pop is
00642  * attempted, this method will throw AsyncQueuePopError.  It might
00643  * also throw if the assignment operator of the queue item might
00644  * throw.  In order to complete pop() operations atomically under a
00645  * single lock and to retain strong exception safety, the object into
00646  * which the pop()ed data is to be placed is passed as an argument by
00647  * reference (this avoids a copy from a temporary object after the
00648  * data has been extracted from the queue, which would occur if the
00649  * item extracted were returned by value).  It might also throw if the
00650  * destructor of the queue item might throw (but that should never
00651  * happen), or if the empty() method of the container type throws
00652  * (which would not happen on any sane implementation).
00653  */
00654   void pop(value_type& obj) {
00655     Thread::Mutex::Lock lock{mutex};
00656     if (q.empty()) throw AsyncQueuePopError();
00657     obj = q.front();
00658     q.pop();
00659   }
00660 
00661 /**
00662  * Pops an item from the queue.  If the queue is empty, it will block
00663  * until an item becomes available.  If it blocks, the wait comprises
00664  * a cancellation point.  This method is cancellation safe if the
00665  * stack unwinds on cancellation, as cancellation is blocked while the
00666  * queue is being operated on after coming out of a wait.  This method
00667  * has strong exception safety if the container is a std::deque or
00668  * std::list container (the default is std::list), provided the
00669  * destructor of a contained item does not throw.  It is thread safe.
00670  * @param obj A value type reference to which the item at the front of
00671  * the queue will be assigned.  This method might throw if the
00672  * assignment operator of the queue item might throw.  In order to
00673  * complete pop() operations atomically under a single lock and to
00674  * retain strong exception safety, the object into which the pop()ed
00675  * data is to be placed is passed as an argument by reference (this
00676  * avoids a copy from a temporary object after the data has been
00677  * extracted from the queue, which would occur if the item extracted
00678  * were returned by value).  It might also throw if the destructor of
00679  * the queue item might throw (but that should never happen), or if
00680  * the empty() method of the container type throws (which would not
00681  * happen on any sane implementation).
00682  */
00683   void pop_dispatch(value_type& obj) {
00684     Thread::Mutex::Lock lock{mutex};
00685     while (q.empty()) cond.wait(mutex);
00686     Thread::CancelBlock b;
00687     obj = q.front();
00688     q.pop();
00689   }    
00690 
00691 /**
00692  * Pops an item from the queue.  If the queue is empty, it will block
00693  * until an item becomes available or until the timeout expires.  If
00694  * it blocks, the wait comprises a cancellation point.  This method is
00695  * cancellation safe if the stack unwinds on cancellation, as
00696  * cancellation is blocked while the queue is being operated on after
00697  * coming out of a wait.  This method has strong exception safety if
00698  * the container is a std::deque or std::list container (the default
00699  * is std::list), provided the destructor of a contained item does not
00700  * throw.  It is thread safe.
00701  * @param obj A value type reference to which the item at the front of
00702  * the queue will be assigned.  This method might throw if the
00703  * assignment operator of the queue item might throw.  In order to
00704  * complete pop() operations atomically under a single lock and to
00705  * retain strong exception safety, the object into which the pop()ed
00706  * data is to be placed is passed as an argument by reference (this
00707  * avoids a copy from a temporary object after the data has been
00708  * extracted from the queue, which would occur if the item extracted
00709  * were returned by value).  It might also throw if the destructor of
00710  * the queue item might throw (but that should never happen), or if
00711  * the empty() method of the container type throws (which would not
00712  * happen on any sane implementation).
00713  * @param millisec The timeout interval, in milliseconds.
00714  * @return If the timeout expires without an item becoming available,
00715  * the method will return true.  If an item from the queue is
00716  * extracted, it returns false.
00717  */
00718   bool pop_timed_dispatch(value_type& obj, unsigned int millisec) {
00719     timespec ts;
00720     Thread::Cond::get_abs_time(ts, millisec);
00721     Thread::Mutex::Lock lock{mutex};
00722     while (q.empty()) {
00723       if (cond.timed_wait(mutex, ts)) return true;
00724     }
00725     Thread::CancelBlock b;
00726     obj = q.front();
00727     q.pop();
00728     return false;
00729   }
00730 
00731 /**
00732  * Discards the item at the front of the queue.  This method has
00733  * strong exception safety if the container is a std::deque or
00734  * std::list container (the default is std::list), provided the
00735  * destructor of a contained item does not throw.  It is thread safe.
00736  * @exception AsyncQueuePopError If the queue is empty when a pop is
00737  * attempted, this method will throw AsyncQueuePopError.  It might
00738  * also throw if the destructor of the queue item might throw (but
00739  * that should never happen), or if the empty() method of the
00740  * container type throws (which would not happen on any sane
00741  * implementation).
00742  */
00743   void pop() {
00744     Thread::Mutex::Lock lock{mutex};
00745     if (q.empty()) throw AsyncQueuePopError();
00746     q.pop();
00747   }
00748 
00749 /**
00750  * @return Whether the queue is empty.  It will not throw assuming
00751  * that the empty() method of the container type does not throw, as it
00752  * will not on any sane implementation.
00753  * @note This method is thread safe, but the return value may not be
00754  * valid if another thread has pushed to or popped from the queue
00755  * before the value returned by the method is acted on.  It is
00756  * provided as a utility, but may not be meaningful, depending on the
00757  * intended usage.
00758  */
00759   bool empty() const {
00760     Thread::Mutex::Lock lock{mutex};
00761     return q.empty();
00762   }
00763 
00764 /**
00765  * @return The number of items currently in the queue.  It will not
00766  * throw assuming that the size() method of the container type does
00767  * not throw, as it will not on any sane implementation.
00768  * @note This method is thread safe, but the return value may not be
00769  * valid if another thread has pushed to or popped from the queue
00770  * before the value returned by the method is acted on.  It is
00771  * provided as a utility, but may not be meaningful, depending on the
00772  * intended usage.
00773  *
00774  * Since 2.0.8
00775  */
00776   size_type size() const {
00777     Thread::Mutex::Lock lock{mutex};
00778     return q.size();
00779   }
00780 
00781 /**
00782  * Swaps the contents of 'this' and 'other'.  It will not throw
00783  * assuming that the swap method of the container type does not throw
00784  * (which the C++11 standard requires not to happen with the standard
00785  * sequence containers).  It is thread safe and the swap is
00786  * thread-wise atomic.  A non-class function
00787  * Cgu::swap(Cgu::AsyncQueue&, Cgu::AsyncQueue&) method is also
00788  * provided which will call this method.
00789  * @param other The object to be swapped with this one.
00790  * @note An object swapped does not, by virtue of the swap, inherit
00791  * any threads waiting on the other one.  However if threads were
00792  * waiting on a swapped object prior to the swap, and it acquires
00793  * items by virtue of the swap, the waiting threads will unblock and
00794  * extract those items.
00795  *
00796  * Since 2.0.8
00797  */
00798   void swap(AsyncQueueDispatch& other) {
00799     if (this != &other) {
00800       lock2(mutex, other.mutex); // doesn't throw
00801       Thread::Mutex::Lock l1{mutex, Thread::locked};
00802       Thread::Mutex::Lock l2{other.mutex, Thread::locked};
00803       q.swap(other.q);
00804       if (!q.empty()) cond.broadcast();
00805       if (!other.q.empty()) other.cond.broadcast();
00806     }
00807   }
00808 
00809 /**
00810  * The assignment operator is strongly exception safe with the
00811  * standard sequence containers (it uses copy and swap).  It is also
00812  * thread safe, as it safely locks both the assignor's and assignee's
00813  * mutex to provide a thread-wise atomic assignment.
00814  * @param rhs The assignor.
00815  * @exception std::bad_alloc The copy constructor of the queue's
00816  * container type, and so this assignment operator, might throw
00817  * std::bad_alloc if memory is exhausted and the system throws in that
00818  * case.  This assignment operator will also throw if the copy
00819  * constructor of the queue's container type throws any other
00820  * exceptions, including if any copy or move constructor or copy or
00821  * move assignment operator of a contained item throws.
00822  * @exception Thread::MutexError The assignment operator might throw
00823  * Thread::MutexError if initialization of a transitional object's
00824  * contained mutex fails.  (It is often not worth checking for this,
00825  * as it means either memory is exhausted or pthread has run out of
00826  * other resources to create new mutexes.)
00827  * @exception Thread::CondError The assignment operator might throw
00828  * this exception if initialisation of a transitional object's
00829  * contained condition variable fails.  (It is often not worth
00830  * checking for this, as it means either memory is exhausted or
00831  * pthread has run out of other resources to create new condition
00832  * variables.)
00833  * @note The assignee does not, by virtue of the assignment, inherit
00834  * any threads waiting on the assignor.  However, if prior to the
00835  * assignment threads were waiting on the assignee and the assignee
00836  * acquires items from the assignor as a result of the assignment, the
00837  * waiting threads will unblock and extract those items.
00838  *
00839  * Since 2.0.8
00840  */
00841   AsyncQueueDispatch& operator=(const AsyncQueueDispatch& rhs) {
00842     if (this != &rhs) {
00843       lock2(mutex, rhs.mutex); // doesn't throw
00844       Thread::Mutex::Lock l1{mutex, Thread::locked};
00845       Thread::Mutex::Lock l2{rhs.mutex, Thread::locked};
00846       std::queue<T, Container> temp(rhs.q);
00847       q.swap(temp);
00848       if (!q.empty()) cond.broadcast();
00849     }
00850     return *this;
00851   }
00852 
00853 /**
00854  * This move assignment operator is thread safe as regards the
00855  * assignee (the object moved to), but no synchronization is carried
00856  * out with respect to the rvalue assignor/movant.  This is because
00857  * temporaries are only visible and accessible in the thread carrying
00858  * out the move operation and synchronization for them would represent
00859  * pointless overhead.  In a case where the user uses std::move to
00860  * force a move from a named object, and that named object's lifetime
00861  * is managed by (or the object is otherwise accessed by) a different
00862  * thread than the one making the move, the user must carry out her
00863  * own synchronization with respect to that different thread, both to
00864  * ensure that a consistent view of the the named object is obtained
00865  * and because that object will be mutated by the move.  This method
00866  * invokes std::queue's move assignment operator, and therefore has
00867  * the same exception safety as the standard library's implementation
00868  * of that operator.  It will not normally throw unless a custom
00869  * allocator is used which throws on move assignment, or the
00870  * destructor of a contained item throws.
00871  * @param rhs The assignor/movant.
00872  *
00873  * Since 2.0.8
00874  */
00875   AsyncQueueDispatch& operator=(AsyncQueueDispatch&& rhs) {
00876     Thread::Mutex::Lock lock{mutex};
00877     q = std::move(rhs.q);
00878     return *this;
00879   }
00880 
00881 /**
00882  * @exception std::bad_alloc The default constructor might throw this
00883  * exception if memory is exhausted and the system throws in that
00884  * case.
00885  * @exception Thread::MutexError The default constructor might throw
00886  * this exception if initialisation of the contained mutex fails.  (It
00887  * is often not worth checking for this, as it means either memory is
00888  * exhausted or pthread has run out of other resources to create new
00889  * mutexes.)
00890  * @exception Thread::CondError The default constructor might throw
00891  * this exception if initialisation of the contained condition
00892  * variable fails.  (It is often not worth checking for this, as it
00893  * means either memory is exhausted or pthread has run out of other
00894  * resources to create new condition variables.)
00895  */
00896   AsyncQueueDispatch() = default;
00897 
00898 /**
00899  * As regards thread safety, the move constructor does not synchronize
00900  * with respect to the initializing rvalue.  This is because
00901  * temporaries are only visible and accessible in the thread carrying
00902  * out the move operation and synchronization for them would represent
00903  * pointless overhead.  In a case where a user uses std::move to force
00904  * a move from a named object, and that named object's lifetime is
00905  * managed by (or the object is otherwise accessed by) a different
00906  * thread than the one making the move, the user must carry out her
00907  * own synchronization with respect to that different thread, both to
00908  * ensure that a consistent view of the the named object is obtained
00909  * and because that object will be mutated by the move.
00910  * @param rhs The AsyncQueueDispatch object to be moved.
00911  * @exception Thread::MutexError The move constructor might throw
00912  * Thread::MutexError if initialization of the contained mutex fails.
00913  * (It is often not worth checking for this, as it means either memory
00914  * is exhausted or pthread has run out of other resources to create
00915  * new mutexes.)  It might also throw if the queue's container type's
00916  * move constructor might throw, but it should not do that unless a
00917  * custom allocator is in use.
00918  * @exception Thread::CondError The move constructor might throw this
00919  * exception if initialisation of the contained condition variable
00920  * fails.  (It is often not worth checking for this, as it means
00921  * either memory is exhausted or pthread has run out of other
00922  * resources to create new condition variables.)  It might also throw
00923  * if the queue's container type's move constructor might throw, but
00924  * it should not do that unless a custom allocator is in use.
00925  * @note If this constructor throws Thread::MutexError or
00926  * Thread::CondError, and a named object is moved using std::move,
00927  * this constructor is not strongly exception safe (items in the moved
00928  * queue will be lost).  Fixing this efficiently requires changing the
00929  * order of construction of data members of this class, which cannot
00930  * be done until the next ABI break for this library as it would alter
00931  * object layout.  As noted above, in most cases the possibility of
00932  * Thread::MutexError or Thread::CondError throwing can be ignored,
00933  * but where that is not the case and strong exception safety is
00934  * wanted, the user should either not employ std::move with named
00935  * objects when invoking this class's constructors, or should
00936  * construct an AsyncQueueDispatch object using the default
00937  * constructor and then move assign to it.
00938  *
00939  * Since 2.0.8
00940  */
00941   AsyncQueueDispatch(AsyncQueueDispatch&& rhs): q(std::move(rhs.q)) {}
00942 
00943 /**
00944  * The copy constructor is thread safe, as it locks the initializing
00945  * object's mutex to obtain a consistent view of it.
00946  * @param rhs The AsyncQueueDispatch object to be copied.
00947  * @exception std::bad_alloc The copy constructor of the queue's
00948  * container type, and so this constructor, might throw std::bad_alloc
00949  * if memory is exhausted and the system throws in that case.  It will
00950  * also throw if the copy constructor of the queue's container type
00951  * throws any other exceptions, including if any copy or move
00952  * constructor or copy or move assignment operator of a contained item
00953  * throws.
00954  * @exception Thread::MutexError The copy constructor might throw
00955  * Thread::MutexError if initialization of the contained mutex fails.
00956  * (It is often not worth checking for this, as it means either memory
00957  * is exhausted or pthread has run out of other resources to create
00958  * new mutexes.)
00959  * @exception Thread::CondError The copy constructor might throw this
00960  * exception if initialisation of the contained condition variable
00961  * fails.  (It is often not worth checking for this, as it means
00962  * either memory is exhausted or pthread has run out of other
00963  * resources to create new condition variables.)
00964  *
00965  * Since 2.0.8
00966  */
00967   // we use the comma operator here to lock the mutex and call the
00968   // copy constructor: the lock will be retained until the end of the
00969   // full expression in which it is lexically situated, namely until
00970   // the end of q's constructor - see C++11 1.9/10 and 12.2/3
00971   AsyncQueueDispatch(const AsyncQueueDispatch& rhs):
00972                         q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
00973 
00974 /**
00975  * The destructor does not throw unless the destructor of a contained
00976  * item throws.  It is thread safe (any thread may delete the
00977  * AsyncQueueDispatch object).  Destroying an AsyncQueueDispatch
00978  * object on which another thread is currently blocked results in
00979  * undefined behavior.
00980  */
00981   ~AsyncQueueDispatch() {
00982     // lock and unlock the mutex in the destructor so that we have an
00983     // acquire operation to ensure that when the std::queue object is
00984     // destroyed memory is synchronised, so any thread may destroy the
00985     // AsyncQueueDispatch object
00986     Thread::Mutex::Lock lock{mutex};
00987   }
00988 
00989 /* Only has effect if --with-glib-memory-slices-compat or
00990  * --with-glib-memory-slices-no-compat option picked */
00991   CGU_GLIB_MEMORY_SLICES_FUNCS
00992 };
00993 
00994 /**
00995  * Swaps the contents of two AsyncQueue objects.  It will not throw
00996  * assuming that the swap method of the container type does not throw
00997  * (which the C++11 standard requires not to happen with the standard
00998  * sequence containers).  It is thread safe and the swap is
00999  * thread-wise atomic.
01000  * @param q1 An object to be swapped with the other.
01001  * @param q2 An object to be swapped with the other.
01002  * @note Calling std::swap on AsyncQueue objects is thread safe but
01003  * does not provide a thread-wise atomic swap (the swapped objects may
01004  * not be mirror images if during the execution of std::swap's default
01005  * algorithm one of them has been modified), although in many cases
01006  * that doesn't matter.  If swap() is called without a namespace
01007  * qualifier, argument dependent look-up will pick this one correctly.
01008  *
01009  * Since 2.0.8
01010  */
01011 template <class T, class Container>
01012 void swap(Cgu::AsyncQueue<T, Container>& q1,
01013           Cgu::AsyncQueue<T, Container>& q2) {
01014   q1.swap(q2);
01015 }
01016 
01017 /**
01018  * Swaps the contents of two AsyncQueue objects.  It will not throw
01019  * assuming that the swap method of the container type does not throw
01020  * (which the C++11 standard requires not to happen with the standard
01021  * sequence containers).  It is thread safe and the swap is
01022  * thread-wise atomic.
01023  * @param q1 An object to be swapped with the other.
01024  * @param q2 An object to be swapped with the other.
01025  * @note 1. An object swapped does not, by virtue of the swap, inherit
01026  * any threads waiting on the other one.  However if threads were
01027  * waiting on a swapped object prior to the swap, and it acquires
01028  * items by virtue of the swap, the waiting threads will unblock and
01029  * extract those items.
01030  * @note 2. Calling std::swap on AsyncQueueDispatch objects is thread
01031  * safe but does not provide a thread-wise atomic swap (the swapped
01032  * objects may not be mirror images if during the execution of
01033  * std::swap's default algorithm one of them has been modified),
01034  * although in many cases that doesn't matter.  If swap() is called
01035  * without a namespace qualifier, argument dependent look-up will pick
01036  * this one correctly.
01037  *
01038  * Since 2.0.8
01039  */
01040 template <class T, class Container>
01041 void swap(Cgu::AsyncQueueDispatch<T, Container>& q1,
01042           Cgu::AsyncQueueDispatch<T, Container>& q2) {
01043   q1.swap(q2);
01044 }
01045 
01046 } // namespace Cgu
01047 
01048 #endif