اجراکننده ها و IO سیستم

توی قسمت قبلی در بخش نگاه دقیق به Future Trait راجب این مثال از خوندن socket به صورت async که یک Future بود بحث کردیم:

pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // سوکت دیتا رو آماده داره -- بخونش داخل یه بافر و برگردونش
            Poll::Ready(self.socket.read_buf())
        } else {
            // سوکت هنوز دیتایی نداره
            //
            // تابع "بیدار شدن" رو زمانی که دیتا آماده بود صدا بزن.
            // وقتی دیتا آماده باشه تابع "بیدار شدن" صدا زده میشه و در نتیجه
            // اون "فیوچر" میدونه که الان باید "پول" رو صدا بزنه دوباره و دیتا رو بگیره
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}

این Future دیتای موجود socket رو میخونه، و اگر دیتای برای خوندن موجود نباشه، به اجراکننده میگه، و ازش میخواد که task هاش صدا زده بشن یا دوباره بیدار بشن در زمانی که دیتایی برای خوندن موجود باشه. با این حال، توی این مثال واضح نیست که تایپ Socket چجوری پیاده سازی شده یا به صورت دقیق تر، مشخص نیست که تابع set_readable_callback چجوری کار میکنه. چجوری میتونیم wake() رو فراخونی کنیم زمانی که socket چیزی برای خوندن داشته باشه ؟ یه راه حل اینه که یک thread به صورت مداوم چک کنه که آیا socket دیتایی برای خوندن داره یا نه و در زمان مناسب wake() رو فراخونی کنه. البته این روش اصلا بهینه نیست، چون داریم برای هر IO ای که مسدود کننده (blocking) هست یک thread باز میکنیم. این روش باعث میشه کارآمدی کد async ما خیلی پایین باشه.

در واقعیت، این مشکل با یک سری کتابخونه هایی که به اصطلاح آگاه به IO مسدود کننده سیستم هستن حل شده، مثل epoll روی لینوکس، kqueue روی FreeBSD و MacOS، IOCP روی ویندوز و port رو Fuchsia همه ی اینا داخل Rust از طریق crate ای به نام mio که به صورت cross platform دسترسی به این کتابخونه های سیستمی رو فراهم میکنه به صورت آماده وجود داره. این کتابخونه ها این امکان رو فراهم میکنن که یک thread رو چندین عملیات IO async مسدود بشه و زمانی نتیجش برگرده که یکی از اون event ها انجام شده و نتیجش حاضره. تو عمل استفاده از API این کتابخونه ها یه چیزی شبیه مثال زیره:


struct IoBlocker {
    /* ... */
}

struct Event {
    // یک آی دی یکتا که مشخص کننده رویداد هست و با این آی دی لیست شده
    id: usize,

    // یک سری سیگنال که برای جوابشون صبر میکنه
    signals: Signals,
}

impl IoBlocker {
    /// ساخت یک کالکشن از عملیات ناهمزمان برای رویداد های "آی او" که برای گرفتن جوابشون ترد بلاک یا مسدود میشه
    fn new() -> Self { /* ... */ }

    /// تعریف یک رویداد خاص برای بررسی در آینده
    fn add_io_event_interest(
        &self,

        /// آبجکتی که رویداد توی اون اتفاق میفته
        io_object: &IoObject,

        /// یک سری از سینگنال هایی که درون آبجکت بالا پدید میان
        /// که در نتیجه اون رویدادی باید اتفاق بیفته که اون رویداد با یک آی دی یکتا مشخص شده
        event: Event,
    ) { /* ... */ }

    /// بلاک شدن و مسدود شدن تا زمانی که یکی از رویداد ها اتفاق بیفته
    fn block(&self) -> Event { /* ... */ }
}

let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
    &socket_1,
    Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
    &socket_2,
    Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();

// اگر سوکت 1 زمانی قبال خوندن بشه برای مثال چاپ میشه: سوکت 1 آماده خوندن هست
println!("Socket {:?} is now {:?}", event.id, event.signals);

اجراکننده های Future میتونن از این کتابخونه های سیستمی استفاده کنن تا امکان اجرا شدن callback این Object های IO async مثل socket ها موقعی که event مربوطه اتفاق میفته فراهم بشه. توی مثال مربوط به SocketRead در بالا، تابع Socket::set_readable_callback یه چیزی شبیه کد پایینه:

impl Socket {
    fn set_readable_callback(&self, waker: Waker) {
        // یک رفرنس از اجراکننده ای که قرار اجرا کنه
        // که البته خیلیا این رو به صورت کلی داخل اون تردی که قراره اجرا بشه میارن
        // اما اینجا یک رفرنس از اون پاس داده شده تا از این رفرنس استفاده بشه
        let local_executor = self.local_executor;

        // یک ای دی یکتا برای این آبجکت آی او
        let id = self.id;

        // تابع بیدارکننده رو توی مپ اجراککنده ذخیره میکنیم تا بعدا وقت رویداد های آی او
        // اتفاق افتادن بتونه صداشون بزنه
        local_executor.event_map.insert(id, waker);
        local_executor.add_io_event_interest(
            &self.socket_file_descriptor,
            Event { id, signals: READABLE },
        );
    }
}

حالا ما میتونیم فقط یک اجراکننده روی thread داشته باشیم تا رویداد ها و event های IO رو بگیره و Waker متناسب با اون رو فعال کنه، که باعث میشه task مربوطه انجام بشه. همین مساله باعث میشه که اجراکننده فرصت داشته باشه تا task های بیشتری رو اجرا کنه قبل از اینکه دوباره بره و چک کنه event جدید IO ای برای اجرا کردن داره یا نه (و این چرخه همینجور ادامه داره ...).