1use crate::{unbounded, Config, Error, EventHandler, Receiver, RecursiveMode, Sender, Watcher};
7use std::{
8 collections::HashMap,
9 path::{Path, PathBuf},
10 sync::{
11 atomic::{AtomicBool, Ordering},
12 Arc, Mutex,
13 },
14 thread,
15 time::Duration,
16};
17
18pub type ScanEvent = crate::Result<PathBuf>;
20
21pub trait ScanEventHandler: Send + 'static {
26 fn handle_event(&mut self, event: ScanEvent);
28}
29
30impl<F> ScanEventHandler for F
31where
32 F: FnMut(ScanEvent) + Send + 'static,
33{
34 fn handle_event(&mut self, event: ScanEvent) {
35 (self)(event);
36 }
37}
38
39#[cfg(feature = "crossbeam-channel")]
40impl ScanEventHandler for crossbeam_channel::Sender<ScanEvent> {
41 fn handle_event(&mut self, event: ScanEvent) {
42 let _ = self.send(event);
43 }
44}
45
46#[cfg(feature = "flume")]
47impl ScanEventHandler for flume::Sender<ScanEvent> {
48 fn handle_event(&mut self, event: ScanEvent) {
49 let _ = self.send(event);
50 }
51}
52
53impl ScanEventHandler for std::sync::mpsc::Sender<ScanEvent> {
54 fn handle_event(&mut self, event: ScanEvent) {
55 let _ = self.send(event);
56 }
57}
58
59impl ScanEventHandler for () {
60 fn handle_event(&mut self, _event: ScanEvent) {}
61}
62
63use data::{DataBuilder, WatchData};
64mod data {
65 use crate::{
66 event::{CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind},
67 EventHandler,
68 };
69 use std::{
70 cell::RefCell,
71 collections::{hash_map::RandomState, HashMap},
72 fmt::{self, Debug},
73 fs::{self, File, Metadata},
74 hash::{BuildHasher, Hasher},
75 io::{self, Read},
76 path::{Path, PathBuf},
77 time::Instant,
78 };
79 use walkdir::WalkDir;
80
81 use super::ScanEventHandler;
82
83 fn system_time_to_seconds(time: std::time::SystemTime) -> i64 {
84 match time.duration_since(std::time::SystemTime::UNIX_EPOCH) {
85 Ok(d) => d.as_secs() as i64,
86 Err(e) => -(e.duration().as_secs() as i64),
87 }
88 }
89
90 pub(super) struct DataBuilder {
92 emitter: EventEmitter,
93 scan_emitter: Option<Box<RefCell<dyn ScanEventHandler>>>,
94
95 build_hasher: Option<RandomState>,
98
99 now: Instant,
101 }
102
103 impl DataBuilder {
104 pub(super) fn new<F, G>(
105 event_handler: F,
106 compare_content: bool,
107 scan_emitter: Option<G>,
108 ) -> Self
109 where
110 F: EventHandler,
111 G: ScanEventHandler,
112 {
113 let scan_emitter = match scan_emitter {
114 None => None,
115 Some(v) => {
116 let intermediate: Box<RefCell<dyn ScanEventHandler>> =
118 Box::new(RefCell::new(v));
119 Some(intermediate)
120 }
121 };
122 Self {
123 emitter: EventEmitter::new(event_handler),
124 scan_emitter,
125 build_hasher: compare_content.then(RandomState::default),
126 now: Instant::now(),
127 }
128 }
129
130 pub(super) fn update_timestamp(&mut self) {
132 self.now = Instant::now();
133 }
134
135 pub(super) fn build_watch_data(
140 &self,
141 root: PathBuf,
142 is_recursive: bool,
143 follow_symlinks: bool,
144 ) -> Option<WatchData> {
145 WatchData::new(self, root, is_recursive, follow_symlinks)
146 }
147
148 fn build_path_data(&self, meta_path: &MetaPath) -> PathData {
150 PathData::new(self, meta_path)
151 }
152 }
153
154 impl Debug for DataBuilder {
155 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
156 f.debug_struct("DataBuilder")
157 .field("build_hasher", &self.build_hasher)
158 .field("now", &self.now)
159 .finish()
160 }
161 }
162
163 #[derive(Debug)]
164 pub(super) struct WatchData {
165 root: PathBuf,
167 is_recursive: bool,
168 follow_symlinks: bool,
169
170 all_path_data: HashMap<PathBuf, PathData>,
172 }
173
174 impl WatchData {
175 fn new(
181 data_builder: &DataBuilder,
182 root: PathBuf,
183 is_recursive: bool,
184 follow_symlinks: bool,
185 ) -> Option<Self> {
186 if let Err(e) = fs::metadata(&root) {
205 data_builder.emitter.emit_io_err(e, Some(&root));
206 return None;
207 }
208
209 let all_path_data = Self::scan_all_path_data(
210 data_builder,
211 root.clone(),
212 is_recursive,
213 follow_symlinks,
214 true,
215 )
216 .collect();
217
218 Some(Self {
219 root,
220 is_recursive,
221 follow_symlinks,
222 all_path_data,
223 })
224 }
225
226 pub(super) fn rescan(&mut self, data_builder: &mut DataBuilder) {
232 for (path, new_path_data) in Self::scan_all_path_data(
234 data_builder,
235 self.root.clone(),
236 self.is_recursive,
237 self.follow_symlinks,
238 false,
239 ) {
240 let old_path_data = self
241 .all_path_data
242 .insert(path.clone(), new_path_data.clone());
243
244 let event =
246 PathData::compare_to_event(path, old_path_data.as_ref(), Some(&new_path_data));
247 if let Some(event) = event {
248 data_builder.emitter.emit_ok(event);
249 }
250 }
251
252 let mut disappeared_paths = Vec::new();
254 for (path, path_data) in self.all_path_data.iter() {
255 if path_data.last_check < data_builder.now {
256 disappeared_paths.push(path.clone());
257 }
258 }
259
260 for path in disappeared_paths {
262 let old_path_data = self.all_path_data.remove(&path);
263
264 let event = PathData::compare_to_event(path, old_path_data.as_ref(), None);
266 if let Some(event) = event {
267 data_builder.emitter.emit_ok(event);
268 }
269 }
270 }
271
272 fn scan_all_path_data(
278 data_builder: &'_ DataBuilder,
279 root: PathBuf,
280 is_recursive: bool,
281 follow_symlinks: bool,
282 is_initial: bool,
284 ) -> impl Iterator<Item = (PathBuf, PathData)> + '_ {
285 log::trace!("rescanning {root:?}");
286 WalkDir::new(root)
291 .follow_links(follow_symlinks)
292 .max_depth(Self::dir_scan_depth(is_recursive))
293 .into_iter()
294 .filter_map(|entry_res| match entry_res {
295 Ok(entry) => Some(entry),
296 Err(err) => {
297 log::warn!("walkdir error scanning {err:?}");
298 if let Some(io_error) = err.io_error() {
299 let new_io_error = io::Error::new(io_error.kind(), err.to_string());
301 data_builder.emitter.emit_io_err(new_io_error, err.path());
302 } else {
303 let crate_err =
304 crate::Error::new(crate::ErrorKind::Generic(err.to_string()));
305 data_builder.emitter.emit(Err(crate_err));
306 }
307 None
308 }
309 })
310 .filter_map(move |entry| match entry.metadata() {
311 Ok(metadata) => {
312 let path = entry.into_path();
313 if is_initial {
314 if let Some(ref emitter) = data_builder.scan_emitter {
316 emitter.borrow_mut().handle_event(Ok(path.clone()));
317 }
318 }
319 let meta_path = MetaPath::from_parts_unchecked(path, metadata);
320 let data_path = data_builder.build_path_data(&meta_path);
321
322 Some((meta_path.into_path(), data_path))
323 }
324 Err(e) => {
325 let path = entry.into_path();
327 data_builder.emitter.emit_io_err(e, Some(path));
328
329 None
330 }
331 })
332 }
333
334 fn dir_scan_depth(is_recursive: bool) -> usize {
335 if is_recursive {
336 usize::MAX
337 } else {
338 1
339 }
340 }
341 }
342
343 #[derive(Debug, Clone)]
347 struct PathData {
348 mtime: i64,
350
351 hash: Option<u64>,
354
355 last_check: Instant,
357 }
358
359 impl PathData {
360 fn new(data_builder: &DataBuilder, meta_path: &MetaPath) -> PathData {
362 let metadata = meta_path.metadata();
363
364 PathData {
365 mtime: metadata.modified().map_or(0, system_time_to_seconds),
366 hash: data_builder
367 .build_hasher
368 .as_ref()
369 .filter(|_| metadata.is_file())
370 .and_then(|build_hasher| {
371 Self::get_content_hash(build_hasher, meta_path.path()).ok()
372 }),
373
374 last_check: data_builder.now,
375 }
376 }
377
378 fn get_content_hash(build_hasher: &RandomState, path: &Path) -> io::Result<u64> {
380 let mut hasher = build_hasher.build_hasher();
381 let mut file = File::open(path)?;
382 let mut buf = [0; 512];
383
384 loop {
385 let n = match file.read(&mut buf) {
386 Ok(0) => break,
387 Ok(len) => len,
388 Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
389 Err(e) => return Err(e),
390 };
391
392 hasher.write(&buf[..n]);
393 }
394
395 Ok(hasher.finish())
396 }
397
398 fn compare_to_event<P>(
400 path: P,
401 old: Option<&PathData>,
402 new: Option<&PathData>,
403 ) -> Option<Event>
404 where
405 P: Into<PathBuf>,
406 {
407 match (old, new) {
408 (Some(old), Some(new)) => {
409 if new.mtime > old.mtime {
410 Some(EventKind::Modify(ModifyKind::Metadata(
411 MetadataKind::WriteTime,
412 )))
413 } else if new.hash != old.hash {
414 Some(EventKind::Modify(ModifyKind::Data(DataChange::Any)))
415 } else {
416 None
417 }
418 }
419 (None, Some(_new)) => Some(EventKind::Create(CreateKind::Any)),
420 (Some(_old), None) => Some(EventKind::Remove(RemoveKind::Any)),
421 (None, None) => None,
422 }
423 .map(|event_kind| Event::new(event_kind).add_path(path.into()))
424 }
425 }
426
427 #[derive(Debug)]
433 pub(super) struct MetaPath {
434 path: PathBuf,
435 metadata: Metadata,
436 }
437
438 impl MetaPath {
439 fn from_parts_unchecked(path: PathBuf, metadata: Metadata) -> Self {
445 Self { path, metadata }
446 }
447
448 fn path(&self) -> &Path {
449 &self.path
450 }
451
452 fn metadata(&self) -> &Metadata {
453 &self.metadata
454 }
455
456 fn into_path(self) -> PathBuf {
457 self.path
458 }
459 }
460
461 struct EventEmitter(
463 Box<RefCell<dyn EventHandler>>,
466 );
467
468 impl EventEmitter {
469 fn new<F: EventHandler>(event_handler: F) -> Self {
470 Self(Box::new(RefCell::new(event_handler)))
471 }
472
473 fn emit(&self, event: crate::Result<Event>) {
475 self.0.borrow_mut().handle_event(event);
476 }
477
478 fn emit_ok(&self, event: Event) {
480 self.emit(Ok(event))
481 }
482
483 fn emit_io_err<E, P>(&self, err: E, path: Option<P>)
485 where
486 E: Into<io::Error>,
487 P: Into<PathBuf>,
488 {
489 let e = crate::Error::io(err.into());
490 if let Some(path) = path {
491 self.emit(Err(e.add_path(path.into())));
492 } else {
493 self.emit(Err(e));
494 }
495 }
496 }
497}
498
499#[derive(Debug)]
506pub struct PollWatcher {
507 watches: Arc<Mutex<HashMap<PathBuf, WatchData>>>,
508 data_builder: Arc<Mutex<DataBuilder>>,
509 want_to_stop: Arc<AtomicBool>,
510 message_channel: Sender<()>,
513 delay: Option<Duration>,
514 follow_sylinks: bool,
515}
516
517impl PollWatcher {
518 pub fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<PollWatcher> {
520 Self::with_opt::<_, ()>(event_handler, config, None)
521 }
522
523 pub fn poll(&self) -> crate::Result<()> {
525 self.message_channel
526 .send(())
527 .map_err(|_| Error::generic("failed to send poll message"))?;
528 Ok(())
529 }
530
531 pub fn with_initial_scan<F: EventHandler, G: ScanEventHandler>(
535 event_handler: F,
536 config: Config,
537 scan_callback: G,
538 ) -> crate::Result<PollWatcher> {
539 Self::with_opt(event_handler, config, Some(scan_callback))
540 }
541
542 fn with_opt<F: EventHandler, G: ScanEventHandler>(
544 event_handler: F,
545 config: Config,
546 scan_callback: Option<G>,
547 ) -> crate::Result<PollWatcher> {
548 let data_builder =
549 DataBuilder::new(event_handler, config.compare_contents(), scan_callback);
550
551 let (tx, rx) = unbounded();
552
553 let poll_watcher = PollWatcher {
554 watches: Default::default(),
555 data_builder: Arc::new(Mutex::new(data_builder)),
556 want_to_stop: Arc::new(AtomicBool::new(false)),
557 delay: config.poll_interval(),
558 follow_sylinks: config.follow_symlinks(),
559 message_channel: tx,
560 };
561
562 poll_watcher.run(rx);
563
564 Ok(poll_watcher)
565 }
566
567 fn run(&self, rx: Receiver<()>) {
568 let watches = Arc::clone(&self.watches);
569 let data_builder = Arc::clone(&self.data_builder);
570 let want_to_stop = Arc::clone(&self.want_to_stop);
571 let delay = self.delay;
572
573 let _ = thread::Builder::new()
574 .name("notify-rs poll loop".to_string())
575 .spawn(move || {
576 loop {
577 if want_to_stop.load(Ordering::SeqCst) {
578 break;
579 }
580
581 if let (Ok(mut watches), Ok(mut data_builder)) =
586 (watches.lock(), data_builder.lock())
587 {
588 data_builder.update_timestamp();
589
590 let vals = watches.values_mut();
591 for watch_data in vals {
592 watch_data.rescan(&mut data_builder);
593 }
594 }
595 if let Some(delay) = delay {
597 let _ = rx.recv_timeout(delay);
598 } else {
599 let _ = rx.recv();
600 }
601 }
602 });
603 }
604
605 fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) {
610 if let (Ok(mut watches), Ok(mut data_builder)) =
614 (self.watches.lock(), self.data_builder.lock())
615 {
616 data_builder.update_timestamp();
617
618 let watch_data = data_builder.build_watch_data(
619 path.to_path_buf(),
620 recursive_mode.is_recursive(),
621 self.follow_sylinks,
622 );
623
624 if let Some(watch_data) = watch_data {
626 watches.insert(path.to_path_buf(), watch_data);
627 }
628 }
629 }
630
631 fn unwatch_inner(&mut self, path: &Path) -> crate::Result<()> {
635 self.watches
637 .lock()
638 .unwrap()
639 .remove(path)
640 .map(|_| ())
641 .ok_or_else(crate::Error::watch_not_found)
642 }
643}
644
645impl Watcher for PollWatcher {
646 fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<Self> {
648 Self::new(event_handler, config)
649 }
650
651 fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> crate::Result<()> {
652 self.watch_inner(path, recursive_mode);
653
654 Ok(())
655 }
656
657 fn unwatch(&mut self, path: &Path) -> crate::Result<()> {
658 self.unwatch_inner(path)
659 }
660
661 fn kind() -> crate::WatcherKind {
662 crate::WatcherKind::PollWatcher
663 }
664}
665
666impl Drop for PollWatcher {
667 fn drop(&mut self) {
668 self.want_to_stop.store(true, Ordering::Relaxed);
669 }
670}
671
672#[test]
673fn poll_watcher_is_send_and_sync() {
674 fn check<T: Send + Sync>() {}
675 check::<PollWatcher>();
676}