notify/
poll.rs

1//! Generic Watcher implementation based on polling
2//!
3//! Checks the `watch`ed paths periodically to detect changes. This implementation only uses
4//! Rust stdlib APIs and should work on all of the platforms it supports.
5
6use crate::{unbounded, Config, Error, EventHandler, Receiver, RecursiveMode, Sender, Watcher};
7use std::{
8    collections::HashMap,
9    path::{Path, PathBuf},
10    sync::{
11        atomic::{AtomicBool, Ordering},
12        Arc, Mutex,
13    },
14    thread,
15    time::Duration,
16};
17
18/// Event sent for registered handlers on initial directory scans
19pub type ScanEvent = crate::Result<PathBuf>;
20
21/// Handler trait for receivers of [`ScanEvent`].
22/// Very much the same as [`EventHandler`], but including the Result.
23///
24/// See the full example for more information.
25pub trait ScanEventHandler: Send + 'static {
26    /// Handles an event.
27    fn handle_event(&mut self, event: ScanEvent);
28}
29
30impl<F> ScanEventHandler for F
31where
32    F: FnMut(ScanEvent) + Send + 'static,
33{
34    fn handle_event(&mut self, event: ScanEvent) {
35        (self)(event);
36    }
37}
38
39#[cfg(feature = "crossbeam-channel")]
40impl ScanEventHandler for crossbeam_channel::Sender<ScanEvent> {
41    fn handle_event(&mut self, event: ScanEvent) {
42        let _ = self.send(event);
43    }
44}
45
46#[cfg(feature = "flume")]
47impl ScanEventHandler for flume::Sender<ScanEvent> {
48    fn handle_event(&mut self, event: ScanEvent) {
49        let _ = self.send(event);
50    }
51}
52
53impl ScanEventHandler for std::sync::mpsc::Sender<ScanEvent> {
54    fn handle_event(&mut self, event: ScanEvent) {
55        let _ = self.send(event);
56    }
57}
58
59impl ScanEventHandler for () {
60    fn handle_event(&mut self, _event: ScanEvent) {}
61}
62
63use data::{DataBuilder, WatchData};
64mod data {
65    use crate::{
66        event::{CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind},
67        EventHandler,
68    };
69    use std::{
70        cell::RefCell,
71        collections::{hash_map::RandomState, HashMap},
72        fmt::{self, Debug},
73        fs::{self, File, Metadata},
74        hash::{BuildHasher, Hasher},
75        io::{self, Read},
76        path::{Path, PathBuf},
77        time::Instant,
78    };
79    use walkdir::WalkDir;
80
81    use super::ScanEventHandler;
82
83    fn system_time_to_seconds(time: std::time::SystemTime) -> i64 {
84        match time.duration_since(std::time::SystemTime::UNIX_EPOCH) {
85            Ok(d) => d.as_secs() as i64,
86            Err(e) => -(e.duration().as_secs() as i64),
87        }
88    }
89
90    /// Builder for [`WatchData`] & [`PathData`].
91    pub(super) struct DataBuilder {
92        emitter: EventEmitter,
93        scan_emitter: Option<Box<RefCell<dyn ScanEventHandler>>>,
94
95        // TODO: May allow user setup their custom BuildHasher / BuildHasherDefault
96        // in future.
97        build_hasher: Option<RandomState>,
98
99        // current timestamp for building Data.
100        now: Instant,
101    }
102
103    impl DataBuilder {
104        pub(super) fn new<F, G>(
105            event_handler: F,
106            compare_content: bool,
107            scan_emitter: Option<G>,
108        ) -> Self
109        where
110            F: EventHandler,
111            G: ScanEventHandler,
112        {
113            let scan_emitter = match scan_emitter {
114                None => None,
115                Some(v) => {
116                    // workaround for a weird type resolution bug when directly going to dyn Trait
117                    let intermediate: Box<RefCell<dyn ScanEventHandler>> =
118                        Box::new(RefCell::new(v));
119                    Some(intermediate)
120                }
121            };
122            Self {
123                emitter: EventEmitter::new(event_handler),
124                scan_emitter,
125                build_hasher: compare_content.then(RandomState::default),
126                now: Instant::now(),
127            }
128        }
129
130        /// Update internal timestamp.
131        pub(super) fn update_timestamp(&mut self) {
132            self.now = Instant::now();
133        }
134
135        /// Create [`WatchData`].
136        ///
137        /// This function will return `Err(_)` if can not retrieve metadata from
138        /// the path location. (e.g., not found).
139        pub(super) fn build_watch_data(
140            &self,
141            root: PathBuf,
142            is_recursive: bool,
143            follow_symlinks: bool,
144        ) -> Option<WatchData> {
145            WatchData::new(self, root, is_recursive, follow_symlinks)
146        }
147
148        /// Create [`PathData`].
149        fn build_path_data(&self, meta_path: &MetaPath) -> PathData {
150            PathData::new(self, meta_path)
151        }
152    }
153
154    impl Debug for DataBuilder {
155        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
156            f.debug_struct("DataBuilder")
157                .field("build_hasher", &self.build_hasher)
158                .field("now", &self.now)
159                .finish()
160        }
161    }
162
163    #[derive(Debug)]
164    pub(super) struct WatchData {
165        // config part, won't change.
166        root: PathBuf,
167        is_recursive: bool,
168        follow_symlinks: bool,
169
170        // current status part.
171        all_path_data: HashMap<PathBuf, PathData>,
172    }
173
174    impl WatchData {
175        /// Scan filesystem and create a new `WatchData`.
176        ///
177        /// # Side effect
178        ///
179        /// This function may send event by `data_builder.emitter`.
180        fn new(
181            data_builder: &DataBuilder,
182            root: PathBuf,
183            is_recursive: bool,
184            follow_symlinks: bool,
185        ) -> Option<Self> {
186            // If metadata read error at `root` path, it will emit
187            // a error event and stop to create the whole `WatchData`.
188            //
189            // QUESTION: inconsistent?
190            //
191            // When user try to *CREATE* a watch by `poll_watcher.watch(root, ..)`,
192            // if `root` path hit an io error, then watcher will reject to
193            // create this new watch.
194            //
195            // This may inconsistent with *POLLING* a watch. When watcher
196            // continue polling, io error at root path will not delete
197            // a existing watch. polling still working.
198            //
199            // So, consider a config file may not exists at first time but may
200            // create after a while, developer cannot watch it.
201            //
202            // FIXME: Can we always allow to watch a path, even file not
203            // found at this path?
204            if let Err(e) = fs::metadata(&root) {
205                data_builder.emitter.emit_io_err(e, Some(&root));
206                return None;
207            }
208
209            let all_path_data = Self::scan_all_path_data(
210                data_builder,
211                root.clone(),
212                is_recursive,
213                follow_symlinks,
214                true,
215            )
216            .collect();
217
218            Some(Self {
219                root,
220                is_recursive,
221                follow_symlinks,
222                all_path_data,
223            })
224        }
225
226        /// Rescan filesystem and update this `WatchData`.
227        ///
228        /// # Side effect
229        ///
230        /// This function may emit event by `data_builder.emitter`.
231        pub(super) fn rescan(&mut self, data_builder: &mut DataBuilder) {
232            // scan current filesystem.
233            for (path, new_path_data) in Self::scan_all_path_data(
234                data_builder,
235                self.root.clone(),
236                self.is_recursive,
237                self.follow_symlinks,
238                false,
239            ) {
240                let old_path_data = self
241                    .all_path_data
242                    .insert(path.clone(), new_path_data.clone());
243
244                // emit event
245                let event =
246                    PathData::compare_to_event(path, old_path_data.as_ref(), Some(&new_path_data));
247                if let Some(event) = event {
248                    data_builder.emitter.emit_ok(event);
249                }
250            }
251
252            // scan for disappeared paths.
253            let mut disappeared_paths = Vec::new();
254            for (path, path_data) in self.all_path_data.iter() {
255                if path_data.last_check < data_builder.now {
256                    disappeared_paths.push(path.clone());
257                }
258            }
259
260            // remove disappeared paths
261            for path in disappeared_paths {
262                let old_path_data = self.all_path_data.remove(&path);
263
264                // emit event
265                let event = PathData::compare_to_event(path, old_path_data.as_ref(), None);
266                if let Some(event) = event {
267                    data_builder.emitter.emit_ok(event);
268                }
269            }
270        }
271
272        /// Get all `PathData` by given configuration.
273        ///
274        /// # Side Effect
275        ///
276        /// This function may emit some IO Error events by `data_builder.emitter`.
277        fn scan_all_path_data(
278            data_builder: &'_ DataBuilder,
279            root: PathBuf,
280            is_recursive: bool,
281            follow_symlinks: bool,
282            // whether this is an initial scan, used only for events
283            is_initial: bool,
284        ) -> impl Iterator<Item = (PathBuf, PathData)> + '_ {
285            log::trace!("rescanning {root:?}");
286            // WalkDir return only one entry if root is a file (not a folder),
287            // so we can use single logic to do the both file & dir's jobs.
288            //
289            // See: https://docs.rs/walkdir/2.0.1/walkdir/struct.WalkDir.html#method.new
290            WalkDir::new(root)
291                .follow_links(follow_symlinks)
292                .max_depth(Self::dir_scan_depth(is_recursive))
293                .into_iter()
294                .filter_map(|entry_res| match entry_res {
295                    Ok(entry) => Some(entry),
296                    Err(err) => {
297                        log::warn!("walkdir error scanning {err:?}");
298                        if let Some(io_error) = err.io_error() {
299                            // clone an io::Error, so we have to create a new one.
300                            let new_io_error = io::Error::new(io_error.kind(), err.to_string());
301                            data_builder.emitter.emit_io_err(new_io_error, err.path());
302                        } else {
303                            let crate_err =
304                                crate::Error::new(crate::ErrorKind::Generic(err.to_string()));
305                            data_builder.emitter.emit(Err(crate_err));
306                        }
307                        None
308                    }
309                })
310                .filter_map(move |entry| match entry.metadata() {
311                    Ok(metadata) => {
312                        let path = entry.into_path();
313                        if is_initial {
314                            // emit initial scans
315                            if let Some(ref emitter) = data_builder.scan_emitter {
316                                emitter.borrow_mut().handle_event(Ok(path.clone()));
317                            }
318                        }
319                        let meta_path = MetaPath::from_parts_unchecked(path, metadata);
320                        let data_path = data_builder.build_path_data(&meta_path);
321
322                        Some((meta_path.into_path(), data_path))
323                    }
324                    Err(e) => {
325                        // emit event.
326                        let path = entry.into_path();
327                        data_builder.emitter.emit_io_err(e, Some(path));
328
329                        None
330                    }
331                })
332        }
333
334        fn dir_scan_depth(is_recursive: bool) -> usize {
335            if is_recursive {
336                usize::MAX
337            } else {
338                1
339            }
340        }
341    }
342
343    /// Stored data for a one path locations.
344    ///
345    /// See [`WatchData`] for more detail.
346    #[derive(Debug, Clone)]
347    struct PathData {
348        /// File updated time.
349        mtime: i64,
350
351        /// Content's hash value, only available if user request compare file
352        /// contents and read successful.
353        hash: Option<u64>,
354
355        /// Checked time.
356        last_check: Instant,
357    }
358
359    impl PathData {
360        /// Create a new `PathData`.
361        fn new(data_builder: &DataBuilder, meta_path: &MetaPath) -> PathData {
362            let metadata = meta_path.metadata();
363
364            PathData {
365                mtime: metadata.modified().map_or(0, system_time_to_seconds),
366                hash: data_builder
367                    .build_hasher
368                    .as_ref()
369                    .filter(|_| metadata.is_file())
370                    .and_then(|build_hasher| {
371                        Self::get_content_hash(build_hasher, meta_path.path()).ok()
372                    }),
373
374                last_check: data_builder.now,
375            }
376        }
377
378        /// Get hash value for the data content in given file `path`.
379        fn get_content_hash(build_hasher: &RandomState, path: &Path) -> io::Result<u64> {
380            let mut hasher = build_hasher.build_hasher();
381            let mut file = File::open(path)?;
382            let mut buf = [0; 512];
383
384            loop {
385                let n = match file.read(&mut buf) {
386                    Ok(0) => break,
387                    Ok(len) => len,
388                    Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
389                    Err(e) => return Err(e),
390                };
391
392                hasher.write(&buf[..n]);
393            }
394
395            Ok(hasher.finish())
396        }
397
398        /// Get [`Event`] by compare two optional [`PathData`].
399        fn compare_to_event<P>(
400            path: P,
401            old: Option<&PathData>,
402            new: Option<&PathData>,
403        ) -> Option<Event>
404        where
405            P: Into<PathBuf>,
406        {
407            match (old, new) {
408                (Some(old), Some(new)) => {
409                    if new.mtime > old.mtime {
410                        Some(EventKind::Modify(ModifyKind::Metadata(
411                            MetadataKind::WriteTime,
412                        )))
413                    } else if new.hash != old.hash {
414                        Some(EventKind::Modify(ModifyKind::Data(DataChange::Any)))
415                    } else {
416                        None
417                    }
418                }
419                (None, Some(_new)) => Some(EventKind::Create(CreateKind::Any)),
420                (Some(_old), None) => Some(EventKind::Remove(RemoveKind::Any)),
421                (None, None) => None,
422            }
423            .map(|event_kind| Event::new(event_kind).add_path(path.into()))
424        }
425    }
426
427    /// Compose path and its metadata.
428    ///
429    /// This data structure designed for make sure path and its metadata can be
430    /// transferred in consistent way, and may avoid some duplicated
431    /// `fs::metadata()` function call in some situations.
432    #[derive(Debug)]
433    pub(super) struct MetaPath {
434        path: PathBuf,
435        metadata: Metadata,
436    }
437
438    impl MetaPath {
439        /// Create `MetaPath` by given parts.
440        ///
441        /// # Invariant
442        ///
443        /// User must make sure the input `metadata` are associated with `path`.
444        fn from_parts_unchecked(path: PathBuf, metadata: Metadata) -> Self {
445            Self { path, metadata }
446        }
447
448        fn path(&self) -> &Path {
449            &self.path
450        }
451
452        fn metadata(&self) -> &Metadata {
453            &self.metadata
454        }
455
456        fn into_path(self) -> PathBuf {
457            self.path
458        }
459    }
460
461    /// Thin wrapper for outer event handler, for easy to use.
462    struct EventEmitter(
463        // Use `RefCell` to make sure `emit()` only need shared borrow of self (&self).
464        // Use `Box` to make sure EventEmitter is Sized.
465        Box<RefCell<dyn EventHandler>>,
466    );
467
468    impl EventEmitter {
469        fn new<F: EventHandler>(event_handler: F) -> Self {
470            Self(Box::new(RefCell::new(event_handler)))
471        }
472
473        /// Emit single event.
474        fn emit(&self, event: crate::Result<Event>) {
475            self.0.borrow_mut().handle_event(event);
476        }
477
478        /// Emit event.
479        fn emit_ok(&self, event: Event) {
480            self.emit(Ok(event))
481        }
482
483        /// Emit io error event.
484        fn emit_io_err<E, P>(&self, err: E, path: Option<P>)
485        where
486            E: Into<io::Error>,
487            P: Into<PathBuf>,
488        {
489            let e = crate::Error::io(err.into());
490            if let Some(path) = path {
491                self.emit(Err(e.add_path(path.into())));
492            } else {
493                self.emit(Err(e));
494            }
495        }
496    }
497}
498
499/// Polling based `Watcher` implementation.
500///
501/// By default scans through all files and checks for changed entries based on their change date.
502/// Can also be changed to perform file content change checks.
503///
504/// See [Config] for more details.
505#[derive(Debug)]
506pub struct PollWatcher {
507    watches: Arc<Mutex<HashMap<PathBuf, WatchData>>>,
508    data_builder: Arc<Mutex<DataBuilder>>,
509    want_to_stop: Arc<AtomicBool>,
510    /// channel to the poll loop
511    /// currently used only for manual polling
512    message_channel: Sender<()>,
513    delay: Option<Duration>,
514    follow_sylinks: bool,
515}
516
517impl PollWatcher {
518    /// Create a new [`PollWatcher`], configured as needed.
519    pub fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<PollWatcher> {
520        Self::with_opt::<_, ()>(event_handler, config, None)
521    }
522
523    /// Actively poll for changes. Can be combined with a timeout of 0 to perform only manual polling.
524    pub fn poll(&self) -> crate::Result<()> {
525        self.message_channel
526            .send(())
527            .map_err(|_| Error::generic("failed to send poll message"))?;
528        Ok(())
529    }
530
531    /// Create a new [`PollWatcher`] with an scan event handler.
532    ///
533    /// `scan_fallback` is called on the initial scan with all files seen by the pollwatcher.
534    pub fn with_initial_scan<F: EventHandler, G: ScanEventHandler>(
535        event_handler: F,
536        config: Config,
537        scan_callback: G,
538    ) -> crate::Result<PollWatcher> {
539        Self::with_opt(event_handler, config, Some(scan_callback))
540    }
541
542    /// create a new [`PollWatcher`] with all options.
543    fn with_opt<F: EventHandler, G: ScanEventHandler>(
544        event_handler: F,
545        config: Config,
546        scan_callback: Option<G>,
547    ) -> crate::Result<PollWatcher> {
548        let data_builder =
549            DataBuilder::new(event_handler, config.compare_contents(), scan_callback);
550
551        let (tx, rx) = unbounded();
552
553        let poll_watcher = PollWatcher {
554            watches: Default::default(),
555            data_builder: Arc::new(Mutex::new(data_builder)),
556            want_to_stop: Arc::new(AtomicBool::new(false)),
557            delay: config.poll_interval(),
558            follow_sylinks: config.follow_symlinks(),
559            message_channel: tx,
560        };
561
562        poll_watcher.run(rx);
563
564        Ok(poll_watcher)
565    }
566
567    fn run(&self, rx: Receiver<()>) {
568        let watches = Arc::clone(&self.watches);
569        let data_builder = Arc::clone(&self.data_builder);
570        let want_to_stop = Arc::clone(&self.want_to_stop);
571        let delay = self.delay;
572
573        let _ = thread::Builder::new()
574            .name("notify-rs poll loop".to_string())
575            .spawn(move || {
576                loop {
577                    if want_to_stop.load(Ordering::SeqCst) {
578                        break;
579                    }
580
581                    // HINT: Make sure always lock in the same order to avoid deadlock.
582                    //
583                    // FIXME: inconsistent: some place mutex poison cause panic,
584                    // some place just ignore.
585                    if let (Ok(mut watches), Ok(mut data_builder)) =
586                        (watches.lock(), data_builder.lock())
587                    {
588                        data_builder.update_timestamp();
589
590                        let vals = watches.values_mut();
591                        for watch_data in vals {
592                            watch_data.rescan(&mut data_builder);
593                        }
594                    }
595                    // TODO: v7.0 use delay - (Instant::now().saturating_duration_since(start))
596                    if let Some(delay) = delay {
597                        let _ = rx.recv_timeout(delay);
598                    } else {
599                        let _ = rx.recv();
600                    }
601                }
602            });
603    }
604
605    /// Watch a path location.
606    ///
607    /// QUESTION: this function never return an Error, is it as intend?
608    /// Please also consider the IO Error event problem.
609    fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) {
610        // HINT: Make sure always lock in the same order to avoid deadlock.
611        //
612        // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore.
613        if let (Ok(mut watches), Ok(mut data_builder)) =
614            (self.watches.lock(), self.data_builder.lock())
615        {
616            data_builder.update_timestamp();
617
618            let watch_data = data_builder.build_watch_data(
619                path.to_path_buf(),
620                recursive_mode.is_recursive(),
621                self.follow_sylinks,
622            );
623
624            // if create watch_data successful, add it to watching list.
625            if let Some(watch_data) = watch_data {
626                watches.insert(path.to_path_buf(), watch_data);
627            }
628        }
629    }
630
631    /// Unwatch a path.
632    ///
633    /// Return `Err(_)` if given path has't be monitored.
634    fn unwatch_inner(&mut self, path: &Path) -> crate::Result<()> {
635        // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore.
636        self.watches
637            .lock()
638            .unwrap()
639            .remove(path)
640            .map(|_| ())
641            .ok_or_else(crate::Error::watch_not_found)
642    }
643}
644
645impl Watcher for PollWatcher {
646    /// Create a new [`PollWatcher`].
647    fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<Self> {
648        Self::new(event_handler, config)
649    }
650
651    fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> crate::Result<()> {
652        self.watch_inner(path, recursive_mode);
653
654        Ok(())
655    }
656
657    fn unwatch(&mut self, path: &Path) -> crate::Result<()> {
658        self.unwatch_inner(path)
659    }
660
661    fn kind() -> crate::WatcherKind {
662        crate::WatcherKind::PollWatcher
663    }
664}
665
666impl Drop for PollWatcher {
667    fn drop(&mut self) {
668        self.want_to_stop.store(true, Ordering::Relaxed);
669    }
670}
671
672#[test]
673fn poll_watcher_is_send_and_sync() {
674    fn check<T: Send + Sync>() {}
675    check::<PollWatcher>();
676}