notify/
inotify.rs

1//! Watcher implementation for the inotify Linux API
2//!
3//! The inotify API provides a mechanism for monitoring filesystem events.  Inotify can be used to
4//! monitor individual files, or to monitor directories.  When a directory is monitored, inotify
5//! will return events for the directory itself, and for files inside the directory.
6
7use super::event::*;
8use super::{Config, Error, ErrorKind, EventHandler, RecursiveMode, Result, Watcher};
9use crate::{bounded, unbounded, BoundSender, Receiver, Sender};
10use inotify as inotify_sys;
11use inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask};
12use std::collections::HashMap;
13use std::env;
14use std::ffi::OsStr;
15use std::fs::metadata;
16use std::os::unix::io::AsRawFd;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19use std::thread;
20use walkdir::WalkDir;
21
22const INOTIFY: mio::Token = mio::Token(0);
23const MESSAGE: mio::Token = mio::Token(1);
24
25// The EventLoop will set up a mio::Poll and use it to wait for the following:
26//
27// -  messages telling it what to do
28//
29// -  events telling it that something has happened on one of the watched files.
30
31struct EventLoop {
32    running: bool,
33    poll: mio::Poll,
34    event_loop_waker: Arc<mio::Waker>,
35    event_loop_tx: Sender<EventLoopMsg>,
36    event_loop_rx: Receiver<EventLoopMsg>,
37    inotify: Option<Inotify>,
38    event_handler: Box<dyn EventHandler>,
39    /// PathBuf -> (WatchDescriptor, WatchMask, is_recursive, is_dir)
40    watches: HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
41    paths: HashMap<WatchDescriptor, PathBuf>,
42    rename_event: Option<Event>,
43    follow_links: bool,
44}
45
46/// Watcher implementation based on inotify
47#[derive(Debug)]
48pub struct INotifyWatcher {
49    channel: Sender<EventLoopMsg>,
50    waker: Arc<mio::Waker>,
51}
52
53enum EventLoopMsg {
54    AddWatch(PathBuf, RecursiveMode, Sender<Result<()>>),
55    RemoveWatch(PathBuf, Sender<Result<()>>),
56    Shutdown,
57    Configure(Config, BoundSender<Result<bool>>),
58}
59
60#[inline]
61fn add_watch_by_event(
62    path: &Option<PathBuf>,
63    event: &inotify_sys::Event<&OsStr>,
64    watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
65    add_watches: &mut Vec<PathBuf>,
66) {
67    if let Some(ref path) = *path {
68        if event.mask.contains(EventMask::ISDIR) {
69            if let Some(parent_path) = path.parent() {
70                if let Some(&(_, _, is_recursive, _)) = watches.get(parent_path) {
71                    if is_recursive {
72                        add_watches.push(path.to_owned());
73                    }
74                }
75            }
76        }
77    }
78}
79
80#[inline]
81fn remove_watch_by_event(
82    path: &Option<PathBuf>,
83    watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
84    remove_watches: &mut Vec<PathBuf>,
85) {
86    if let Some(ref path) = *path {
87        if watches.contains_key(path) {
88            remove_watches.push(path.to_owned());
89        }
90    }
91}
92
93impl EventLoop {
94    pub fn new(
95        inotify: Inotify,
96        event_handler: Box<dyn EventHandler>,
97        follow_links: bool,
98    ) -> Result<Self> {
99        let (event_loop_tx, event_loop_rx) = unbounded::<EventLoopMsg>();
100        let poll = mio::Poll::new()?;
101
102        let event_loop_waker = Arc::new(mio::Waker::new(poll.registry(), MESSAGE)?);
103
104        let inotify_fd = inotify.as_raw_fd();
105        let mut evented_inotify = mio::unix::SourceFd(&inotify_fd);
106        poll.registry()
107            .register(&mut evented_inotify, INOTIFY, mio::Interest::READABLE)?;
108
109        let event_loop = EventLoop {
110            running: true,
111            poll,
112            event_loop_waker,
113            event_loop_tx,
114            event_loop_rx,
115            inotify: Some(inotify),
116            event_handler,
117            watches: HashMap::new(),
118            paths: HashMap::new(),
119            rename_event: None,
120            follow_links,
121        };
122        Ok(event_loop)
123    }
124
125    // Run the event loop.
126    pub fn run(self) {
127        let _ = thread::Builder::new()
128            .name("notify-rs inotify loop".to_string())
129            .spawn(|| self.event_loop_thread());
130    }
131
132    fn event_loop_thread(mut self) {
133        let mut events = mio::Events::with_capacity(16);
134        loop {
135            // Wait for something to happen.
136            match self.poll.poll(&mut events, None) {
137                Err(ref e) if matches!(e.kind(), std::io::ErrorKind::Interrupted) => {
138                    // System call was interrupted, we will retry
139                    // TODO: Not covered by tests (to reproduce likely need to setup signal handlers)
140                }
141                Err(e) => panic!("poll failed: {}", e),
142                Ok(()) => {}
143            }
144
145            // Process whatever happened.
146            for event in &events {
147                self.handle_event(event);
148            }
149
150            // Stop, if we're done.
151            if !self.running {
152                break;
153            }
154        }
155    }
156
157    // Handle a single event.
158    fn handle_event(&mut self, event: &mio::event::Event) {
159        match event.token() {
160            MESSAGE => {
161                // The channel is readable - handle messages.
162                self.handle_messages()
163            }
164            INOTIFY => {
165                // inotify has something to tell us.
166                self.handle_inotify()
167            }
168            _ => unreachable!(),
169        }
170    }
171
172    fn handle_messages(&mut self) {
173        while let Ok(msg) = self.event_loop_rx.try_recv() {
174            match msg {
175                EventLoopMsg::AddWatch(path, recursive_mode, tx) => {
176                    let _ = tx.send(self.add_watch(path, recursive_mode.is_recursive(), true));
177                }
178                EventLoopMsg::RemoveWatch(path, tx) => {
179                    let _ = tx.send(self.remove_watch(path, false));
180                }
181                EventLoopMsg::Shutdown => {
182                    let _ = self.remove_all_watches();
183                    if let Some(inotify) = self.inotify.take() {
184                        let _ = inotify.close();
185                    }
186                    self.running = false;
187                    break;
188                }
189                EventLoopMsg::Configure(config, tx) => {
190                    self.configure_raw_mode(config, tx);
191                }
192            }
193        }
194    }
195
196    fn configure_raw_mode(&mut self, _config: Config, tx: BoundSender<Result<bool>>) {
197        tx.send(Ok(false))
198            .expect("configuration channel disconnected");
199    }
200
201    fn handle_inotify(&mut self) {
202        let mut add_watches = Vec::new();
203        let mut remove_watches = Vec::new();
204
205        if let Some(ref mut inotify) = self.inotify {
206            let mut buffer = [0; 1024];
207            // Read all buffers available.
208            loop {
209                match inotify.read_events(&mut buffer) {
210                    Ok(events) => {
211                        let mut num_events = 0;
212                        for event in events {
213                            log::trace!("inotify event: {event:?}");
214
215                            num_events += 1;
216                            if event.mask.contains(EventMask::Q_OVERFLOW) {
217                                let ev = Ok(Event::new(EventKind::Other).set_flag(Flag::Rescan));
218                                self.event_handler.handle_event(ev);
219                            }
220
221                            let path = match event.name {
222                                Some(name) => self.paths.get(&event.wd).map(|root| root.join(name)),
223                                None => self.paths.get(&event.wd).cloned(),
224                            };
225
226                            let mut evs = Vec::new();
227
228                            if event.mask.contains(EventMask::MOVED_FROM) {
229                                remove_watch_by_event(&path, &self.watches, &mut remove_watches);
230
231                                let event = Event::new(EventKind::Modify(ModifyKind::Name(
232                                    RenameMode::From,
233                                )))
234                                .add_some_path(path.clone())
235                                .set_tracker(event.cookie as usize);
236
237                                self.rename_event = Some(event.clone());
238
239                                evs.push(event);
240                            } else if event.mask.contains(EventMask::MOVED_TO) {
241                                evs.push(
242                                    Event::new(EventKind::Modify(ModifyKind::Name(RenameMode::To)))
243                                        .set_tracker(event.cookie as usize)
244                                        .add_some_path(path.clone()),
245                                );
246
247                                let trackers_match =
248                                    self.rename_event.as_ref().and_then(|e| e.tracker())
249                                        == Some(event.cookie as usize);
250
251                                if trackers_match {
252                                    let rename_event = self.rename_event.take().unwrap(); // unwrap is safe because `rename_event` must be set at this point
253                                    evs.push(
254                                        Event::new(EventKind::Modify(ModifyKind::Name(
255                                            RenameMode::Both,
256                                        )))
257                                        .set_tracker(event.cookie as usize)
258                                        .add_some_path(rename_event.paths.first().cloned())
259                                        .add_some_path(path.clone()),
260                                    );
261                                }
262                                add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
263                            }
264                            if event.mask.contains(EventMask::MOVE_SELF) {
265                                evs.push(
266                                    Event::new(EventKind::Modify(ModifyKind::Name(
267                                        RenameMode::From,
268                                    )))
269                                    .add_some_path(path.clone()),
270                                );
271                                // TODO stat the path and get to new path
272                                // - emit To and Both events
273                                // - change prefix for further events
274                            }
275                            if event.mask.contains(EventMask::CREATE) {
276                                evs.push(
277                                    Event::new(EventKind::Create(
278                                        if event.mask.contains(EventMask::ISDIR) {
279                                            CreateKind::Folder
280                                        } else {
281                                            CreateKind::File
282                                        },
283                                    ))
284                                    .add_some_path(path.clone()),
285                                );
286                                add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
287                            }
288                            if event.mask.contains(EventMask::DELETE) {
289                                evs.push(
290                                    Event::new(EventKind::Remove(
291                                        if event.mask.contains(EventMask::ISDIR) {
292                                            RemoveKind::Folder
293                                        } else {
294                                            RemoveKind::File
295                                        },
296                                    ))
297                                    .add_some_path(path.clone()),
298                                );
299                                remove_watch_by_event(&path, &self.watches, &mut remove_watches);
300                            }
301                            if event.mask.contains(EventMask::DELETE_SELF) {
302                                let remove_kind = match &path {
303                                    Some(watched_path) => {
304                                        let current_watch = self.watches.get(watched_path);
305                                        match current_watch {
306                                            Some(&(_, _, _, true)) => RemoveKind::Folder,
307                                            Some(&(_, _, _, false)) => RemoveKind::File,
308                                            None => RemoveKind::Other,
309                                        }
310                                    }
311                                    None => {
312                                        log::trace!(
313                                            "No patch for DELETE_SELF event, may be a bug?"
314                                        );
315                                        RemoveKind::Other
316                                    }
317                                };
318                                evs.push(
319                                    Event::new(EventKind::Remove(remove_kind))
320                                        .add_some_path(path.clone()),
321                                );
322                                remove_watch_by_event(&path, &self.watches, &mut remove_watches);
323                            }
324                            if event.mask.contains(EventMask::MODIFY) {
325                                evs.push(
326                                    Event::new(EventKind::Modify(ModifyKind::Data(
327                                        DataChange::Any,
328                                    )))
329                                    .add_some_path(path.clone()),
330                                );
331                            }
332                            if event.mask.contains(EventMask::CLOSE_WRITE) {
333                                evs.push(
334                                    Event::new(EventKind::Access(AccessKind::Close(
335                                        AccessMode::Write,
336                                    )))
337                                    .add_some_path(path.clone()),
338                                );
339                            }
340                            if event.mask.contains(EventMask::CLOSE_NOWRITE) {
341                                evs.push(
342                                    Event::new(EventKind::Access(AccessKind::Close(
343                                        AccessMode::Read,
344                                    )))
345                                    .add_some_path(path.clone()),
346                                );
347                            }
348                            if event.mask.contains(EventMask::ATTRIB) {
349                                evs.push(
350                                    Event::new(EventKind::Modify(ModifyKind::Metadata(
351                                        MetadataKind::Any,
352                                    )))
353                                    .add_some_path(path.clone()),
354                                );
355                            }
356                            if event.mask.contains(EventMask::OPEN) {
357                                evs.push(
358                                    Event::new(EventKind::Access(AccessKind::Open(
359                                        AccessMode::Any,
360                                    )))
361                                    .add_some_path(path.clone()),
362                                );
363                            }
364
365                            for ev in evs {
366                                self.event_handler.handle_event(Ok(ev));
367                            }
368                        }
369
370                        // All events read. Break out.
371                        if num_events == 0 {
372                            break;
373                        }
374                    }
375                    Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
376                        // No events read. Break out.
377                        break;
378                    }
379                    Err(e) => {
380                        self.event_handler.handle_event(Err(Error::io(e)));
381                    }
382                }
383            }
384        }
385
386        for path in remove_watches {
387            self.remove_watch(path, true).ok();
388        }
389
390        for path in add_watches {
391            self.add_watch(path, true, false).ok();
392        }
393    }
394
395    fn add_watch(&mut self, path: PathBuf, is_recursive: bool, mut watch_self: bool) -> Result<()> {
396        // If the watch is not recursive, or if we determine (by stat'ing the path to get its
397        // metadata) that the watched path is not a directory, add a single path watch.
398        if !is_recursive || !metadata(&path).map_err(Error::io_watch)?.is_dir() {
399            return self.add_single_watch(path, false, true);
400        }
401
402        for entry in WalkDir::new(path)
403            .follow_links(self.follow_links)
404            .into_iter()
405            .filter_map(filter_dir)
406        {
407            self.add_single_watch(entry.into_path(), is_recursive, watch_self)?;
408            watch_self = false;
409        }
410
411        Ok(())
412    }
413
414    fn add_single_watch(
415        &mut self,
416        path: PathBuf,
417        is_recursive: bool,
418        watch_self: bool,
419    ) -> Result<()> {
420        let mut watchmask = WatchMask::ATTRIB
421            | WatchMask::CREATE
422            | WatchMask::OPEN
423            | WatchMask::DELETE
424            | WatchMask::CLOSE_WRITE
425            | WatchMask::MODIFY
426            | WatchMask::MOVED_FROM
427            | WatchMask::MOVED_TO;
428
429        if watch_self {
430            watchmask.insert(WatchMask::DELETE_SELF);
431            watchmask.insert(WatchMask::MOVE_SELF);
432        }
433
434        if let Some(&(_, old_watchmask, _, _)) = self.watches.get(&path) {
435            watchmask.insert(old_watchmask);
436            watchmask.insert(WatchMask::MASK_ADD);
437        }
438
439        if let Some(ref mut inotify) = self.inotify {
440            log::trace!("adding inotify watch: {}", path.display());
441
442            match inotify.watches().add(&path, watchmask) {
443                Err(e) => {
444                    Err(if e.raw_os_error() == Some(libc::ENOSPC) {
445                        // do not report inotify limits as "no more space" on linux #266
446                        Error::new(ErrorKind::MaxFilesWatch)
447                    } else if e.kind() == std::io::ErrorKind::NotFound {
448                        Error::new(ErrorKind::PathNotFound)
449                    } else {
450                        Error::io(e)
451                    }
452                    .add_path(path))
453                }
454                Ok(w) => {
455                    watchmask.remove(WatchMask::MASK_ADD);
456                    let is_dir = metadata(&path).map_err(Error::io)?.is_dir();
457                    self.watches
458                        .insert(path.clone(), (w.clone(), watchmask, is_recursive, is_dir));
459                    self.paths.insert(w, path);
460                    Ok(())
461                }
462            }
463        } else {
464            Ok(())
465        }
466    }
467
468    fn remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()> {
469        match self.watches.remove(&path) {
470            None => return Err(Error::watch_not_found().add_path(path)),
471            Some((w, _, is_recursive, _)) => {
472                if let Some(ref mut inotify) = self.inotify {
473                    let mut inotify_watches = inotify.watches();
474                    log::trace!("removing inotify watch: {}", path.display());
475
476                    inotify_watches
477                        .remove(w.clone())
478                        .map_err(|e| Error::io(e).add_path(path.clone()))?;
479                    self.paths.remove(&w);
480
481                    if is_recursive || remove_recursive {
482                        let mut remove_list = Vec::new();
483                        for (w, p) in &self.paths {
484                            if p.starts_with(&path) {
485                                inotify_watches
486                                    .remove(w.clone())
487                                    .map_err(|e| Error::io(e).add_path(p.into()))?;
488                                self.watches.remove(p);
489                                remove_list.push(w.clone());
490                            }
491                        }
492                        for w in remove_list {
493                            self.paths.remove(&w);
494                        }
495                    }
496                }
497            }
498        }
499        Ok(())
500    }
501
502    fn remove_all_watches(&mut self) -> Result<()> {
503        if let Some(ref mut inotify) = self.inotify {
504            let mut inotify_watches = inotify.watches();
505            for (w, p) in &self.paths {
506                inotify_watches
507                    .remove(w.clone())
508                    .map_err(|e| Error::io(e).add_path(p.into()))?;
509            }
510            self.watches.clear();
511            self.paths.clear();
512        }
513        Ok(())
514    }
515}
516
517/// return `DirEntry` when it is a directory
518fn filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry> {
519    if let Ok(e) = e {
520        if let Ok(metadata) = e.metadata() {
521            if metadata.is_dir() {
522                return Some(e);
523            }
524        }
525    }
526    None
527}
528
529impl INotifyWatcher {
530    fn from_event_handler(
531        event_handler: Box<dyn EventHandler>,
532        follow_links: bool,
533    ) -> Result<Self> {
534        let inotify = Inotify::init()?;
535        let event_loop = EventLoop::new(inotify, event_handler, follow_links)?;
536        let channel = event_loop.event_loop_tx.clone();
537        let waker = event_loop.event_loop_waker.clone();
538        event_loop.run();
539        Ok(INotifyWatcher { channel, waker })
540    }
541
542    fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
543        let pb = if path.is_absolute() {
544            path.to_owned()
545        } else {
546            let p = env::current_dir().map_err(Error::io)?;
547            p.join(path)
548        };
549        let (tx, rx) = unbounded();
550        let msg = EventLoopMsg::AddWatch(pb, recursive_mode, tx);
551
552        // we expect the event loop to live and reply => unwraps must not panic
553        self.channel.send(msg).unwrap();
554        self.waker.wake().unwrap();
555        rx.recv().unwrap()
556    }
557
558    fn unwatch_inner(&mut self, path: &Path) -> Result<()> {
559        let pb = if path.is_absolute() {
560            path.to_owned()
561        } else {
562            let p = env::current_dir().map_err(Error::io)?;
563            p.join(path)
564        };
565        let (tx, rx) = unbounded();
566        let msg = EventLoopMsg::RemoveWatch(pb, tx);
567
568        // we expect the event loop to live and reply => unwraps must not panic
569        self.channel.send(msg).unwrap();
570        self.waker.wake().unwrap();
571        rx.recv().unwrap()
572    }
573}
574
575impl Watcher for INotifyWatcher {
576    /// Create a new watcher.
577    fn new<F: EventHandler>(event_handler: F, config: Config) -> Result<Self> {
578        Self::from_event_handler(Box::new(event_handler), config.follow_symlinks())
579    }
580
581    fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
582        self.watch_inner(path, recursive_mode)
583    }
584
585    fn unwatch(&mut self, path: &Path) -> Result<()> {
586        self.unwatch_inner(path)
587    }
588
589    fn configure(&mut self, config: Config) -> Result<bool> {
590        let (tx, rx) = bounded(1);
591        self.channel.send(EventLoopMsg::Configure(config, tx))?;
592        self.waker.wake()?;
593        rx.recv()?
594    }
595
596    fn kind() -> crate::WatcherKind {
597        crate::WatcherKind::Inotify
598    }
599}
600
601impl Drop for INotifyWatcher {
602    fn drop(&mut self) {
603        // we expect the event loop to live => unwrap must not panic
604        self.channel.send(EventLoopMsg::Shutdown).unwrap();
605        self.waker.wake().unwrap();
606    }
607}
608
609#[test]
610fn inotify_watcher_is_send_and_sync() {
611    fn check<T: Send + Sync>() {}
612    check::<INotifyWatcher>();
613}
614
615#[test]
616fn native_error_type_on_missing_path() {
617    let mut watcher = INotifyWatcher::new(|_| {}, Config::default()).unwrap();
618
619    let result = watcher.watch(
620        &PathBuf::from("/some/non/existant/path"),
621        RecursiveMode::NonRecursive,
622    );
623
624    assert!(matches!(
625        result,
626        Err(Error {
627            paths: _,
628            kind: ErrorKind::PathNotFound
629        })
630    ))
631}