swansong/implementation/
interrupt.rs1use crate::{Guard, Inner};
2use event_listener::EventListener;
3use std::{
4 ops::{Deref, DerefMut},
5 sync::{Arc, Weak},
6};
7
8#[cfg(any(feature = "tokio", feature = "futures-io"))]
9mod async_read;
10#[cfg(any(feature = "tokio", feature = "futures-io"))]
11mod async_write;
12
13mod future;
14mod iterator;
15mod stream;
16
17pin_project_lite::pin_project! {
18 #[derive(Debug)]
26 pub struct Interrupt<T> {
27 inner: WeakInner,
28 #[pin]
29 wrapped_type: T,
30 guard: Option<Guard>,
31 stop_listener: StopListener,
32 }
33}
34
35impl<T: Eq> Eq for Interrupt<T> {}
36
37impl<T, U> PartialEq<Interrupt<U>> for Interrupt<T>
38where
39 T: PartialEq<U>,
40{
41 fn eq(&self, other: &Interrupt<U>) -> bool {
42 self.inner.ptr_eq(&other.inner) && self.wrapped_type == other.wrapped_type
43 }
44}
45
46impl<T> Interrupt<T> {
47 pub(crate) fn new(inner: &Arc<Inner>, wrapped_type: T) -> Self {
48 Self {
49 inner: WeakInner(Arc::downgrade(inner)),
50 wrapped_type,
51 guard: None,
52 stop_listener: StopListener(None),
53 }
54 }
55
56 #[must_use]
58 pub fn guarded(mut self) -> Self {
59 if let Some(inner) = self.inner.upgrade() {
60 self.guard = Some(Guard::new(&inner));
61 }
62 self
63 }
64
65 pub fn into_inner(self) -> T {
69 self.wrapped_type
70 }
71
72 pub(crate) fn is_stopped(&self) -> bool {
73 self.inner.is_stopped()
74 }
75
76 #[cfg(any(feature = "futures-io", feature = "tokio"))]
77 pub(crate) fn is_stopped_relaxed(&self) -> bool {
78 self.inner.is_stopped_relaxed()
79 }
80}
81
82impl<T> Deref for Interrupt<T> {
83 type Target = T;
84
85 fn deref(&self) -> &Self::Target {
86 &self.wrapped_type
87 }
88}
89
90impl<T> DerefMut for Interrupt<T> {
91 fn deref_mut(&mut self) -> &mut Self::Target {
92 &mut self.wrapped_type
93 }
94}
95
96#[derive(Debug)]
97struct WeakInner(Weak<Inner>);
98impl Deref for WeakInner {
99 type Target = Weak<Inner>;
100
101 fn deref(&self) -> &Self::Target {
102 &self.0
103 }
104}
105impl WeakInner {
106 fn is_stopped(&self) -> bool {
107 self.upgrade().as_deref().is_none_or(Inner::is_stopped)
108 }
109 fn is_stopped_relaxed(&self) -> bool {
110 self.upgrade()
111 .as_deref()
112 .is_none_or(Inner::is_stopped_relaxed)
113 }
114}
115
116#[derive(Debug)]
117struct StopListener(Option<EventListener>);
118impl StopListener {
119 fn listen(&mut self, weak_inner: &WeakInner) -> Option<&mut EventListener> {
120 let Self(listener) = self;
121 if let Some(listener) = listener {
122 return Some(listener);
123 }
124 let inner = weak_inner.upgrade()?;
125 let listener = listener.insert(inner.listen_stop());
126 if inner.is_stopped() {
127 log::trace!("inner was stopped after registering new listener");
128 None
129 } else {
130 log::trace!("registered new listener");
131 Some(listener)
132 }
133 }
134}
135impl Deref for StopListener {
136 type Target = Option<EventListener>;
137
138 fn deref(&self) -> &Self::Target {
139 &self.0
140 }
141}
142impl DerefMut for StopListener {
143 fn deref_mut(&mut self) -> &mut Self::Target {
144 &mut self.0
145 }
146}