1 // Copyright (C) 2013 Vicente J. Botet Escriba
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 // <boost/thread/sync_bounded_queue.hpp>
7 
8 // class sync_queue<T>
9 
10 //    push || pull;
11 
12 #include <boost/config.hpp>
13 #if ! defined  BOOST_NO_CXX11_DECLTYPE
14 #define BOOST_RESULT_OF_USE_DECLTYPE
15 #endif
16 
17 #define BOOST_THREAD_VERSION 4
18 
19 #include <boost/thread/sync_bounded_queue.hpp>
20 #include <boost/thread/future.hpp>
21 #include <boost/thread/barrier.hpp>
22 
23 #include <boost/detail/lightweight_test.hpp>
24 
25 struct call_push
26 {
27   boost::sync_bounded_queue<int> &q_;
28   boost::barrier& go_;
29 
call_pushcall_push30   call_push(boost::sync_bounded_queue<int> &q, boost::barrier &go) :
31     q_(q), go_(go)
32   {
33   }
34   typedef void result_type;
operator ()call_push35   void operator()()
36   {
37     go_.count_down_and_wait();
38     q_.push(42);
39 
40   }
41 };
42 
43 struct call_push_2
44 {
45   boost::sync_bounded_queue<int> &q_;
46   boost::barrier& go_;
47   boost::barrier& end_;
48 
call_push_2call_push_249   call_push_2(boost::sync_bounded_queue<int> &q, boost::barrier &go, boost::barrier &end) :
50     q_(q), go_(go), end_(end)
51   {
52   }
53   typedef void result_type;
operator ()call_push_254   void operator()()
55   {
56     go_.count_down_and_wait();
57     q_.push(42);
58     end_.count_down_and_wait();
59 
60   }
61 };
62 
63 struct call_pull
64 {
65   boost::sync_bounded_queue<int> &q_;
66   boost::barrier& go_;
67 
call_pullcall_pull68   call_pull(boost::sync_bounded_queue<int> &q, boost::barrier &go) :
69     q_(q), go_(go)
70   {
71   }
72   typedef int result_type;
operator ()call_pull73   int operator()()
74   {
75     go_.count_down_and_wait();
76     return q_.pull();
77   }
78 };
79 
test_concurrent_push_and_pull_on_empty_queue()80 void test_concurrent_push_and_pull_on_empty_queue()
81 {
82   boost::sync_bounded_queue<int> q(4);
83 
84   boost::barrier go(2);
85 
86   boost::future<void> push_done;
87   boost::future<int> pull_done;
88 
89   try
90   {
91     push_done=boost::async(boost::launch::async,
92 #if ! defined BOOST_NO_CXX11_LAMBDAS
93         [&q,&go]()
94         {
95           go.wait();
96           q.push(42);
97         }
98 #else
99         call_push(q,go)
100 #endif
101     );
102     pull_done=boost::async(boost::launch::async,
103 #if ! defined BOOST_NO_CXX11_LAMBDAS
104         [&q,&go]() -> int
105         {
106           go.wait();
107           return q.pull();
108         }
109 #else
110         call_pull(q,go)
111 #endif
112     );
113 
114     push_done.get();
115     BOOST_TEST_EQ(pull_done.get(), 42);
116     BOOST_TEST(q.empty());
117   }
118   catch (...)
119   {
120     BOOST_TEST(false);
121   }
122 }
123 
test_concurrent_push_on_empty_queue()124 void test_concurrent_push_on_empty_queue()
125 {
126   boost::sync_bounded_queue<int> q(4);
127   const unsigned int n = 3;
128   boost::barrier go(n);
129   boost::future<void> push_done[n];
130 
131   try
132   {
133     for (unsigned int i =0; i< n; ++i)
134       push_done[i]=boost::async(boost::launch::async,
135 #if ! defined BOOST_NO_CXX11_LAMBDAS
136         [&q,&go]()
137         {
138           go.wait();
139           q.push(42);
140         }
141 #else
142         call_push(q,go)
143 #endif
144     );
145 
146     for (unsigned int i = 0; i < n; ++i)
147       push_done[i].get();
148 
149     BOOST_TEST(!q.empty());
150     for (unsigned int i =0; i< n; ++i)
151       BOOST_TEST_EQ(q.pull(), 42);
152     BOOST_TEST(q.empty());
153 
154   }
155   catch (...)
156   {
157     BOOST_TEST(false);
158   }
159 }
160 
test_concurrent_push_on_full_queue()161 void test_concurrent_push_on_full_queue()
162 {
163   const unsigned int size = 2;
164   boost::sync_bounded_queue<int> q(size);
165   const unsigned int n = 2*size;
166   boost::barrier go(n);
167   boost::barrier end(size+1);
168   boost::future<void> push_done[n];
169 
170   try
171   {
172     for (unsigned int i =0; i< n; ++i)
173       push_done[i]=boost::async(boost::launch::async,
174 #if ! defined BOOST_NO_CXX11_LAMBDAS
175         [&q,&go,&end]()
176         {
177           go.wait();
178           q.push(42);
179           end.wait();
180         }
181 #else
182         call_push_2(q,go,end)
183 #endif
184       );
185 
186     end.wait();
187     BOOST_TEST(!q.empty());
188     BOOST_TEST(q.full());
189     for (unsigned int i =0; i< size; ++i)
190       BOOST_TEST_EQ(q.pull(), 42);
191     end.wait();
192 
193     for (unsigned int i = 0; i < n; ++i)
194       push_done[i].get();
195 
196     BOOST_TEST(!q.empty());
197     for (unsigned int i =0; i< size; ++i)
198       BOOST_TEST_EQ(q.pull(), 42);
199     BOOST_TEST(q.empty());
200 
201   }
202   catch (...)
203   {
204     BOOST_TEST(false);
205   }
206 }
test_concurrent_pull_on_queue()207 void test_concurrent_pull_on_queue()
208 {
209   boost::sync_bounded_queue<int> q(4);
210   const unsigned int n = 3;
211   boost::barrier go(n);
212 
213   boost::future<int> pull_done[n];
214 
215   try
216   {
217     for (unsigned int i =0; i< n; ++i)
218       q.push(42);
219 
220     for (unsigned int i =0; i< n; ++i)
221       pull_done[i]=boost::async(boost::launch::async,
222 #if ! defined BOOST_NO_CXX11_LAMBDAS
223         [&q,&go]() -> int
224         {
225           go.wait();
226           return q.pull();
227         }
228 #else
229         call_pull(q,go)
230 #endif
231     );
232 
233     for (unsigned int i = 0; i < n; ++i)
234       BOOST_TEST_EQ(pull_done[i].get(), 42);
235     BOOST_TEST(q.empty());
236   }
237   catch (...)
238   {
239     BOOST_TEST(false);
240   }
241 }
242 
main()243 int main()
244 {
245   test_concurrent_push_and_pull_on_empty_queue();
246   test_concurrent_push_on_empty_queue();
247   test_concurrent_push_on_full_queue();
248   test_concurrent_pull_on_queue();
249 
250   return boost::report_errors();
251 }
252 
253