c++-gtk-utils

async_queue.h

Go to the documentation of this file.
00001 /* Copyright (C) 2006 to 2012 Chris Vine
00002 
00003 The library comprised in this file or of which this file is part is
00004 distributed by Chris Vine under the GNU Lesser General Public
00005 License as follows:
00006 
00007    This library is free software; you can redistribute it and/or
00008    modify it under the terms of the GNU Lesser General Public License
00009    as published by the Free Software Foundation; either version 2.1 of
00010    the License, or (at your option) any later version.
00011 
00012    This library is distributed in the hope that it will be useful, but
00013    WITHOUT ANY WARRANTY; without even the implied warranty of
00014    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00015    Lesser General Public License, version 2.1, for more details.
00016 
00017    You should have received a copy of the GNU Lesser General Public
00018    License, version 2.1, along with this library (see the file LGPL.TXT
00019    which came with this source code package in the c++-gtk-utils
00020    sub-directory); if not, write to the Free Software Foundation, Inc.,
00021    51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00022 
00023 However, it is not intended that the object code of a program whose
00024 source code instantiates a template from this file or uses macros or
00025 inline functions (of any length) should by reason only of that
00026 instantiation or use be subject to the restrictions of use in the GNU
00027 Lesser General Public License.  With that in mind, the words "and
00028 macros, inline functions and instantiations of templates (of any
00029 length)" shall be treated as substituted for the words "and small
00030 macros and small inline functions (ten lines or less in length)" in
00031 the fourth paragraph of section 5 of that licence.  This does not
00032 affect any other reason why object code may be subject to the
00033 restrictions in that licence (nor for the avoidance of doubt does it
00034 affect the application of section 2 of that licence to modifications
00035 of the source code in this file).
00036 
00037 */
00038 
00039 /**
00040  * @file async_queue.h
00041  * @brief This file provides thread-safe asynchronous queue classes.
00042  *
00043  * AsyncQueue is a class which provides some of the functionality of a
00044  * std::queue object (but note that the AsyncQueue::pop(value_type&
00045  * obj) method provides the pop()ed element by reference - see the
00046  * comments on that method for the reason), except that it has mutex
00047  * locking of the data container so as to permit push()ing and
00048  * pop()ing from different threads.  It is therefore useful for
00049  * passing data between threads, perhaps in response to a signal being
00050  * emitted from a Notifier object.  Passing the data by means of a
00051  * SharedLockPtr object, or an IntrusivePtr object referencing data
00052  * derived from IntrusiveLockCounter, would be ideal.
00053  *
00054  * AsyncQueueDispatch is a class which has a blocking pop() method,
00055  * which allows it to be waited on by a dedicated event/message
00056  * dispatching thread for incoming work (represented by the data
00057  * pushed onto the queue).  In the same way, it can be used to
00058  * implement thread pools, by having threads in the pool waiting on
00059  * the queue.
00060  *
00061  * By default the queues use a std::list object as their container
00062  * because in the kind of use mentioned above they are unlikely to
00063  * hold many objects but they can be changed to, say, a std::deque
00064  * object by specifying it as the second template parameter.
00065  */
00066 
00067 #ifndef CGU_ASYNC_QUEUE_H
00068 #define CGU_ASYNC_QUEUE_H
00069 
00070 #include <queue>
00071 #include <list>
00072 #include <exception>
00073 #include <utility>    // for std::move and std::forward
00074 #include <algorithm>  // for std::swap
00075 #include <time.h>
00076 
00077 #include <c++-gtk-utils/mutex.h>
00078 #include <c++-gtk-utils/thread.h>
00079 #include <c++-gtk-utils/cgu_config.h>
00080 
00081 #ifdef CGU_USE_SCHED_YIELD
00082 #include <sched.h>
00083 #else
00084 #include <unistd.h>
00085 #endif
00086 
00087 namespace Cgu {
00088 
00089 /**
00090  * @class AsyncQueuePopError async_queue.h c++-gtk-utils/async_queue.h
00091  * @brief An exception thrown if calling pop() on a AsyncQueue or
00092  * AsyncQueueDispatch object fails because the queue is empty.
00093  * @sa AsyncQueue AsyncQueueDispatch
00094  */
00095 
00096 struct AsyncQueuePopError: public std::exception {
00097   virtual const char* what() const throw() {return "AsyncQueuePopError: popping from empty AsyncQueue object\n";}
00098 };
00099 
00100 
00101 /**
00102  * @class AsyncQueue async_queue.h c++-gtk-utils/async_queue.h
00103  * @brief A thread-safe asynchronous queue.
00104  * @sa AsyncQueueDispatch AsyncResult
00105  *
00106  * AsyncQueue is a class which provides some of the functionality of a
00107  * std::queue object (but note that the AsyncQueue::pop(value_type&
00108  * obj) method provides the pop()ed element by reference - see the
00109  * comments on that method for the reason), except that it has mutex
00110  * locking of the data container so as to permit push()ing and
00111  * pop()ing from different threads.  It is therefore useful for
00112  * passing data between threads, perhaps in response to a signal being
00113  * emitted from a Notifier object.  Passing the data by means of a
00114  * SharedLockPtr object, or an IntrusivePtr object referencing data
00115  * derived from IntrusiveLockCounter, would be ideal.
00116  *
00117  * By default the queue uses a std::list object as its container
00118  * because in the kind of use mentioned above it is unlikely to hold
00119  * many objects but it can be changed to, say, a std::deque object by
00120  * specifying it as the second template parameter.
00121  *
00122  * If the library is installed using the
00123  * --with-glib-memory-slices-compat or
00124  * --with-glib-memory-slices-no-compat configuration options, any
00125  * AsyncQueue objects constructed on free store will be constructed in
00126  * glib memory slices.  This does not affect the queue container
00127  * itself: to change the allocator of the C++ container, a custom
00128  * allocator type can be provided when the AsyncQueue object is
00129  * instantiated offering the standard allocator interface.  If glib
00130  * memory slices are not used or no AsyncQueue objects are constructed
00131  * on free store, it is not necessary to call g_thread_init() before
00132  * manipulating or using an AsyncQueue object in multiple threads, but
00133  * prior to glib version 2.32 glib itself (and thus glib memory
00134  * slices) are not thread safe unless that function has been called.
00135  */
00136 
00137 template <class T, class Container = std::list<T> > class AsyncQueue {
00138 public:
00139   typedef typename Container::value_type value_type;
00140   typedef typename Container::size_type size_type;
00141   typedef Container container_type;
00142 private:
00143 // TODO: put 'q' after 'mutex' at the next ABI break, so move
00144 // construction is strongly exception safe
00145   std::queue<T, Container> q;
00146   mutable Thread::Mutex mutex;
00147 
00148 // this won't throw: it is for the user to ensure the arguments do not
00149 // refer to the same mutex object
00150   void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
00151     m1.lock();
00152     for(;;) {
00153       if (!m2.trylock()) {
00154         return;
00155       }
00156       m1.unlock();
00157       // spin nicely
00158 #ifdef CGU_USE_SCHED_YIELD
00159       sched_yield();
00160 #else
00161       usleep(10);
00162 #endif
00163       m1.lock();
00164     }
00165   }
00166 public:
00167 /**
00168  * Pushes an item onto the queue.  This method has strong exception
00169  * safety if the container is a std::list or std::deque container (the
00170  * default is std::list), except that if std::deque is used as the
00171  * container and the copy constructor, move constructor, assignment
00172  * operator or move assignment operator of the queue item throws, it
00173  * only gives the basic exception guarantee (and the basic guarantee
00174  * is not given by std::deque if the queue item's move constructor
00175  * throws and it uses a non-default allocator which does not provide
00176  * for it to be CopyInsertable).  It is thread safe.
00177  * @param obj The item to be pushed onto the queue.
00178  * @exception std::bad_alloc The method might throw std::bad_alloc if
00179  * memory is exhausted and the system throws in that case.  It might
00180  * also throw if the copy constructor, move constructor, assignment
00181  * operator or move assignment operator of the queue item might throw.
00182  */
00183   void push(const value_type& obj) {
00184     Thread::Mutex::Lock lock{mutex};
00185     q.push(obj);
00186   }
00187 
00188 /**
00189  * Pushes an item onto the queue.  This method has strong exception
00190  * safety if the container is a std::list or std::deque container (the
00191  * default is std::list), except that if std::deque is used as the
00192  * container and the copy constructor, move constructor, assignment
00193  * operator or move assignment operator of the queue item throws, it
00194  * only gives the basic exception guarantee (and the basic guarantee
00195  * is not given by std::deque if the queue item's move constructor
00196  * throws and it uses a non-default allocator which does not provide
00197  * for it to be CopyInsertable).  It is thread safe.
00198  * @param obj The item to be pushed onto the queue.
00199  * @exception std::bad_alloc The method might throw std::bad_alloc if
00200  * memory is exhausted and the system throws in that case.  It might
00201  * also throw if the copy constructor, move constructor, assignment
00202  * operator or move assignment operator of the queue item might throw.
00203  *
00204  * Since 2.0.0-rc5
00205  */
00206   void push(value_type&& obj) {
00207     Thread::Mutex::Lock lock{mutex};
00208     q.push(std::move(obj));
00209   }
00210 
00211 /**
00212  * Pushes an item onto the queue by constructing it in place: that is,
00213  * by passing to this method the item's constructor's arguments,
00214  * rather than the item itself.  This method has strong exception
00215  * safety if the container is a std::list or std::deque container (the
00216  * default is std::list).  (Technically, for a std::deque container,
00217  * emplace() only offers the same exception guarantees as does push(),
00218  * namely only the basic guarantee where a copy or move of the queue
00219  * item throws during the call, but the purpose of emplace is to
00220  * construct in place and any reasonable implementation will not copy
00221  * or move the queue item.)  It is thread safe.
00222  * @param args The constructor arguments for the item to be pushed
00223  * onto the queue.
00224  * @exception std::bad_alloc The method might throw std::bad_alloc if
00225  * memory is exhausted and the system throws in that case.  It might
00226  * also throw if the item's constructor (including any of its
00227  * constructor arguments) might throw when constructing the item.
00228  * @note The constructor of the item pushed onto the queue must not
00229  * access any of the methods of the same queue object, or a deadlock
00230  * might occur.
00231  *
00232  * Since 2.0.0-rc5
00233  */
00234   template<class... Args>
00235   void emplace(Args&&... args) {
00236     Thread::Mutex::Lock lock{mutex};
00237     q.emplace(std::forward<Args>(args)...);
00238   }
00239 
00240 /**
00241  * Pops an item from the queue.  This method has strong exception
00242  * safety if the container is a std::deque or std::list container (the
00243  * default is std::list), provided the destructor of a contained item
00244  * does not throw.  It is thread safe.
00245  * @param obj A value type reference to which the item at the front of
00246  * the queue will be assigned.
00247  * @exception AsyncQueuePopError If the queue is empty when a pop is
00248  * attempted, this method will throw AsyncQueuePopError.  It might
00249  * also throw if the assignment operator of the queue item might
00250  * throw.  In order to complete pop() operations atomically under a
00251  * single lock and to retain strong exception safety, the object into
00252  * which the pop()ed data is to be placed is passed as an argument by
00253  * reference (this avoids a copy from a temporary object after the
00254  * data has been extracted from the queue, which would occur if the
00255  * item extracted were returned by value).  It might also throw if the
00256  * destructor of the queue item might throw (but that should never
00257  * happen), or if the empty() method of the container type throws
00258  * (which would not happen on any sane implementation).
00259  */
00260   void pop(value_type& obj) {
00261     Thread::Mutex::Lock lock{mutex};
00262     if (q.empty()) throw AsyncQueuePopError();
00263     obj = q.front();
00264     q.pop();
00265   }
00266 
00267 /**
00268  * Discards the item at the front of the queue.  This method has
00269  * strong exception safety if the container is a std::deque or
00270  * std::list container (the default is std::list), provided the
00271  * destructor of a contained item does not throw.  It is thread safe.
00272  * @exception AsyncQueuePopError If the queue is empty when a pop is
00273  * attempted, this method will throw AsyncQueuePopError.  It might
00274  * also throw if the destructor of the queue item might throw (but
00275  * that should never happen), or if the empty() method of the
00276  * container type throws (which would not happen on any sane
00277  * implementation).
00278  */
00279   void pop() {
00280     Thread::Mutex::Lock lock{mutex};
00281     if (q.empty()) throw AsyncQueuePopError();
00282     q.pop();
00283   }
00284 
00285 /**
00286  * @return Whether the queue is empty.  It will not throw assuming
00287  * that the empty() method of the container type does not throw, as it
00288  * will not on any sane implementation.
00289  * @note This method is thread safe, but the return value may not be
00290  * valid if another thread has pushed to or popped from the queue
00291  * before the value returned by the method is acted on.  It is
00292  * provided as a utility, but may not be meaningful, depending on the
00293  * intended usage.
00294  */
00295   bool empty() const {
00296     Thread::Mutex::Lock lock{mutex};
00297     return q.empty();
00298   }
00299 
00300 /**
00301  * @return The number of items currently in the queue.  It will not
00302  * throw assuming that the size() method of the container type does
00303  * not throw, as it will not on any sane implementation.
00304  * @note This method is thread safe, but the return value may not be
00305  * valid if another thread has pushed to or popped from the queue
00306  * before the value returned by the method is acted on.  It is
00307  * provided as a utility, but may not be meaningful, depending on the
00308  * intended usage.
00309  *
00310  * Since 2.0.8
00311  */
00312   size_type size() const {
00313     Thread::Mutex::Lock lock{mutex};
00314     return q.size();
00315   }
00316 
00317 /**
00318  * Swaps the contents of 'this' and 'other'.  It will not throw
00319  * assuming that the swap method of the container type does not throw
00320  * (which the C++11 standard requires not to happen with the standard
00321  * sequence containers).  It is thread safe and the swap is
00322  * thread-wise atomic.  A non-class function
00323  * Cgu::swap(Cgu::AsyncQueue&, Cgu::AsyncQueue&) method is also
00324  * provided which will call this method.
00325  * @param other The object to be swapped with this one.
00326  *
00327  * Since 2.0.8
00328  */
00329   void swap(AsyncQueue& other) {
00330     if (this != &other) {
00331       lock2(mutex, other.mutex); // doesn't throw
00332       Thread::Mutex::Lock l1{mutex, Thread::locked};
00333       Thread::Mutex::Lock l2{other.mutex, Thread::locked};
00334       q.swap(other.q);
00335     }
00336   }
00337 
00338 /**
00339  * The assignment operator is strongly exception safe with the
00340  * standard sequence containers (it uses copy and swap).  It is also
00341  * thread safe, as it safely locks both the assignor's and assignee's
00342  * mutex to provide a thread-wise atomic assignment.
00343  * @param rhs The assignor.
00344  * @return The AsyncQueue object after assignment.
00345  * @exception std::bad_alloc The copy constructor of the queue's
00346  * container type, and so this assignment operator, might throw
00347  * std::bad_alloc if memory is exhausted and the system throws in that
00348  * case.  This assignment operator will also throw if the copy
00349  * constructor of the queue's container type throws any other
00350  * exceptions, including if any copy or move constructor or copy or
00351  * move assignment operator of a contained item throws.
00352  * @exception Thread::MutexError The assignment operator might throw
00353  * Thread::MutexError if initialization of a transitional object's
00354  * contained mutex fails.  (It is often not worth checking for this,
00355  * as it means either memory is exhausted or pthread has run out of
00356  * other resources to create new mutexes.)
00357  *
00358  * Since 2.0.8
00359  */
00360   AsyncQueue& operator=(const AsyncQueue& rhs) {
00361     if (this != &rhs) {
00362       lock2(mutex, rhs.mutex); // doesn't throw
00363       Thread::Mutex::Lock l1{mutex, Thread::locked};
00364       Thread::Mutex::Lock l2{rhs.mutex, Thread::locked};
00365       std::queue<T, Container> temp{rhs.q};
00366       q.swap(temp);
00367     }
00368     return *this;
00369   }
00370 
00371 /**
00372  * This move assignment operator is thread safe as regards the
00373  * assignee (the object moved to), but no synchronization is carried
00374  * out with respect to the rvalue assignor/movant.  This is because
00375  * temporaries are only visible and accessible in the thread carrying
00376  * out the move operation and synchronization for them would represent
00377  * pointless overhead.  In a case where the user uses std::move to
00378  * force a move from a named object, and that named object's lifetime
00379  * is managed by (or the object is otherwise accessed by) a different
00380  * thread than the one making the move, the user must carry out her
00381  * own synchronization with respect to that different thread, both to
00382  * ensure that a consistent view of the the named object is obtained
00383  * and because that object will be mutated by the move.  This method
00384  * invokes std::queue's move assignment operator, and therefore has
00385  * the same exception safety as the standard library's implementation
00386  * of that operator.  It will not normally throw unless a custom
00387  * allocator is used which throws on move assignment, or the
00388  * destructor of a contained item throws.
00389  * @param rhs The assignor/movant.
00390  * @return The AsyncQueue object after move assignment.
00391  *
00392  * Since 2.0.8
00393  */
00394   AsyncQueue& operator=(AsyncQueue&& rhs) {
00395     Thread::Mutex::Lock lock{mutex};
00396     q = std::move(rhs.q);
00397     return *this;
00398   }
00399 
00400 /**
00401  * @exception std::bad_alloc The default constructor might throw
00402  * std::bad_alloc if memory is exhausted and the system throws in that
00403  * case.
00404  * @exception Thread::MutexError The default constructor might throw
00405  * Thread::MutexError if initialization of the contained mutex fails.
00406  * (It is often not worth checking for this, as it means either memory
00407  * is exhausted or pthread has run out of other resources to create
00408  * new mutexes.)
00409  */
00410   AsyncQueue() = default;
00411 
00412 /**
00413  * As regards thread safety, the move constructor does not synchronize
00414  * with respect to the initializing rvalue.  This is because
00415  * temporaries are only visible and accessible in the thread carrying
00416  * out the move operation and synchronization for them would represent
00417  * pointless overhead.  In a case where a user uses std::move to force
00418  * a move from a named object, and that named object's lifetime is
00419  * managed by (or the object is otherwise accessed by) a different
00420  * thread than the one making the move, the user must carry out her
00421  * own synchronization with respect to that different thread, both to
00422  * ensure that a consistent view of the the named object is obtained
00423  * and because that object will be mutated by the move.
00424  * @param rhs The AsyncQueue object to be moved.
00425  * @exception Thread::MutexError The move constructor might throw
00426  * Thread::MutexError if initialization of the contained mutex fails.
00427  * (It is often not worth checking for this, as it means either memory
00428  * is exhausted or pthread has run out of other resources to create
00429  * new mutexes.)  It might also throw if the queue's container type's
00430  * move constructor might throw, but it should not do that unless a
00431  * custom allocator is in use.
00432  * @note If this constructor throws Thread::MutexError, and a named
00433  * object is moved using std::move, this constructor is not strongly
00434  * exception safe (items in the moved queue will be lost).  Fixing
00435  * this efficiently requires changing the order of construction of
00436  * data members of this class, which cannot be done until the next ABI
00437  * break for this library as it would alter object layout.  As noted
00438  * above, in most cases the possibility of Thread::MutexError throwing
00439  * can be ignored, but where that is not the case and strong exception
00440  * safety is wanted, the user should either not employ std::move with
00441  * named objects when invoking this class's constructors, or should
00442  * construct an AsyncQueue object using the default constructor and
00443  * then move assign to it.
00444  *
00445  * Since 2.0.8
00446  */
00447   AsyncQueue(AsyncQueue&& rhs): q(std::move(rhs.q)) {}
00448 
00449 /**
00450  * The copy constructor is thread safe, as it locks the initializing
00451  * object's mutex to obtain a consistent view of it.
00452  * @param rhs The AsyncQueue object to be copied.
00453  * @exception std::bad_alloc The copy constructor of the queue's
00454  * container type, and so this constructor, might throw std::bad_alloc
00455  * if memory is exhausted and the system throws in that case.  It will
00456  * also throw if the copy constructor of the queue's container type
00457  * throws any other exceptions, including if any copy or move
00458  * constructor or copy or move assignment operator of a contained item
00459  * throws.
00460  * @exception Thread::MutexError The copy constructor might throw
00461  * Thread::MutexError if initialization of the contained mutex fails.
00462  * (It is often not worth checking for this, as it means either memory
00463  * is exhausted or pthread has run out of other resources to create
00464  * new mutexes.)
00465  *
00466  * Since 2.0.8
00467  */
00468   // we use the comma operator here to lock the mutex and call the
00469   // copy constructor: the lock will be retained until the end of the
00470   // full expression in which it is lexically situated, namely until
00471   // the end of q's constructor - see C++11 1.9/10 and 12.2/3
00472   AsyncQueue(const AsyncQueue& rhs): q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
00473 
00474 /**
00475  * The destructor does not throw unless the destructor of a contained
00476  * item throws.  It is thread safe (any thread may delete the
00477  * AsyncQueue object).
00478  */
00479   ~AsyncQueue() {
00480     // lock and unlock the mutex in the destructor so that we have an
00481     // acquire operation to ensure that when the std::queue object is
00482     // destroyed memory is synchronised, so any thread may destroy the
00483     // AsyncQueue object
00484     Thread::Mutex::Lock lock{mutex};
00485   }
00486 
00487 /* Only has effect if --with-glib-memory-slices-compat or
00488  * --with-glib-memory-slices-no-compat option picked */
00489   CGU_GLIB_MEMORY_SLICES_FUNCS
00490 };
00491 
00492 /**
00493  * @class AsyncQueueDispatch async_queue.h c++-gtk-utils/async_queue.h
00494  * @brief A thread-safe asynchronous queue with a blocking pop()
00495  * method.
00496  * @sa AsyncQueue AsyncResult
00497  *
00498  * AsyncQueueDispatch is similar to the AsyncQueue class, except that
00499  * it has a blocking pop_dispatch() method, which allows it to be
00500  * waited on by a dedicated event/message dispatching thread for
00501  * incoming work (represented by the data pushed onto the queue).  In
00502  * the same way, it can be used to implement thread pools, by having
00503  * threads in the pool waiting on the queue.  The AsyncResult class
00504  * can be useful for passing results between threads in conjunction
00505  * with AsyncQueueDispatch (the documentation on AsyncResult gives an
00506  * example).
00507  *
00508  * By default the queue uses a std::list object as its container
00509  * because in the kind of use mentioned above it is unlikely to hold
00510  * many objects but it can be changed to, say, a std::deque object by
00511  * specifying it as the second template parameter.
00512  *
00513  * If the library is installed using the
00514  * --with-glib-memory-slices-compat or
00515  * --with-glib-memory-slices-no-compat configuration options, any
00516  * AsyncQueueDispatch objects constructed on free store will be
00517  * constructed in glib memory slices.  This does not affect the queue
00518  * container itself: to change the allocator of the C++ container, a
00519  * custom allocator type can be provided when the AsyncQueueDispatch
00520  * object is instantiated offering the standard allocator interface.
00521  * If glib memory slices are not used or no AsyncQueueDispatch objects
00522  * are constructed on free store, it is not necessary to call
00523  * g_thread_init() before manipulating or using an AsyncQueueDispatch
00524  * object in multiple threads, but prior to glib version 2.32 glib
00525  * itself (and thus glib memory slices) are not thread safe unless
00526  * that function has been called.
00527  */
00528 
00529 template <class T, class Container = std::list<T> > class AsyncQueueDispatch {
00530 public:
00531   typedef typename Container::value_type value_type;
00532   typedef typename Container::size_type size_type;
00533   typedef Container container_type;
00534 private:
00535 // TODO: put 'q' after 'mutex' and 'cond' at the next ABI break, so
00536 // move construction is strongly exception safe
00537   std::queue<T, Container> q;
00538   mutable Thread::Mutex mutex;
00539   Thread::Cond cond;
00540 
00541 // this won't throw: it is for the user to ensure the arguments do not
00542 // refer to the same mutex object
00543   void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
00544     m1.lock();
00545     for(;;) {
00546       if (!m2.trylock()) {
00547         return;
00548       }
00549       m1.unlock();
00550       // spin nicely
00551 #ifdef CGU_USE_SCHED_YIELD
00552       sched_yield();
00553 #else
00554       usleep(10);
00555 #endif
00556       m1.lock();
00557     }
00558   }
00559 public:
00560 /**
00561  * Pushes an item onto the queue.  This method has strong exception
00562  * safety if the container is a std::list or std::deque container (the
00563  * default is std::list), except that if std::deque is used as the
00564  * container and the copy constructor, move constructor, assignment
00565  * operator or move assignment operator of the queue item throws, it
00566  * only gives the basic exception guarantee (and the basic guarantee
00567  * is not given by std::deque if the queue item's move constructor
00568  * throws and it uses a non-default allocator which does not provide
00569  * for it to be CopyInsertable).  It is thread safe.
00570  * @param obj The item to be pushed onto the queue.
00571  * @exception std::bad_alloc The method might throw std::bad_alloc if
00572  * memory is exhausted and the system throws in that case.  It might
00573  * also throw if the copy constructor, move constructor, assignment
00574  * operator or move assignment operator of the queue item might throw.
00575  */
00576   void push(const value_type& obj) {
00577     Thread::Mutex::Lock lock{mutex};
00578     q.push(obj);
00579     cond.signal();
00580   }
00581 
00582 /**
00583  * Pushes an item onto the queue.  This method has strong exception
00584  * safety if the container is a std::list or std::deque container (the
00585  * default is std::list), except that if std::deque is used as the
00586  * container and the copy constructor, move constructor, assignment
00587  * operator or move assignment operator of the queue item throws, it
00588  * only gives the basic exception guarantee (and the basic guarantee
00589  * is not given by std::deque if the queue item's move constructor
00590  * throws and it uses a non-default allocator which does not provide
00591  * for it to be CopyInsertable).  It is thread safe.
00592  * @param obj The item to be pushed onto the queue.
00593  * @exception std::bad_alloc The method might throw std::bad_alloc if
00594  * memory is exhausted and the system throws in that case.  It might
00595  * also throw if the copy constructor, move constructor, assignment
00596  * operator or move assignment operator of the queue item might throw.
00597  *
00598  * Since 2.0.0-rc5
00599  */
00600   void push(value_type&& obj) {
00601     Thread::Mutex::Lock lock{mutex};
00602     q.push(std::move(obj));
00603     cond.signal();
00604   }
00605 
00606 /**
00607  * Pushes an item onto the queue by constructing it in place: that is,
00608  * by passing to this method the item's constructor's arguments,
00609  * rather than the item itself.  This method has strong exception
00610  * safety if the container is a std::list or std::deque container (the
00611  * default is std::list).  (Technically, for a std::deque container,
00612  * emplace() only offers the same exception guarantees as does push(),
00613  * namely only the basic guarantee where a copy or move of the queue
00614  * item throws during the call, but the purpose of emplace is to
00615  * construct in place and any reasonable implementation will not copy
00616  * or move the queue item.)  It is thread safe.
00617  * @param args The constructor arguments for the item to be pushed
00618  * onto the queue.
00619  * @exception std::bad_alloc The method might throw std::bad_alloc if
00620  * memory is exhausted and the system throws in that case.  It might
00621  * also throw if the item's constructor (including any of its
00622  * constructor arguments) might throw when constructing the item.
00623  * @note The constructor of the item pushed onto the queue must not
00624  * access any of the methods of the same queue object, or a deadlock
00625  * might occur.
00626  *
00627  * Since 2.0.0-rc5
00628  */
00629   template<class... Args>
00630   void emplace(Args&&... args) {
00631     Thread::Mutex::Lock lock{mutex};
00632     q.emplace(std::forward<Args>(args)...);
00633     cond.signal();
00634   }
00635 
00636 /**
00637  * Pops an item from the queue.  This method has strong exception
00638  * safety if the container is a std::deque or std::list container (the
00639  * default is std::list), provided the destructor of a contained item
00640  * does not throw.  It is thread safe.
00641  * @param obj A value type reference to which the item at the front of
00642  * the queue will be assigned.
00643  * @exception AsyncQueuePopError If the queue is empty when a pop is
00644  * attempted, this method will throw AsyncQueuePopError.  It might
00645  * also throw if the assignment operator of the queue item might
00646  * throw.  In order to complete pop() operations atomically under a
00647  * single lock and to retain strong exception safety, the object into
00648  * which the pop()ed data is to be placed is passed as an argument by
00649  * reference (this avoids a copy from a temporary object after the
00650  * data has been extracted from the queue, which would occur if the
00651  * item extracted were returned by value).  It might also throw if the
00652  * destructor of the queue item might throw (but that should never
00653  * happen), or if the empty() method of the container type throws
00654  * (which would not happen on any sane implementation).
00655  */
00656   void pop(value_type& obj) {
00657     Thread::Mutex::Lock lock{mutex};
00658     if (q.empty()) throw AsyncQueuePopError();
00659     obj = q.front();
00660     q.pop();
00661   }
00662 
00663 /**
00664  * Pops an item from the queue.  If the queue is empty, it will block
00665  * until an item becomes available.  If it blocks, the wait comprises
00666  * a cancellation point.  This method is cancellation safe if the
00667  * stack unwinds on cancellation, as cancellation is blocked while the
00668  * queue is being operated on after coming out of a wait.  This method
00669  * has strong exception safety if the container is a std::deque or
00670  * std::list container (the default is std::list), provided the
00671  * destructor of a contained item does not throw.  It is thread safe.
00672  * @param obj A value type reference to which the item at the front of
00673  * the queue will be assigned.  This method might throw if the
00674  * assignment operator of the queue item might throw.  In order to
00675  * complete pop() operations atomically under a single lock and to
00676  * retain strong exception safety, the object into which the pop()ed
00677  * data is to be placed is passed as an argument by reference (this
00678  * avoids a copy from a temporary object after the data has been
00679  * extracted from the queue, which would occur if the item extracted
00680  * were returned by value).  It might also throw if the destructor of
00681  * the queue item might throw (but that should never happen), or if
00682  * the empty() method of the container type throws (which would not
00683  * happen on any sane implementation).
00684  */
00685   void pop_dispatch(value_type& obj) {
00686     Thread::Mutex::Lock lock{mutex};
00687     while (q.empty()) cond.wait(mutex);
00688     Thread::CancelBlock b;
00689     obj = q.front();
00690     q.pop();
00691   }    
00692 
00693 /**
00694  * Pops an item from the queue.  If the queue is empty, it will block
00695  * until an item becomes available or until the timeout expires.  If
00696  * it blocks, the wait comprises a cancellation point.  This method is
00697  * cancellation safe if the stack unwinds on cancellation, as
00698  * cancellation is blocked while the queue is being operated on after
00699  * coming out of a wait.  This method has strong exception safety if
00700  * the container is a std::deque or std::list container (the default
00701  * is std::list), provided the destructor of a contained item does not
00702  * throw.  It is thread safe.
00703  * @param obj A value type reference to which the item at the front of
00704  * the queue will be assigned.  This method might throw if the
00705  * assignment operator of the queue item might throw.  In order to
00706  * complete pop() operations atomically under a single lock and to
00707  * retain strong exception safety, the object into which the pop()ed
00708  * data is to be placed is passed as an argument by reference (this
00709  * avoids a copy from a temporary object after the data has been
00710  * extracted from the queue, which would occur if the item extracted
00711  * were returned by value).  It might also throw if the destructor of
00712  * the queue item might throw (but that should never happen), or if
00713  * the empty() method of the container type throws (which would not
00714  * happen on any sane implementation).
00715  * @param millisec The timeout interval, in milliseconds.
00716  * @return If the timeout expires without an item becoming available,
00717  * the method will return true.  If an item from the queue is
00718  * extracted, it returns false.
00719  */
00720   bool pop_timed_dispatch(value_type& obj, unsigned int millisec) {
00721     timespec ts;
00722     Thread::Cond::get_abs_time(ts, millisec);
00723     Thread::Mutex::Lock lock{mutex};
00724     while (q.empty()) {
00725       if (cond.timed_wait(mutex, ts)) return true;
00726     }
00727     Thread::CancelBlock b;
00728     obj = q.front();
00729     q.pop();
00730     return false;
00731   }
00732 
00733 /**
00734  * Discards the item at the front of the queue.  This method has
00735  * strong exception safety if the container is a std::deque or
00736  * std::list container (the default is std::list), provided the
00737  * destructor of a contained item does not throw.  It is thread safe.
00738  * @exception AsyncQueuePopError If the queue is empty when a pop is
00739  * attempted, this method will throw AsyncQueuePopError.  It might
00740  * also throw if the destructor of the queue item might throw (but
00741  * that should never happen), or if the empty() method of the
00742  * container type throws (which would not happen on any sane
00743  * implementation).
00744  */
00745   void pop() {
00746     Thread::Mutex::Lock lock{mutex};
00747     if (q.empty()) throw AsyncQueuePopError();
00748     q.pop();
00749   }
00750 
00751 /**
00752  * @return Whether the queue is empty.  It will not throw assuming
00753  * that the empty() method of the container type does not throw, as it
00754  * will not on any sane implementation.
00755  * @note This method is thread safe, but the return value may not be
00756  * valid if another thread has pushed to or popped from the queue
00757  * before the value returned by the method is acted on.  It is
00758  * provided as a utility, but may not be meaningful, depending on the
00759  * intended usage.
00760  */
00761   bool empty() const {
00762     Thread::Mutex::Lock lock{mutex};
00763     return q.empty();
00764   }
00765 
00766 /**
00767  * @return The number of items currently in the queue.  It will not
00768  * throw assuming that the size() method of the container type does
00769  * not throw, as it will not on any sane implementation.
00770  * @note This method is thread safe, but the return value may not be
00771  * valid if another thread has pushed to or popped from the queue
00772  * before the value returned by the method is acted on.  It is
00773  * provided as a utility, but may not be meaningful, depending on the
00774  * intended usage.
00775  *
00776  * Since 2.0.8
00777  */
00778   size_type size() const {
00779     Thread::Mutex::Lock lock{mutex};
00780     return q.size();
00781   }
00782 
00783 /**
00784  * Swaps the contents of 'this' and 'other'.  It will not throw
00785  * assuming that the swap method of the container type does not throw
00786  * (which the C++11 standard requires not to happen with the standard
00787  * sequence containers).  It is thread safe and the swap is
00788  * thread-wise atomic.  A non-class function
00789  * Cgu::swap(Cgu::AsyncQueue&, Cgu::AsyncQueue&) method is also
00790  * provided which will call this method.
00791  * @param other The object to be swapped with this one.
00792  * @note An object swapped does not, by virtue of the swap, inherit
00793  * any threads waiting on the other one.  However if threads were
00794  * waiting on a swapped object prior to the swap, and it acquires
00795  * items by virtue of the swap, the waiting threads will unblock and
00796  * extract those items.
00797  *
00798  * Since 2.0.8
00799  */
00800   void swap(AsyncQueueDispatch& other) {
00801     if (this != &other) {
00802       lock2(mutex, other.mutex); // doesn't throw
00803       Thread::Mutex::Lock l1{mutex, Thread::locked};
00804       Thread::Mutex::Lock l2{other.mutex, Thread::locked};
00805       q.swap(other.q);
00806       if (!q.empty()) cond.broadcast();
00807       if (!other.q.empty()) other.cond.broadcast();
00808     }
00809   }
00810 
00811 /**
00812  * The assignment operator is strongly exception safe with the
00813  * standard sequence containers (it uses copy and swap).  It is also
00814  * thread safe, as it safely locks both the assignor's and assignee's
00815  * mutex to provide a thread-wise atomic assignment.
00816  * @param rhs The assignor.
00817  * @return The AsyncQueueDispatch object after assignment.
00818  * @exception std::bad_alloc The copy constructor of the queue's
00819  * container type, and so this assignment operator, might throw
00820  * std::bad_alloc if memory is exhausted and the system throws in that
00821  * case.  This assignment operator will also throw if the copy
00822  * constructor of the queue's container type throws any other
00823  * exceptions, including if any copy or move constructor or copy or
00824  * move assignment operator of a contained item throws.
00825  * @exception Thread::MutexError The assignment operator might throw
00826  * Thread::MutexError if initialization of a transitional object's
00827  * contained mutex fails.  (It is often not worth checking for this,
00828  * as it means either memory is exhausted or pthread has run out of
00829  * other resources to create new mutexes.)
00830  * @exception Thread::CondError The assignment operator might throw
00831  * this exception if initialisation of a transitional object's
00832  * contained condition variable fails.  (It is often not worth
00833  * checking for this, as it means either memory is exhausted or
00834  * pthread has run out of other resources to create new condition
00835  * variables.)
00836  * @note The assignee does not, by virtue of the assignment, inherit
00837  * any threads waiting on the assignor.  However, if prior to the
00838  * assignment threads were waiting on the assignee and the assignee
00839  * acquires items from the assignor as a result of the assignment, the
00840  * waiting threads will unblock and extract those items.
00841  *
00842  * Since 2.0.8
00843  */
00844   AsyncQueueDispatch& operator=(const AsyncQueueDispatch& rhs) {
00845     if (this != &rhs) {
00846       lock2(mutex, rhs.mutex); // doesn't throw
00847       Thread::Mutex::Lock l1{mutex, Thread::locked};
00848       Thread::Mutex::Lock l2{rhs.mutex, Thread::locked};
00849       std::queue<T, Container> temp{rhs.q};
00850       q.swap(temp);
00851       if (!q.empty()) cond.broadcast();
00852     }
00853     return *this;
00854   }
00855 
00856 /**
00857  * This move assignment operator is thread safe as regards the
00858  * assignee (the object moved to), but no synchronization is carried
00859  * out with respect to the rvalue assignor/movant.  This is because
00860  * temporaries are only visible and accessible in the thread carrying
00861  * out the move operation and synchronization for them would represent
00862  * pointless overhead.  In a case where the user uses std::move to
00863  * force a move from a named object, and that named object's lifetime
00864  * is managed by (or the object is otherwise accessed by) a different
00865  * thread than the one making the move, the user must carry out her
00866  * own synchronization with respect to that different thread, both to
00867  * ensure that a consistent view of the the named object is obtained
00868  * and because that object will be mutated by the move.  This method
00869  * invokes std::queue's move assignment operator, and therefore has
00870  * the same exception safety as the standard library's implementation
00871  * of that operator.  It will not normally throw unless a custom
00872  * allocator is used which throws on move assignment, or the
00873  * destructor of a contained item throws.
00874  * @param rhs The assignor/movant.
00875  * @return The AsyncQueueDispatch object after move assignment.
00876  * @note The assignee does not, by virtue of the move, inherit any
00877  * threads waiting on the assignor/movant.  However, if prior to the
00878  * move threads were waiting on the assignee and the assignee acquires
00879  * items from the assignor/movant as a result of the move, from
00880  * version 2.0.9 the waiting threads will unblock and extract those
00881  * items (such unblocking on move assignment did not happen with
00882  * version 2.0.8, which was a bug).
00883  *
00884  * Since 2.0.8
00885  */
00886   AsyncQueueDispatch& operator=(AsyncQueueDispatch&& rhs) {
00887     Thread::Mutex::Lock lock{mutex};
00888     q = std::move(rhs.q);
00889     if (!q.empty()) cond.broadcast();
00890     return *this;
00891   }
00892 
00893 /**
00894  * @exception std::bad_alloc The default constructor might throw this
00895  * exception if memory is exhausted and the system throws in that
00896  * case.
00897  * @exception Thread::MutexError The default constructor might throw
00898  * this exception if initialisation of the contained mutex fails.  (It
00899  * is often not worth checking for this, as it means either memory is
00900  * exhausted or pthread has run out of other resources to create new
00901  * mutexes.)
00902  * @exception Thread::CondError The default constructor might throw
00903  * this exception if initialisation of the contained condition
00904  * variable fails.  (It is often not worth checking for this, as it
00905  * means either memory is exhausted or pthread has run out of other
00906  * resources to create new condition variables.)
00907  */
00908   AsyncQueueDispatch() = default;
00909 
00910 /**
00911  * As regards thread safety, the move constructor does not synchronize
00912  * with respect to the initializing rvalue.  This is because
00913  * temporaries are only visible and accessible in the thread carrying
00914  * out the move operation and synchronization for them would represent
00915  * pointless overhead.  In a case where a user uses std::move to force
00916  * a move from a named object, and that named object's lifetime is
00917  * managed by (or the object is otherwise accessed by) a different
00918  * thread than the one making the move, the user must carry out her
00919  * own synchronization with respect to that different thread, both to
00920  * ensure that a consistent view of the the named object is obtained
00921  * and because that object will be mutated by the move.
00922  * @param rhs The AsyncQueueDispatch object to be moved.
00923  * @exception Thread::MutexError The move constructor might throw
00924  * Thread::MutexError if initialization of the contained mutex fails.
00925  * (It is often not worth checking for this, as it means either memory
00926  * is exhausted or pthread has run out of other resources to create
00927  * new mutexes.)  It might also throw if the queue's container type's
00928  * move constructor might throw, but it should not do that unless a
00929  * custom allocator is in use.
00930  * @exception Thread::CondError The move constructor might throw this
00931  * exception if initialisation of the contained condition variable
00932  * fails.  (It is often not worth checking for this, as it means
00933  * either memory is exhausted or pthread has run out of other
00934  * resources to create new condition variables.)  It might also throw
00935  * if the queue's container type's move constructor might throw, but
00936  * it should not do that unless a custom allocator is in use.
00937  * @note If this constructor throws Thread::MutexError or
00938  * Thread::CondError, and a named object is moved using std::move,
00939  * this constructor is not strongly exception safe (items in the moved
00940  * queue will be lost).  Fixing this efficiently requires changing the
00941  * order of construction of data members of this class, which cannot
00942  * be done until the next ABI break for this library as it would alter
00943  * object layout.  As noted above, in most cases the possibility of
00944  * Thread::MutexError or Thread::CondError throwing can be ignored,
00945  * but where that is not the case and strong exception safety is
00946  * wanted, the user should either not employ std::move with named
00947  * objects when invoking this class's constructors, or should
00948  * construct an AsyncQueueDispatch object using the default
00949  * constructor and then move assign to it.
00950  *
00951  * Since 2.0.8
00952  */
00953   AsyncQueueDispatch(AsyncQueueDispatch&& rhs): q(std::move(rhs.q)) {}
00954 
00955 /**
00956  * The copy constructor is thread safe, as it locks the initializing
00957  * object's mutex to obtain a consistent view of it.
00958  * @param rhs The AsyncQueueDispatch object to be copied.
00959  * @exception std::bad_alloc The copy constructor of the queue's
00960  * container type, and so this constructor, might throw std::bad_alloc
00961  * if memory is exhausted and the system throws in that case.  It will
00962  * also throw if the copy constructor of the queue's container type
00963  * throws any other exceptions, including if any copy or move
00964  * constructor or copy or move assignment operator of a contained item
00965  * throws.
00966  * @exception Thread::MutexError The copy constructor might throw
00967  * Thread::MutexError if initialization of the contained mutex fails.
00968  * (It is often not worth checking for this, as it means either memory
00969  * is exhausted or pthread has run out of other resources to create
00970  * new mutexes.)
00971  * @exception Thread::CondError The copy constructor might throw this
00972  * exception if initialisation of the contained condition variable
00973  * fails.  (It is often not worth checking for this, as it means
00974  * either memory is exhausted or pthread has run out of other
00975  * resources to create new condition variables.)
00976  *
00977  * Since 2.0.8
00978  */
00979   // we use the comma operator here to lock the mutex and call the
00980   // copy constructor: the lock will be retained until the end of the
00981   // full expression in which it is lexically situated, namely until
00982   // the end of q's constructor - see C++11 1.9/10 and 12.2/3
00983   AsyncQueueDispatch(const AsyncQueueDispatch& rhs):
00984                         q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
00985 
00986 /**
00987  * The destructor does not throw unless the destructor of a contained
00988  * item throws.  It is thread safe (any thread may delete the
00989  * AsyncQueueDispatch object).  Destroying an AsyncQueueDispatch
00990  * object on which another thread is currently blocked results in
00991  * undefined behavior.
00992  */
00993   ~AsyncQueueDispatch() {
00994     // lock and unlock the mutex in the destructor so that we have an
00995     // acquire operation to ensure that when the std::queue object is
00996     // destroyed memory is synchronised, so any thread may destroy the
00997     // AsyncQueueDispatch object
00998     Thread::Mutex::Lock lock{mutex};
00999   }
01000 
01001 /* Only has effect if --with-glib-memory-slices-compat or
01002  * --with-glib-memory-slices-no-compat option picked */
01003   CGU_GLIB_MEMORY_SLICES_FUNCS
01004 };
01005 
01006 /**
01007  * Swaps the contents of two AsyncQueue objects.  It will not throw
01008  * assuming that the swap method of the container type does not throw
01009  * (which the C++11 standard requires not to happen with the standard
01010  * sequence containers).  It is thread safe and the swap is
01011  * thread-wise atomic.
01012  * @param q1 An object to be swapped with the other.
01013  * @param q2 An object to be swapped with the other.
01014  * @note Calling std::swap on AsyncQueue objects is thread safe but
01015  * does not provide a thread-wise atomic swap (the swapped objects may
01016  * not be mirror images if during the execution of std::swap's default
01017  * algorithm one of them has been modified), although in many cases
01018  * that doesn't matter.  If swap() is called without a namespace
01019  * qualifier, argument dependent look-up will pick this one correctly.
01020  *
01021  * Since 2.0.8
01022  */
01023 template <class T, class Container>
01024 void swap(Cgu::AsyncQueue<T, Container>& q1,
01025           Cgu::AsyncQueue<T, Container>& q2) {
01026   q1.swap(q2);
01027 }
01028 
01029 /**
01030  * Swaps the contents of two AsyncQueue objects.  It will not throw
01031  * assuming that the swap method of the container type does not throw
01032  * (which the C++11 standard requires not to happen with the standard
01033  * sequence containers).  It is thread safe and the swap is
01034  * thread-wise atomic.
01035  * @param q1 An object to be swapped with the other.
01036  * @param q2 An object to be swapped with the other.
01037  * @note 1. An object swapped does not, by virtue of the swap, inherit
01038  * any threads waiting on the other one.  However if threads were
01039  * waiting on a swapped object prior to the swap, and it acquires
01040  * items by virtue of the swap, the waiting threads will unblock and
01041  * extract those items.
01042  * @note 2. Calling std::swap on AsyncQueueDispatch objects is thread
01043  * safe but does not provide a thread-wise atomic swap (the swapped
01044  * objects may not be mirror images if during the execution of
01045  * std::swap's default algorithm one of them has been modified),
01046  * although in many cases that doesn't matter.  If swap() is called
01047  * without a namespace qualifier, argument dependent look-up will pick
01048  * this one correctly.
01049  *
01050  * Since 2.0.8
01051  */
01052 template <class T, class Container>
01053 void swap(Cgu::AsyncQueueDispatch<T, Container>& q1,
01054           Cgu::AsyncQueueDispatch<T, Container>& q2) {
01055   q1.swap(q2);
01056 }
01057 
01058 } // namespace Cgu
01059 
01060 #endif