1 /*
2  * Copyright (c) 2024 Google Inc. All rights reserved
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining
5  * a copy of this software and associated documentation files
6  * (the "Software"), to deal in the Software without restriction,
7  * including without limitation the rights to use, copy, modify, merge,
8  * publish, distribute, sublicense, and/or sell copies of the Software,
9  * and to permit persons to whom the Software is furnished to do so,
10  * subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be
13  * included in all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
18  * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
19  * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
20  * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
21  * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22  */
23 
24 #![deny(unsafe_op_in_unsafe_fn)]
25 use core::ffi::c_void;
26 use core::ops::Deref;
27 use core::ops::DerefMut;
28 use core::ptr::eq;
29 use core::ptr::null_mut;
30 use core::time::Duration;
31 
32 use alloc::boxed::Box;
33 use alloc::ffi::CString;
34 use alloc::sync::Arc;
35 use alloc::vec;
36 use alloc::vec::Vec;
37 
38 use log::debug;
39 use log::error;
40 use log::info;
41 use log::warn;
42 
43 use rust_support::handle::IPC_HANDLE_POLL_HUP;
44 use rust_support::handle::IPC_HANDLE_POLL_MSG;
45 use rust_support::handle::IPC_HANDLE_POLL_READY;
46 use rust_support::handle::IPC_HANDLE_POLL_SEND_UNBLOCKED;
47 use rust_support::ipc::iovec_kern;
48 use rust_support::ipc::ipc_get_msg;
49 use rust_support::ipc::ipc_msg_info;
50 use rust_support::ipc::ipc_msg_kern;
51 use rust_support::ipc::ipc_port_connect_async;
52 use rust_support::ipc::ipc_put_msg;
53 use rust_support::ipc::ipc_read_msg;
54 use rust_support::ipc::ipc_send_msg;
55 use rust_support::ipc::zero_uuid;
56 use rust_support::ipc::IPC_CONNECT_WAIT_FOR_PORT;
57 use rust_support::ipc::IPC_PORT_PATH_MAX;
58 use rust_support::sync::Mutex;
59 use rust_support::thread;
60 use rust_support::thread::sleep;
61 use virtio_drivers::device::socket::SocketError;
62 use virtio_drivers::device::socket::VsockAddr;
63 use virtio_drivers::device::socket::VsockConnectionManager;
64 use virtio_drivers::device::socket::VsockEvent;
65 use virtio_drivers::device::socket::VsockEventType;
66 use virtio_drivers::transport::Transport;
67 use virtio_drivers::Error as VirtioError;
68 use virtio_drivers::Hal;
69 use virtio_drivers::PAGE_SIZE;
70 
71 use rust_support::handle::HandleRef;
72 use rust_support::handle_set::HandleSet;
73 
74 use rust_support::Error as LkError;
75 
76 use crate::err::Error;
77 
78 const ACTIVE_TIMEOUT: Duration = Duration::from_secs(5);
79 
80 #[allow(dead_code)]
81 #[derive(Clone, Copy, Debug, Default, PartialEq)]
82 enum VsockConnectionState {
83     #[default]
84     Invalid = 0,
85     VsockOnly,
86     TipcOnly,
87     TipcConnecting,
88     TipcSendBlocked,
89     Active,
90     TipcClosed,
91     Closed,
92 }
93 
94 #[derive(Default)]
95 struct VsockConnection {
96     peer: VsockAddr,
97     local_port: u32,
98     state: VsockConnectionState,
99     tipc_port_name: Option<CString>,
100     href: HandleRef,
101     tx_count: u64,
102     tx_since_rx: u64,
103     rx_count: u64,
104     rx_since_tx: u64,
105     rx_buffer: Box<[u8]>, // buffers data if the tipc connection blocks
106     rx_pending: usize,    // how many bytes to send when tipc unblocks
107 }
108 
109 impl VsockConnection {
new(peer: VsockAddr, local_port: u32) -> Self110     fn new(peer: VsockAddr, local_port: u32) -> Self {
111         // Make rx_buffer twice as large as the vsock connection rx buffer such
112         // that we can buffer pending messages if TIPC blocks.
113         //
114         // TODO: the ideal rx_buffer size depends on the connection so it might
115         // be worthwhile to dynamically re-size the buffer in response to tipc
116         // blocking or unblocking.
117         let rx_buffer_len = 2 * PAGE_SIZE;
118         Self {
119             peer,
120             local_port,
121             state: VsockConnectionState::VsockOnly,
122             tipc_port_name: None,
123             rx_buffer: vec![0u8; rx_buffer_len].into_boxed_slice(),
124             ..Default::default()
125         }
126     }
127 
tipc_port_name(&self) -> &str128     fn tipc_port_name(&self) -> &str {
129         self.tipc_port_name
130             .as_ref()
131             .map(|s| s.to_str().expect("invalid port name"))
132             .unwrap_or("(no port name)")
133     }
134 
print_stats(&self)135     fn print_stats(&self) {
136         info!(
137             "vsock: tx {:?} ({:>5?}) rx {:?} ({:>5?}) port: {}, remote {}, state {:?}",
138             self.tx_since_rx,
139             self.tx_count,
140             self.rx_since_tx,
141             self.rx_count,
142             self.tipc_port_name(),
143             self.peer.port,
144             self.state
145         );
146     }
147 
tipc_try_send(&mut self) -> Result<(), Error>148     fn tipc_try_send(&mut self) -> Result<(), Error> {
149         debug_assert!(self.rx_pending > 0 && self.rx_pending < PAGE_SIZE);
150         debug_assert!(
151             self.state == VsockConnectionState::Active
152                 || self.state == VsockConnectionState::TipcSendBlocked
153         );
154 
155         let length = self.rx_pending;
156         let mut iov = iovec_kern { iov_base: self.rx_buffer.as_mut_ptr() as _, iov_len: length };
157         let mut msg = ipc_msg_kern::new(&mut iov);
158 
159         // Safety:
160         // `c.href.handle` is a handle attached to a tipc channel.
161         // `msg` contains an `iov` which points to a buffer from which
162         // the kernel can read `iov_len` bytes.
163         let ret = unsafe { ipc_send_msg(self.href.handle(), &mut msg) };
164         if ret == LkError::ERR_NOT_ENOUGH_BUFFER.into() {
165             self.state = VsockConnectionState::TipcSendBlocked;
166             return Ok(());
167         } else if ret < 0 {
168             error!("failed to send {length} bytes to {}: {ret} ", self.tipc_port_name());
169             LkError::from_lk(ret)?;
170         } else if ret as usize != length {
171             // TODO: in streaming mode, this should not be an error. Instead, consume
172             // the data that was sent and try sending the rest in the next message.
173             error!("sent {ret} bytes but expected to send {length} bytes");
174             return Err(LkError::ERR_BAD_LEN.into());
175         }
176 
177         self.state = VsockConnectionState::Active;
178         self.tx_since_rx = 0;
179         self.rx_pending = 0;
180 
181         debug!("sent {length} bytes to {}", self.tipc_port_name());
182 
183         Ok(())
184     }
185 }
186 
187 /// The action to take after running the `f` closure in [`vsock_connection_lookup`].
188 #[derive(PartialEq, Eq)]
189 enum ConnectionStateAction {
190     /// No action needs to be taken, so the connection stays open.
191     None,
192 
193     /// TIPC has requested that the connection be closed.
194     /// This closes the connection and waits for the peer to acknowledge before removing it.
195     Close,
196 
197     /// We want to close the connection and remove it
198     /// without waiting for the peer to acknowledge it,
199     /// such as when there is an error (but also potentially other reasons).
200     Remove,
201 }
202 
vsock_connection_lookup( connections: &mut Vec<VsockConnection>, remote_port: u32, f: impl FnOnce(&mut VsockConnection) -> ConnectionStateAction, ) -> Result<(), ()>203 fn vsock_connection_lookup(
204     connections: &mut Vec<VsockConnection>,
205     remote_port: u32,
206     f: impl FnOnce(&mut VsockConnection) -> ConnectionStateAction,
207 ) -> Result<(), ()> {
208     let (index, connection) = connections
209         .iter_mut()
210         .enumerate()
211         .find(|(_idx, connection)| connection.peer.port == remote_port)
212         .ok_or(())?;
213     let action = f(connection);
214     if action == ConnectionStateAction::None {
215         return Ok(());
216     }
217 
218     if vsock_connection_close(connection, action) {
219         connections.swap_remove(index);
220     }
221 
222     Ok(())
223 }
224 
vsock_connection_close(c: &mut VsockConnection, action: ConnectionStateAction) -> bool225 fn vsock_connection_close(c: &mut VsockConnection, action: ConnectionStateAction) -> bool {
226     info!(
227         "remote_port {}, tipc_port_name {}, state {:?}",
228         c.peer.port,
229         c.tipc_port_name(),
230         c.state
231     );
232 
233     if c.state == VsockConnectionState::VsockOnly {
234         info!("tipc vsock only connection closed");
235         c.state = VsockConnectionState::TipcClosed;
236     }
237 
238     if c.state == VsockConnectionState::Active
239         || c.state == VsockConnectionState::TipcConnecting
240         || c.state == VsockConnectionState::TipcSendBlocked
241     {
242         // The handle set owns the only reference we have to the handle and
243         // handle_set_wait might have already returned a pointer to c
244         c.href.detach();
245         c.href.handle_close();
246         c.href.set_cookie(null_mut());
247         info!("tipc handle closed");
248         c.state = VsockConnectionState::TipcClosed;
249     }
250     if action == ConnectionStateAction::Remove && c.state == VsockConnectionState::TipcClosed {
251         info!("vsock closed");
252         c.state = VsockConnectionState::Closed;
253     }
254     if c.state == VsockConnectionState::Closed && c.href.cookie().is_null() {
255         info!("remove connection");
256         c.print_stats();
257         return true; // remove connection
258     }
259     false // keep connection
260 }
261 
262 pub struct VsockDevice<H, T>
263 where
264     H: Hal,
265     T: Transport,
266 {
267     connections: Mutex<Vec<VsockConnection>>,
268     handle_set: HandleSet,
269     connection_manager: Mutex<VsockConnectionManager<H, T, 4096>>,
270 }
271 
272 impl<H, T> VsockDevice<H, T>
273 where
274     H: Hal,
275     T: Transport,
276 {
new(manager: VsockConnectionManager<H, T, 4096>) -> Self277     pub(crate) fn new(manager: VsockConnectionManager<H, T, 4096>) -> Self {
278         Self {
279             connections: Mutex::new(Vec::new()),
280             handle_set: HandleSet::new(),
281             connection_manager: Mutex::new(manager),
282         }
283     }
284 
vsock_rx_op_request(&self, peer: VsockAddr, local: VsockAddr)285     fn vsock_rx_op_request(&self, peer: VsockAddr, local: VsockAddr) {
286         debug!("dst_port {}, src_port {}", local.port, peer.port);
287 
288         // do we already have a connection?
289         let mut guard = self.connections.lock();
290         if guard
291             .deref()
292             .iter()
293             .any(|connection| connection.peer == peer && connection.local_port == local.port)
294         {
295             panic!("connection already exists");
296         };
297 
298         guard.deref_mut().push(VsockConnection::new(peer, local.port));
299     }
300 
vsock_connect_tipc( &self, c: &mut VsockConnection, length: usize, source: VsockAddr, destination: VsockAddr, ) -> Result<(), Error>301     fn vsock_connect_tipc(
302         &self,
303         c: &mut VsockConnection,
304         length: usize,
305         source: VsockAddr,
306         destination: VsockAddr,
307     ) -> Result<(), Error> {
308         let mut buffer = [0; IPC_PORT_PATH_MAX as usize];
309         assert!(length < buffer.len());
310         let mut data_len = self
311             .connection_manager
312             .lock()
313             .deref_mut()
314             .recv(source, destination.port, &mut buffer)
315             .unwrap();
316         assert!(data_len == length);
317         // allow manual connect from nc in line mode
318         if buffer[data_len - 1] == b'\n' as _ {
319             data_len -= 1;
320         }
321         let port_name = &buffer[0..data_len];
322         // should not contain any null bytes
323         c.tipc_port_name = CString::new(port_name).ok();
324 
325         // Safety:
326         // - `cid`` is a valid uuid because we use a bindgen'd constant
327         // - `path` points to a null-terminated C-string. The null byte was appended by
328         //   `CString::new`.
329         // - `max_path` is the length of `path` in bytes including the null terminator.
330         //   It is always less than or equal to IPC_PORT_PATH_MAX.
331         // - `flags` contains a flag value accepted by the callee
332         // - `chandle_ptr` points to memory that the kernel can store a pointer into
333         //   after the callee returns.
334         let ret = unsafe {
335             ipc_port_connect_async(
336                 &zero_uuid,
337                 c.tipc_port_name.as_ref().unwrap().as_ptr(),
338                 data_len + /* null byte added by CString::new */ 1,
339                 IPC_CONNECT_WAIT_FOR_PORT,
340                 &mut (*c.href.as_mut_ptr()).handle,
341             )
342         };
343         if ret != 0 {
344             warn!(
345                 "failed to connect to {}, remote {}, connect err {ret}",
346                 c.tipc_port_name(),
347                 c.peer.port
348             )
349         }
350 
351         debug!("wait for connection to {}, remote {}", c.tipc_port_name(), c.peer.port);
352 
353         c.state = VsockConnectionState::TipcConnecting;
354 
355         // We cannot use the address of the connection as the cookie as it may move.
356         // Use the heap address of the `handle_ref` instead as it will not get moved.
357         let cookie = c.href.as_mut_ptr() as *mut c_void;
358         c.href.set_cookie(cookie);
359         c.href.set_emask(!0);
360         c.href.set_id(c.peer.port);
361 
362         self.handle_set.attach(&mut c.href).map_err(|e| {
363             c.href.handle_close();
364             Error::Lk(e)
365         })
366     }
367 
vsock_tx_tipc_ready(&self, c: &mut VsockConnection)368     fn vsock_tx_tipc_ready(&self, c: &mut VsockConnection) {
369         if c.state != VsockConnectionState::TipcConnecting {
370             panic!("warning, got poll ready in unexpected state: {:?}", c.state);
371         }
372         info!("connected to {}, remote {:?}", c.tipc_port_name(), c.peer.port);
373         c.state = VsockConnectionState::Active;
374 
375         let buffer = [0u8];
376         let res = self.connection_manager.lock().send(c.peer, c.local_port, &buffer);
377         if res.is_err() {
378             warn!("failed to send connected status message");
379         }
380     }
381 
vsock_rx_channel( &self, c: &mut VsockConnection, length: usize, source: VsockAddr, destination: VsockAddr, ) -> Result<(), Error>382     fn vsock_rx_channel(
383         &self,
384         c: &mut VsockConnection,
385         length: usize,
386         source: VsockAddr,
387         destination: VsockAddr,
388     ) -> Result<(), Error> {
389         assert_eq!(c.state, VsockConnectionState::Active);
390 
391         // multiple messages may be available when we call recv but we want to forward
392         // them on the tipc connection one by one. Pass a slice of the rx_buffer so
393         // we only drain the number of bytes that correspond to a single vsock event.
394         c.rx_pending = self
395             .connection_manager
396             .lock()
397             .deref_mut()
398             .recv(source, destination.port, &mut c.rx_buffer[..length])
399             .unwrap();
400 
401         // TODO: handle large messages properly
402         assert_eq!(c.rx_pending, length);
403 
404         c.rx_count += 1;
405         c.rx_since_tx += 1;
406 
407         c.tipc_try_send()?;
408 
409         self.connection_manager.lock().deref_mut().update_credit(c.peer, c.local_port).unwrap();
410 
411         Ok(())
412     }
413 
print_stats(&self)414     fn print_stats(&self) {
415         let guard = self.connections.lock();
416         let connections = guard.deref();
417         for connection in connections {
418             connection.print_stats();
419         }
420     }
421 }
422 
423 // Safety: each field of a `VsockDevice` is safe to transfer across thread boundaries
424 // TODO: remove this once https://github.com/rcore-os/virtio-drivers/pull/146 lands
425 unsafe impl<H, T> Send for VsockDevice<H, T>
426 where
427     H: Hal,
428     T: Transport,
429 {
430 }
431 
432 // Safety: each field of a `VsockDevice` is safe to share between threads
433 // TODO: remove this once https://github.com/rcore-os/virtio-drivers/pull/146 lands
434 unsafe impl<H, T> Sync for VsockDevice<H, T>
435 where
436     H: Hal,
437     T: Transport,
438 {
439 }
440 
vsock_rx_loop<H, T>(device: Arc<VsockDevice<H, T>>) -> Result<(), Error> where H: Hal, T: Transport,441 pub(crate) fn vsock_rx_loop<H, T>(device: Arc<VsockDevice<H, T>>) -> Result<(), Error>
442 where
443     H: Hal,
444     T: Transport,
445 {
446     let local_port = 1;
447     let ten_ms = Duration::from_millis(10);
448     let mut pending: Vec<VsockEvent> = vec![];
449 
450     debug!("starting vsock_rx_loop");
451     device.connection_manager.lock().deref_mut().listen(local_port);
452 
453     loop {
454         // TODO: use interrupts instead of polling
455         // TODO: handle case where poll returns SocketError::OutputBufferTooShort
456         let event = pending
457             .pop()
458             .or_else(|| device.connection_manager.lock().deref_mut().poll().expect("poll failed"));
459 
460         if event.is_none() {
461             sleep(ten_ms);
462             continue;
463         }
464 
465         let VsockEvent { source, destination, event_type, buffer_status } = event.unwrap();
466 
467         match event_type {
468             VsockEventType::ConnectionRequest => {
469                 device.vsock_rx_op_request(source, destination);
470             }
471             VsockEventType::Connected => {
472                 panic!("outbound connections not supported");
473             }
474             VsockEventType::Received { length } => {
475                 debug!("recv destination: {destination:?}");
476 
477                 let connections = &mut *device.connections.lock();
478                 let _ = vsock_connection_lookup(connections, source.port, |mut connection| {
479                     if let Err(e) = match connection {
480                         ref mut c @ VsockConnection {
481                             state: VsockConnectionState::VsockOnly, ..
482                         } => device.vsock_connect_tipc(c, length, source, destination),
483                         ref mut c @ VsockConnection {
484                             state: VsockConnectionState::Active, ..
485                         } => device.vsock_rx_channel(c, length, source, destination),
486                         VsockConnection {
487                             state: VsockConnectionState::TipcSendBlocked, ..
488                         } => {
489                             // requeue pending event.
490                             pending.push(VsockEvent {
491                                 source,
492                                 destination,
493                                 event_type,
494                                 buffer_status,
495                             });
496                             // TODO: on one hand, we want to wait for the tipc connection to unblock
497                             // on the other, we want to pick up incoming events as soon as we can...
498                             // NOTE: Adding support for interrupts means we no longer have to sleep.
499                             sleep(ten_ms);
500                             Ok(())
501                         }
502                         VsockConnection { state: VsockConnectionState::TipcConnecting, .. } => {
503                             warn!("got data while still waiting for tipc connection");
504                             Err(LkError::ERR_BAD_STATE.into())
505                         }
506                         VsockConnection { state: s, .. } => {
507                             error!("got data for connection in state {s:?}");
508                             Err(LkError::ERR_BAD_STATE.into())
509                         }
510                     } {
511                         error!("failed to receive data from vsock connection:  {e:?}");
512                         // TODO: add reset function to device or connection?
513                         let _ = device
514                             .connection_manager
515                             .lock()
516                             .deref_mut()
517                             .force_close(connection.peer, connection.local_port);
518 
519                         return ConnectionStateAction::Remove;
520                     }
521                     ConnectionStateAction::None
522                 })
523                 .inspect_err(|_| {
524                     warn!("got packet for unknown connection");
525                 });
526             }
527             VsockEventType::Disconnected { reason } => {
528                 debug!("disconnected from peer. reason: {reason:?}");
529                 let connections = &mut *device.connections.lock();
530                 let _ = vsock_connection_lookup(connections, source.port, |_connection| {
531                     ConnectionStateAction::Remove
532                 })
533                 .inspect_err(|_| {
534                     warn!("got disconnect ({reason:?}) for unknown connection");
535                 });
536             }
537             VsockEventType::CreditUpdate => { /* nothing to do */ }
538             VsockEventType::CreditRequest => {
539                 // Polling the VsockConnectionManager won't return this event type
540                 panic!("don't know how to handle credit requests");
541             }
542         }
543     }
544 }
545 
vsock_tx_loop<H, T>(device: Arc<VsockDevice<H, T>>) -> Result<(), Error> where H: Hal, T: Transport,546 pub(crate) fn vsock_tx_loop<H, T>(device: Arc<VsockDevice<H, T>>) -> Result<(), Error>
547 where
548     H: Hal,
549     T: Transport,
550 {
551     let mut timeout = Duration::MAX;
552     let ten_secs = Duration::from_secs(10);
553     let mut tx_buffer = vec![0u8; PAGE_SIZE].into_boxed_slice();
554     loop {
555         let mut href = HandleRef::default();
556         let mut ret = device.handle_set.handle_set_wait(&mut href, timeout);
557         if ret == Err(LkError::ERR_NOT_FOUND) {
558             // handle_set_wait returns ERR_NOT_FOUND if the handle_set is empty
559             // but we can wait for it to become non-empty using handle_wait.
560             // Once that that returns we have to call handle_set_wait again to
561             // get the event we care about.
562             ret = device.handle_set.handle_wait(&mut href.emask(), timeout);
563             if ret != Err(LkError::ERR_TIMED_OUT) {
564                 info!("handle_wait on handle set returned: {ret:?}");
565                 continue;
566             }
567             // fall through to ret == ERR_TIMED_OUT case, then continue
568         }
569         if ret == Err(LkError::ERR_TIMED_OUT) {
570             info!("tx inactive for {timeout:?} ms");
571             timeout = Duration::MAX;
572             device.print_stats();
573             continue;
574         }
575         if ret.is_err() {
576             warn!("handle_set_wait failed: {}", ret.unwrap_err());
577             thread::sleep(ten_secs);
578             continue;
579         }
580 
581         let _ = vsock_connection_lookup(&mut device.connections.lock(), href.id(), |c| {
582             if !eq(c.href.as_mut_ptr() as *mut c_void, href.cookie()) {
583                 panic!(
584                     "unexpected cookie {:?} != {:?} for connection {}",
585                     href.cookie(),
586                     c.href.as_mut_ptr(),
587                     c.tipc_port_name()
588                 );
589             }
590 
591             if href.emask() & IPC_HANDLE_POLL_READY != 0 {
592                 device.vsock_tx_tipc_ready(c);
593             }
594             if href.emask() & IPC_HANDLE_POLL_MSG != 0 {
595                 // Print stats if we don't send any more packets for a while
596                 timeout = ACTIVE_TIMEOUT;
597                 // TODO: loop and read all messages?
598                 let mut msg_info = ipc_msg_info::default();
599 
600                 // TODO: add more idiomatic Rust interface
601                 // Safety:
602                 // `c.href.handle` is a valid handle to a tipc channel.
603                 // `ipc_get_msg` can store a message descriptor in `msg_info`.
604                 let ret = unsafe { ipc_get_msg(c.href.handle(), &mut msg_info) };
605                 if ret == rust_support::Error::NO_ERROR.into() {
606                     let mut iov: iovec_kern = tx_buffer.as_mut().into();
607                     let mut msg = ipc_msg_kern::new(&mut iov);
608 
609                     // Safety:
610                     // `c.href.handle` is a valid handle to a tipc channel.
611                     // `msg_info` holds the results of a successful call to `ipc_get_msg`
612                     // using the same handle.
613                     let ret = unsafe { ipc_read_msg(c.href.handle(), msg_info.id, 0, &mut msg) };
614 
615                     // Safety:
616                     // `ipc_put_msg` was called with the same handle and msg_info arguments.
617                     unsafe { ipc_put_msg(c.href.handle(), msg_info.id) };
618                     if ret >= 0 && ret as usize == msg_info.len {
619                         c.tx_count += 1;
620                         c.tx_since_rx += 1;
621                         c.rx_since_tx = 0;
622                         match device.connection_manager.lock().send(
623                             c.peer,
624                             c.local_port,
625                             &tx_buffer[..msg_info.len],
626                         ) {
627                             Err(err) => {
628                                 if err == VirtioError::SocketDeviceError(SocketError::NotConnected)
629                                 {
630                                     debug!(
631                                         "failed to send {} bytes from {}. Connection closed",
632                                         msg_info.len,
633                                         c.tipc_port_name()
634                                     );
635                                 } else {
636                                     // TODO: close connection instead
637                                     panic!(
638                                         "failed to send {} bytes from {}: {:?}",
639                                         msg_info.len,
640                                         c.tipc_port_name(),
641                                         err
642                                     );
643                                 }
644                             }
645                             Ok(_) => {
646                                 debug!("sent {} bytes from {}", msg_info.len, c.tipc_port_name());
647                             }
648                         }
649                     } else {
650                         error!("ipc_read_msg failed: {ret}");
651                     }
652                 }
653             }
654             if href.emask() & IPC_HANDLE_POLL_SEND_UNBLOCKED != 0 {
655                 assert_eq!(c.state, VsockConnectionState::TipcSendBlocked);
656                 assert_ne!(c.rx_pending, 0);
657 
658                 debug!("tipc connection unblocked {}", c.tipc_port_name());
659 
660                 if let Err(e) = c.tipc_try_send() {
661                     error!("failed to send pending message to {}: {e:?}", c.tipc_port_name());
662                 }
663             }
664             if href.emask() & IPC_HANDLE_POLL_HUP != 0 {
665                 // Print stats if we don't send any more packets for a while
666                 timeout = ACTIVE_TIMEOUT;
667                 info!("got hup");
668                 debug!(
669                     "shut down connection {}, {:?}, {:?}",
670                     c.tipc_port_name(),
671                     c.peer,
672                     c.local_port
673                 );
674                 let res = device.connection_manager.lock().shutdown(c.peer, c.local_port);
675                 if res.is_ok() {
676                     return ConnectionStateAction::Close;
677                 } else {
678                     warn!(
679                         "failed to send shutdown command, connection removed? {}",
680                         res.unwrap_err()
681                     );
682                 }
683             }
684             ConnectionStateAction::None
685         })
686         .inspect_err(|_| {
687             warn!("got event for non-existent remote {}, was it closed?", href.id());
688         });
689         href.handle_decref();
690     }
691 }
692