پیاده سازی یک اجراکننده برای عملیات Async
Future
های Rust تنبل هستن: اونا هیچ کاری انجام نمیدن تا وقتی یکی اونا رو اجرا کنه تا اتمام پیدا کنن. یکی از راه ها برای اجرای یک Future اینه که از .await
داخل یک تابع async
استفاده کنیم، ولی خود این قضیه رو مشکل تر میکنه: حالا کی اون Future ای که توسط توابع async
تولید شده رو اجرا میکنه ؟ جواب اینه که ما به یک اجرا کننده Future
نیاز داریم.
اجارکننده های Future
توابع سطح بالا Future
رو میگیرم و اجراشون میکنن، که این کار رو با صدا زدن poll
زمانی که Future
میتونه پردازش بیشتری رو انجام بده، انجام میدن. معمولا اجراکننده های poll
یک Future رو برای شروع صدا میزنن. وقتی Future میفهمه که آمادس تا پردازش بیشتری رو با صدا زدن wake()
انجام بده، داخل یک صف گذاشته میشن تا دوباره نوبتشون بشه و poll
دوباره صدا زده بشه، این کار اونقدر ادامه پیدا میکنه تا Future
کارش تموم بشه.
تو این قسمت قراره اجراکننده ساده خودمون رو بنویسم که قادره تعداد زیادی توابع سطح بالا Future رو بگیره و به صورت همزمان اجرا کنه تا تموم بشن.
For this example, we depend on the futures
crate for the ArcWake
trait,
which provides an easy way to construct a Waker
. Edit Cargo.toml
to add
a new dependency:
برای این مثال ما از پکیج futures
استفاده میکنیم تا بتونیم از trait ArcWake
استفاده کنیم. چون خیلی راحت میتونیم یه Waker
بسازیم. فایل cargo.toml
رو باز کنید و این crate رو اضافه کنید:
[package]
name = "timer_future"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2021"
[dependencies]
futures = "0.3"
در ادامه نیاز داریم تا این چیزا ها رو به بالای src/main.rs
اضافه کنیم:
use futures::{
future::{BoxFuture, FutureExt},
task::{waker_ref, ArcWake},
};
use std::{
future::Future,
sync::mpsc::{sync_channel, Receiver, SyncSender},
sync::{Arc, Mutex},
task::Context,
time::Duration,
};
// اون تایمری که توی قسمت قبل با هم نوشتیم
use timer_future::TimerFuture;
اجراکننده ما با ارسال task ها روی یک کانال (channel) کار میکنه. اجراکننده event ها رو از روی کانال بر میداره و اونا رو اجرا میکنه. زمانی که یه task آماده هست تا کار بیشتری انجام بده (بیدار شده)، میتونه خودشو دوباره از طریق کانال بفرسته که دوباره poll بشه.
توی این طراحی، خود اجراکننده فقط لازمه قسمت دریافت کننده کانال رو داشته باشه. و کاربر باید قسمت فرستنده رو داشته باشه تا بتونه Future های جدید تولید کنه. Task ها خودشون فقط Future هایی هستن که میتونن دوباره خودشونو برنامه ریزی کنن برای اجرا دوباره، پس با اونا رو به صورت جفت Future با فرستنده ذخیره میکنیم تا task بتونه دوباره خودشو توی صف بزاره.
/// اجراکننده ای که تسک ها رو از یه کانال میگیره و اجراشون میکنه
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}
/// این یه "فیوچر" جدید میسازه و میفرسته به کانال
#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
/// یک "فیوچر" ی که میتونه خودشو دوباره برنامه ریزی که که توسط اجراکننده دوباره "پول" بشه
struct Task {
/// فیوچری که در حاله پردازشه و باید به داخل صف گذاشته بشه تا انجام بشه
///
/// اینجا استفاده از "میوتکس" اجباری نیست، به دلیل اینکه ما فقط از طریق یک "ترد" داریم به مقادریمون دسترسی
/// پیدا میکنیم، اما "راست" اونقدر باهوش نیست که بفهمه ما فقط از یه "ترد" استفاده میکنیم
/// برای همین مجبوریم از "میتوکس" استفاده کنیم تا قوانین امن بودن مموری در
/// زبان "راست" رو رعایت کرده باشیم
/// البته میشه از تایپ زیر هم به جای میتوکس استفاده کرد
/// `UnsafeCell`
future: Mutex<Option<BoxFuture<'static, ()>>>,
/// یک هندلر که تسک رو برنامه ریزی میکنه و برش میگردونه به داخل صف
task_sender: SyncSender<Arc<Task>>,
}
fn new_executor_and_spawner() -> (Executor, Spawner) {
// بیشترین حد تسک هایی که میتونیم داخل صف داشته باشیم از طریق کانال در یک لحظه
const MAX_QUEUED_TASKS: usize = 10_000;
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
(Executor { ready_queue }, Spawner { task_sender })
}
بیاید یه متد هم برای راحتی تولید Future های جدید بسازیم.
این متد یک تایپ Future میگیره که ما اینو با تایپ box یکی میکنیم و یک Arc<Task>
جدید میسازیم که میتونه داخل اجراکننده به صورت صف اجرا بشه.
impl Spawner {
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
self.task_sender.send(task).expect("too many tasks queued");
}
}
برای poll کردن Future ها باید یه Waker
بسازیم.
همونطور که داخل بیدار کردن یک عملیات Async با Waker بحث کریدم، Waker
ها موظف هستن که یک task رو برنامه ریزی کنن برای poll شدن دوباره وقتی wake
صدا زده میشه. یادتون باشه که Waker
به اجراکننده میگه که دقیقا چه task ای آماده هست، که همین باعث میشه فقط Future های poll بشن که آماده هستن تا پردازش بیشتری رو انجام بدن. راحت ترین راه برای ساختن یک Waker
پیاده سازی trait ArcWake
و بعد استفاده از توابع waker_ref
یا .into_waker()
هست تا بتونیم Arc<impl ArcWake>
رو به Waker
تبدیل کنیم.
بیاید ArcWake
رو برای task هامون پیاده سازی کنیم تا این اجازه رو بهشون بدیم که به Waker
تبدیل بشن و در نهایت بیدار بشن:
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
// تابع "بیدار کننده" رو با فرستادن تسک توی کانال پیاده سازی میکنیم
// تا دوباره توسط اجرا کننده "پول" بشه
let cloned = arc_self.clone();
arc_self
.task_sender
.send(cloned)
.expect("too many tasks queued");
}
}
وقتی یک Waker
از Arc<Task>
ساخته میشه، صدا کردن wake()
باعث میشه یک کپی از Arc
به کانال task ها فرستاده بشه. در نهایت اجارکننده ما باید task رو برداره و poll اش کنه. بیاید همینو پیاده سازی کنیم:
impl Executor {
fn run(&self) {
while let Ok(task) = self.ready_queue.recv() {
// فیوچر رو میگیره و اگه هنوز تکمیل نشده باشه "پول" اش میکنه
// و سعی میکنه تکمیلش کنه
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
// یک "بیدار شونده" از خود تسک میسازیم
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&*waker);
// `BoxFuture<T>` این تایپ یه تایپ مستعار برای تایپ زیره
// `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
if future.as_mut().poll(context).is_pending() {
// ایمنجا هنوز تسک ما تکمیل نشده برای همین
// دوباره برش میگردونیم تا دوباره توسط اجراکننده اجرا بشه
*future_slot = Some(future);
}
}
}
}
}
تبریک میگم! ما الان یک اجراکننده Future داریم که کار میکنه. حتی الا میتونیم با این اجارکننده عملیات asyn/.await
و Future های شخصی سازی شده رو انجام بدیم. عملیاتی مثل TimeFuture
که قبلا نوشتیمش:
fn main() {
let (executor, spawner) = new_executor_and_spawner();
//یک تسک برای پرینت کردن قبل و بعد از اتمام تایمر میسازیم
spawner.spawn(async {
println!("howdy!");
// صبر میکنیم تا تایمر "فیوچر" ما بعد از 2 ثانیه تکمیل بشه
TimerFuture::new(Duration::new(2, 0)).await;
println!("done!");
});
// حالا از حافظه پاکش میکنیم تا اجرا کننده بدونه تموم شده کارش
// و تسک های بیشتری در آینده نمیگیره ازش تا اجراش کنه
drop(spawner);
// Run the executor until the task queue is empty.
// This will print "howdy!", pause, and then print "done!".
// اجرا کننده رو اجرا میکنه تا زمانی که صف تسک ها خالی بشه
// که در نهایت برای ما اول پرینت میکنه:
// "howdy!"
// و بعدش پرینت میکنه
// "done!"
executor.run();
}