Commit a827a230 authored by Stavros Korokithakis's avatar Stavros Korokithakis

Add native debouncing

parent d7bd2d3d
......@@ -26,11 +26,14 @@ use std::process::Command;
use std::sync::mpsc::channel;
use std::sync::Mutex;
use std::thread;
use std::time::{Duration, SystemTime};
use std::time::{Duration, SystemTime, Instant};
use views::start_api;
/// The time of the last filesystem event in the Hearth folder, or None if the files were already
/// added to IPFS.
lazy_static! {
static ref DEBOUNCING_DONE: Mutex<bool> = Mutex::new(true);
static ref LAST_EVENT_TIME: Mutex<Option<Instant>> = Mutex::new(None);
}
lazy_static! {
......@@ -84,7 +87,6 @@ fn kill_ipfs_and_exit(exit_code: i32) {
/// Add some files to the IPFS daemon and the hashmap.
fn add_files_to_ipfs() {
info!("Adding files to IPFS...");
let output = Command::new(IPFS_EXECUTABLE)
.env("IPFS_PATH", &*IPFS_PATH)
.args(&["add", "-r", HEARTH_DIR.to_str().unwrap()])
......@@ -107,25 +109,42 @@ fn add_files_to_ipfs() {
}));
}
/// Emit messages to the console and add files to IPFS.
/// Receive a filesystem event and act on it.
///
/// This does the proper debouncing (on top of the inotify debouncer) to emit the necessary
/// messages to the console when we are expecting changes/are done with changes.
fn notify_ipfs(add_to_ipfs: bool) {
let mut debouncing_done = DEBOUNCING_DONE.lock().unwrap();
debug!(
"debouncing_done={:?}, add_to_ipfs={:?}",
*debouncing_done, add_to_ipfs
);
if add_to_ipfs {
fn notify_ipfs(fs_event: bool) {
// If this is a filesystem event, just reset the timer.
let mut last_event_time = LAST_EVENT_TIME.lock().unwrap();
debug!("Got event, fs_event={:?}", fs_event);
if last_event_time.is_none() {
debug!("Last event time is None");
} else {
debug!("Last event time is {:?}", last_event_time.unwrap());
}
if fs_event {
if last_event_time.is_none() {
// This is the first event for now, so emit the log line.
info!("Change detected in filesystem.");
}
// Reset the timestamp to now.
*last_event_time = Some(Instant::now());
return;
} else {
// This is a timer event, so we should check when the last timer event was.
if last_event_time.is_none() || Instant::now().duration_since(last_event_time.unwrap()).as_secs() < 5 {
// It there was no event, or there hasn't been long enough since the last one, return.
return;
}
info!("Adding files to IPFS...");
add_files_to_ipfs();
info!("Finished adding files to IPFS.\n");
*debouncing_done = true;
} else if *debouncing_done == true {
info!("Change detected in filesystem.");
*debouncing_done = false;
// We added files to IPFS, so we'll set the last_event_time to None.
*last_event_time = None;
}
}
......@@ -133,20 +152,19 @@ fn notify_ipfs(add_to_ipfs: bool) {
///
/// add_to_ipfs will be true when we actually need to add the files to IPFS (ie when enough time
/// has passed after debouncing).
fn files_changed(event: notify::DebouncedEvent) {
debug!("Got new event: {:?}", event);
match event {
notify::DebouncedEvent::NoticeWrite(_) => notify_ipfs(false),
notify::DebouncedEvent::NoticeRemove(_) => notify_ipfs(false),
notify::DebouncedEvent::Create(_) => notify_ipfs(true),
notify::DebouncedEvent::Remove(_) => notify_ipfs(true),
notify::DebouncedEvent::Rename(_, _) => notify_ipfs(true),
notify::DebouncedEvent::Write(_) => notify_ipfs(true),
_ => return,
fn files_changed(op: notify::Op, path: std::path::PathBuf) {
debug!("Got new event: {:?}, {:?}", op, path);
if op.contains(notify::op::CREATE) ||
op.contains(notify::op::REMOVE) ||
op.contains(notify::op::RENAME) ||
op.contains(notify::op::WRITE) ||
op.contains(notify::op::RESCAN) ||
op.contains(notify::op::CLOSE_WRITE) {
notify_ipfs(true);
}
}
/// Start the IPFS daemon and wait. If it ever dies, quit the entire program.
fn start_ipfs() {
info!("Initializing IPFS directory...");
......@@ -162,7 +180,7 @@ fn start_ipfs() {
info!("Starting IPFS daemon...");
// Print the correct message to the console because we'll add some files.
notify_ipfs(false);
notify_ipfs(true);
// Add everything, because files might have been changed while we weren't looking.
// This is done in a new thread, as the command to run the IPFS daemon farther down will block.
......@@ -186,7 +204,9 @@ fn start_ipfs() {
.env("IPFS_PATH", &*IPFS_PATH)
.args(&["swarm", "connect", "/ip4/195.201.40.251/tcp/4001/ipfs/QmXRxDBWU7zA6yBDtLNM3HMU2W8BRVjRhew42bKHE63yfe"]);
notify_ipfs(true);
// After all initialization has been done, spawn the cron thread (so we can add
// files to IPFS at startup as well).
thread::spawn(cron_thread);
return;
}
}
......@@ -218,17 +238,37 @@ fn start_ipfs() {
process::exit(1)
}
fn create_debounced_watcher() {
/// Run notify_ipfs every second, for debouncing.
///
/// notify_ipfs stores the last time an event occurred, and only adds files to IPFS if there's no
/// event after a while. To do that, however, it needs to also run "after a while". This is where
/// this function comes in. It runs notify_ipfs every second, to give it the opportunity to check
/// its timer and add files to IPFS if it needs to.
fn cron_thread() {
loop {
thread::sleep(Duration::from_secs(1));
notify_ipfs(false);
}
}
fn create_watcher() {
let (tx, rx) = channel();
let mut watcher: RecommendedWatcher = Watcher::new(tx, Duration::from_secs(5)).unwrap();
let mut watcher: RecommendedWatcher = Watcher::new_raw(tx).unwrap();
watcher
.watch(&*HEARTH_DIR, RecursiveMode::Recursive)
.expect("There was an error when trying to watch the Hearth directory for changes.");
loop {
match rx.recv() {
Ok(e) => files_changed(e),
Err(e) => warn!("watch error: {:?}", e),
Ok(notify::RawEvent {
path: Some(path),
op: Ok(op),
cookie: _cookie
}) => files_changed(op, path),
Ok(event) => debug!("Broken event: {:?}", event),
Err(event) => debug!("Watch error: {:?}", event),
}
}
}
......@@ -266,5 +306,5 @@ fn main() {
_ => {}
}
create_debounced_watcher();
create_watcher();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment