1 // Copyright (C) 2013 Vicente J. Botet Escriba
2 //
3 // Distributed under the Boost Software License, Version 1.0. (See accompanying
4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5
6 // <boost/thread/sync_bounded_queue.hpp>
7
8 // class sync_queue<T>
9
10 // push || pull;
11
12 #include <boost/config.hpp>
13 #if ! defined BOOST_NO_CXX11_DECLTYPE
14 #define BOOST_RESULT_OF_USE_DECLTYPE
15 #endif
16
17 #define BOOST_THREAD_VERSION 4
18
19 #include <boost/thread/sync_bounded_queue.hpp>
20 #include <boost/thread/future.hpp>
21 #include <boost/thread/barrier.hpp>
22
23 #include <boost/detail/lightweight_test.hpp>
24
25 struct call_push
26 {
27 boost::sync_bounded_queue<int> &q_;
28 boost::barrier& go_;
29
call_pushcall_push30 call_push(boost::sync_bounded_queue<int> &q, boost::barrier &go) :
31 q_(q), go_(go)
32 {
33 }
34 typedef void result_type;
operator ()call_push35 void operator()()
36 {
37 go_.count_down_and_wait();
38 q_.push(42);
39
40 }
41 };
42
43 struct call_push_2
44 {
45 boost::sync_bounded_queue<int> &q_;
46 boost::barrier& go_;
47 boost::barrier& end_;
48
call_push_2call_push_249 call_push_2(boost::sync_bounded_queue<int> &q, boost::barrier &go, boost::barrier &end) :
50 q_(q), go_(go), end_(end)
51 {
52 }
53 typedef void result_type;
operator ()call_push_254 void operator()()
55 {
56 go_.count_down_and_wait();
57 q_.push(42);
58 end_.count_down_and_wait();
59
60 }
61 };
62
63 struct call_pull
64 {
65 boost::sync_bounded_queue<int> &q_;
66 boost::barrier& go_;
67
call_pullcall_pull68 call_pull(boost::sync_bounded_queue<int> &q, boost::barrier &go) :
69 q_(q), go_(go)
70 {
71 }
72 typedef int result_type;
operator ()call_pull73 int operator()()
74 {
75 go_.count_down_and_wait();
76 return q_.pull();
77 }
78 };
79
test_concurrent_push_and_pull_on_empty_queue()80 void test_concurrent_push_and_pull_on_empty_queue()
81 {
82 boost::sync_bounded_queue<int> q(4);
83
84 boost::barrier go(2);
85
86 boost::future<void> push_done;
87 boost::future<int> pull_done;
88
89 try
90 {
91 push_done=boost::async(boost::launch::async,
92 #if ! defined BOOST_NO_CXX11_LAMBDAS
93 [&q,&go]()
94 {
95 go.wait();
96 q.push(42);
97 }
98 #else
99 call_push(q,go)
100 #endif
101 );
102 pull_done=boost::async(boost::launch::async,
103 #if ! defined BOOST_NO_CXX11_LAMBDAS
104 [&q,&go]() -> int
105 {
106 go.wait();
107 return q.pull();
108 }
109 #else
110 call_pull(q,go)
111 #endif
112 );
113
114 push_done.get();
115 BOOST_TEST_EQ(pull_done.get(), 42);
116 BOOST_TEST(q.empty());
117 }
118 catch (...)
119 {
120 BOOST_TEST(false);
121 }
122 }
123
test_concurrent_push_on_empty_queue()124 void test_concurrent_push_on_empty_queue()
125 {
126 boost::sync_bounded_queue<int> q(4);
127 const unsigned int n = 3;
128 boost::barrier go(n);
129 boost::future<void> push_done[n];
130
131 try
132 {
133 for (unsigned int i =0; i< n; ++i)
134 push_done[i]=boost::async(boost::launch::async,
135 #if ! defined BOOST_NO_CXX11_LAMBDAS
136 [&q,&go]()
137 {
138 go.wait();
139 q.push(42);
140 }
141 #else
142 call_push(q,go)
143 #endif
144 );
145
146 for (unsigned int i = 0; i < n; ++i)
147 push_done[i].get();
148
149 BOOST_TEST(!q.empty());
150 for (unsigned int i =0; i< n; ++i)
151 BOOST_TEST_EQ(q.pull(), 42);
152 BOOST_TEST(q.empty());
153
154 }
155 catch (...)
156 {
157 BOOST_TEST(false);
158 }
159 }
160
test_concurrent_push_on_full_queue()161 void test_concurrent_push_on_full_queue()
162 {
163 const unsigned int size = 2;
164 boost::sync_bounded_queue<int> q(size);
165 const unsigned int n = 2*size;
166 boost::barrier go(n);
167 boost::barrier end(size+1);
168 boost::future<void> push_done[n];
169
170 try
171 {
172 for (unsigned int i =0; i< n; ++i)
173 push_done[i]=boost::async(boost::launch::async,
174 #if ! defined BOOST_NO_CXX11_LAMBDAS
175 [&q,&go,&end]()
176 {
177 go.wait();
178 q.push(42);
179 end.wait();
180 }
181 #else
182 call_push_2(q,go,end)
183 #endif
184 );
185
186 end.wait();
187 BOOST_TEST(!q.empty());
188 BOOST_TEST(q.full());
189 for (unsigned int i =0; i< size; ++i)
190 BOOST_TEST_EQ(q.pull(), 42);
191 end.wait();
192
193 for (unsigned int i = 0; i < n; ++i)
194 push_done[i].get();
195
196 BOOST_TEST(!q.empty());
197 for (unsigned int i =0; i< size; ++i)
198 BOOST_TEST_EQ(q.pull(), 42);
199 BOOST_TEST(q.empty());
200
201 }
202 catch (...)
203 {
204 BOOST_TEST(false);
205 }
206 }
test_concurrent_pull_on_queue()207 void test_concurrent_pull_on_queue()
208 {
209 boost::sync_bounded_queue<int> q(4);
210 const unsigned int n = 3;
211 boost::barrier go(n);
212
213 boost::future<int> pull_done[n];
214
215 try
216 {
217 for (unsigned int i =0; i< n; ++i)
218 q.push(42);
219
220 for (unsigned int i =0; i< n; ++i)
221 pull_done[i]=boost::async(boost::launch::async,
222 #if ! defined BOOST_NO_CXX11_LAMBDAS
223 [&q,&go]() -> int
224 {
225 go.wait();
226 return q.pull();
227 }
228 #else
229 call_pull(q,go)
230 #endif
231 );
232
233 for (unsigned int i = 0; i < n; ++i)
234 BOOST_TEST_EQ(pull_done[i].get(), 42);
235 BOOST_TEST(q.empty());
236 }
237 catch (...)
238 {
239 BOOST_TEST(false);
240 }
241 }
242
main()243 int main()
244 {
245 test_concurrent_push_and_pull_on_empty_queue();
246 test_concurrent_push_on_empty_queue();
247 test_concurrent_push_on_full_queue();
248 test_concurrent_pull_on_queue();
249
250 return boost::report_errors();
251 }
252
253