1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2 
3 mod callback;
4 mod executor;
5 mod promise;
6 
7 use std::fmt::{self, Debug, Formatter};
8 use std::future::Future;
9 use std::pin::Pin;
10 use std::sync::Arc;
11 use std::task::{Context, Poll, Waker};
12 
13 use parking_lot::Mutex;
14 
15 use self::callback::{Abort, Request as RequestCallback, UnaryRequest as UnaryRequestCallback};
16 use self::executor::SpawnTask;
17 use self::promise::{Action as ActionPromise, Batch as BatchPromise};
18 use crate::call::server::RequestContext;
19 use crate::call::{BatchContext, Call};
20 use crate::cq::CompletionQueue;
21 use crate::error::{Error, Result};
22 use crate::server::RequestCallContext;
23 
24 pub(crate) use self::executor::{Executor, Kicker, UnfinishedWork};
25 pub(crate) use self::promise::BatchResult;
26 pub use self::promise::BatchType;
27 
28 /// A handle that is used to notify future that the task finishes.
29 pub struct NotifyHandle<T> {
30     result: Option<Result<T>>,
31     waker: Option<Waker>,
32     stale: bool,
33 }
34 
35 impl<T> NotifyHandle<T> {
new() -> NotifyHandle<T>36     fn new() -> NotifyHandle<T> {
37         NotifyHandle {
38             result: None,
39             waker: None,
40             stale: false,
41         }
42     }
43 
44     /// Set the result and notify future if necessary.
set_result(&mut self, res: Result<T>) -> Option<Waker>45     fn set_result(&mut self, res: Result<T>) -> Option<Waker> {
46         self.result = Some(res);
47 
48         self.waker.take()
49     }
50 }
51 
52 type Inner<T> = Mutex<NotifyHandle<T>>;
53 
new_inner<T>() -> Arc<Inner<T>>54 fn new_inner<T>() -> Arc<Inner<T>> {
55     Arc::new(Mutex::new(NotifyHandle::new()))
56 }
57 
58 /// Get the future status without the need to poll.
59 ///
60 /// If the future is polled successfully, this function will return None.
61 /// Not implemented as method as it's only for internal usage.
check_alive<T>(f: &CqFuture<T>) -> Result<()>62 pub fn check_alive<T>(f: &CqFuture<T>) -> Result<()> {
63     let guard = f.inner.lock();
64     match guard.result {
65         None => Ok(()),
66         Some(Err(Error::RpcFailure(ref status))) => {
67             Err(Error::RpcFinished(Some(status.to_owned())))
68         }
69         Some(Ok(_)) | Some(Err(_)) => Err(Error::RpcFinished(None)),
70     }
71 }
72 
73 /// A future object for task that is scheduled to `CompletionQueue`.
74 pub struct CqFuture<T> {
75     inner: Arc<Inner<T>>,
76 }
77 
78 impl<T> CqFuture<T> {
new(inner: Arc<Inner<T>>) -> CqFuture<T>79     fn new(inner: Arc<Inner<T>>) -> CqFuture<T> {
80         CqFuture { inner }
81     }
82 }
83 
84 impl<T> Future for CqFuture<T> {
85     type Output = Result<T>;
86 
poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>87     fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
88         let mut guard = self.inner.lock();
89         if guard.stale {
90             panic!("Resolved future is not supposed to be polled again.");
91         }
92 
93         if let Some(res) = guard.result.take() {
94             guard.stale = true;
95             return Poll::Ready(res);
96         }
97 
98         // So the task has not been finished yet, add notification hook.
99         if guard.waker.is_none() || !guard.waker.as_ref().unwrap().will_wake(cx.waker()) {
100             guard.waker = Some(cx.waker().clone());
101         }
102 
103         Poll::Pending
104     }
105 }
106 
107 /// Future object for batch jobs.
108 pub type BatchFuture = CqFuture<BatchResult>;
109 
110 /// A result holder for asynchronous execution.
111 // This enum is going to be passed to FFI, so don't use trait or generic here.
112 pub enum CallTag {
113     Batch(BatchPromise),
114     Request(RequestCallback),
115     UnaryRequest(UnaryRequestCallback),
116     Abort(Abort),
117     Action(ActionPromise),
118     Spawn(Arc<SpawnTask>),
119 }
120 
121 impl CallTag {
122     /// Generate a Future/CallTag pair for batch jobs.
batch_pair(ty: BatchType) -> (BatchFuture, CallTag)123     pub fn batch_pair(ty: BatchType) -> (BatchFuture, CallTag) {
124         let inner = new_inner();
125         let batch = BatchPromise::new(ty, inner.clone());
126         (CqFuture::new(inner), CallTag::Batch(batch))
127     }
128 
129     /// Generate a CallTag for request job. We don't have an eventloop
130     /// to pull the future, so just the tag is enough.
request(ctx: RequestCallContext) -> CallTag131     pub fn request(ctx: RequestCallContext) -> CallTag {
132         CallTag::Request(RequestCallback::new(ctx))
133     }
134 
135     /// Generate a Future/CallTag pair for action call that only cares if the result is
136     /// successful.
action_pair() -> (CqFuture<bool>, CallTag)137     pub fn action_pair() -> (CqFuture<bool>, CallTag) {
138         let inner = new_inner();
139         let action = ActionPromise::new(inner.clone());
140         (CqFuture::new(inner), CallTag::Action(action))
141     }
142 
143     /// Generate a CallTag for abort call before handler is called.
abort(call: Call) -> CallTag144     pub fn abort(call: Call) -> CallTag {
145         CallTag::Abort(Abort::new(call))
146     }
147 
148     /// Generate a CallTag for unary request job.
unary_request(ctx: RequestContext, rc: RequestCallContext) -> CallTag149     pub fn unary_request(ctx: RequestContext, rc: RequestCallContext) -> CallTag {
150         let cb = UnaryRequestCallback::new(ctx, rc);
151         CallTag::UnaryRequest(cb)
152     }
153 
154     /// Get the batch context from result holder.
batch_ctx(&self) -> Option<&BatchContext>155     pub fn batch_ctx(&self) -> Option<&BatchContext> {
156         match *self {
157             CallTag::Batch(ref prom) => Some(prom.context()),
158             CallTag::UnaryRequest(ref cb) => Some(cb.batch_ctx()),
159             CallTag::Abort(ref cb) => Some(cb.batch_ctx()),
160             _ => None,
161         }
162     }
163 
164     /// Get the request context from the result holder.
request_ctx(&self) -> Option<&RequestContext>165     pub fn request_ctx(&self) -> Option<&RequestContext> {
166         match *self {
167             CallTag::Request(ref prom) => Some(prom.context()),
168             CallTag::UnaryRequest(ref cb) => Some(cb.request_ctx()),
169             _ => None,
170         }
171     }
172 
173     /// Resolve the CallTag with given status.
resolve(self, cq: &CompletionQueue, success: bool)174     pub fn resolve(self, cq: &CompletionQueue, success: bool) {
175         match self {
176             CallTag::Batch(prom) => prom.resolve(success),
177             CallTag::Request(cb) => cb.resolve(cq, success),
178             CallTag::UnaryRequest(cb) => cb.resolve(cq, success),
179             CallTag::Abort(_) => {}
180             CallTag::Action(prom) => prom.resolve(success),
181             CallTag::Spawn(notify) => self::executor::resolve(notify, success),
182         }
183     }
184 }
185 
186 impl Debug for CallTag {
fmt(&self, f: &mut Formatter<'_>) -> fmt::Result187     fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
188         match *self {
189             CallTag::Batch(ref ctx) => write!(f, "CallTag::Batch({ctx:?})"),
190             CallTag::Request(_) => write!(f, "CallTag::Request(..)"),
191             CallTag::UnaryRequest(_) => write!(f, "CallTag::UnaryRequest(..)"),
192             CallTag::Abort(_) => write!(f, "CallTag::Abort(..)"),
193             CallTag::Action(_) => write!(f, "CallTag::Action"),
194             CallTag::Spawn(_) => write!(f, "CallTag::Spawn"),
195         }
196     }
197 }
198 
199 #[cfg(test)]
200 mod tests {
201     use std::sync::mpsc::*;
202     use std::sync::*;
203     use std::thread;
204 
205     use super::*;
206     use crate::env::Environment;
207     use futures_executor::block_on;
208 
209     #[test]
test_resolve()210     fn test_resolve() {
211         let env = Environment::new(1);
212 
213         let (cq_f1, tag1) = CallTag::action_pair();
214         let (cq_f2, tag2) = CallTag::action_pair();
215         let (tx, rx) = mpsc::channel();
216 
217         let handler = thread::spawn(move || {
218             tx.send(block_on(cq_f1)).unwrap();
219             tx.send(block_on(cq_f2)).unwrap();
220         });
221 
222         assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
223         tag1.resolve(&env.pick_cq(), true);
224         assert!(rx.recv().unwrap().is_ok());
225 
226         assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
227         tag2.resolve(&env.pick_cq(), false);
228         match rx.recv() {
229             Ok(Ok(false)) => {}
230             res => panic!("expect Ok(false), but got {:?}", res),
231         }
232 
233         handler.join().unwrap();
234     }
235 }
236