1 /*
2 * Copyright (c) 2024 Google Inc. All rights reserved
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining
5 * a copy of this software and associated documentation files
6 * (the "Software"), to deal in the Software without restriction,
7 * including without limitation the rights to use, copy, modify, merge,
8 * publish, distribute, sublicense, and/or sell copies of the Software,
9 * and to permit persons to whom the Software is furnished to do so,
10 * subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be
13 * included in all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
18 * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
19 * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
20 * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
21 * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22 */
23
24 #![deny(unsafe_op_in_unsafe_fn)]
25 use core::ffi::c_void;
26 use core::ops::Deref;
27 use core::ops::DerefMut;
28 use core::ptr::eq;
29 use core::ptr::null_mut;
30 use core::time::Duration;
31
32 use alloc::boxed::Box;
33 use alloc::ffi::CString;
34 use alloc::sync::Arc;
35 use alloc::vec;
36 use alloc::vec::Vec;
37
38 use log::debug;
39 use log::error;
40 use log::info;
41 use log::warn;
42
43 use rust_support::handle::IPC_HANDLE_POLL_HUP;
44 use rust_support::handle::IPC_HANDLE_POLL_MSG;
45 use rust_support::handle::IPC_HANDLE_POLL_READY;
46 use rust_support::handle::IPC_HANDLE_POLL_SEND_UNBLOCKED;
47 use rust_support::ipc::iovec_kern;
48 use rust_support::ipc::ipc_get_msg;
49 use rust_support::ipc::ipc_msg_info;
50 use rust_support::ipc::ipc_msg_kern;
51 use rust_support::ipc::ipc_port_connect_async;
52 use rust_support::ipc::ipc_put_msg;
53 use rust_support::ipc::ipc_read_msg;
54 use rust_support::ipc::ipc_send_msg;
55 use rust_support::ipc::zero_uuid;
56 use rust_support::ipc::IPC_CONNECT_WAIT_FOR_PORT;
57 use rust_support::ipc::IPC_PORT_PATH_MAX;
58 use rust_support::sync::Mutex;
59 use rust_support::thread;
60 use rust_support::thread::sleep;
61 use virtio_drivers::device::socket::SocketError;
62 use virtio_drivers::device::socket::VsockAddr;
63 use virtio_drivers::device::socket::VsockConnectionManager;
64 use virtio_drivers::device::socket::VsockEvent;
65 use virtio_drivers::device::socket::VsockEventType;
66 use virtio_drivers::transport::Transport;
67 use virtio_drivers::Error as VirtioError;
68 use virtio_drivers::Hal;
69 use virtio_drivers::PAGE_SIZE;
70
71 use rust_support::handle::HandleRef;
72 use rust_support::handle_set::HandleSet;
73
74 use rust_support::Error as LkError;
75
76 use crate::err::Error;
77
78 const ACTIVE_TIMEOUT: Duration = Duration::from_secs(5);
79
80 #[allow(dead_code)]
81 #[derive(Clone, Copy, Debug, Default, PartialEq)]
82 enum VsockConnectionState {
83 #[default]
84 Invalid = 0,
85 VsockOnly,
86 TipcOnly,
87 TipcConnecting,
88 TipcSendBlocked,
89 Active,
90 TipcClosed,
91 Closed,
92 }
93
94 #[derive(Default)]
95 struct VsockConnection {
96 peer: VsockAddr,
97 local_port: u32,
98 state: VsockConnectionState,
99 tipc_port_name: Option<CString>,
100 href: HandleRef,
101 tx_count: u64,
102 tx_since_rx: u64,
103 rx_count: u64,
104 rx_since_tx: u64,
105 rx_buffer: Box<[u8]>, // buffers data if the tipc connection blocks
106 rx_pending: usize, // how many bytes to send when tipc unblocks
107 }
108
109 impl VsockConnection {
new(peer: VsockAddr, local_port: u32) -> Self110 fn new(peer: VsockAddr, local_port: u32) -> Self {
111 // Make rx_buffer twice as large as the vsock connection rx buffer such
112 // that we can buffer pending messages if TIPC blocks.
113 //
114 // TODO: the ideal rx_buffer size depends on the connection so it might
115 // be worthwhile to dynamically re-size the buffer in response to tipc
116 // blocking or unblocking.
117 let rx_buffer_len = 2 * PAGE_SIZE;
118 Self {
119 peer,
120 local_port,
121 state: VsockConnectionState::VsockOnly,
122 tipc_port_name: None,
123 rx_buffer: vec![0u8; rx_buffer_len].into_boxed_slice(),
124 ..Default::default()
125 }
126 }
127
tipc_port_name(&self) -> &str128 fn tipc_port_name(&self) -> &str {
129 self.tipc_port_name
130 .as_ref()
131 .map(|s| s.to_str().expect("invalid port name"))
132 .unwrap_or("(no port name)")
133 }
134
print_stats(&self)135 fn print_stats(&self) {
136 info!(
137 "vsock: tx {:?} ({:>5?}) rx {:?} ({:>5?}) port: {}, remote {}, state {:?}",
138 self.tx_since_rx,
139 self.tx_count,
140 self.rx_since_tx,
141 self.rx_count,
142 self.tipc_port_name(),
143 self.peer.port,
144 self.state
145 );
146 }
147
tipc_try_send(&mut self) -> Result<(), Error>148 fn tipc_try_send(&mut self) -> Result<(), Error> {
149 debug_assert!(self.rx_pending > 0 && self.rx_pending < PAGE_SIZE);
150 debug_assert!(
151 self.state == VsockConnectionState::Active
152 || self.state == VsockConnectionState::TipcSendBlocked
153 );
154
155 let length = self.rx_pending;
156 let mut iov = iovec_kern { iov_base: self.rx_buffer.as_mut_ptr() as _, iov_len: length };
157 let mut msg = ipc_msg_kern::new(&mut iov);
158
159 // Safety:
160 // `c.href.handle` is a handle attached to a tipc channel.
161 // `msg` contains an `iov` which points to a buffer from which
162 // the kernel can read `iov_len` bytes.
163 let ret = unsafe { ipc_send_msg(self.href.handle(), &mut msg) };
164 if ret == LkError::ERR_NOT_ENOUGH_BUFFER.into() {
165 self.state = VsockConnectionState::TipcSendBlocked;
166 return Ok(());
167 } else if ret < 0 {
168 error!("failed to send {length} bytes to {}: {ret} ", self.tipc_port_name());
169 LkError::from_lk(ret)?;
170 } else if ret as usize != length {
171 // TODO: in streaming mode, this should not be an error. Instead, consume
172 // the data that was sent and try sending the rest in the next message.
173 error!("sent {ret} bytes but expected to send {length} bytes");
174 return Err(LkError::ERR_BAD_LEN.into());
175 }
176
177 self.state = VsockConnectionState::Active;
178 self.tx_since_rx = 0;
179 self.rx_pending = 0;
180
181 debug!("sent {length} bytes to {}", self.tipc_port_name());
182
183 Ok(())
184 }
185 }
186
187 /// The action to take after running the `f` closure in [`vsock_connection_lookup`].
188 #[derive(PartialEq, Eq)]
189 enum ConnectionStateAction {
190 /// No action needs to be taken, so the connection stays open.
191 None,
192
193 /// TIPC has requested that the connection be closed.
194 /// This closes the connection and waits for the peer to acknowledge before removing it.
195 Close,
196
197 /// We want to close the connection and remove it
198 /// without waiting for the peer to acknowledge it,
199 /// such as when there is an error (but also potentially other reasons).
200 Remove,
201 }
202
vsock_connection_lookup( connections: &mut Vec<VsockConnection>, remote_port: u32, f: impl FnOnce(&mut VsockConnection) -> ConnectionStateAction, ) -> Result<(), ()>203 fn vsock_connection_lookup(
204 connections: &mut Vec<VsockConnection>,
205 remote_port: u32,
206 f: impl FnOnce(&mut VsockConnection) -> ConnectionStateAction,
207 ) -> Result<(), ()> {
208 let (index, connection) = connections
209 .iter_mut()
210 .enumerate()
211 .find(|(_idx, connection)| connection.peer.port == remote_port)
212 .ok_or(())?;
213 let action = f(connection);
214 if action == ConnectionStateAction::None {
215 return Ok(());
216 }
217
218 if vsock_connection_close(connection, action) {
219 connections.swap_remove(index);
220 }
221
222 Ok(())
223 }
224
vsock_connection_close(c: &mut VsockConnection, action: ConnectionStateAction) -> bool225 fn vsock_connection_close(c: &mut VsockConnection, action: ConnectionStateAction) -> bool {
226 info!(
227 "remote_port {}, tipc_port_name {}, state {:?}",
228 c.peer.port,
229 c.tipc_port_name(),
230 c.state
231 );
232
233 if c.state == VsockConnectionState::VsockOnly {
234 info!("tipc vsock only connection closed");
235 c.state = VsockConnectionState::TipcClosed;
236 }
237
238 if c.state == VsockConnectionState::Active
239 || c.state == VsockConnectionState::TipcConnecting
240 || c.state == VsockConnectionState::TipcSendBlocked
241 {
242 // The handle set owns the only reference we have to the handle and
243 // handle_set_wait might have already returned a pointer to c
244 c.href.detach();
245 c.href.handle_close();
246 c.href.set_cookie(null_mut());
247 info!("tipc handle closed");
248 c.state = VsockConnectionState::TipcClosed;
249 }
250 if action == ConnectionStateAction::Remove && c.state == VsockConnectionState::TipcClosed {
251 info!("vsock closed");
252 c.state = VsockConnectionState::Closed;
253 }
254 if c.state == VsockConnectionState::Closed && c.href.cookie().is_null() {
255 info!("remove connection");
256 c.print_stats();
257 return true; // remove connection
258 }
259 false // keep connection
260 }
261
262 pub struct VsockDevice<H, T>
263 where
264 H: Hal,
265 T: Transport,
266 {
267 connections: Mutex<Vec<VsockConnection>>,
268 handle_set: HandleSet,
269 connection_manager: Mutex<VsockConnectionManager<H, T, 4096>>,
270 }
271
272 impl<H, T> VsockDevice<H, T>
273 where
274 H: Hal,
275 T: Transport,
276 {
new(manager: VsockConnectionManager<H, T, 4096>) -> Self277 pub(crate) fn new(manager: VsockConnectionManager<H, T, 4096>) -> Self {
278 Self {
279 connections: Mutex::new(Vec::new()),
280 handle_set: HandleSet::new(),
281 connection_manager: Mutex::new(manager),
282 }
283 }
284
vsock_rx_op_request(&self, peer: VsockAddr, local: VsockAddr)285 fn vsock_rx_op_request(&self, peer: VsockAddr, local: VsockAddr) {
286 debug!("dst_port {}, src_port {}", local.port, peer.port);
287
288 // do we already have a connection?
289 let mut guard = self.connections.lock();
290 if guard
291 .deref()
292 .iter()
293 .any(|connection| connection.peer == peer && connection.local_port == local.port)
294 {
295 panic!("connection already exists");
296 };
297
298 guard.deref_mut().push(VsockConnection::new(peer, local.port));
299 }
300
vsock_connect_tipc( &self, c: &mut VsockConnection, length: usize, source: VsockAddr, destination: VsockAddr, ) -> Result<(), Error>301 fn vsock_connect_tipc(
302 &self,
303 c: &mut VsockConnection,
304 length: usize,
305 source: VsockAddr,
306 destination: VsockAddr,
307 ) -> Result<(), Error> {
308 let mut buffer = [0; IPC_PORT_PATH_MAX as usize];
309 assert!(length < buffer.len());
310 let mut data_len = self
311 .connection_manager
312 .lock()
313 .deref_mut()
314 .recv(source, destination.port, &mut buffer)
315 .unwrap();
316 assert!(data_len == length);
317 // allow manual connect from nc in line mode
318 if buffer[data_len - 1] == b'\n' as _ {
319 data_len -= 1;
320 }
321 let port_name = &buffer[0..data_len];
322 // should not contain any null bytes
323 c.tipc_port_name = CString::new(port_name).ok();
324
325 // Safety:
326 // - `cid`` is a valid uuid because we use a bindgen'd constant
327 // - `path` points to a null-terminated C-string. The null byte was appended by
328 // `CString::new`.
329 // - `max_path` is the length of `path` in bytes including the null terminator.
330 // It is always less than or equal to IPC_PORT_PATH_MAX.
331 // - `flags` contains a flag value accepted by the callee
332 // - `chandle_ptr` points to memory that the kernel can store a pointer into
333 // after the callee returns.
334 let ret = unsafe {
335 ipc_port_connect_async(
336 &zero_uuid,
337 c.tipc_port_name.as_ref().unwrap().as_ptr(),
338 data_len + /* null byte added by CString::new */ 1,
339 IPC_CONNECT_WAIT_FOR_PORT,
340 &mut (*c.href.as_mut_ptr()).handle,
341 )
342 };
343 if ret != 0 {
344 warn!(
345 "failed to connect to {}, remote {}, connect err {ret}",
346 c.tipc_port_name(),
347 c.peer.port
348 )
349 }
350
351 debug!("wait for connection to {}, remote {}", c.tipc_port_name(), c.peer.port);
352
353 c.state = VsockConnectionState::TipcConnecting;
354
355 // We cannot use the address of the connection as the cookie as it may move.
356 // Use the heap address of the `handle_ref` instead as it will not get moved.
357 let cookie = c.href.as_mut_ptr() as *mut c_void;
358 c.href.set_cookie(cookie);
359 c.href.set_emask(!0);
360 c.href.set_id(c.peer.port);
361
362 self.handle_set.attach(&mut c.href).map_err(|e| {
363 c.href.handle_close();
364 Error::Lk(e)
365 })
366 }
367
vsock_tx_tipc_ready(&self, c: &mut VsockConnection)368 fn vsock_tx_tipc_ready(&self, c: &mut VsockConnection) {
369 if c.state != VsockConnectionState::TipcConnecting {
370 panic!("warning, got poll ready in unexpected state: {:?}", c.state);
371 }
372 info!("connected to {}, remote {:?}", c.tipc_port_name(), c.peer.port);
373 c.state = VsockConnectionState::Active;
374
375 let buffer = [0u8];
376 let res = self.connection_manager.lock().send(c.peer, c.local_port, &buffer);
377 if res.is_err() {
378 warn!("failed to send connected status message");
379 }
380 }
381
vsock_rx_channel( &self, c: &mut VsockConnection, length: usize, source: VsockAddr, destination: VsockAddr, ) -> Result<(), Error>382 fn vsock_rx_channel(
383 &self,
384 c: &mut VsockConnection,
385 length: usize,
386 source: VsockAddr,
387 destination: VsockAddr,
388 ) -> Result<(), Error> {
389 assert_eq!(c.state, VsockConnectionState::Active);
390
391 // multiple messages may be available when we call recv but we want to forward
392 // them on the tipc connection one by one. Pass a slice of the rx_buffer so
393 // we only drain the number of bytes that correspond to a single vsock event.
394 c.rx_pending = self
395 .connection_manager
396 .lock()
397 .deref_mut()
398 .recv(source, destination.port, &mut c.rx_buffer[..length])
399 .unwrap();
400
401 // TODO: handle large messages properly
402 assert_eq!(c.rx_pending, length);
403
404 c.rx_count += 1;
405 c.rx_since_tx += 1;
406
407 c.tipc_try_send()?;
408
409 self.connection_manager.lock().deref_mut().update_credit(c.peer, c.local_port).unwrap();
410
411 Ok(())
412 }
413
print_stats(&self)414 fn print_stats(&self) {
415 let guard = self.connections.lock();
416 let connections = guard.deref();
417 for connection in connections {
418 connection.print_stats();
419 }
420 }
421 }
422
423 // Safety: each field of a `VsockDevice` is safe to transfer across thread boundaries
424 // TODO: remove this once https://github.com/rcore-os/virtio-drivers/pull/146 lands
425 unsafe impl<H, T> Send for VsockDevice<H, T>
426 where
427 H: Hal,
428 T: Transport,
429 {
430 }
431
432 // Safety: each field of a `VsockDevice` is safe to share between threads
433 // TODO: remove this once https://github.com/rcore-os/virtio-drivers/pull/146 lands
434 unsafe impl<H, T> Sync for VsockDevice<H, T>
435 where
436 H: Hal,
437 T: Transport,
438 {
439 }
440
vsock_rx_loop<H, T>(device: Arc<VsockDevice<H, T>>) -> Result<(), Error> where H: Hal, T: Transport,441 pub(crate) fn vsock_rx_loop<H, T>(device: Arc<VsockDevice<H, T>>) -> Result<(), Error>
442 where
443 H: Hal,
444 T: Transport,
445 {
446 let local_port = 1;
447 let ten_ms = Duration::from_millis(10);
448 let mut pending: Vec<VsockEvent> = vec![];
449
450 debug!("starting vsock_rx_loop");
451 device.connection_manager.lock().deref_mut().listen(local_port);
452
453 loop {
454 // TODO: use interrupts instead of polling
455 // TODO: handle case where poll returns SocketError::OutputBufferTooShort
456 let event = pending
457 .pop()
458 .or_else(|| device.connection_manager.lock().deref_mut().poll().expect("poll failed"));
459
460 if event.is_none() {
461 sleep(ten_ms);
462 continue;
463 }
464
465 let VsockEvent { source, destination, event_type, buffer_status } = event.unwrap();
466
467 match event_type {
468 VsockEventType::ConnectionRequest => {
469 device.vsock_rx_op_request(source, destination);
470 }
471 VsockEventType::Connected => {
472 panic!("outbound connections not supported");
473 }
474 VsockEventType::Received { length } => {
475 debug!("recv destination: {destination:?}");
476
477 let connections = &mut *device.connections.lock();
478 let _ = vsock_connection_lookup(connections, source.port, |mut connection| {
479 if let Err(e) = match connection {
480 ref mut c @ VsockConnection {
481 state: VsockConnectionState::VsockOnly, ..
482 } => device.vsock_connect_tipc(c, length, source, destination),
483 ref mut c @ VsockConnection {
484 state: VsockConnectionState::Active, ..
485 } => device.vsock_rx_channel(c, length, source, destination),
486 VsockConnection {
487 state: VsockConnectionState::TipcSendBlocked, ..
488 } => {
489 // requeue pending event.
490 pending.push(VsockEvent {
491 source,
492 destination,
493 event_type,
494 buffer_status,
495 });
496 // TODO: on one hand, we want to wait for the tipc connection to unblock
497 // on the other, we want to pick up incoming events as soon as we can...
498 // NOTE: Adding support for interrupts means we no longer have to sleep.
499 sleep(ten_ms);
500 Ok(())
501 }
502 VsockConnection { state: VsockConnectionState::TipcConnecting, .. } => {
503 warn!("got data while still waiting for tipc connection");
504 Err(LkError::ERR_BAD_STATE.into())
505 }
506 VsockConnection { state: s, .. } => {
507 error!("got data for connection in state {s:?}");
508 Err(LkError::ERR_BAD_STATE.into())
509 }
510 } {
511 error!("failed to receive data from vsock connection: {e:?}");
512 // TODO: add reset function to device or connection?
513 let _ = device
514 .connection_manager
515 .lock()
516 .deref_mut()
517 .force_close(connection.peer, connection.local_port);
518
519 return ConnectionStateAction::Remove;
520 }
521 ConnectionStateAction::None
522 })
523 .inspect_err(|_| {
524 warn!("got packet for unknown connection");
525 });
526 }
527 VsockEventType::Disconnected { reason } => {
528 debug!("disconnected from peer. reason: {reason:?}");
529 let connections = &mut *device.connections.lock();
530 let _ = vsock_connection_lookup(connections, source.port, |_connection| {
531 ConnectionStateAction::Remove
532 })
533 .inspect_err(|_| {
534 warn!("got disconnect ({reason:?}) for unknown connection");
535 });
536 }
537 VsockEventType::CreditUpdate => { /* nothing to do */ }
538 VsockEventType::CreditRequest => {
539 // Polling the VsockConnectionManager won't return this event type
540 panic!("don't know how to handle credit requests");
541 }
542 }
543 }
544 }
545
vsock_tx_loop<H, T>(device: Arc<VsockDevice<H, T>>) -> Result<(), Error> where H: Hal, T: Transport,546 pub(crate) fn vsock_tx_loop<H, T>(device: Arc<VsockDevice<H, T>>) -> Result<(), Error>
547 where
548 H: Hal,
549 T: Transport,
550 {
551 let mut timeout = Duration::MAX;
552 let ten_secs = Duration::from_secs(10);
553 let mut tx_buffer = vec![0u8; PAGE_SIZE].into_boxed_slice();
554 loop {
555 let mut href = HandleRef::default();
556 let mut ret = device.handle_set.handle_set_wait(&mut href, timeout);
557 if ret == Err(LkError::ERR_NOT_FOUND) {
558 // handle_set_wait returns ERR_NOT_FOUND if the handle_set is empty
559 // but we can wait for it to become non-empty using handle_wait.
560 // Once that that returns we have to call handle_set_wait again to
561 // get the event we care about.
562 ret = device.handle_set.handle_wait(&mut href.emask(), timeout);
563 if ret != Err(LkError::ERR_TIMED_OUT) {
564 info!("handle_wait on handle set returned: {ret:?}");
565 continue;
566 }
567 // fall through to ret == ERR_TIMED_OUT case, then continue
568 }
569 if ret == Err(LkError::ERR_TIMED_OUT) {
570 info!("tx inactive for {timeout:?} ms");
571 timeout = Duration::MAX;
572 device.print_stats();
573 continue;
574 }
575 if ret.is_err() {
576 warn!("handle_set_wait failed: {}", ret.unwrap_err());
577 thread::sleep(ten_secs);
578 continue;
579 }
580
581 let _ = vsock_connection_lookup(&mut device.connections.lock(), href.id(), |c| {
582 if !eq(c.href.as_mut_ptr() as *mut c_void, href.cookie()) {
583 panic!(
584 "unexpected cookie {:?} != {:?} for connection {}",
585 href.cookie(),
586 c.href.as_mut_ptr(),
587 c.tipc_port_name()
588 );
589 }
590
591 if href.emask() & IPC_HANDLE_POLL_READY != 0 {
592 device.vsock_tx_tipc_ready(c);
593 }
594 if href.emask() & IPC_HANDLE_POLL_MSG != 0 {
595 // Print stats if we don't send any more packets for a while
596 timeout = ACTIVE_TIMEOUT;
597 // TODO: loop and read all messages?
598 let mut msg_info = ipc_msg_info::default();
599
600 // TODO: add more idiomatic Rust interface
601 // Safety:
602 // `c.href.handle` is a valid handle to a tipc channel.
603 // `ipc_get_msg` can store a message descriptor in `msg_info`.
604 let ret = unsafe { ipc_get_msg(c.href.handle(), &mut msg_info) };
605 if ret == rust_support::Error::NO_ERROR.into() {
606 let mut iov: iovec_kern = tx_buffer.as_mut().into();
607 let mut msg = ipc_msg_kern::new(&mut iov);
608
609 // Safety:
610 // `c.href.handle` is a valid handle to a tipc channel.
611 // `msg_info` holds the results of a successful call to `ipc_get_msg`
612 // using the same handle.
613 let ret = unsafe { ipc_read_msg(c.href.handle(), msg_info.id, 0, &mut msg) };
614
615 // Safety:
616 // `ipc_put_msg` was called with the same handle and msg_info arguments.
617 unsafe { ipc_put_msg(c.href.handle(), msg_info.id) };
618 if ret >= 0 && ret as usize == msg_info.len {
619 c.tx_count += 1;
620 c.tx_since_rx += 1;
621 c.rx_since_tx = 0;
622 match device.connection_manager.lock().send(
623 c.peer,
624 c.local_port,
625 &tx_buffer[..msg_info.len],
626 ) {
627 Err(err) => {
628 if err == VirtioError::SocketDeviceError(SocketError::NotConnected)
629 {
630 debug!(
631 "failed to send {} bytes from {}. Connection closed",
632 msg_info.len,
633 c.tipc_port_name()
634 );
635 } else {
636 // TODO: close connection instead
637 panic!(
638 "failed to send {} bytes from {}: {:?}",
639 msg_info.len,
640 c.tipc_port_name(),
641 err
642 );
643 }
644 }
645 Ok(_) => {
646 debug!("sent {} bytes from {}", msg_info.len, c.tipc_port_name());
647 }
648 }
649 } else {
650 error!("ipc_read_msg failed: {ret}");
651 }
652 }
653 }
654 if href.emask() & IPC_HANDLE_POLL_SEND_UNBLOCKED != 0 {
655 assert_eq!(c.state, VsockConnectionState::TipcSendBlocked);
656 assert_ne!(c.rx_pending, 0);
657
658 debug!("tipc connection unblocked {}", c.tipc_port_name());
659
660 if let Err(e) = c.tipc_try_send() {
661 error!("failed to send pending message to {}: {e:?}", c.tipc_port_name());
662 }
663 }
664 if href.emask() & IPC_HANDLE_POLL_HUP != 0 {
665 // Print stats if we don't send any more packets for a while
666 timeout = ACTIVE_TIMEOUT;
667 info!("got hup");
668 debug!(
669 "shut down connection {}, {:?}, {:?}",
670 c.tipc_port_name(),
671 c.peer,
672 c.local_port
673 );
674 let res = device.connection_manager.lock().shutdown(c.peer, c.local_port);
675 if res.is_ok() {
676 return ConnectionStateAction::Close;
677 } else {
678 warn!(
679 "failed to send shutdown command, connection removed? {}",
680 res.unwrap_err()
681 );
682 }
683 }
684 ConnectionStateAction::None
685 })
686 .inspect_err(|_| {
687 warn!("got event for non-existent remote {}, was it closed?", href.id());
688 });
689 href.handle_decref();
690 }
691 }
692