1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::oneshot;
#[tokio::test(flavor = "multi_thread")]
async fn local() {
tokio::task_local! {
static REQ_ID: u32;
pub static FOO: bool;
}
let j1 = tokio::spawn(REQ_ID.scope(1, async move {
assert_eq!(REQ_ID.get(), 1);
assert_eq!(REQ_ID.get(), 1);
}));
let j2 = tokio::spawn(REQ_ID.scope(2, async move {
REQ_ID.with(|v| {
assert_eq!(REQ_ID.get(), 2);
assert_eq!(*v, 2);
});
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
assert_eq!(REQ_ID.get(), 2);
}));
let j3 = tokio::spawn(FOO.scope(true, async move {
assert!(FOO.get());
}));
j1.await.unwrap();
j2.await.unwrap();
j3.await.unwrap();
}
#[tokio::test]
async fn task_local_available_on_abort() {
tokio::task_local! {
static KEY: u32;
}
struct MyFuture {
tx_poll: Option<oneshot::Sender<()>>,
tx_drop: Option<oneshot::Sender<u32>>,
}
impl Future for MyFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
if let Some(tx_poll) = self.tx_poll.take() {
let _ = tx_poll.send(());
}
Poll::Pending
}
}
impl Drop for MyFuture {
fn drop(&mut self) {
let _ = self.tx_drop.take().unwrap().send(KEY.get());
}
}
let (tx_drop, rx_drop) = oneshot::channel();
let (tx_poll, rx_poll) = oneshot::channel();
let h = tokio::spawn(KEY.scope(
42,
MyFuture {
tx_poll: Some(tx_poll),
tx_drop: Some(tx_drop),
},
));
rx_poll.await.unwrap();
h.abort();
assert_eq!(rx_drop.await.unwrap(), 42);
let err = h.await.unwrap_err();
if !err.is_cancelled() {
if let Ok(panic) = err.try_into_panic() {
std::panic::resume_unwind(panic);
} else {
panic!();
}
}
}
#[tokio::test]
async fn task_local_available_on_completion_drop() {
tokio::task_local! {
static KEY: u32;
}
struct MyFuture {
tx: Option<oneshot::Sender<u32>>,
}
impl Future for MyFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
Poll::Ready(())
}
}
impl Drop for MyFuture {
fn drop(&mut self) {
let _ = self.tx.take().unwrap().send(KEY.get());
}
}
let (tx, rx) = oneshot::channel();
let h = tokio::spawn(KEY.scope(42, MyFuture { tx: Some(tx) }));
assert_eq!(rx.await.unwrap(), 42);
h.await.unwrap();
}
#[tokio::test]
async fn take_value() {
tokio::task_local! {
static KEY: u32
}
let fut = KEY.scope(1, async {});
let mut pinned = Box::pin(fut);
assert_eq!(pinned.as_mut().take_value(), Some(1));
assert_eq!(pinned.as_mut().take_value(), None);
}
#[tokio::test]
async fn poll_after_take_value_should_fail() {
tokio::task_local! {
static KEY: u32
}
let fut = KEY.scope(1, async {
let result = KEY.try_with(|_| {});
// The task local value no longer exists.
assert!(result.is_err());
});
let mut fut = Box::pin(fut);
fut.as_mut().take_value();
// Poll the future after `take_value` has been called
fut.await;
}
|