swansong/implementation/
interrupt.rs

1use crate::{Guard, Inner};
2use event_listener::EventListener;
3use std::{
4    ops::{Deref, DerefMut},
5    sync::{Arc, Weak},
6};
7
8#[cfg(any(feature = "tokio", feature = "futures-io"))]
9mod async_read;
10#[cfg(any(feature = "tokio", feature = "futures-io"))]
11mod async_write;
12
13mod future;
14mod iterator;
15mod stream;
16
17pin_project_lite::pin_project! {
18    /// A wrapper type that implements Stream when wrapping a [`Stream`] and [`Future`] when wrapping a
19    /// Future
20    ///
21    /// When the associated [`Swansong`][crate::Swansong] is stopped with
22    /// [`Swansong::shut_down`][crate::Swansong::shut_down] or all clones of the [`Swansong`] have dropped,
23    /// the Future or Stream within this Stop will wake and return `Poll::Ready(None)` on next poll,
24    /// regardless of where it is being polled.
25    #[derive(Debug)]
26    pub struct Interrupt<T> {
27        inner: WeakInner,
28        #[pin]
29        wrapped_type: T,
30        guard: Option<Guard>,
31        stop_listener: StopListener,
32    }
33}
34
35impl<T: Eq> Eq for Interrupt<T> {}
36
37impl<T, U> PartialEq<Interrupt<U>> for Interrupt<T>
38where
39    T: PartialEq<U>,
40{
41    fn eq(&self, other: &Interrupt<U>) -> bool {
42        self.inner.ptr_eq(&other.inner) && self.wrapped_type == other.wrapped_type
43    }
44}
45
46impl<T> Interrupt<T> {
47    pub(crate) fn new(inner: &Arc<Inner>, wrapped_type: T) -> Self {
48        Self {
49            inner: WeakInner(Arc::downgrade(inner)),
50            wrapped_type,
51            guard: None,
52            stop_listener: StopListener(None),
53        }
54    }
55
56    /// Chainable setter to delay shutdown until this wrapper type has dropped.
57    #[must_use]
58    pub fn guarded(mut self) -> Self {
59        if let Some(inner) = self.inner.upgrade() {
60            self.guard = Some(Guard::new(&inner));
61        }
62        self
63    }
64
65    /// Take the wrapped type out of this Interrupt.
66    ///
67    /// If the interrupt is guarded with [`Interrupt::guarded`], this will decrement the guard count.
68    pub fn into_inner(self) -> T {
69        self.wrapped_type
70    }
71
72    pub(crate) fn is_stopped(&self) -> bool {
73        self.inner.is_stopped()
74    }
75
76    #[cfg(any(feature = "futures-io", feature = "tokio"))]
77    pub(crate) fn is_stopped_relaxed(&self) -> bool {
78        self.inner.is_stopped_relaxed()
79    }
80}
81
82impl<T> Deref for Interrupt<T> {
83    type Target = T;
84
85    fn deref(&self) -> &Self::Target {
86        &self.wrapped_type
87    }
88}
89
90impl<T> DerefMut for Interrupt<T> {
91    fn deref_mut(&mut self) -> &mut Self::Target {
92        &mut self.wrapped_type
93    }
94}
95
96#[derive(Debug)]
97struct WeakInner(Weak<Inner>);
98impl Deref for WeakInner {
99    type Target = Weak<Inner>;
100
101    fn deref(&self) -> &Self::Target {
102        &self.0
103    }
104}
105impl WeakInner {
106    fn is_stopped(&self) -> bool {
107        self.upgrade().as_deref().is_none_or(Inner::is_stopped)
108    }
109    fn is_stopped_relaxed(&self) -> bool {
110        self.upgrade()
111            .as_deref()
112            .is_none_or(Inner::is_stopped_relaxed)
113    }
114}
115
116#[derive(Debug)]
117struct StopListener(Option<EventListener>);
118impl StopListener {
119    fn listen(&mut self, weak_inner: &WeakInner) -> Option<&mut EventListener> {
120        let Self(listener) = self;
121        if let Some(listener) = listener {
122            return Some(listener);
123        }
124        let inner = weak_inner.upgrade()?;
125        let listener = listener.insert(inner.listen_stop());
126        if inner.is_stopped() {
127            log::trace!("inner was stopped after registering new listener");
128            None
129        } else {
130            log::trace!("registered new listener");
131            Some(listener)
132        }
133    }
134}
135impl Deref for StopListener {
136    type Target = Option<EventListener>;
137
138    fn deref(&self) -> &Self::Target {
139        &self.0
140    }
141}
142impl DerefMut for StopListener {
143    fn deref_mut(&mut self) -> &mut Self::Target {
144        &mut self.0
145    }
146}