بیدار کردن یک عملیات Async با Waker

این خیلی عادیه که Future ها همون بار اولی که poll میشن قادر به اتمام کارشون نباشن. تو این موقعیت، Future باید اطمیانان حاصل کنه که در آینده دوباره poll میشه وقتی آمادس تا پردازش بیشتری رو جلو ببره. این کار با استفاده از تایپ Waker انجام میشه.

هر زمانی که یه Future poll میشه، به عنوان قسمتی از یک کارواحد یا "task" poll میشه. task ها همون Future های سطح بالایی هستن که به یک اجراکننده داده شدن تا اجرا بشن.

Waker یه متد wake() داره که کارش اینه که به اجرا کننده بگهئ task ای که بهش وصل شده رو صدا بزنه یا به اصطلاح بیدارش کنه. وقتی wake() وقتی wake() صدا زده میشه اجراکننده میدونه task ای که به Waker متصل شده آمادس تا پردازش بیشتری رو انجام بده و Future اون باید دوباره poll بشه.

Waker متد clone() هم پیاده سازی کرده که میتونه تو جاهای مختلف کپی و ذخیره بشه.

خب حالا بیاید سعی کنیم یه تایمر ساده Future با استفاده از Waker بسازیم:

مثال عملی: یک تایمر بسازید

برای درک بهتر این مثال، وقتی تایمر درست شد ما میایم یک thread جدید ایجاد میکنیم و برای مدتی که اون تایمر ست شده thread رو تو حالت sleep میبریم و بعد از اون یه signal به تایمر Future میدیم که متوجه بشه تایم به اتمام رسیده.

اول، یه پروژه جدید با دستور cargo new --lib time_future ایجاد کنید و چیزایی که لازم داریم رو به src/lib.rs اضافه کنید:


#![allow(unused)]
fn main() {
use std::{
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex},
    task::{Context, Poll, Waker},
    thread,
    time::Duration,
};
}

بیاین اول از تعریف کردن خود تایپ Future شروع کنیم. Future ما نیاز به یک راهی داره تا بتونه با thread ارتباط برقرار کنه و متوجه تموم شدن تایم در اون thread بشه و در نهایت Future کارش با موفقت به اتمام برسه. برای این کار از Arc<Mutex<..>> برا ذخیره مقادیرمون استفاده میکنیم تا بتونیم از طریق اون بین Future و thread مون ارتباط برقرار کنیم.

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// وضعیت اشتراکی بین "فیوچر" و "ترد" ی که منتظر وایساده
struct SharedState {
    /// این مقدار نشون میده که تایمر کارش تموم شده یا نه
    completed: bool,

    /// تابع "بیدار شونده" برای این تایمر. که وقتی تایمر کارش تموم شد
    /// این تابع صدا زده بشه و ادامه عملیات انجام بشه
    waker: Option<Waker>,
}

حالا بیاین واقغا خود Future رو پیاده سازی کنیم!

impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // بررسی کن میکنه ببینه تایمر کارش تموم شده یا نه
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // تابع "بیدار شونده" رو ست میکنیم تا ترد بتونه
            // در زمانی که تایمر کارش تموم میشه اون صدا بزنه
            //
            // اینجا فقط یک بار عملیات کپی کردن از تابع "بیدار شونده" انجام میشه به جای
            // اینکه هر سری بیاد و کپی کنه
            //
            // البته میشه از تابع زیر هم برای بررسی اینکه آیا اون "فیوچر" بیدار میشه یا نه
            // هم استفاده کرد ولی برای اینکه مثال رو ساده نگه داریم اینجوری استفاده کردیم
            // `Waker::will_wake`
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

خیلی راحت بود مگه نه ؟ اگه thread مقدار shared_state.completed = true رو ست کنه ما کارمون تمومه! در غیر این صورت میایم Waker رو clone برای task فعلی clone میکنیم به منتقلش میکنیم به shared_state.waker. به این ترتیب thread میتونه task رو دوباره بیدار کنه.

این خیلی مهمه که هر موقع که Future دوباره poll میشه ما Waker رو آپدیت کنیم، چون ممکنه Future یه task دیگه رو با یک Waker دیگه رو منتقل کرده باشه. این داستان زمانی اتفاق میفته که Future ها بین task های مختلف وقتی poll میشن جا به جا بشن.

در نهایت ما به یه API نیاز داریم تا واقعا تایمر رو بسازیم و thread رو شروع کنیم:

impl TimerFuture {
    /// یک "فیوچر" جدید درست میکنه زمانی که تایمر کارش انجام میشه
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // یک ترد جدید میسازه
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            // زمانی که تایمر تموم شد یک سیگنال میفرسته و آخرین تسکی که "پول" شده رو بیدار میکنه
            // البته اگه تسکی موجود باشه
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}

این تمام چیزی بود که نیاز داشتیم تا یک تایمر Future ساده بسازیم. فقط کاش یه اجراکننده داشتیم که میتونستیم Future رو روی اون اجرا کنیم ...