diff --git a/include/libkrun.h b/include/libkrun.h index 7f5ed647..efd3127a 100644 --- a/include/libkrun.h +++ b/include/libkrun.h @@ -441,6 +441,21 @@ int32_t krun_set_tee_config_file(uint32_t ctx_id, const char *filepath); int32_t krun_add_vsock_port(uint32_t ctx_id, uint32_t port, const char *c_filepath); + +/** + * Adds a port-path pairing for guest IPC with a process in the host. + * + * Arguments: + * "ctx_id" - the configuration context ID. + * "port" - a vsock port that the guest will connect to for IPC. + * "filepath" - a null-terminated string representing the path of the UNIX + * socket in the host. + * "listen" - true if guest expects connections to be initiated from host side + */ +int32_t krun_add_vsock_port2(uint32_t ctx_id, + uint32_t port, + const char *c_filepath, + bool listen); /** * Returns the eventfd file descriptor to signal the guest to shut down orderly. This must be * called before starting the microVM with "krun_start_event". Only available in libkrun-efi. diff --git a/src/devices/src/virtio/vsock/device.rs b/src/devices/src/virtio/vsock/device.rs index 127ded48..01df1317 100644 --- a/src/devices/src/virtio/vsock/device.rs +++ b/src/devices/src/virtio/vsock/device.rs @@ -59,7 +59,7 @@ impl Vsock { cid: u64, host_port_map: Option>, queues: Vec, - unix_ipc_port_map: Option>, + unix_ipc_port_map: Option>, ) -> super::Result { let mut queue_events = Vec::new(); for _ in 0..queues.len() { @@ -103,7 +103,7 @@ impl Vsock { pub fn new( cid: u64, host_port_map: Option>, - unix_ipc_port_map: Option>, + unix_ipc_port_map: Option>, ) -> super::Result { let queues: Vec = defs::QUEUE_SIZES .iter() diff --git a/src/devices/src/virtio/vsock/muxer.rs b/src/devices/src/virtio/vsock/muxer.rs index 0ba9f93f..6d027dcd 100644 --- a/src/devices/src/virtio/vsock/muxer.rs +++ b/src/devices/src/virtio/vsock/muxer.rs @@ -111,7 +111,7 @@ pub struct VsockMuxer { irq_line: Option, proxy_map: ProxyMap, reaper_sender: Option>, - unix_ipc_port_map: Option>, + unix_ipc_port_map: Option>, } impl VsockMuxer { @@ -120,7 +120,7 @@ impl VsockMuxer { host_port_map: Option>, interrupt_evt: EventFd, interrupt_status: Arc, - unix_ipc_port_map: Option>, + unix_ipc_port_map: Option>, ) -> Self { VsockMuxer { cid, @@ -179,6 +179,7 @@ impl VsockMuxer { intc, irq_line, sender.clone(), + self.unix_ipc_port_map.clone().unwrap_or_default(), ); thread.run(); @@ -499,9 +500,18 @@ impl VsockMuxer { if let Some(proxy) = proxy_map.get(&id) { proxy.lock().unwrap().confirm_connect(pkt) } else if let Some(ref mut ipc_map) = &mut self.unix_ipc_port_map { - if let Some(path) = ipc_map.get(&pkt.dst_port()) { + if let Some((path, listen)) = ipc_map.get(&pkt.dst_port()) { let mem = self.mem.as_ref().unwrap(); let queue = self.queue.as_ref().unwrap(); + if *listen { + warn!("vsock: Attempting to connect a socket that is listening, sending rst"); + let rx = MuxerRx::Reset { + local_port: pkt.dst_port(), + peer_port: pkt.src_port(), + }; + push_packet(self.cid, rx, &self.rxq, queue, mem); + return; + } let rxq = self.rxq.clone(); let mut unix = UnixProxy::new( diff --git a/src/devices/src/virtio/vsock/muxer_thread.rs b/src/devices/src/virtio/vsock/muxer_thread.rs index d6286ce5..a0aa8e40 100644 --- a/src/devices/src/virtio/vsock/muxer_thread.rs +++ b/src/devices/src/virtio/vsock/muxer_thread.rs @@ -1,4 +1,6 @@ +use std::collections::HashMap; use std::os::unix::io::RawFd; +use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::thread; @@ -8,9 +10,11 @@ use super::super::Queue as VirtQueue; use super::super::VIRTIO_MMIO_INT_VRING; use super::muxer::{push_packet, MuxerRx, ProxyMap}; use super::muxer_rxq::MuxerRxQ; -use super::proxy::{ProxyRemoval, ProxyUpdate}; +use super::proxy::{NewProxyType, Proxy, ProxyRemoval, ProxyUpdate}; use super::tcp::TcpProxy; +use crate::virtio::vsock::defs; +use crate::virtio::vsock::unix::{UnixAcceptorProxy, UnixProxy}; use crossbeam_channel::Sender; use rand::{rngs::ThreadRng, thread_rng, Rng}; use utils::epoll::{ControlOperation, Epoll, EpollEvent, EventSet}; @@ -29,6 +33,7 @@ pub struct MuxerThread { intc: Option, irq_line: Option, reaper_sender: Sender, + unix_ipc_port_map: HashMap, } impl MuxerThread { @@ -45,6 +50,7 @@ impl MuxerThread { intc: Option, irq_line: Option, reaper_sender: Sender, + unix_ipc_port_map: HashMap, ) -> Self { MuxerThread { cid, @@ -58,6 +64,7 @@ impl MuxerThread { intc, irq_line, reaper_sender, + unix_ipc_port_map, } } @@ -111,24 +118,36 @@ impl MuxerThread { let mut should_signal = update.signal_queue; - if let Some((peer_port, accept_fd)) = update.new_proxy { + if let Some((peer_port, accept_fd, proxy_type)) = update.new_proxy { let local_port: u32 = thread_rng.gen_range(1024..u32::MAX); let new_id: u64 = (peer_port as u64) << 32 | local_port as u64; - let new_proxy = TcpProxy::new_reverse( - new_id, - self.cid, - id, - local_port, - peer_port, - accept_fd, - self.mem.clone(), - self.queue.clone(), - self.rxq.clone(), - ); + let new_proxy: Box = match proxy_type { + NewProxyType::Tcp => Box::new(TcpProxy::new_reverse( + new_id, + self.cid, + id, + local_port, + peer_port, + accept_fd, + self.mem.clone(), + self.queue.clone(), + self.rxq.clone(), + )), + NewProxyType::Unix => Box::new(UnixProxy::new_reverse( + new_id, + self.cid, + local_port, + peer_port, + accept_fd, + self.mem.clone(), + self.queue.clone(), + self.rxq.clone(), + )), + }; self.proxy_map .write() .unwrap() - .insert(new_id, Mutex::new(Box::new(new_proxy))); + .insert(new_id, Mutex::new(new_proxy)); if let Some(proxy) = self.proxy_map.read().unwrap().get(&new_id) { proxy.lock().unwrap().push_op_request(); }; @@ -147,8 +166,32 @@ impl MuxerThread { } } + fn create_lisening_ipc_sockets(&self) { + for (port, (path, do_listen)) in &self.unix_ipc_port_map { + if !do_listen { + continue; + } + let id = (*port as u64) << 32 | defs::TSI_PROXY_PORT as u64; + let proxy = match UnixAcceptorProxy::new(id, path, *port) { + Ok(proxy) => proxy, + Err(e) => { + warn!("Failed to create listening proxy at {:?}: {:?}", path, e); + continue; + } + }; + self.proxy_map + .write() + .unwrap() + .insert(id, Mutex::new(Box::new(proxy))); + if let Some(proxy) = self.proxy_map.read().unwrap().get(&id) { + self.update_polling(id, proxy.lock().unwrap().as_raw_fd(), EventSet::IN); + }; + } + } + fn work(self) { let mut thread_rng = thread_rng(); + self.create_lisening_ipc_sockets(); loop { let mut epoll_events = vec![EpollEvent::new(EventSet::empty(), 0); 32]; match self diff --git a/src/devices/src/virtio/vsock/proxy.rs b/src/devices/src/virtio/vsock/proxy.rs index e7e19f15..bc244b17 100644 --- a/src/devices/src/virtio/vsock/proxy.rs +++ b/src/devices/src/virtio/vsock/proxy.rs @@ -41,12 +41,19 @@ pub enum ProxyRemoval { Deferred, } +#[derive(Default)] +pub enum NewProxyType { + #[default] + Tcp, + Unix, +} + #[derive(Default)] pub struct ProxyUpdate { pub signal_queue: bool, pub remove_proxy: ProxyRemoval, pub polling: Option<(u64, RawFd, EventSet)>, - pub new_proxy: Option<(u32, RawFd)>, + pub new_proxy: Option<(u32, RawFd, NewProxyType)>, pub push_accept: Option<(u64, u64)>, pub push_credit_req: Option, } diff --git a/src/devices/src/virtio/vsock/tcp.rs b/src/devices/src/virtio/vsock/tcp.rs index 10fb7ec4..2f319ebd 100644 --- a/src/devices/src/virtio/vsock/tcp.rs +++ b/src/devices/src/virtio/vsock/tcp.rs @@ -21,7 +21,9 @@ use super::muxer_rxq::MuxerRxQ; use super::packet::{ TsiAcceptReq, TsiConnectReq, TsiGetnameRsp, TsiListenReq, TsiSendtoAddr, VsockPacket, }; -use super::proxy::{Proxy, ProxyError, ProxyRemoval, ProxyStatus, ProxyUpdate, RecvPkt}; +use super::proxy::{ + NewProxyType, Proxy, ProxyError, ProxyRemoval, ProxyStatus, ProxyUpdate, RecvPkt, +}; use utils::epoll::EventSet; use vm_memory::GuestMemoryMmap; @@ -729,7 +731,7 @@ impl Proxy for TcpProxy { { match accept(self.fd) { Ok(accept_fd) => { - update.new_proxy = Some((self.peer_port, accept_fd)); + update.new_proxy = Some((self.peer_port, accept_fd, NewProxyType::Tcp)); } Err(e) => warn!("error accepting connection: id={}, err={}", self.id, e), }; diff --git a/src/devices/src/virtio/vsock/unix.rs b/src/devices/src/virtio/vsock/unix.rs index 5076ef5c..e06ad2f0 100644 --- a/src/devices/src/virtio/vsock/unix.rs +++ b/src/devices/src/virtio/vsock/unix.rs @@ -3,26 +3,25 @@ use super::{ proxy::{ProxyRemoval, RecvPkt}, }; +use nix::fcntl::{fcntl, FcntlArg, OFlag}; +use nix::sys::socket::{ + accept, bind, connect, listen, recv, send, setsockopt, shutdown, socket, sockopt, + AddressFamily, MsgFlags, Shutdown, SockFlag, SockType, UnixAddr, +}; +use nix::unistd::close; use std::collections::HashMap; use std::num::Wrapping; use std::os::unix::io::{AsRawFd, RawFd}; use std::path::PathBuf; use std::sync::{Arc, Mutex}; -use nix::fcntl::{fcntl, FcntlArg, OFlag}; -use nix::sys::socket::{ - accept, connect, recv, send, setsockopt, shutdown, socket, sockopt, AddressFamily, MsgFlags, - Shutdown, SockFlag, SockType, UnixAddr, -}; -use nix::unistd::close; - #[cfg(target_os = "macos")] use super::super::linux_errno::linux_errno_raw; use super::super::Queue as VirtQueue; use super::muxer::{push_packet, MuxerRx}; use super::muxer_rxq::MuxerRxQ; use super::packet::{TsiAcceptReq, TsiConnectReq, TsiListenReq, TsiSendtoAddr, VsockPacket}; -use super::proxy::{Proxy, ProxyError, ProxyStatus, ProxyUpdate}; +use super::proxy::{NewProxyType, Proxy, ProxyError, ProxyStatus, ProxyUpdate}; use utils::epoll::EventSet; use vm_memory::GuestMemoryMmap; @@ -47,6 +46,47 @@ pub struct UnixProxy { rx_cnt: Wrapping, } +fn proxy_fd_create(id: u64) -> Result { + let fd = socket( + AddressFamily::Unix, + SockType::Stream, + SockFlag::empty(), + None, + ) + .map_err(ProxyError::CreatingSocket)?; + + // macOS forces us to do this here instead of just using SockFlag::SOCK_NONBLOCK above. + match fcntl(fd, FcntlArg::F_GETFL) { + Ok(flags) => match OFlag::from_bits(flags) { + Some(flags) => { + if let Err(e) = fcntl(fd, FcntlArg::F_SETFL(flags | OFlag::O_NONBLOCK)) { + warn!("error switching to non-blocking: id={}, err={}", id, e); + } + } + None => error!("invalid fd flags id={}", id), + }, + Err(e) => error!("couldn't obtain fd flags id={}, err={}", id, e), + }; + + setsockopt(fd, sockopt::ReusePort, &true).map_err(ProxyError::SettingReusePort)?; + #[cfg(target_os = "macos")] + { + // nix doesn't provide an abstraction for SO_NOSIGPIPE, fall back to libc. + let option_value: libc::c_int = 1; + unsafe { + libc::setsockopt( + fd, + libc::SOL_SOCKET, + libc::SO_NOSIGPIPE, + &option_value as *const _ as *const libc::c_void, + std::mem::size_of_val(&option_value) as libc::socklen_t, + ) + }; + } + + Ok(fd) +} + impl UnixProxy { #[allow(clippy::too_many_arguments)] pub fn new( @@ -59,42 +99,7 @@ impl UnixProxy { rxq: Arc>, path: PathBuf, ) -> Result { - let fd = socket( - AddressFamily::Unix, - SockType::Stream, - SockFlag::empty(), - None, - ) - .map_err(ProxyError::CreatingSocket)?; - - // macOS forces us to do this here instead of just using SockFlag::SOCK_NONBLOCK above. - match fcntl(fd, FcntlArg::F_GETFL) { - Ok(flags) => match OFlag::from_bits(flags) { - Some(flags) => { - if let Err(e) = fcntl(fd, FcntlArg::F_SETFL(flags | OFlag::O_NONBLOCK)) { - warn!("error switching to non-blocking: id={}, err={}", id, e); - } - } - None => error!("invalid fd flags id={}", id), - }, - Err(e) => error!("couldn't obtain fd flags id={}, err={}", id, e), - }; - - setsockopt(fd, sockopt::ReusePort, &true).map_err(ProxyError::SettingReusePort)?; - #[cfg(target_os = "macos")] - { - // nix doesn't provide an abstraction for SO_NOSIGPIPE, fall back to libc. - let option_value: libc::c_int = 1; - unsafe { - libc::setsockopt( - fd, - libc::SOL_SOCKET, - libc::SO_NOSIGPIPE, - &option_value as *const _ as *const libc::c_void, - std::mem::size_of_val(&option_value) as libc::socklen_t, - ) - }; - } + let fd = proxy_fd_create(id)?; Ok(UnixProxy { id, @@ -117,6 +122,42 @@ impl UnixProxy { }) } + #[allow(clippy::too_many_arguments)] + pub fn new_reverse( + id: u64, + cid: u64, + local_port: u32, + peer_port: u32, + fd: RawFd, + mem: GuestMemoryMmap, + queue: Arc>, + rxq: Arc>, + ) -> Self { + debug!( + "new_reverse: id={} local_port={} peer_port={}", + id, local_port, peer_port + ); + UnixProxy { + id, + cid, + local_port, + peer_port, + control_port: 0, + fd, + status: ProxyStatus::ReverseInit, + mem, + queue, + rxq, + rx_cnt: Wrapping(0), + tx_cnt: Wrapping(0), + last_tx_cnt_sent: Wrapping(0), + peer_buf_alloc: 0, + peer_fwd_cnt: Wrapping(0), + push_cnt: Wrapping(0), + path: Default::default(), + } + } + fn switch_to_connected(&mut self) { self.status = ProxyStatus::Connected; match fcntl(self.fd, FcntlArg::F_GETFL) { @@ -434,11 +475,36 @@ impl Proxy for UnixProxy { } fn push_op_request(&self) { - todo!(); + debug!( + "push_op_request: id={}, local_port={} peer_port={}", + self.id, self.local_port, self.peer_port + ); + + // This packet goes to the connection. + let rx = MuxerRx::OpRequest { + local_port: self.local_port, + peer_port: self.peer_port, + }; + push_packet(self.cid, rx, &self.rxq, &self.queue, &self.mem); } - fn process_op_response(&mut self, _pkt: &VsockPacket) -> ProxyUpdate { - todo!(); + fn process_op_response(&mut self, pkt: &VsockPacket) -> ProxyUpdate { + debug!( + "process_op_response: id={} src_port={} dst_port={}", + self.id, + pkt.src_port(), + pkt.dst_port() + ); + + self.peer_buf_alloc = pkt.buf_alloc(); + self.peer_fwd_cnt = Wrapping(pkt.fwd_cnt()); + + self.switch_to_connected(); + + ProxyUpdate { + polling: Some((self.id, self.fd, EventSet::IN)), + ..Default::default() + } } fn enqueue_accept(&mut self) { @@ -467,11 +533,7 @@ impl Proxy for UnixProxy { "release: id={}, tx_cnt={}, last_tx_cnt={}", self.id, self.tx_cnt, self.last_tx_cnt_sent ); - let remove_proxy = if self.status == ProxyStatus::Listening { - ProxyRemoval::Immediate - } else { - ProxyRemoval::Deferred - }; + let remove_proxy = ProxyRemoval::Deferred; ProxyUpdate { remove_proxy, @@ -494,11 +556,7 @@ impl Proxy for UnixProxy { self.status = ProxyStatus::Closed; update.polling = Some((self.id, self.fd, EventSet::empty())); update.signal_queue = true; - update.remove_proxy = if self.status == ProxyStatus::Listening { - ProxyRemoval::Immediate - } else { - ProxyRemoval::Deferred - }; + update.remove_proxy = ProxyRemoval::Deferred; return update; } @@ -533,17 +591,6 @@ impl Proxy for UnixProxy { debug!("process_event: WaitingCreditUpdate"); update.polling = Some((self.id(), self.fd, EventSet::empty())); } - } else if self.status == ProxyStatus::Listening - || self.status == ProxyStatus::WaitingOnAccept - { - match accept(self.fd) { - Ok(accept_fd) => { - update.new_proxy = Some((self.peer_port, accept_fd)); - } - Err(e) => warn!("error accepting connection: id={}, err={}", self.id, e), - }; - update.signal_queue = true; - return update; } else { debug!( "vsock::tcp: EventSet::IN while not connected: {:?}", @@ -581,3 +628,104 @@ impl Drop for UnixProxy { } } } + +pub struct UnixAcceptorProxy { + id: u64, + fd: RawFd, + peer_port: u32, +} + +impl UnixAcceptorProxy { + pub fn new(id: u64, path: &PathBuf, peer_port: u32) -> Result { + let fd = socket( + AddressFamily::Unix, + SockType::Stream, + SockFlag::empty(), + None, + ) + .map_err(ProxyError::CreatingSocket)?; + bind( + fd, + &UnixAddr::new(path).map_err(ProxyError::CreatingSocket)?, + ) + .map_err(ProxyError::CreatingSocket)?; + listen(fd, 5).map_err(ProxyError::CreatingSocket)?; + Ok(UnixAcceptorProxy { id, fd, peer_port }) + } +} + +impl Proxy for UnixAcceptorProxy { + fn id(&self) -> u64 { + self.id + } + fn status(&self) -> ProxyStatus { + ProxyStatus::WaitingOnAccept + } + fn connect(&mut self, _: &VsockPacket, _: TsiConnectReq) -> ProxyUpdate { + unreachable!() + } + fn getpeername(&mut self, _: &VsockPacket) { + unreachable!() + } + fn sendmsg(&mut self, _: &VsockPacket) -> ProxyUpdate { + unreachable!() + } + fn sendto_addr(&mut self, _: TsiSendtoAddr) -> ProxyUpdate { + unreachable!() + } + fn listen( + &mut self, + _: &VsockPacket, + _: TsiListenReq, + _: &Option>, + ) -> ProxyUpdate { + unreachable!() + } + fn accept(&mut self, _: TsiAcceptReq) -> ProxyUpdate { + unreachable!() + } + fn update_peer_credit(&mut self, _: &VsockPacket) -> ProxyUpdate { + unreachable!() + } + fn process_op_response(&mut self, _: &VsockPacket) -> ProxyUpdate { + unreachable!() + } + fn release(&mut self) -> ProxyUpdate { + unreachable!() + } + fn process_event(&mut self, evset: EventSet) -> ProxyUpdate { + let mut update = ProxyUpdate::default(); + + if evset.contains(EventSet::HANG_UP) { + debug!("process_event: HANG_UP"); + update.polling = Some((self.id, self.fd, EventSet::empty())); + update.signal_queue = true; + update.remove_proxy = ProxyRemoval::Deferred; + return update; + } + if evset.contains(EventSet::IN) { + match accept(self.fd) { + Ok(accept_fd) => { + update.new_proxy = Some((self.peer_port, accept_fd, NewProxyType::Unix)); + } + Err(e) => warn!("error accepting connection: id={}, err={}", self.id, e), + }; + update.signal_queue = true; + } + update + } +} + +impl AsRawFd for UnixAcceptorProxy { + fn as_raw_fd(&self) -> RawFd { + self.fd + } +} + +impl Drop for UnixAcceptorProxy { + fn drop(&mut self) { + if let Err(e) = close(self.fd) { + warn!("error closing proxy fd: {}", e); + } + } +} diff --git a/src/libkrun/src/lib.rs b/src/libkrun/src/lib.rs index 0f864f17..acc17396 100644 --- a/src/libkrun/src/lib.rs +++ b/src/libkrun/src/lib.rs @@ -95,7 +95,7 @@ struct ContextConfig { data_block_cfg: Option, #[cfg(feature = "tee")] tee_config_file: Option, - unix_ipc_port_map: Option>, + unix_ipc_port_map: Option>, shutdown_efd: Option, gpu_virgl_flags: Option, gpu_shm_size: Option, @@ -220,12 +220,12 @@ impl ContextConfig { self.tee_config_file.clone() } - fn add_vsock_port(&mut self, port: u32, filepath: PathBuf) { + fn add_vsock_port(&mut self, port: u32, filepath: PathBuf, listen: bool) { if let Some(ref mut map) = &mut self.unix_ipc_port_map { - map.insert(port, filepath); + map.insert(port, (filepath, listen)); } else { - let mut map: HashMap = HashMap::new(); - map.insert(port, filepath); + let mut map: HashMap = HashMap::new(); + map.insert(port, (filepath, listen)); self.unix_ipc_port_map = Some(map); } } @@ -933,6 +933,17 @@ pub unsafe extern "C" fn krun_add_vsock_port( ctx_id: u32, port: u32, c_filepath: *const c_char, +) -> i32 { + krun_add_vsock_port2(ctx_id, port, c_filepath, false) +} + +#[allow(clippy::missing_safety_doc)] +#[no_mangle] +pub unsafe extern "C" fn krun_add_vsock_port2( + ctx_id: u32, + port: u32, + c_filepath: *const c_char, + listen: bool, ) -> i32 { let filepath = match CStr::from_ptr(c_filepath).to_str() { Ok(f) => f, @@ -942,7 +953,7 @@ pub unsafe extern "C" fn krun_add_vsock_port( match CTX_MAP.lock().unwrap().entry(ctx_id) { Entry::Occupied(mut ctx_cfg) => { let cfg = ctx_cfg.get_mut(); - cfg.add_vsock_port(port, PathBuf::from(filepath.to_string())); + cfg.add_vsock_port(port, PathBuf::from(filepath.to_string()), listen); } Entry::Vacant(_) => return -libc::ENOENT, } diff --git a/src/vmm/src/vmm_config/vsock.rs b/src/vmm/src/vmm_config/vsock.rs index e8f7999c..5aafe858 100644 --- a/src/vmm/src/vmm_config/vsock.rs +++ b/src/vmm/src/vmm_config/vsock.rs @@ -39,7 +39,7 @@ pub struct VsockDeviceConfig { /// An optional map of host to guest port mappings. pub host_port_map: Option>, /// An optional map of guest port to host UNIX domain sockets for IPC. - pub unix_ipc_port_map: Option>, + pub unix_ipc_port_map: Option>, } struct VsockWrapper {