1use super::event::*;
8use super::{Config, Error, ErrorKind, EventHandler, RecursiveMode, Result, Watcher};
9use crate::{bounded, unbounded, BoundSender, Receiver, Sender};
10use inotify as inotify_sys;
11use inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask};
12use std::collections::HashMap;
13use std::env;
14use std::ffi::OsStr;
15use std::fs::metadata;
16use std::os::unix::io::AsRawFd;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19use std::thread;
20use walkdir::WalkDir;
21
22const INOTIFY: mio::Token = mio::Token(0);
23const MESSAGE: mio::Token = mio::Token(1);
24
25struct EventLoop {
32 running: bool,
33 poll: mio::Poll,
34 event_loop_waker: Arc<mio::Waker>,
35 event_loop_tx: Sender<EventLoopMsg>,
36 event_loop_rx: Receiver<EventLoopMsg>,
37 inotify: Option<Inotify>,
38 event_handler: Box<dyn EventHandler>,
39 watches: HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
41 paths: HashMap<WatchDescriptor, PathBuf>,
42 rename_event: Option<Event>,
43 follow_links: bool,
44}
45
46#[derive(Debug)]
48pub struct INotifyWatcher {
49 channel: Sender<EventLoopMsg>,
50 waker: Arc<mio::Waker>,
51}
52
53enum EventLoopMsg {
54 AddWatch(PathBuf, RecursiveMode, Sender<Result<()>>),
55 RemoveWatch(PathBuf, Sender<Result<()>>),
56 Shutdown,
57 Configure(Config, BoundSender<Result<bool>>),
58}
59
60#[inline]
61fn add_watch_by_event(
62 path: &Option<PathBuf>,
63 event: &inotify_sys::Event<&OsStr>,
64 watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
65 add_watches: &mut Vec<PathBuf>,
66) {
67 if let Some(ref path) = *path {
68 if event.mask.contains(EventMask::ISDIR) {
69 if let Some(parent_path) = path.parent() {
70 if let Some(&(_, _, is_recursive, _)) = watches.get(parent_path) {
71 if is_recursive {
72 add_watches.push(path.to_owned());
73 }
74 }
75 }
76 }
77 }
78}
79
80#[inline]
81fn remove_watch_by_event(
82 path: &Option<PathBuf>,
83 watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
84 remove_watches: &mut Vec<PathBuf>,
85) {
86 if let Some(ref path) = *path {
87 if watches.contains_key(path) {
88 remove_watches.push(path.to_owned());
89 }
90 }
91}
92
93impl EventLoop {
94 pub fn new(
95 inotify: Inotify,
96 event_handler: Box<dyn EventHandler>,
97 follow_links: bool,
98 ) -> Result<Self> {
99 let (event_loop_tx, event_loop_rx) = unbounded::<EventLoopMsg>();
100 let poll = mio::Poll::new()?;
101
102 let event_loop_waker = Arc::new(mio::Waker::new(poll.registry(), MESSAGE)?);
103
104 let inotify_fd = inotify.as_raw_fd();
105 let mut evented_inotify = mio::unix::SourceFd(&inotify_fd);
106 poll.registry()
107 .register(&mut evented_inotify, INOTIFY, mio::Interest::READABLE)?;
108
109 let event_loop = EventLoop {
110 running: true,
111 poll,
112 event_loop_waker,
113 event_loop_tx,
114 event_loop_rx,
115 inotify: Some(inotify),
116 event_handler,
117 watches: HashMap::new(),
118 paths: HashMap::new(),
119 rename_event: None,
120 follow_links,
121 };
122 Ok(event_loop)
123 }
124
125 pub fn run(self) {
127 let _ = thread::Builder::new()
128 .name("notify-rs inotify loop".to_string())
129 .spawn(|| self.event_loop_thread());
130 }
131
132 fn event_loop_thread(mut self) {
133 let mut events = mio::Events::with_capacity(16);
134 loop {
135 match self.poll.poll(&mut events, None) {
137 Err(ref e) if matches!(e.kind(), std::io::ErrorKind::Interrupted) => {
138 }
141 Err(e) => panic!("poll failed: {}", e),
142 Ok(()) => {}
143 }
144
145 for event in &events {
147 self.handle_event(event);
148 }
149
150 if !self.running {
152 break;
153 }
154 }
155 }
156
157 fn handle_event(&mut self, event: &mio::event::Event) {
159 match event.token() {
160 MESSAGE => {
161 self.handle_messages()
163 }
164 INOTIFY => {
165 self.handle_inotify()
167 }
168 _ => unreachable!(),
169 }
170 }
171
172 fn handle_messages(&mut self) {
173 while let Ok(msg) = self.event_loop_rx.try_recv() {
174 match msg {
175 EventLoopMsg::AddWatch(path, recursive_mode, tx) => {
176 let _ = tx.send(self.add_watch(path, recursive_mode.is_recursive(), true));
177 }
178 EventLoopMsg::RemoveWatch(path, tx) => {
179 let _ = tx.send(self.remove_watch(path, false));
180 }
181 EventLoopMsg::Shutdown => {
182 let _ = self.remove_all_watches();
183 if let Some(inotify) = self.inotify.take() {
184 let _ = inotify.close();
185 }
186 self.running = false;
187 break;
188 }
189 EventLoopMsg::Configure(config, tx) => {
190 self.configure_raw_mode(config, tx);
191 }
192 }
193 }
194 }
195
196 fn configure_raw_mode(&mut self, _config: Config, tx: BoundSender<Result<bool>>) {
197 tx.send(Ok(false))
198 .expect("configuration channel disconnected");
199 }
200
201 fn handle_inotify(&mut self) {
202 let mut add_watches = Vec::new();
203 let mut remove_watches = Vec::new();
204
205 if let Some(ref mut inotify) = self.inotify {
206 let mut buffer = [0; 1024];
207 loop {
209 match inotify.read_events(&mut buffer) {
210 Ok(events) => {
211 let mut num_events = 0;
212 for event in events {
213 log::trace!("inotify event: {event:?}");
214
215 num_events += 1;
216 if event.mask.contains(EventMask::Q_OVERFLOW) {
217 let ev = Ok(Event::new(EventKind::Other).set_flag(Flag::Rescan));
218 self.event_handler.handle_event(ev);
219 }
220
221 let path = match event.name {
222 Some(name) => self.paths.get(&event.wd).map(|root| root.join(name)),
223 None => self.paths.get(&event.wd).cloned(),
224 };
225
226 let mut evs = Vec::new();
227
228 if event.mask.contains(EventMask::MOVED_FROM) {
229 remove_watch_by_event(&path, &self.watches, &mut remove_watches);
230
231 let event = Event::new(EventKind::Modify(ModifyKind::Name(
232 RenameMode::From,
233 )))
234 .add_some_path(path.clone())
235 .set_tracker(event.cookie as usize);
236
237 self.rename_event = Some(event.clone());
238
239 evs.push(event);
240 } else if event.mask.contains(EventMask::MOVED_TO) {
241 evs.push(
242 Event::new(EventKind::Modify(ModifyKind::Name(RenameMode::To)))
243 .set_tracker(event.cookie as usize)
244 .add_some_path(path.clone()),
245 );
246
247 let trackers_match =
248 self.rename_event.as_ref().and_then(|e| e.tracker())
249 == Some(event.cookie as usize);
250
251 if trackers_match {
252 let rename_event = self.rename_event.take().unwrap(); evs.push(
254 Event::new(EventKind::Modify(ModifyKind::Name(
255 RenameMode::Both,
256 )))
257 .set_tracker(event.cookie as usize)
258 .add_some_path(rename_event.paths.first().cloned())
259 .add_some_path(path.clone()),
260 );
261 }
262 add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
263 }
264 if event.mask.contains(EventMask::MOVE_SELF) {
265 evs.push(
266 Event::new(EventKind::Modify(ModifyKind::Name(
267 RenameMode::From,
268 )))
269 .add_some_path(path.clone()),
270 );
271 }
275 if event.mask.contains(EventMask::CREATE) {
276 evs.push(
277 Event::new(EventKind::Create(
278 if event.mask.contains(EventMask::ISDIR) {
279 CreateKind::Folder
280 } else {
281 CreateKind::File
282 },
283 ))
284 .add_some_path(path.clone()),
285 );
286 add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
287 }
288 if event.mask.contains(EventMask::DELETE) {
289 evs.push(
290 Event::new(EventKind::Remove(
291 if event.mask.contains(EventMask::ISDIR) {
292 RemoveKind::Folder
293 } else {
294 RemoveKind::File
295 },
296 ))
297 .add_some_path(path.clone()),
298 );
299 remove_watch_by_event(&path, &self.watches, &mut remove_watches);
300 }
301 if event.mask.contains(EventMask::DELETE_SELF) {
302 let remove_kind = match &path {
303 Some(watched_path) => {
304 let current_watch = self.watches.get(watched_path);
305 match current_watch {
306 Some(&(_, _, _, true)) => RemoveKind::Folder,
307 Some(&(_, _, _, false)) => RemoveKind::File,
308 None => RemoveKind::Other,
309 }
310 }
311 None => {
312 log::trace!(
313 "No patch for DELETE_SELF event, may be a bug?"
314 );
315 RemoveKind::Other
316 }
317 };
318 evs.push(
319 Event::new(EventKind::Remove(remove_kind))
320 .add_some_path(path.clone()),
321 );
322 remove_watch_by_event(&path, &self.watches, &mut remove_watches);
323 }
324 if event.mask.contains(EventMask::MODIFY) {
325 evs.push(
326 Event::new(EventKind::Modify(ModifyKind::Data(
327 DataChange::Any,
328 )))
329 .add_some_path(path.clone()),
330 );
331 }
332 if event.mask.contains(EventMask::CLOSE_WRITE) {
333 evs.push(
334 Event::new(EventKind::Access(AccessKind::Close(
335 AccessMode::Write,
336 )))
337 .add_some_path(path.clone()),
338 );
339 }
340 if event.mask.contains(EventMask::CLOSE_NOWRITE) {
341 evs.push(
342 Event::new(EventKind::Access(AccessKind::Close(
343 AccessMode::Read,
344 )))
345 .add_some_path(path.clone()),
346 );
347 }
348 if event.mask.contains(EventMask::ATTRIB) {
349 evs.push(
350 Event::new(EventKind::Modify(ModifyKind::Metadata(
351 MetadataKind::Any,
352 )))
353 .add_some_path(path.clone()),
354 );
355 }
356 if event.mask.contains(EventMask::OPEN) {
357 evs.push(
358 Event::new(EventKind::Access(AccessKind::Open(
359 AccessMode::Any,
360 )))
361 .add_some_path(path.clone()),
362 );
363 }
364
365 for ev in evs {
366 self.event_handler.handle_event(Ok(ev));
367 }
368 }
369
370 if num_events == 0 {
372 break;
373 }
374 }
375 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
376 break;
378 }
379 Err(e) => {
380 self.event_handler.handle_event(Err(Error::io(e)));
381 }
382 }
383 }
384 }
385
386 for path in remove_watches {
387 self.remove_watch(path, true).ok();
388 }
389
390 for path in add_watches {
391 self.add_watch(path, true, false).ok();
392 }
393 }
394
395 fn add_watch(&mut self, path: PathBuf, is_recursive: bool, mut watch_self: bool) -> Result<()> {
396 if !is_recursive || !metadata(&path).map_err(Error::io_watch)?.is_dir() {
399 return self.add_single_watch(path, false, true);
400 }
401
402 for entry in WalkDir::new(path)
403 .follow_links(self.follow_links)
404 .into_iter()
405 .filter_map(filter_dir)
406 {
407 self.add_single_watch(entry.into_path(), is_recursive, watch_self)?;
408 watch_self = false;
409 }
410
411 Ok(())
412 }
413
414 fn add_single_watch(
415 &mut self,
416 path: PathBuf,
417 is_recursive: bool,
418 watch_self: bool,
419 ) -> Result<()> {
420 let mut watchmask = WatchMask::ATTRIB
421 | WatchMask::CREATE
422 | WatchMask::OPEN
423 | WatchMask::DELETE
424 | WatchMask::CLOSE_WRITE
425 | WatchMask::MODIFY
426 | WatchMask::MOVED_FROM
427 | WatchMask::MOVED_TO;
428
429 if watch_self {
430 watchmask.insert(WatchMask::DELETE_SELF);
431 watchmask.insert(WatchMask::MOVE_SELF);
432 }
433
434 if let Some(&(_, old_watchmask, _, _)) = self.watches.get(&path) {
435 watchmask.insert(old_watchmask);
436 watchmask.insert(WatchMask::MASK_ADD);
437 }
438
439 if let Some(ref mut inotify) = self.inotify {
440 log::trace!("adding inotify watch: {}", path.display());
441
442 match inotify.watches().add(&path, watchmask) {
443 Err(e) => {
444 Err(if e.raw_os_error() == Some(libc::ENOSPC) {
445 Error::new(ErrorKind::MaxFilesWatch)
447 } else if e.kind() == std::io::ErrorKind::NotFound {
448 Error::new(ErrorKind::PathNotFound)
449 } else {
450 Error::io(e)
451 }
452 .add_path(path))
453 }
454 Ok(w) => {
455 watchmask.remove(WatchMask::MASK_ADD);
456 let is_dir = metadata(&path).map_err(Error::io)?.is_dir();
457 self.watches
458 .insert(path.clone(), (w.clone(), watchmask, is_recursive, is_dir));
459 self.paths.insert(w, path);
460 Ok(())
461 }
462 }
463 } else {
464 Ok(())
465 }
466 }
467
468 fn remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()> {
469 match self.watches.remove(&path) {
470 None => return Err(Error::watch_not_found().add_path(path)),
471 Some((w, _, is_recursive, _)) => {
472 if let Some(ref mut inotify) = self.inotify {
473 let mut inotify_watches = inotify.watches();
474 log::trace!("removing inotify watch: {}", path.display());
475
476 inotify_watches
477 .remove(w.clone())
478 .map_err(|e| Error::io(e).add_path(path.clone()))?;
479 self.paths.remove(&w);
480
481 if is_recursive || remove_recursive {
482 let mut remove_list = Vec::new();
483 for (w, p) in &self.paths {
484 if p.starts_with(&path) {
485 inotify_watches
486 .remove(w.clone())
487 .map_err(|e| Error::io(e).add_path(p.into()))?;
488 self.watches.remove(p);
489 remove_list.push(w.clone());
490 }
491 }
492 for w in remove_list {
493 self.paths.remove(&w);
494 }
495 }
496 }
497 }
498 }
499 Ok(())
500 }
501
502 fn remove_all_watches(&mut self) -> Result<()> {
503 if let Some(ref mut inotify) = self.inotify {
504 let mut inotify_watches = inotify.watches();
505 for (w, p) in &self.paths {
506 inotify_watches
507 .remove(w.clone())
508 .map_err(|e| Error::io(e).add_path(p.into()))?;
509 }
510 self.watches.clear();
511 self.paths.clear();
512 }
513 Ok(())
514 }
515}
516
517fn filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry> {
519 if let Ok(e) = e {
520 if let Ok(metadata) = e.metadata() {
521 if metadata.is_dir() {
522 return Some(e);
523 }
524 }
525 }
526 None
527}
528
529impl INotifyWatcher {
530 fn from_event_handler(
531 event_handler: Box<dyn EventHandler>,
532 follow_links: bool,
533 ) -> Result<Self> {
534 let inotify = Inotify::init()?;
535 let event_loop = EventLoop::new(inotify, event_handler, follow_links)?;
536 let channel = event_loop.event_loop_tx.clone();
537 let waker = event_loop.event_loop_waker.clone();
538 event_loop.run();
539 Ok(INotifyWatcher { channel, waker })
540 }
541
542 fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
543 let pb = if path.is_absolute() {
544 path.to_owned()
545 } else {
546 let p = env::current_dir().map_err(Error::io)?;
547 p.join(path)
548 };
549 let (tx, rx) = unbounded();
550 let msg = EventLoopMsg::AddWatch(pb, recursive_mode, tx);
551
552 self.channel.send(msg).unwrap();
554 self.waker.wake().unwrap();
555 rx.recv().unwrap()
556 }
557
558 fn unwatch_inner(&mut self, path: &Path) -> Result<()> {
559 let pb = if path.is_absolute() {
560 path.to_owned()
561 } else {
562 let p = env::current_dir().map_err(Error::io)?;
563 p.join(path)
564 };
565 let (tx, rx) = unbounded();
566 let msg = EventLoopMsg::RemoveWatch(pb, tx);
567
568 self.channel.send(msg).unwrap();
570 self.waker.wake().unwrap();
571 rx.recv().unwrap()
572 }
573}
574
575impl Watcher for INotifyWatcher {
576 fn new<F: EventHandler>(event_handler: F, config: Config) -> Result<Self> {
578 Self::from_event_handler(Box::new(event_handler), config.follow_symlinks())
579 }
580
581 fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
582 self.watch_inner(path, recursive_mode)
583 }
584
585 fn unwatch(&mut self, path: &Path) -> Result<()> {
586 self.unwatch_inner(path)
587 }
588
589 fn configure(&mut self, config: Config) -> Result<bool> {
590 let (tx, rx) = bounded(1);
591 self.channel.send(EventLoopMsg::Configure(config, tx))?;
592 self.waker.wake()?;
593 rx.recv()?
594 }
595
596 fn kind() -> crate::WatcherKind {
597 crate::WatcherKind::Inotify
598 }
599}
600
601impl Drop for INotifyWatcher {
602 fn drop(&mut self) {
603 self.channel.send(EventLoopMsg::Shutdown).unwrap();
605 self.waker.wake().unwrap();
606 }
607}
608
609#[test]
610fn inotify_watcher_is_send_and_sync() {
611 fn check<T: Send + Sync>() {}
612 check::<INotifyWatcher>();
613}
614
615#[test]
616fn native_error_type_on_missing_path() {
617 let mut watcher = INotifyWatcher::new(|_| {}, Config::default()).unwrap();
618
619 let result = watcher.watch(
620 &PathBuf::from("/some/non/existant/path"),
621 RecursiveMode::NonRecursive,
622 );
623
624 assert!(matches!(
625 result,
626 Err(Error {
627 paths: _,
628 kind: ErrorKind::PathNotFound
629 })
630 ))
631}