1 // Copyright 2024 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 //! Launcher of forwarder_guest
16 
17 use anyhow::{anyhow, Context};
18 use clap::Parser;
19 use csv_async::AsyncReader;
20 use debian_service::debian_service_client::DebianServiceClient;
21 use debian_service::{QueueOpeningRequest, ReportVmActivePortsRequest};
22 use futures::stream::StreamExt;
23 use log::{debug, error};
24 use serde::Deserialize;
25 use std::collections::HashSet;
26 use std::process::Stdio;
27 use tokio::io::BufReader;
28 use tokio::process::Command;
29 use tokio::try_join;
30 use tonic::transport::{Channel, Endpoint};
31 use tonic::Request;
32 
33 mod debian_service {
34     tonic::include_proto!("com.android.virtualization.terminal.proto");
35 }
36 
37 const NON_PREVILEGED_PORT_RANGE_START: i32 = 1024;
38 const TTYD_PORT: i32 = 7681;
39 const TCPSTATES_IP_4: i8 = 4;
40 const TCPSTATES_STATE_CLOSE: &str = "CLOSE";
41 const TCPSTATES_STATE_LISTEN: &str = "LISTEN";
42 
43 #[derive(Debug, Deserialize)]
44 #[serde(rename_all = "UPPERCASE")]
45 struct TcpStateRow {
46     ip: i8,
47     lport: i32,
48     rport: i32,
49     newstate: String,
50 }
51 
52 #[derive(Parser)]
53 /// Flags for running command
54 pub struct Args {
55     /// Host IP address
56     #[arg(long)]
57     #[arg(alias = "host")]
58     host_addr: String,
59     /// grpc port number
60     #[arg(long)]
61     #[arg(alias = "grpc_port")]
62     grpc_port: String,
63 }
64 
process_forwarding_request_queue( mut client: DebianServiceClient<Channel>, ) -> Result<(), Box<dyn std::error::Error>>65 async fn process_forwarding_request_queue(
66     mut client: DebianServiceClient<Channel>,
67 ) -> Result<(), Box<dyn std::error::Error>> {
68     let cid = vsock::get_local_cid().context("Failed to get CID of VM")?;
69     let mut res_stream = client
70         .open_forwarding_request_queue(Request::new(QueueOpeningRequest { cid: cid as i32 }))
71         .await?
72         .into_inner();
73 
74     while let Some(response) = res_stream.message().await? {
75         let tcp_port = i16::try_from(response.guest_tcp_port)
76             .context("Failed to convert guest_tcp_port as i16")?;
77         let vsock_port = response.vsock_port as u32;
78 
79         debug!(
80             "executing forwarder_guest with guest_tcp_port: {:?}, vsock_port: {:?}",
81             &tcp_port, &vsock_port
82         );
83 
84         let _ = Command::new("forwarder_guest")
85             .arg("--local")
86             .arg(format!("127.0.0.1:{}", tcp_port))
87             .arg("--remote")
88             .arg(format!("vsock:2:{}", vsock_port))
89             .spawn();
90     }
91     Err(anyhow!("process_forwarding_request_queue is terminated").into())
92 }
93 
send_active_ports_report( listening_ports: HashSet<i32>, client: &mut DebianServiceClient<Channel>, ) -> Result<(), Box<dyn std::error::Error>>94 async fn send_active_ports_report(
95     listening_ports: HashSet<i32>,
96     client: &mut DebianServiceClient<Channel>,
97 ) -> Result<(), Box<dyn std::error::Error>> {
98     let res = client
99         .report_vm_active_ports(Request::new(ReportVmActivePortsRequest {
100             ports: listening_ports.into_iter().collect(),
101         }))
102         .await?
103         .into_inner();
104     if res.success {
105         debug!("Successfully reported active ports to the host");
106     } else {
107         error!("Failure response received from the host for reporting active ports");
108     }
109     Ok(())
110 }
111 
is_forwardable_port(port: i32) -> bool112 fn is_forwardable_port(port: i32) -> bool {
113     port >= NON_PREVILEGED_PORT_RANGE_START && port != TTYD_PORT
114 }
115 
report_active_ports( mut client: DebianServiceClient<Channel>, ) -> Result<(), Box<dyn std::error::Error>>116 async fn report_active_ports(
117     mut client: DebianServiceClient<Channel>,
118 ) -> Result<(), Box<dyn std::error::Error>> {
119     // TODO: we can remove python3 -u when https://github.com/iovisor/bcc/pull/5142 is deployed
120     let mut cmd = Command::new("python3")
121         .arg("-u")
122         .arg("/usr/sbin/tcpstates-bpfcc")
123         .arg("-s")
124         .stdout(Stdio::piped())
125         .spawn()?;
126     let stdout = cmd.stdout.take().context("Failed to get stdout of tcpstates")?;
127     let mut csv_reader = AsyncReader::from_reader(BufReader::new(stdout));
128     let header = csv_reader.headers().await?.clone();
129 
130     // TODO(b/340126051): Consider using NETLINK_SOCK_DIAG for the optimization.
131     let listeners = listeners::get_all()?;
132     // TODO(b/340126051): Support distinguished port forwarding for ipv6 as well.
133     let mut listening_ports: HashSet<_> = listeners
134         .iter()
135         .map(|x| x.socket)
136         .filter(|x| x.is_ipv4())
137         .map(|x| x.port().into())
138         .filter(|x| is_forwardable_port(*x))
139         .collect();
140     send_active_ports_report(listening_ports.clone(), &mut client).await?;
141 
142     let mut records = csv_reader.records();
143     while let Some(record) = records.next().await {
144         let row: TcpStateRow = record?.deserialize(Some(&header))?;
145         if row.ip != TCPSTATES_IP_4 {
146             continue;
147         }
148         if !is_forwardable_port(row.lport) {
149             continue;
150         }
151         if row.rport > 0 {
152             continue;
153         }
154         match row.newstate.as_str() {
155             TCPSTATES_STATE_LISTEN => {
156                 listening_ports.insert(row.lport);
157             }
158             TCPSTATES_STATE_CLOSE => {
159                 listening_ports.remove(&row.lport);
160             }
161             _ => continue,
162         }
163         send_active_ports_report(listening_ports.clone(), &mut client).await?;
164     }
165 
166     Err(anyhow!("report_active_ports is terminated").into())
167 }
168 
169 #[tokio::main]
main() -> Result<(), Box<dyn std::error::Error>>170 async fn main() -> Result<(), Box<dyn std::error::Error>> {
171     env_logger::init();
172     debug!("Starting forwarder_guest_launcher");
173     let args = Args::parse();
174     let addr = format!("https://{}:{}", args.host_addr, args.grpc_port);
175     let channel = Endpoint::from_shared(addr)?.connect().await?;
176     let client = DebianServiceClient::new(channel);
177 
178     try_join!(process_forwarding_request_queue(client.clone()), report_active_ports(client))?;
179     Ok(())
180 }
181