Skip to content

Commit

Permalink
net: Allow creating vsocks in listen mode.
Browse files Browse the repository at this point in the history
Previously all vsocks were expected to be connected from the guest.
This patch adds support for sockets that are listened to by the guest
and are connected from the host.

Signed-off-by: Sasha Finkelstein <[email protected]>
  • Loading branch information
WhatAmISupposedToPutHere committed Dec 31, 2024
1 parent b67a6a0 commit 09c6753
Show file tree
Hide file tree
Showing 9 changed files with 333 additions and 97 deletions.
15 changes: 15 additions & 0 deletions include/libkrun.h
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,21 @@ int32_t krun_set_tee_config_file(uint32_t ctx_id, const char *filepath);
int32_t krun_add_vsock_port(uint32_t ctx_id,
uint32_t port,
const char *c_filepath);

/**
* Adds a port-path pairing for guest IPC with a process in the host.
*
* Arguments:
* "ctx_id" - the configuration context ID.
* "port" - a vsock port that the guest will connect to for IPC.
* "filepath" - a null-terminated string representing the path of the UNIX
* socket in the host.
* "listen" - true if guest expects connections to be initiated from host side
*/
int32_t krun_add_vsock_port2(uint32_t ctx_id,
uint32_t port,
const char *c_filepath,
bool listen);
/**
* Returns the eventfd file descriptor to signal the guest to shut down orderly. This must be
* called before starting the microVM with "krun_start_event". Only available in libkrun-efi.
Expand Down
4 changes: 2 additions & 2 deletions src/devices/src/virtio/vsock/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl Vsock {
cid: u64,
host_port_map: Option<HashMap<u16, u16>>,
queues: Vec<VirtQueue>,
unix_ipc_port_map: Option<HashMap<u32, PathBuf>>,
unix_ipc_port_map: Option<HashMap<u32, (PathBuf, bool)>>,
) -> super::Result<Vsock> {
let mut queue_events = Vec::new();
for _ in 0..queues.len() {
Expand Down Expand Up @@ -103,7 +103,7 @@ impl Vsock {
pub fn new(
cid: u64,
host_port_map: Option<HashMap<u16, u16>>,
unix_ipc_port_map: Option<HashMap<u32, PathBuf>>,
unix_ipc_port_map: Option<HashMap<u32, (PathBuf, bool)>>,
) -> super::Result<Vsock> {
let queues: Vec<VirtQueue> = defs::QUEUE_SIZES
.iter()
Expand Down
16 changes: 13 additions & 3 deletions src/devices/src/virtio/vsock/muxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub struct VsockMuxer {
irq_line: Option<u32>,
proxy_map: ProxyMap,
reaper_sender: Option<Sender<u64>>,
unix_ipc_port_map: Option<HashMap<u32, PathBuf>>,
unix_ipc_port_map: Option<HashMap<u32, (PathBuf, bool)>>,
}

impl VsockMuxer {
Expand All @@ -120,7 +120,7 @@ impl VsockMuxer {
host_port_map: Option<HashMap<u16, u16>>,
interrupt_evt: EventFd,
interrupt_status: Arc<AtomicUsize>,
unix_ipc_port_map: Option<HashMap<u32, PathBuf>>,
unix_ipc_port_map: Option<HashMap<u32, (PathBuf, bool)>>,
) -> Self {
VsockMuxer {
cid,
Expand Down Expand Up @@ -179,6 +179,7 @@ impl VsockMuxer {
intc,
irq_line,
sender.clone(),
self.unix_ipc_port_map.clone().unwrap_or_default(),
);
thread.run();

Expand Down Expand Up @@ -499,9 +500,18 @@ impl VsockMuxer {
if let Some(proxy) = proxy_map.get(&id) {
proxy.lock().unwrap().confirm_connect(pkt)
} else if let Some(ref mut ipc_map) = &mut self.unix_ipc_port_map {
if let Some(path) = ipc_map.get(&pkt.dst_port()) {
if let Some((path, listen)) = ipc_map.get(&pkt.dst_port()) {
let mem = self.mem.as_ref().unwrap();
let queue = self.queue.as_ref().unwrap();
if *listen {
warn!("vsock: Attempting to connect a socket that is listening, sending rst");
let rx = MuxerRx::Reset {
local_port: pkt.dst_port(),
peer_port: pkt.src_port(),
};
push_packet(self.cid, rx, &self.rxq, queue, mem);
return;
}
let rxq = self.rxq.clone();

let mut unix = UnixProxy::new(
Expand Down
71 changes: 57 additions & 14 deletions src/devices/src/virtio/vsock/muxer_thread.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::collections::HashMap;
use std::os::unix::io::RawFd;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
Expand All @@ -8,9 +10,11 @@ use super::super::Queue as VirtQueue;
use super::super::VIRTIO_MMIO_INT_VRING;
use super::muxer::{push_packet, MuxerRx, ProxyMap};
use super::muxer_rxq::MuxerRxQ;
use super::proxy::{ProxyRemoval, ProxyUpdate};
use super::proxy::{NewProxyType, Proxy, ProxyRemoval, ProxyUpdate};
use super::tcp::TcpProxy;

use crate::virtio::vsock::defs;
use crate::virtio::vsock::unix::{UnixAcceptorProxy, UnixProxy};
use crossbeam_channel::Sender;
use rand::{rngs::ThreadRng, thread_rng, Rng};
use utils::epoll::{ControlOperation, Epoll, EpollEvent, EventSet};
Expand All @@ -29,6 +33,7 @@ pub struct MuxerThread {
intc: Option<GicV3>,
irq_line: Option<u32>,
reaper_sender: Sender<u64>,
unix_ipc_port_map: HashMap<u32, (PathBuf, bool)>,
}

impl MuxerThread {
Expand All @@ -45,6 +50,7 @@ impl MuxerThread {
intc: Option<GicV3>,
irq_line: Option<u32>,
reaper_sender: Sender<u64>,
unix_ipc_port_map: HashMap<u32, (PathBuf, bool)>,
) -> Self {
MuxerThread {
cid,
Expand All @@ -58,6 +64,7 @@ impl MuxerThread {
intc,
irq_line,
reaper_sender,
unix_ipc_port_map,
}
}

Expand Down Expand Up @@ -111,24 +118,36 @@ impl MuxerThread {

let mut should_signal = update.signal_queue;

if let Some((peer_port, accept_fd)) = update.new_proxy {
if let Some((peer_port, accept_fd, proxy_type)) = update.new_proxy {
let local_port: u32 = thread_rng.gen_range(1024..u32::MAX);
let new_id: u64 = (peer_port as u64) << 32 | local_port as u64;
let new_proxy = TcpProxy::new_reverse(
new_id,
self.cid,
id,
local_port,
peer_port,
accept_fd,
self.mem.clone(),
self.queue.clone(),
self.rxq.clone(),
);
let new_proxy: Box<dyn Proxy> = match proxy_type {
NewProxyType::Tcp => Box::new(TcpProxy::new_reverse(
new_id,
self.cid,
id,
local_port,
peer_port,
accept_fd,
self.mem.clone(),
self.queue.clone(),
self.rxq.clone(),
)),
NewProxyType::Unix => Box::new(UnixProxy::new_reverse(
new_id,
self.cid,
local_port,
peer_port,
accept_fd,
self.mem.clone(),
self.queue.clone(),
self.rxq.clone(),
)),
};
self.proxy_map
.write()
.unwrap()
.insert(new_id, Mutex::new(Box::new(new_proxy)));
.insert(new_id, Mutex::new(new_proxy));
if let Some(proxy) = self.proxy_map.read().unwrap().get(&new_id) {
proxy.lock().unwrap().push_op_request();
};
Expand All @@ -147,8 +166,32 @@ impl MuxerThread {
}
}

fn create_lisening_ipc_sockets(&self) {
for (port, (path, do_listen)) in &self.unix_ipc_port_map {
if !do_listen {
continue;
}
let id = (*port as u64) << 32 | defs::TSI_PROXY_PORT as u64;
let proxy = match UnixAcceptorProxy::new(id, path, *port) {
Ok(proxy) => proxy,
Err(e) => {
warn!("Failed to create listening proxy at {:?}: {:?}", path, e);
continue;
}
};
self.proxy_map
.write()
.unwrap()
.insert(id, Mutex::new(Box::new(proxy)));
if let Some(proxy) = self.proxy_map.read().unwrap().get(&id) {
self.update_polling(id, proxy.lock().unwrap().as_raw_fd(), EventSet::IN);
};
}
}

fn work(self) {
let mut thread_rng = thread_rng();
self.create_lisening_ipc_sockets();
loop {
let mut epoll_events = vec![EpollEvent::new(EventSet::empty(), 0); 32];
match self
Expand Down
9 changes: 8 additions & 1 deletion src/devices/src/virtio/vsock/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,19 @@ pub enum ProxyRemoval {
Deferred,
}

#[derive(Default)]
pub enum NewProxyType {
#[default]
Tcp,
Unix,
}

#[derive(Default)]
pub struct ProxyUpdate {
pub signal_queue: bool,
pub remove_proxy: ProxyRemoval,
pub polling: Option<(u64, RawFd, EventSet)>,
pub new_proxy: Option<(u32, RawFd)>,
pub new_proxy: Option<(u32, RawFd, NewProxyType)>,
pub push_accept: Option<(u64, u64)>,
pub push_credit_req: Option<MuxerRx>,
}
Expand Down
6 changes: 4 additions & 2 deletions src/devices/src/virtio/vsock/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use super::muxer_rxq::MuxerRxQ;
use super::packet::{
TsiAcceptReq, TsiConnectReq, TsiGetnameRsp, TsiListenReq, TsiSendtoAddr, VsockPacket,
};
use super::proxy::{Proxy, ProxyError, ProxyRemoval, ProxyStatus, ProxyUpdate, RecvPkt};
use super::proxy::{
NewProxyType, Proxy, ProxyError, ProxyRemoval, ProxyStatus, ProxyUpdate, RecvPkt,
};
use utils::epoll::EventSet;

use vm_memory::GuestMemoryMmap;
Expand Down Expand Up @@ -729,7 +731,7 @@ impl Proxy for TcpProxy {
{
match accept(self.fd) {
Ok(accept_fd) => {
update.new_proxy = Some((self.peer_port, accept_fd));
update.new_proxy = Some((self.peer_port, accept_fd, NewProxyType::Tcp));
}
Err(e) => warn!("error accepting connection: id={}, err={}", self.id, e),
};
Expand Down
Loading

0 comments on commit 09c6753

Please sign in to comment.