پیاده سازی یک اجراکننده برای عملیات Async

Future های Rust تنبل هستن: اونا هیچ کاری انجام نمیدن تا وقتی یکی اونا رو اجرا کنه تا اتمام پیدا کنن. یکی از راه ها برای اجرای یک Future اینه که از .await داخل یک تابع async استفاده کنیم، ولی خود این قضیه رو مشکل تر میکنه: حالا کی اون Future ای که توسط توابع async تولید شده رو اجرا میکنه ؟ جواب اینه که ما به یک اجرا کننده Future نیاز داریم.

اجارکننده های Future توابع سطح بالا Future رو میگیرم و اجراشون میکنن، که این کار رو با صدا زدن poll زمانی که Future میتونه پردازش بیشتری رو انجام بده، انجام میدن. معمولا اجراکننده های poll یک Future رو برای شروع صدا میزنن. وقتی Future میفهمه که آمادس تا پردازش بیشتری رو با صدا زدن wake() انجام بده، داخل یک صف گذاشته میشن تا دوباره نوبتشون بشه و poll دوباره صدا زده بشه، این کار اونقدر ادامه پیدا میکنه تا Future کارش تموم بشه.

تو این قسمت قراره اجراکننده ساده خودمون رو بنویسم که قادره تعداد زیادی توابع سطح بالا Future رو بگیره و به صورت همزمان اجرا کنه تا تموم بشن.

For this example, we depend on the futures crate for the ArcWake trait, which provides an easy way to construct a Waker. Edit Cargo.toml to add a new dependency:

برای این مثال ما از پکیج futures استفاده میکنیم تا بتونیم از trait ArcWake استفاده کنیم. چون خیلی راحت میتونیم یه Waker بسازیم. فایل cargo.toml رو باز کنید و این crate رو اضافه کنید:

[package]
name = "timer_future"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2021"

[dependencies]
futures = "0.3"

در ادامه نیاز داریم تا این چیزا ها رو به بالای src/main.rs اضافه کنیم:

use futures::{
    future::{BoxFuture, FutureExt},
    task::{waker_ref, ArcWake},
};
use std::{
    future::Future,
    sync::mpsc::{sync_channel, Receiver, SyncSender},
    sync::{Arc, Mutex},
    task::Context,
    time::Duration,
};
// اون تایمری که توی قسمت قبل با هم نوشتیم
use timer_future::TimerFuture;

اجراکننده ما با ارسال task ها روی یک کانال (channel) کار میکنه. اجراکننده event ها رو از روی کانال بر میداره و اونا رو اجرا میکنه. زمانی که یه task آماده هست تا کار بیشتری انجام بده (بیدار شده)، میتونه خودشو دوباره از طریق کانال بفرسته که دوباره poll بشه.

توی این طراحی، خود اجراکننده فقط لازمه قسمت دریافت کننده کانال رو داشته باشه. و کاربر باید قسمت فرستنده رو داشته باشه تا بتونه Future های جدید تولید کنه. Task ها خودشون فقط Future هایی هستن که میتونن دوباره خودشونو برنامه ریزی کنن برای اجرا دوباره، پس با اونا رو به صورت جفت Future با فرستنده ذخیره میکنیم تا task بتونه دوباره خودشو توی صف بزاره.

/// اجراکننده ای که تسک ها رو از یه کانال میگیره و اجراشون میکنه
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

/// این یه "فیوچر" جدید میسازه و میفرسته به کانال
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

/// یک "فیوچر" ی که میتونه خودشو دوباره برنامه ریزی که که توسط اجراکننده دوباره "پول" بشه
struct Task {
    /// فیوچری که در حاله پردازشه و باید به داخل صف گذاشته بشه تا انجام بشه
    /// 
    /// اینجا استفاده از "میوتکس" اجباری نیست، به دلیل اینکه ما فقط از طریق یک "ترد" داریم به مقادریمون دسترسی
    /// پیدا میکنیم، اما "راست" اونقدر باهوش نیست که بفهمه ما فقط از یه "ترد" استفاده میکنیم
    /// برای همین مجبوریم از "میتوکس" استفاده کنیم تا قوانین امن بودن مموری در
    /// زبان "راست" رو رعایت کرده باشیم
    /// البته میشه از تایپ زیر هم به جای میتوکس استفاده کرد
    /// `UnsafeCell`
    future: Mutex<Option<BoxFuture<'static, ()>>>,

    /// یک هندلر که تسک رو برنامه ریزی میکنه و برش میگردونه به داخل صف
    task_sender: SyncSender<Arc<Task>>,
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    // بیشترین حد تسک هایی که میتونیم داخل صف داشته باشیم از طریق کانال در یک لحظه
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    (Executor { ready_queue }, Spawner { task_sender })
}

بیاید یه متد هم برای راحتی تولید Future های جدید بسازیم. این متد یک تایپ Future میگیره که ما اینو با تایپ box یکی میکنیم و یک Arc<Task> جدید میسازیم که میتونه داخل اجراکننده به صورت صف اجرا بشه.

impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("too many tasks queued");
    }
}

برای poll کردن Future ها باید یه Waker بسازیم. همونطور که داخل بیدار کردن یک عملیات Async با Waker بحث کریدم، Waker ها موظف هستن که یک task رو برنامه ریزی کنن برای poll شدن دوباره وقتی wake صدا زده میشه. یادتون باشه که Waker به اجراکننده میگه که دقیقا چه task ای آماده هست، که همین باعث میشه فقط Future های poll بشن که آماده هستن تا پردازش بیشتری رو انجام بدن. راحت ترین راه برای ساختن یک Waker پیاده سازی trait ArcWake و بعد استفاده از توابع waker_ref یا .into_waker() هست تا بتونیم Arc<impl ArcWake> رو به Waker تبدیل کنیم. بیاید ArcWake رو برای task هامون پیاده سازی کنیم تا این اجازه رو بهشون بدیم که به Waker تبدیل بشن و در نهایت بیدار بشن:

impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // تابع "بیدار کننده" رو با فرستادن تسک توی کانال پیاده سازی میکنیم
        // تا دوباره توسط اجرا کننده "پول" بشه
        let cloned = arc_self.clone();
        arc_self
            .task_sender
            .send(cloned)
            .expect("too many tasks queued");
    }
}

وقتی یک Waker از Arc<Task> ساخته میشه، صدا کردن wake() باعث میشه یک کپی از Arc به کانال task ها فرستاده بشه. در نهایت اجارکننده ما باید task رو برداره و poll اش کنه. بیاید همینو پیاده سازی کنیم:

impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            // فیوچر رو میگیره و اگه هنوز تکمیل نشده باشه "پول" اش میکنه
            // و سعی میکنه تکمیلش کنه
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                // یک "بیدار شونده" از خود تسک میسازیم
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&*waker);
                // `BoxFuture<T>` این تایپ یه تایپ مستعار برای تایپ زیره
                // `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
                if future.as_mut().poll(context).is_pending() {
                    // ایمنجا هنوز تسک ما تکمیل نشده برای همین
                    // دوباره برش میگردونیم تا دوباره توسط اجراکننده اجرا بشه
                    *future_slot = Some(future);
                }
            }
        }
    }
}

تبریک میگم! ما الان یک اجراکننده Future داریم که کار میکنه. حتی الا میتونیم با این اجارکننده عملیات asyn/.await و Future های شخصی سازی شده رو انجام بدیم. عملیاتی مثل TimeFuture که قبلا نوشتیمش:

fn main() {
    let (executor, spawner) = new_executor_and_spawner();

    //یک تسک برای پرینت کردن قبل و بعد از اتمام تایمر میسازیم
    spawner.spawn(async {
        println!("howdy!");
        // صبر میکنیم تا تایمر "فیوچر" ما بعد از 2 ثانیه تکمیل بشه
        TimerFuture::new(Duration::new(2, 0)).await;
        println!("done!");
    });

    // حالا از حافظه پاکش میکنیم تا اجرا کننده بدونه تموم شده کارش
    // و تسک های بیشتری در آینده نمیگیره ازش تا اجراش کنه
    drop(spawner);

    // Run the executor until the task queue is empty.
    // This will print "howdy!", pause, and then print "done!".
    // اجرا کننده رو اجرا میکنه تا زمانی که صف تسک ها خالی بشه
    // که در نهایت برای ما اول پرینت میکنه:
    // "howdy!"
    // و بعدش پرینت میکنه
    // "done!"
    executor.run();
}