1 use crate::runtime::scheduler::multi_thread::{queue, Stats};
2 use crate::runtime::task::{self, Schedule, Task, TaskHarnessScheduleHooks};
3
4 use std::cell::RefCell;
5 use std::thread;
6 use std::time::Duration;
7
8 #[allow(unused)]
9 macro_rules! assert_metrics {
10 ($stats:ident, $field:ident == $v:expr) => {
11 #[cfg(target_has_atomic = "64")]
12 {
13 use crate::runtime::WorkerMetrics;
14 use std::sync::atomic::Ordering::Relaxed;
15
16 let worker = WorkerMetrics::new();
17 $stats.submit(&worker);
18
19 let expect = $v;
20 let actual = worker.$field.load(Relaxed);
21
22 assert!(actual == expect, "expect = {}; actual = {}", expect, actual)
23 }
24 };
25 }
26
new_stats() -> Stats27 fn new_stats() -> Stats {
28 use crate::runtime::WorkerMetrics;
29 Stats::new(&WorkerMetrics::new())
30 }
31
32 #[test]
fits_256_one_at_a_time()33 fn fits_256_one_at_a_time() {
34 let (_, mut local) = queue::local();
35 let inject = RefCell::new(vec![]);
36 let mut stats = new_stats();
37
38 for _ in 0..256 {
39 let (task, _) = super::unowned(async {});
40 local.push_back_or_overflow(task, &inject, &mut stats);
41 }
42
43 cfg_unstable_metrics! {
44 assert_metrics!(stats, overflow_count == 0);
45 }
46
47 assert!(inject.borrow_mut().pop().is_none());
48
49 while local.pop().is_some() {}
50 }
51
52 #[test]
fits_256_all_at_once()53 fn fits_256_all_at_once() {
54 let (_, mut local) = queue::local();
55
56 let mut tasks = (0..256)
57 .map(|_| super::unowned(async {}).0)
58 .collect::<Vec<_>>();
59 local.push_back(tasks.drain(..));
60
61 let mut i = 0;
62 while local.pop().is_some() {
63 i += 1;
64 }
65
66 assert_eq!(i, 256);
67 }
68
69 #[test]
fits_256_all_in_chunks()70 fn fits_256_all_in_chunks() {
71 let (_, mut local) = queue::local();
72
73 let mut tasks = (0..256)
74 .map(|_| super::unowned(async {}).0)
75 .collect::<Vec<_>>();
76
77 local.push_back(tasks.drain(..10));
78 local.push_back(tasks.drain(..100));
79 local.push_back(tasks.drain(..46));
80 local.push_back(tasks.drain(..100));
81
82 let mut i = 0;
83 while local.pop().is_some() {
84 i += 1;
85 }
86
87 assert_eq!(i, 256);
88 }
89
90 #[test]
overflow()91 fn overflow() {
92 let (_, mut local) = queue::local();
93 let inject = RefCell::new(vec![]);
94 let mut stats = new_stats();
95
96 for _ in 0..257 {
97 let (task, _) = super::unowned(async {});
98 local.push_back_or_overflow(task, &inject, &mut stats);
99 }
100
101 cfg_unstable_metrics! {
102 assert_metrics!(stats, overflow_count == 1);
103 }
104
105 let mut n = 0;
106
107 n += inject.borrow_mut().drain(..).count();
108
109 while local.pop().is_some() {
110 n += 1;
111 }
112
113 assert_eq!(n, 257);
114 }
115
116 #[test]
steal_batch()117 fn steal_batch() {
118 let mut stats = new_stats();
119
120 let (steal1, mut local1) = queue::local();
121 let (_, mut local2) = queue::local();
122 let inject = RefCell::new(vec![]);
123
124 for _ in 0..4 {
125 let (task, _) = super::unowned(async {});
126 local1.push_back_or_overflow(task, &inject, &mut stats);
127 }
128
129 assert!(steal1.steal_into(&mut local2, &mut stats).is_some());
130
131 cfg_unstable_metrics! {
132 assert_metrics!(stats, steal_count == 2);
133 }
134
135 for _ in 0..1 {
136 assert!(local2.pop().is_some());
137 }
138
139 assert!(local2.pop().is_none());
140
141 for _ in 0..2 {
142 assert!(local1.pop().is_some());
143 }
144
145 assert!(local1.pop().is_none());
146 }
147
normal_or_miri(normal: usize, miri: usize) -> usize148 const fn normal_or_miri(normal: usize, miri: usize) -> usize {
149 if cfg!(miri) {
150 miri
151 } else {
152 normal
153 }
154 }
155
156 #[test]
stress1()157 fn stress1() {
158 const NUM_ITER: usize = 5;
159 const NUM_STEAL: usize = normal_or_miri(1_000, 10);
160 const NUM_LOCAL: usize = normal_or_miri(1_000, 10);
161 const NUM_PUSH: usize = normal_or_miri(500, 10);
162 const NUM_POP: usize = normal_or_miri(250, 10);
163
164 let mut stats = new_stats();
165
166 for _ in 0..NUM_ITER {
167 let (steal, mut local) = queue::local();
168 let inject = RefCell::new(vec![]);
169
170 let th = thread::spawn(move || {
171 let mut stats = new_stats();
172 let (_, mut local) = queue::local();
173 let mut n = 0;
174
175 for _ in 0..NUM_STEAL {
176 if steal.steal_into(&mut local, &mut stats).is_some() {
177 n += 1;
178 }
179
180 while local.pop().is_some() {
181 n += 1;
182 }
183
184 thread::yield_now();
185 }
186
187 cfg_unstable_metrics! {
188 assert_metrics!(stats, steal_count == n as _);
189 }
190
191 n
192 });
193
194 let mut n = 0;
195
196 for _ in 0..NUM_LOCAL {
197 for _ in 0..NUM_PUSH {
198 let (task, _) = super::unowned(async {});
199 local.push_back_or_overflow(task, &inject, &mut stats);
200 }
201
202 for _ in 0..NUM_POP {
203 if local.pop().is_some() {
204 n += 1;
205 } else {
206 break;
207 }
208 }
209 }
210
211 n += inject.borrow_mut().drain(..).count();
212
213 n += th.join().unwrap();
214
215 assert_eq!(n, NUM_LOCAL * NUM_PUSH);
216 }
217 }
218
219 #[test]
stress2()220 fn stress2() {
221 const NUM_ITER: usize = 1;
222 const NUM_TASKS: usize = normal_or_miri(1_000_000, 50);
223 const NUM_STEAL: usize = normal_or_miri(1_000, 10);
224
225 let mut stats = new_stats();
226
227 for _ in 0..NUM_ITER {
228 let (steal, mut local) = queue::local();
229 let inject = RefCell::new(vec![]);
230
231 let th = thread::spawn(move || {
232 let mut stats = new_stats();
233 let (_, mut local) = queue::local();
234 let mut n = 0;
235
236 for _ in 0..NUM_STEAL {
237 if steal.steal_into(&mut local, &mut stats).is_some() {
238 n += 1;
239 }
240
241 while local.pop().is_some() {
242 n += 1;
243 }
244
245 thread::sleep(Duration::from_micros(10));
246 }
247
248 n
249 });
250
251 let mut num_pop = 0;
252
253 for i in 0..NUM_TASKS {
254 let (task, _) = super::unowned(async {});
255 local.push_back_or_overflow(task, &inject, &mut stats);
256
257 if i % 128 == 0 && local.pop().is_some() {
258 num_pop += 1;
259 }
260
261 num_pop += inject.borrow_mut().drain(..).count();
262 }
263
264 num_pop += th.join().unwrap();
265
266 while local.pop().is_some() {
267 num_pop += 1;
268 }
269
270 num_pop += inject.borrow_mut().drain(..).count();
271
272 assert_eq!(num_pop, NUM_TASKS);
273 }
274 }
275
276 #[allow(dead_code)]
277 struct Runtime;
278
279 impl Schedule for Runtime {
release(&self, _task: &Task<Self>) -> Option<Task<Self>>280 fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
281 None
282 }
283
schedule(&self, _task: task::Notified<Self>)284 fn schedule(&self, _task: task::Notified<Self>) {
285 unreachable!();
286 }
287
hooks(&self) -> TaskHarnessScheduleHooks288 fn hooks(&self) -> TaskHarnessScheduleHooks {
289 TaskHarnessScheduleHooks {
290 task_terminate_callback: None,
291 }
292 }
293 }
294