1 #![allow(unknown_lints, unexpected_cfgs)]
2 #![warn(rust_2018_idioms)]
3 // Too slow on miri.
4 #![cfg(all(feature = "full", not(target_os = "wasi"), not(miri)))]
5 
6 use tokio::io::{AsyncReadExt, AsyncWriteExt};
7 use tokio::net::{TcpListener, TcpStream};
8 use tokio::runtime;
9 use tokio::sync::oneshot;
10 use tokio_test::{assert_err, assert_ok};
11 
12 use std::future::{poll_fn, Future};
13 use std::pin::Pin;
14 use std::sync::atomic::Ordering::Relaxed;
15 use std::sync::atomic::{AtomicUsize, Ordering};
16 use std::sync::{mpsc, Arc, Mutex};
17 use std::task::{Context, Poll, Waker};
18 
19 macro_rules! cfg_metrics {
20     ($($t:tt)*) => {
21         #[cfg(all(tokio_unstable, target_has_atomic = "64"))]
22         {
23             $( $t )*
24         }
25     }
26 }
27 
28 #[test]
single_thread()29 fn single_thread() {
30     // No panic when starting a runtime w/ a single thread
31     let _ = runtime::Builder::new_multi_thread()
32         .enable_all()
33         .worker_threads(1)
34         .build()
35         .unwrap();
36 }
37 
38 #[test]
many_oneshot_futures()39 fn many_oneshot_futures() {
40     // used for notifying the main thread
41     const NUM: usize = 1_000;
42 
43     for _ in 0..5 {
44         let (tx, rx) = mpsc::channel();
45 
46         let rt = rt();
47         let cnt = Arc::new(AtomicUsize::new(0));
48 
49         for _ in 0..NUM {
50             let cnt = cnt.clone();
51             let tx = tx.clone();
52 
53             rt.spawn(async move {
54                 let num = cnt.fetch_add(1, Relaxed) + 1;
55 
56                 if num == NUM {
57                     tx.send(()).unwrap();
58                 }
59             });
60         }
61 
62         rx.recv().unwrap();
63 
64         // Wait for the pool to shutdown
65         drop(rt);
66     }
67 }
68 
69 #[test]
spawn_two()70 fn spawn_two() {
71     let rt = rt();
72 
73     let out = rt.block_on(async {
74         let (tx, rx) = oneshot::channel();
75 
76         tokio::spawn(async move {
77             tokio::spawn(async move {
78                 tx.send("ZOMG").unwrap();
79             });
80         });
81 
82         assert_ok!(rx.await)
83     });
84 
85     assert_eq!(out, "ZOMG");
86 
87     cfg_metrics! {
88         let metrics = rt.metrics();
89         drop(rt);
90         assert_eq!(1, metrics.remote_schedule_count());
91 
92         let mut local = 0;
93         for i in 0..metrics.num_workers() {
94             local += metrics.worker_local_schedule_count(i);
95         }
96 
97         assert_eq!(1, local);
98     }
99 }
100 
101 #[test]
many_multishot_futures()102 fn many_multishot_futures() {
103     const CHAIN: usize = 200;
104     const CYCLES: usize = 5;
105     const TRACKS: usize = 50;
106 
107     for _ in 0..50 {
108         let rt = rt();
109         let mut start_txs = Vec::with_capacity(TRACKS);
110         let mut final_rxs = Vec::with_capacity(TRACKS);
111 
112         for _ in 0..TRACKS {
113             let (start_tx, mut chain_rx) = tokio::sync::mpsc::channel(10);
114 
115             for _ in 0..CHAIN {
116                 let (next_tx, next_rx) = tokio::sync::mpsc::channel(10);
117 
118                 // Forward all the messages
119                 rt.spawn(async move {
120                     while let Some(v) = chain_rx.recv().await {
121                         next_tx.send(v).await.unwrap();
122                     }
123                 });
124 
125                 chain_rx = next_rx;
126             }
127 
128             // This final task cycles if needed
129             let (final_tx, final_rx) = tokio::sync::mpsc::channel(10);
130             let cycle_tx = start_tx.clone();
131             let mut rem = CYCLES;
132 
133             rt.spawn(async move {
134                 for _ in 0..CYCLES {
135                     let msg = chain_rx.recv().await.unwrap();
136 
137                     rem -= 1;
138 
139                     if rem == 0 {
140                         final_tx.send(msg).await.unwrap();
141                     } else {
142                         cycle_tx.send(msg).await.unwrap();
143                     }
144                 }
145             });
146 
147             start_txs.push(start_tx);
148             final_rxs.push(final_rx);
149         }
150 
151         {
152             rt.block_on(async move {
153                 for start_tx in start_txs {
154                     start_tx.send("ping").await.unwrap();
155                 }
156 
157                 for mut final_rx in final_rxs {
158                     final_rx.recv().await.unwrap();
159                 }
160             });
161         }
162     }
163 }
164 
165 #[test]
lifo_slot_budget()166 fn lifo_slot_budget() {
167     async fn my_fn() {
168         spawn_another();
169     }
170 
171     fn spawn_another() {
172         tokio::spawn(my_fn());
173     }
174 
175     let rt = runtime::Builder::new_multi_thread()
176         .enable_all()
177         .worker_threads(1)
178         .build()
179         .unwrap();
180 
181     let (send, recv) = oneshot::channel();
182 
183     rt.spawn(async move {
184         tokio::spawn(my_fn());
185         let _ = send.send(());
186     });
187 
188     let _ = rt.block_on(recv);
189 }
190 
191 #[test]
192 #[cfg_attr(miri, ignore)] // No `socket` in miri.
spawn_shutdown()193 fn spawn_shutdown() {
194     let rt = rt();
195     let (tx, rx) = mpsc::channel();
196 
197     rt.block_on(async {
198         tokio::spawn(client_server(tx.clone()));
199     });
200 
201     // Use spawner
202     rt.spawn(client_server(tx));
203 
204     assert_ok!(rx.recv());
205     assert_ok!(rx.recv());
206 
207     drop(rt);
208     assert_err!(rx.try_recv());
209 }
210 
client_server(tx: mpsc::Sender<()>)211 async fn client_server(tx: mpsc::Sender<()>) {
212     let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
213 
214     // Get the assigned address
215     let addr = assert_ok!(server.local_addr());
216 
217     // Spawn the server
218     tokio::spawn(async move {
219         // Accept a socket
220         let (mut socket, _) = server.accept().await.unwrap();
221 
222         // Write some data
223         socket.write_all(b"hello").await.unwrap();
224     });
225 
226     let mut client = TcpStream::connect(&addr).await.unwrap();
227 
228     let mut buf = vec![];
229     client.read_to_end(&mut buf).await.unwrap();
230 
231     assert_eq!(buf, b"hello");
232     tx.send(()).unwrap();
233 }
234 
235 #[test]
drop_threadpool_drops_futures()236 fn drop_threadpool_drops_futures() {
237     for _ in 0..1_000 {
238         let num_inc = Arc::new(AtomicUsize::new(0));
239         let num_dec = Arc::new(AtomicUsize::new(0));
240         let num_drop = Arc::new(AtomicUsize::new(0));
241 
242         struct Never(Arc<AtomicUsize>);
243 
244         impl Future for Never {
245             type Output = ();
246 
247             fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
248                 Poll::Pending
249             }
250         }
251 
252         impl Drop for Never {
253             fn drop(&mut self) {
254                 self.0.fetch_add(1, Relaxed);
255             }
256         }
257 
258         let a = num_inc.clone();
259         let b = num_dec.clone();
260 
261         let rt = runtime::Builder::new_multi_thread()
262             .enable_all()
263             .on_thread_start(move || {
264                 a.fetch_add(1, Relaxed);
265             })
266             .on_thread_stop(move || {
267                 b.fetch_add(1, Relaxed);
268             })
269             .build()
270             .unwrap();
271 
272         rt.spawn(Never(num_drop.clone()));
273 
274         // Wait for the pool to shutdown
275         drop(rt);
276 
277         // Assert that only a single thread was spawned.
278         let a = num_inc.load(Relaxed);
279         assert!(a >= 1);
280 
281         // Assert that all threads shutdown
282         let b = num_dec.load(Relaxed);
283         assert_eq!(a, b);
284 
285         // Assert that the future was dropped
286         let c = num_drop.load(Relaxed);
287         assert_eq!(c, 1);
288     }
289 }
290 
291 #[test]
start_stop_callbacks_called()292 fn start_stop_callbacks_called() {
293     use std::sync::atomic::{AtomicUsize, Ordering};
294 
295     let after_start = Arc::new(AtomicUsize::new(0));
296     let before_stop = Arc::new(AtomicUsize::new(0));
297 
298     let after_inner = after_start.clone();
299     let before_inner = before_stop.clone();
300     let rt = tokio::runtime::Builder::new_multi_thread()
301         .enable_all()
302         .on_thread_start(move || {
303             after_inner.clone().fetch_add(1, Ordering::Relaxed);
304         })
305         .on_thread_stop(move || {
306             before_inner.clone().fetch_add(1, Ordering::Relaxed);
307         })
308         .build()
309         .unwrap();
310 
311     let (tx, rx) = oneshot::channel();
312 
313     rt.spawn(async move {
314         assert_ok!(tx.send(()));
315     });
316 
317     assert_ok!(rt.block_on(rx));
318 
319     drop(rt);
320 
321     assert!(after_start.load(Ordering::Relaxed) > 0);
322     assert!(before_stop.load(Ordering::Relaxed) > 0);
323 }
324 
325 #[test]
326 // too slow on miri
327 #[cfg_attr(miri, ignore)]
blocking()328 fn blocking() {
329     // used for notifying the main thread
330     const NUM: usize = 1_000;
331 
332     for _ in 0..10 {
333         let (tx, rx) = mpsc::channel();
334 
335         let rt = rt();
336         let cnt = Arc::new(AtomicUsize::new(0));
337 
338         // there are four workers in the pool
339         // so, if we run 4 blocking tasks, we know that handoff must have happened
340         let block = Arc::new(std::sync::Barrier::new(5));
341         for _ in 0..4 {
342             let block = block.clone();
343             rt.spawn(async move {
344                 tokio::task::block_in_place(move || {
345                     block.wait();
346                     block.wait();
347                 })
348             });
349         }
350         block.wait();
351 
352         for _ in 0..NUM {
353             let cnt = cnt.clone();
354             let tx = tx.clone();
355 
356             rt.spawn(async move {
357                 let num = cnt.fetch_add(1, Relaxed) + 1;
358 
359                 if num == NUM {
360                     tx.send(()).unwrap();
361                 }
362             });
363         }
364 
365         rx.recv().unwrap();
366 
367         // Wait for the pool to shutdown
368         block.wait();
369     }
370 }
371 
372 #[test]
multi_threadpool()373 fn multi_threadpool() {
374     use tokio::sync::oneshot;
375 
376     let rt1 = rt();
377     let rt2 = rt();
378 
379     let (tx, rx) = oneshot::channel();
380     let (done_tx, done_rx) = mpsc::channel();
381 
382     rt2.spawn(async move {
383         rx.await.unwrap();
384         done_tx.send(()).unwrap();
385     });
386 
387     rt1.spawn(async move {
388         tx.send(()).unwrap();
389     });
390 
391     done_rx.recv().unwrap();
392 }
393 
394 // When `block_in_place` returns, it attempts to reclaim the yielded runtime
395 // worker. In this case, the remainder of the task is on the runtime worker and
396 // must take part in the cooperative task budgeting system.
397 //
398 // The test ensures that, when this happens, attempting to consume from a
399 // channel yields occasionally even if there are values ready to receive.
400 #[test]
coop_and_block_in_place()401 fn coop_and_block_in_place() {
402     let rt = tokio::runtime::Builder::new_multi_thread()
403         // Setting max threads to 1 prevents another thread from claiming the
404         // runtime worker yielded as part of `block_in_place` and guarantees the
405         // same thread will reclaim the worker at the end of the
406         // `block_in_place` call.
407         .max_blocking_threads(1)
408         .build()
409         .unwrap();
410 
411     rt.block_on(async move {
412         let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
413 
414         // Fill the channel
415         for _ in 0..1024 {
416             tx.send(()).await.unwrap();
417         }
418 
419         drop(tx);
420 
421         tokio::spawn(async move {
422             // Block in place without doing anything
423             tokio::task::block_in_place(|| {});
424 
425             // Receive all the values, this should trigger a `Pending` as the
426             // coop limit will be reached.
427             poll_fn(|cx| {
428                 while let Poll::Ready(v) = {
429                     tokio::pin! {
430                         let fut = rx.recv();
431                     }
432 
433                     Pin::new(&mut fut).poll(cx)
434                 } {
435                     if v.is_none() {
436                         panic!("did not yield");
437                     }
438                 }
439 
440                 Poll::Ready(())
441             })
442             .await
443         })
444         .await
445         .unwrap();
446     });
447 }
448 
449 #[test]
yield_after_block_in_place()450 fn yield_after_block_in_place() {
451     let rt = tokio::runtime::Builder::new_multi_thread()
452         .worker_threads(1)
453         .build()
454         .unwrap();
455 
456     rt.block_on(async {
457         tokio::spawn(async move {
458             // Block in place then enter a new runtime
459             tokio::task::block_in_place(|| {
460                 let rt = tokio::runtime::Builder::new_current_thread()
461                     .build()
462                     .unwrap();
463 
464                 rt.block_on(async {});
465             });
466 
467             // Yield, then complete
468             tokio::task::yield_now().await;
469         })
470         .await
471         .unwrap()
472     });
473 }
474 
475 // Testing this does not panic
476 #[test]
max_blocking_threads()477 fn max_blocking_threads() {
478     let _rt = tokio::runtime::Builder::new_multi_thread()
479         .max_blocking_threads(1)
480         .build()
481         .unwrap();
482 }
483 
484 #[test]
485 #[should_panic]
max_blocking_threads_set_to_zero()486 fn max_blocking_threads_set_to_zero() {
487     let _rt = tokio::runtime::Builder::new_multi_thread()
488         .max_blocking_threads(0)
489         .build()
490         .unwrap();
491 }
492 
493 /// Regression test for #6445.
494 ///
495 /// After #6445, setting `global_queue_interval` to 1 is now technically valid.
496 /// This test confirms that there is no regression in `multi_thread_runtime`
497 /// when global_queue_interval is set to 1.
498 #[test]
global_queue_interval_set_to_one()499 fn global_queue_interval_set_to_one() {
500     let rt = tokio::runtime::Builder::new_multi_thread()
501         .global_queue_interval(1)
502         .build()
503         .unwrap();
504 
505     // Perform a simple work.
506     let cnt = Arc::new(AtomicUsize::new(0));
507     rt.block_on(async {
508         let mut set = tokio::task::JoinSet::new();
509         for _ in 0..10 {
510             let cnt = cnt.clone();
511             set.spawn(async move { cnt.fetch_add(1, Ordering::Relaxed) });
512         }
513 
514         while let Some(res) = set.join_next().await {
515             res.unwrap();
516         }
517     });
518     assert_eq!(cnt.load(Relaxed), 10);
519 }
520 
521 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
hang_on_shutdown()522 async fn hang_on_shutdown() {
523     let (sync_tx, sync_rx) = std::sync::mpsc::channel::<()>();
524     tokio::spawn(async move {
525         tokio::task::block_in_place(|| sync_rx.recv().ok());
526     });
527 
528     tokio::spawn(async {
529         tokio::time::sleep(std::time::Duration::from_secs(2)).await;
530         drop(sync_tx);
531     });
532     tokio::time::sleep(std::time::Duration::from_secs(1)).await;
533 }
534 
535 /// Demonstrates tokio-rs/tokio#3869
536 #[test]
wake_during_shutdown()537 fn wake_during_shutdown() {
538     struct Shared {
539         waker: Option<Waker>,
540     }
541 
542     struct MyFuture {
543         shared: Arc<Mutex<Shared>>,
544         put_waker: bool,
545     }
546 
547     impl MyFuture {
548         fn new() -> (Self, Self) {
549             let shared = Arc::new(Mutex::new(Shared { waker: None }));
550             let f1 = MyFuture {
551                 shared: shared.clone(),
552                 put_waker: true,
553             };
554             let f2 = MyFuture {
555                 shared,
556                 put_waker: false,
557             };
558             (f1, f2)
559         }
560     }
561 
562     impl Future for MyFuture {
563         type Output = ();
564 
565         fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
566             let me = Pin::into_inner(self);
567             let mut lock = me.shared.lock().unwrap();
568             if me.put_waker {
569                 lock.waker = Some(cx.waker().clone());
570             }
571             Poll::Pending
572         }
573     }
574 
575     impl Drop for MyFuture {
576         fn drop(&mut self) {
577             let mut lock = self.shared.lock().unwrap();
578             if !self.put_waker {
579                 lock.waker.take().unwrap().wake();
580             }
581             drop(lock);
582         }
583     }
584 
585     let rt = tokio::runtime::Builder::new_multi_thread()
586         .worker_threads(1)
587         .enable_all()
588         .build()
589         .unwrap();
590 
591     let (f1, f2) = MyFuture::new();
592 
593     rt.spawn(f1);
594     rt.spawn(f2);
595 
596     rt.block_on(async { tokio::time::sleep(tokio::time::Duration::from_millis(20)).await });
597 }
598 
599 #[should_panic]
600 #[tokio::test]
test_block_in_place1()601 async fn test_block_in_place1() {
602     tokio::task::block_in_place(|| {});
603 }
604 
605 #[tokio::test(flavor = "multi_thread")]
test_block_in_place2()606 async fn test_block_in_place2() {
607     tokio::task::block_in_place(|| {});
608 }
609 
610 #[should_panic]
611 #[tokio::main(flavor = "current_thread")]
612 #[test]
test_block_in_place3()613 async fn test_block_in_place3() {
614     tokio::task::block_in_place(|| {});
615 }
616 
617 #[tokio::main]
618 #[test]
test_block_in_place4()619 async fn test_block_in_place4() {
620     tokio::task::block_in_place(|| {});
621 }
622 
623 // Repro for tokio-rs/tokio#5239
624 #[test]
test_nested_block_in_place_with_block_on_between()625 fn test_nested_block_in_place_with_block_on_between() {
626     let rt = runtime::Builder::new_multi_thread()
627         .worker_threads(1)
628         // Needs to be more than 0
629         .max_blocking_threads(1)
630         .build()
631         .unwrap();
632 
633     // Triggered by a race condition, so run a few times to make sure it is OK.
634     for _ in 0..100 {
635         let h = rt.handle().clone();
636 
637         rt.block_on(async move {
638             tokio::spawn(async move {
639                 tokio::task::block_in_place(|| {
640                     h.block_on(async {
641                         tokio::task::block_in_place(|| {});
642                     });
643                 })
644             })
645             .await
646             .unwrap()
647         });
648     }
649 }
650 
651 // Testing the tuning logic is tricky as it is inherently timing based, and more
652 // of a heuristic than an exact behavior. This test checks that the interval
653 // changes over time based on load factors. There are no assertions, completion
654 // is sufficient. If there is a regression, this test will hang. In theory, we
655 // could add limits, but that would be likely to fail on CI.
656 #[test]
657 #[cfg(not(tokio_no_tuning_tests))]
test_tuning()658 fn test_tuning() {
659     use std::sync::atomic::AtomicBool;
660     use std::time::Duration;
661 
662     let rt = runtime::Builder::new_multi_thread()
663         .worker_threads(1)
664         .build()
665         .unwrap();
666 
667     fn iter(flag: Arc<AtomicBool>, counter: Arc<AtomicUsize>, stall: bool) {
668         if flag.load(Relaxed) {
669             if stall {
670                 std::thread::sleep(Duration::from_micros(5));
671             }
672 
673             counter.fetch_add(1, Relaxed);
674             tokio::spawn(async move { iter(flag, counter, stall) });
675         }
676     }
677 
678     let flag = Arc::new(AtomicBool::new(true));
679     let counter = Arc::new(AtomicUsize::new(61));
680     let interval = Arc::new(AtomicUsize::new(61));
681 
682     {
683         let flag = flag.clone();
684         let counter = counter.clone();
685         rt.spawn(async move { iter(flag, counter, true) });
686     }
687 
688     // Now, hammer the injection queue until the interval drops.
689     let mut n = 0;
690     loop {
691         let curr = interval.load(Relaxed);
692 
693         if curr <= 8 {
694             n += 1;
695         } else {
696             n = 0;
697         }
698 
699         // Make sure we get a few good rounds. Jitter in the tuning could result
700         // in one "good" value without being representative of reaching a good
701         // state.
702         if n == 3 {
703             break;
704         }
705 
706         if Arc::strong_count(&interval) < 5_000 {
707             let counter = counter.clone();
708             let interval = interval.clone();
709 
710             rt.spawn(async move {
711                 let prev = counter.swap(0, Relaxed);
712                 interval.store(prev, Relaxed);
713             });
714 
715             std::thread::yield_now();
716         }
717     }
718 
719     flag.store(false, Relaxed);
720 
721     let w = Arc::downgrade(&interval);
722     drop(interval);
723 
724     while w.strong_count() > 0 {
725         std::thread::sleep(Duration::from_micros(500));
726     }
727 
728     // Now, run it again with a faster task
729     let flag = Arc::new(AtomicBool::new(true));
730     // Set it high, we know it shouldn't ever really be this high
731     let counter = Arc::new(AtomicUsize::new(10_000));
732     let interval = Arc::new(AtomicUsize::new(10_000));
733 
734     {
735         let flag = flag.clone();
736         let counter = counter.clone();
737         rt.spawn(async move { iter(flag, counter, false) });
738     }
739 
740     // Now, hammer the injection queue until the interval reaches the expected range.
741     let mut n = 0;
742     loop {
743         let curr = interval.load(Relaxed);
744 
745         if curr <= 1_000 && curr > 32 {
746             n += 1;
747         } else {
748             n = 0;
749         }
750 
751         if n == 3 {
752             break;
753         }
754 
755         if Arc::strong_count(&interval) <= 5_000 {
756             let counter = counter.clone();
757             let interval = interval.clone();
758 
759             rt.spawn(async move {
760                 let prev = counter.swap(0, Relaxed);
761                 interval.store(prev, Relaxed);
762             });
763         }
764 
765         std::thread::yield_now();
766     }
767 
768     flag.store(false, Relaxed);
769 }
770 
rt() -> runtime::Runtime771 fn rt() -> runtime::Runtime {
772     runtime::Runtime::new().unwrap()
773 }
774 
775 #[cfg(tokio_unstable)]
776 mod unstable {
777     use super::*;
778 
779     #[test]
test_disable_lifo_slot()780     fn test_disable_lifo_slot() {
781         use std::sync::mpsc::{channel, RecvTimeoutError};
782 
783         let rt = runtime::Builder::new_multi_thread()
784             .disable_lifo_slot()
785             .worker_threads(2)
786             .build()
787             .unwrap();
788 
789         // Spawn a background thread to poke the runtime periodically.
790         //
791         // This is necessary because we may end up triggering the issue in:
792         // <https://github.com/tokio-rs/tokio/issues/4730>
793         //
794         // Spawning a task will wake up the second worker, which will then steal
795         // the task. However, the steal will fail if the task is in the LIFO
796         // slot, because the LIFO slot cannot be stolen.
797         //
798         // Note that this only happens rarely. Most of the time, this thread is
799         // not necessary.
800         let (kill_bg_thread, recv) = channel::<()>();
801         let handle = rt.handle().clone();
802         let bg_thread = std::thread::spawn(move || {
803             let one_sec = std::time::Duration::from_secs(1);
804             while recv.recv_timeout(one_sec) == Err(RecvTimeoutError::Timeout) {
805                 handle.spawn(async {});
806             }
807         });
808 
809         rt.block_on(async {
810             tokio::spawn(async {
811                 // Spawn another task and block the thread until completion. If the LIFO slot
812                 // is used then the test doesn't complete.
813                 futures::executor::block_on(tokio::spawn(async {})).unwrap();
814             })
815             .await
816             .unwrap();
817         });
818 
819         drop(kill_bg_thread);
820         bg_thread.join().unwrap();
821     }
822 
823     #[test]
runtime_id_is_same()824     fn runtime_id_is_same() {
825         let rt = rt();
826 
827         let handle1 = rt.handle();
828         let handle2 = rt.handle();
829 
830         assert_eq!(handle1.id(), handle2.id());
831     }
832 
833     #[test]
runtime_ids_different()834     fn runtime_ids_different() {
835         let rt1 = rt();
836         let rt2 = rt();
837 
838         assert_ne!(rt1.handle().id(), rt2.handle().id());
839     }
840 }
841