Skip to content

Commit

Permalink
perf: detach the watch registration from the main thread (#2224)
Browse files Browse the repository at this point in the history
  • Loading branch information
sxyazi authored Jan 19, 2025
1 parent 7bd7952 commit ed3c9c4
Showing 1 changed file with 35 additions and 17 deletions.
52 changes: 35 additions & 17 deletions yazi-core/src/manager/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,28 +96,17 @@ impl Watcher {
});
}

async fn fan_in(mut rx: watch::Receiver<HashSet<Url>>, mut watcher: impl notify::Watcher) {
async fn fan_in(
mut rx: watch::Receiver<HashSet<Url>>,
mut watcher: impl notify::Watcher + Send + 'static,
) {
loop {
let (mut to_unwatch, mut to_watch): (HashSet<_>, HashSet<_>) = {
let (to_unwatch, to_watch): (HashSet<_>, HashSet<_>) = {
let (new, old) = (&*rx.borrow_and_update(), &*WATCHED.read());
(old.difference(new).cloned().collect(), new.difference(old).cloned().collect())
};

to_unwatch.retain(|u| match watcher.unwatch(u) {
Ok(_) => true,
Err(e) if matches!(e.kind, notify::ErrorKind::WatchNotFound) => true,
Err(e) => {
error!("Unwatch failed: {e:?}");
false
}
});
to_watch.retain(|u| watcher.watch(u, RecursiveMode::NonRecursive).is_ok());

{
let mut watched = WATCHED.write();
watched.retain(|u| !to_unwatch.contains(u));
watched.extend(to_watch);
}
watcher = Self::sync_watched(watcher, to_unwatch, to_watch).await;

if !rx.has_changed().unwrap_or(false) {
Self::sync_linked().await;
Expand Down Expand Up @@ -173,6 +162,35 @@ impl Watcher {
}
}

async fn sync_watched<W>(mut watcher: W, to_unwatch: HashSet<Url>, to_watch: HashSet<Url>) -> W
where
W: notify::Watcher + Send + 'static,
{
use notify::ErrorKind::WatchNotFound;

if to_unwatch.is_empty() && to_watch.is_empty() {
return watcher;
}

tokio::task::spawn_blocking(move || {
for u in to_unwatch {
match watcher.unwatch(&u) {
Ok(()) => _ = WATCHED.write().remove(&u),
Err(e) if matches!(e.kind, WatchNotFound) => _ = WATCHED.write().remove(&u),
Err(e) => error!("Unwatch failed: {e:?}"),
}
}
for u in to_watch {
if watcher.watch(&u, RecursiveMode::NonRecursive).is_ok() {
WATCHED.write().insert(u);
}
}
watcher
})
.await
.unwrap()
}

async fn sync_linked() {
let mut new = WATCHED.read().clone();

Expand Down

0 comments on commit ed3c9c4

Please sign in to comment.