1 use crate::runtime::scheduler::multi_thread::{queue, Stats};
2 use crate::runtime::task::{self, Schedule, Task, TaskHarnessScheduleHooks};
3 
4 use std::cell::RefCell;
5 use std::thread;
6 use std::time::Duration;
7 
8 #[allow(unused)]
9 macro_rules! assert_metrics {
10     ($stats:ident, $field:ident == $v:expr) => {
11         #[cfg(target_has_atomic = "64")]
12         {
13             use crate::runtime::WorkerMetrics;
14             use std::sync::atomic::Ordering::Relaxed;
15 
16             let worker = WorkerMetrics::new();
17             $stats.submit(&worker);
18 
19             let expect = $v;
20             let actual = worker.$field.load(Relaxed);
21 
22             assert!(actual == expect, "expect = {}; actual = {}", expect, actual)
23         }
24     };
25 }
26 
new_stats() -> Stats27 fn new_stats() -> Stats {
28     use crate::runtime::WorkerMetrics;
29     Stats::new(&WorkerMetrics::new())
30 }
31 
32 #[test]
fits_256_one_at_a_time()33 fn fits_256_one_at_a_time() {
34     let (_, mut local) = queue::local();
35     let inject = RefCell::new(vec![]);
36     let mut stats = new_stats();
37 
38     for _ in 0..256 {
39         let (task, _) = super::unowned(async {});
40         local.push_back_or_overflow(task, &inject, &mut stats);
41     }
42 
43     cfg_unstable_metrics! {
44         assert_metrics!(stats, overflow_count == 0);
45     }
46 
47     assert!(inject.borrow_mut().pop().is_none());
48 
49     while local.pop().is_some() {}
50 }
51 
52 #[test]
fits_256_all_at_once()53 fn fits_256_all_at_once() {
54     let (_, mut local) = queue::local();
55 
56     let mut tasks = (0..256)
57         .map(|_| super::unowned(async {}).0)
58         .collect::<Vec<_>>();
59     local.push_back(tasks.drain(..));
60 
61     let mut i = 0;
62     while local.pop().is_some() {
63         i += 1;
64     }
65 
66     assert_eq!(i, 256);
67 }
68 
69 #[test]
fits_256_all_in_chunks()70 fn fits_256_all_in_chunks() {
71     let (_, mut local) = queue::local();
72 
73     let mut tasks = (0..256)
74         .map(|_| super::unowned(async {}).0)
75         .collect::<Vec<_>>();
76 
77     local.push_back(tasks.drain(..10));
78     local.push_back(tasks.drain(..100));
79     local.push_back(tasks.drain(..46));
80     local.push_back(tasks.drain(..100));
81 
82     let mut i = 0;
83     while local.pop().is_some() {
84         i += 1;
85     }
86 
87     assert_eq!(i, 256);
88 }
89 
90 #[test]
overflow()91 fn overflow() {
92     let (_, mut local) = queue::local();
93     let inject = RefCell::new(vec![]);
94     let mut stats = new_stats();
95 
96     for _ in 0..257 {
97         let (task, _) = super::unowned(async {});
98         local.push_back_or_overflow(task, &inject, &mut stats);
99     }
100 
101     cfg_unstable_metrics! {
102         assert_metrics!(stats, overflow_count == 1);
103     }
104 
105     let mut n = 0;
106 
107     n += inject.borrow_mut().drain(..).count();
108 
109     while local.pop().is_some() {
110         n += 1;
111     }
112 
113     assert_eq!(n, 257);
114 }
115 
116 #[test]
steal_batch()117 fn steal_batch() {
118     let mut stats = new_stats();
119 
120     let (steal1, mut local1) = queue::local();
121     let (_, mut local2) = queue::local();
122     let inject = RefCell::new(vec![]);
123 
124     for _ in 0..4 {
125         let (task, _) = super::unowned(async {});
126         local1.push_back_or_overflow(task, &inject, &mut stats);
127     }
128 
129     assert!(steal1.steal_into(&mut local2, &mut stats).is_some());
130 
131     cfg_unstable_metrics! {
132         assert_metrics!(stats, steal_count == 2);
133     }
134 
135     for _ in 0..1 {
136         assert!(local2.pop().is_some());
137     }
138 
139     assert!(local2.pop().is_none());
140 
141     for _ in 0..2 {
142         assert!(local1.pop().is_some());
143     }
144 
145     assert!(local1.pop().is_none());
146 }
147 
normal_or_miri(normal: usize, miri: usize) -> usize148 const fn normal_or_miri(normal: usize, miri: usize) -> usize {
149     if cfg!(miri) {
150         miri
151     } else {
152         normal
153     }
154 }
155 
156 #[test]
stress1()157 fn stress1() {
158     const NUM_ITER: usize = 5;
159     const NUM_STEAL: usize = normal_or_miri(1_000, 10);
160     const NUM_LOCAL: usize = normal_or_miri(1_000, 10);
161     const NUM_PUSH: usize = normal_or_miri(500, 10);
162     const NUM_POP: usize = normal_or_miri(250, 10);
163 
164     let mut stats = new_stats();
165 
166     for _ in 0..NUM_ITER {
167         let (steal, mut local) = queue::local();
168         let inject = RefCell::new(vec![]);
169 
170         let th = thread::spawn(move || {
171             let mut stats = new_stats();
172             let (_, mut local) = queue::local();
173             let mut n = 0;
174 
175             for _ in 0..NUM_STEAL {
176                 if steal.steal_into(&mut local, &mut stats).is_some() {
177                     n += 1;
178                 }
179 
180                 while local.pop().is_some() {
181                     n += 1;
182                 }
183 
184                 thread::yield_now();
185             }
186 
187             cfg_unstable_metrics! {
188                 assert_metrics!(stats, steal_count == n as _);
189             }
190 
191             n
192         });
193 
194         let mut n = 0;
195 
196         for _ in 0..NUM_LOCAL {
197             for _ in 0..NUM_PUSH {
198                 let (task, _) = super::unowned(async {});
199                 local.push_back_or_overflow(task, &inject, &mut stats);
200             }
201 
202             for _ in 0..NUM_POP {
203                 if local.pop().is_some() {
204                     n += 1;
205                 } else {
206                     break;
207                 }
208             }
209         }
210 
211         n += inject.borrow_mut().drain(..).count();
212 
213         n += th.join().unwrap();
214 
215         assert_eq!(n, NUM_LOCAL * NUM_PUSH);
216     }
217 }
218 
219 #[test]
stress2()220 fn stress2() {
221     const NUM_ITER: usize = 1;
222     const NUM_TASKS: usize = normal_or_miri(1_000_000, 50);
223     const NUM_STEAL: usize = normal_or_miri(1_000, 10);
224 
225     let mut stats = new_stats();
226 
227     for _ in 0..NUM_ITER {
228         let (steal, mut local) = queue::local();
229         let inject = RefCell::new(vec![]);
230 
231         let th = thread::spawn(move || {
232             let mut stats = new_stats();
233             let (_, mut local) = queue::local();
234             let mut n = 0;
235 
236             for _ in 0..NUM_STEAL {
237                 if steal.steal_into(&mut local, &mut stats).is_some() {
238                     n += 1;
239                 }
240 
241                 while local.pop().is_some() {
242                     n += 1;
243                 }
244 
245                 thread::sleep(Duration::from_micros(10));
246             }
247 
248             n
249         });
250 
251         let mut num_pop = 0;
252 
253         for i in 0..NUM_TASKS {
254             let (task, _) = super::unowned(async {});
255             local.push_back_or_overflow(task, &inject, &mut stats);
256 
257             if i % 128 == 0 && local.pop().is_some() {
258                 num_pop += 1;
259             }
260 
261             num_pop += inject.borrow_mut().drain(..).count();
262         }
263 
264         num_pop += th.join().unwrap();
265 
266         while local.pop().is_some() {
267             num_pop += 1;
268         }
269 
270         num_pop += inject.borrow_mut().drain(..).count();
271 
272         assert_eq!(num_pop, NUM_TASKS);
273     }
274 }
275 
276 #[allow(dead_code)]
277 struct Runtime;
278 
279 impl Schedule for Runtime {
release(&self, _task: &Task<Self>) -> Option<Task<Self>>280     fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
281         None
282     }
283 
schedule(&self, _task: task::Notified<Self>)284     fn schedule(&self, _task: task::Notified<Self>) {
285         unreachable!();
286     }
287 
hooks(&self) -> TaskHarnessScheduleHooks288     fn hooks(&self) -> TaskHarnessScheduleHooks {
289         TaskHarnessScheduleHooks {
290             task_terminate_callback: None,
291         }
292     }
293 }
294