چند تا نکته

این کتاب ترجمه کتاب اصلی به همین عنوان هست و سعی شده به صورت فارسی روان ترجمه بشه. برای اینکه حس کردم شاید کسی که میخونه با زبان عامیانه بیشتر ارتباط برقرار کنه تا خیلی رسمی، متن کتاب هم به صورت فارسی عامیانه ترجمه شده و شاید خیلی از کلمات و لغات از نظر نگارشی اشتباه باشن. بعضی جاهای کتاب شاید ترجمش خیلی خوب از آب در نیومده، اگه دوست داشتید و ترجمه بهتری برای قسمت های مختلف داشتید حتما contribute .کنید تا این لحظه که من این مقدمه رو مینوسیم خیلی منابع درست حسابی برنامه نویسی Async داخل Rust وجود نداره، این کتاب هم یکم مطالب و متد هاش شاید قدیمی شده باشه و اینجوری که توی repository اصلیش صحبت میکنن قصد آپدیت کردنشم ندارن دیگه متاسفانه. ولی خب همینم توی comminuty فارسی جاش خالیه و خوندنش برای آشنا شدن با مفاهیم زیرساختی و پایه ای Async خیلی خوبه. حتما سراغ crate های مدرن و پیشرفته تر دیگه مثل tokio هم برید و سعی کنید با اونا کار کنید. در نهایت خیلی خوشحال میشم اگه تو بهتر کردن این کتاب کمک کنید :) البته خیلی درست نیست متن اصلیو عوض کنیم ولی میشه قطعا ترجمشو بهتر کرد.

مقدمه و شروع کار

به برنامه نویسی Async در Rust خوش اومدی! اگه دنبال این هستی که شروع کنی به نوشتن کد های Async در Rust جای درست و خوبی اومدی. فرقی نمیکنه در حال نوشتن یک وب سرور، دیتابیس یا حتی یه سیستم عامل هستی، این کتاب بهت نشون میده چجوری با ابزار های Async در Rust بهترین بازده رو از سخت افزارت بگیری.

این کتاب چه چیزهایی رو پوشش میده؟

هدف این کتاب اینه که یک منبع جامع و بروز از راهنمای استفاده از ویژگی های Async زبان Rust و کتابخانه هاش، چه برای مبتدی ها و چه برای خفن ها و قدیمیای Rust باشه.

  • فصل های اول کتاب مقدمه های کلی برای ورود به برنامه نویسی Async، به همراه دیدگاه Rust نسبت به برنامه Async رو توضیح میده.

  • فصل های میانی در مورد ابزار های کلیدی، مفید و کنترلی در زمان نوشتن کد Async بحث میکنه، و توضیح میده چجوری کتابخانه ها و برنامه هایی با سرعت و امکان استفاده طولانی مدت با استفاده از بهترین الگوهایی که قبلا تست شده بسازیم.

  • قسمت آخر کتاب اکوسیستم های بزرگتر Async به همراه چند مثال در مورد نحوه نوشتن یک سری کد های رایج Async رو پوشش میده.

بعد از یه مقدمه نه چندان طولانی، بهتره سراغ دنیای جذاب برنامه نویسی Async در Rust بریم!

چرا Async ؟

ما همه عاشق قدرتی هستیم که Rust در نوشتن نرم افزار های سریع و امن میده، اما برنامه نویسی Async چجوری میتونه تو این دیدگاه جا داشته باشه ؟

برنامه نویسی Asynchronous یا Async، یک مدل برنامه نویسی همزمان هست که توسط تعداد زیادی از زبان های برنامه نویسی پشتیبانی میشه، که این امکان رو به شما میده که تعداد زیادی عملیات رو روی تعداد کمی thread سیستم عاملی انجام بدید. این در حالیه که کدی که مینویسید به خاطر کلید واژه های async/await از نظر ظاهری و حسی خیلی شبیه برنامه های عادی sync هست.

مقایسه Async و دیگر مدل های همزمانی

برنامه نویسی همزمان نسبت به برنامه نویسی معمولی و متوالی هنوز پخته نشده و استاندارد هاش کمتره، در نتیجه بسته به اینکه اون زبان برنامه نویسی از کدوم مدل همزمانی پشتیبانی میکنه، همزمانی رو به صورت های مختلفی میشه بیان کرد. یه مرور سریع روی محبوب ترین مدل های برنامه نویسی همزمان میتونه بهتون در فهمیدن اینکه برنامه نویسی Async چه جایگاهی در این حوزه وسیع داره کمک کنه:

  • Thread های سیستم عاملی: تو این روش نیازی به تغییر تو مدل برنامه نویسی وجود نداره، که همین موضوع رسیدن به مدل همزمانی رو خیلی آسون تر میکنه. البته sync کردن و همزمان کردن thread ها خودش میتونه در مواقعی خیلی سخت باشه، و در ضمن تو این روش سربار و افت سرعت قابل توجه هست. روش هایی مثل Thread Pool میتونه یکم تو کم کردن این سربار ها و افزایش سرعت کارساز باشه، ولی در نهایت برای حجم زیاد عملیات هایی که محدود به I/O هستند کافی نیست.

  • برنامه نویسی Event محور: این روش با استفاده از callback ها میتونه سرعت رو افزایش بده، ولی منجر به ایجاد یه جریان غیر خطی توی برنامه میشه که پیگیری و عیب یابی از این نوع برنامه رو سخت میکنه.

  • روش Coroutine: مثل thread ها تغیری توی مدل برنامه نویسی نیاز ندارن، که استفاده ازشون رو راحت تر میکنه. مثل Async میتونن تعداد زیادی عملیات رو به طور همزمان پوشش بدن. با این وجود یه سری جزئیات سطح پایین که برای برنامه نویسی سیستم و runtime های خاص مهم هستند رو در نظر نمیگیره.

  • مدل بازیگر (Actor): در این روش تمام محاسبات همزمان به صورت واحد هایی به اسم بازیگر تقسیم میشن، که با هم در ارتباط هستن، دقیقا شبیه سیستم های توزیع شده. این روش میتونه خیلی بهینه پیاده سازی بشه اما بسیاری از مسائل کاربردی مثل کنترل جریان و منطق مجدد (مثلا بعد از یک خطا) رو بی پاسخ میزاره.

به طور خلاصه، برنامه نویسی Async این اجازه رو به شما میده که برنامه های فوق العاده بهینه و با سرعت بالا برای زبان های سطح پایینی مثل Rust پیاده سازی کنید، در حالی که بسیاری از مزایای thread ها و دیگر روش ها رو هم پوشش میدن.

Async در Rust در مقایسه با بقیه زبان ها

اگرچه برنامه نویسی Async توسط خیلی از زبان ها پشتیبانی میشه، ولی تو بعضی از جزئیات وقتی به قسمت پیاده سازی میرسیم با هم دیگه فرق دارن. پیاده سازی Async ولی روشی که تو Rust ازش استفاده میشه با بیشتر زبان ها تو زمینه های زیر فرق داره:

  • Future های بی جان: Future ها توی Rust تا وقتی صداشون نزنید و چیزی که قرار بوده بهتون بدن رو ازشون نگیرید پردازشی رو جلو نمیبرن. نگه نداشتن Future ها باعث میشه عملیاتی که داشتن انجام میدادن دیگه تا آخر انجام نشه و متوقف بشه.

  • Async بدون هزینه هست: توی Rust استفاده از Async هزینه ای نداره، این یعنی شما از نظر سخت افزاری فقط هزینه چیزایی که استفاده میکنید رو میدید، نکته قابل توجهش اینه که میتونید بدون گرفتن فضای heap از ram از Async استفاده کنید و به صورت پویا نتیجه عملیات Async رو انجام بدید که برای سرعت سیستم خیلی چیزه خوبیه.

  • Runtime حاظر و آماده ای وجود نداره: توی Rust برای داستان Async هیچ runtime آماده ای وجود نداره. در عوض runtime ها توسط جامعه برنامه نویسان Rust نوشته شده و به صورت پکیج های قابل نصب crate موجوده.

  • runtime های single-thread و multi-thread موجوده: توی Rust هر دو نوع runtime های single-thread ای و multi-thread ای موجوده، که البته هر کدوم مزایا و معایب خودشونو دارن.

تفاوت Async و استفاده از Thread ها در Rust

جایگزین اصلی برای Async در Rust استفاده از thread های سیستم عاملی هست، چه به صورت مستقیم با استقاده از std::thread یا به صورت غیر مستقیم با استفاده از thread pool. مهاجرت از thread ها به Async و برعکس معمولا نیاز به بازنویسی اساسی توی کد داره، هم از نظر پیاده سازی و هم نظر ساختن یک راه ارتباطی عمومی برای قسمت های مختلف (وقتی مثلا یک کتابخانه میسازید). به همین خاطر انتخاب مدلی که دقیقا طبق نیاز های سیستم شما باشه میتونه تو زمان پیاده سازی خیلی صرفه جویی کنه.

Thread های سیستم عاملی برای انجام task ها و عملیات مختلف تو مقیاس کم مناسب هستن، چون thread ها یه سربار اضافه تری برای CPU و حافظه (RAM) هستن. ساخت و تعویض بین thread هااز نظر سخت افزاری خیلی هزینه بره حتی thread هایی که استفاده نمیشن یا به اصطلاح idle هستن هم منابع مصرف میکنن. استفاده از thread pool میتونه تو کم کردن این هزینه ها تاثیر داشته باشه، ولی نه تو همه چی. اگرچه thread ها این اجازه رو به شما میدن که از همون کد عادی sync بدون تغییرات خیلی اساسی بتونید استفاده کنید و هیچ مدل برنامه نویسی نیاز نداره. همچنین توی بعضی سیستم عامل ها میتونید اولویت اجرای thread ها رو عوض کنید، که میتونه خیلی چیزه مفیدی برای driver ها یا برنامه هایی که خیلی حساس به زمان اجرا و latency هستن باشه.

روش Async به طور چشمگیری استفاده از CPU و حافظه رو کاهش میده، مخصوصا برای کارهایی که به طور خاص I/O زیادی دارن، مثل سرور ها یا دیتابیس ها. به همین خاطر میتونید task ها و عملیات بیشتری نسبت به thread های سیستم عاملی داشته باشید، چون یک runtime ای که به صورت async هست از تعداد thread های کمتر (که برای ما هزینه بر بودن) برای اجرای عملیات و task های بیشتر (که از نظر منابعی که استفاده میکنن کم هزینه تر هستن) استفاده میکنه.

یک نکته ای که آخر باید اشاره کنیم اینه که برنامه نویسی Async بهتر از thread ها نیست، در واقع تفاوتشونه که مهمه. اگه به Async برای رسیدن به نتایج بهتر توی سرعت نیاز ندارید، thread ها معمولا جایگزین های راحت تری هستن.

مثالی از دانلود فایل ها به صورت همزمان

تو این مثال هدف ما دانلود دو تا صفحه وب به صورت همزمانه. توی یه برنامه معمول برای اینکار نیاز داریم که thread های جدید ایجاد کنیم تا به همزمانی برسیم:

fn get_two_sites() {
    // ایجاد دو تا ترد جدید که کار دانلود رو انجام بده.
    let thread_one = thread::spawn(|| download("https://www.foo.com"));
    let thread_two = thread::spawn(|| download("https://www.bar.com"));

    // صبر میکنیم تا هردو ترد کارشون تموم شه
    thread_one.join().expect("thread one panicked");
    thread_two.join().expect("thread two panicked");
}

این درحالیه که دانلود کردن یک صفحه وب یک کار خیلی کوچیکه و ایجاد thread جدید برای همچین کار کوچیکی واقعا هزینه سخت افزاری زیادی از ما میگیره. برای برنامه های بزرگتر این مسئله خیلی راحت میتونه تبدیل به یه معضل بزرگ بشه، توی برنامه نویسی Async در Rust میتونیم همین کار رو بکنیم بدون نیاز به ساختن thread های اضافی:

async fn get_two_sites_async() {
    // ایجاد دو تا فیوچر جدید که پس از تموم شدن کارشون
    // صفحات وب رو به صورت ناهمزمان دانلود میکنن
    let future_one = download_async("https://www.foo.com");
    let future_two = download_async("https://www.bar.com");

    // هر دو فیوچر رو اجرا میکنیم که در یک زمان کارشون تموم شه
    join!(future_one, future_two);
}

اینجا هیچ thread اضافه ای ساخته نشده. تمام توابعی که صداشون زدیم به صورت ثابت استفاده شدن، و هیچ استفاده ای از قسمت heap توی ram نکردیم! اگرچه نیاز داریم که کد رو به صورت async بنویسم در وحله اول، که این کتاب دقیقا میخواد به شما تو رسیدن این هدف کمک کنه.

مدل های همزمانی شخصی سازی شده در Rust

در آخر باید بگیم که Rust شما رو اجبار به انتخاب بین دو مدل thread و async نمیکنه. شما میتونید از هر دو مدل توی یه برنامه استفاده کنید، که میتونه خیلی هم مفید باشه وقتی thread هایی دارید که به عملیات های async وابستگی دارن. در حقیقت، شما میتونید حتی از مدل های همزمانی مختلف دیگه هم استفاده کنید مثل مدل Event محور یا چیزای دیگه، تا وقتی کتابخانه هایی دارید که اینا رو پیاده سازی کردن.

وضعیت کنونی Rust در حالت Async

قسمت هایی از حالت Async در Rust مشابه قسمت های عادی و معمولی sync با ثبات بالا و خیلی خوبی پشتیبانی و تضمین می شود. قسمت های دیگه هنوز به ثبات کافی نرسیده اند و در حال تفییر هستند. موارد زیر رو میتونید از Rust در حالت Async انتظار داشته باشید:

  • عملکرد فوق العاده با سرعت بالا در زمان اجرا برای task ها و عملیات مرسومی که به صورت همزمان انجام می شوند.
  • تعامل خوب با قسمت های پیشرفته زبان مثل Lifetime ها و Pin کردن.
  • وجود یک سری محدودیت های سازگاری بین کد sync و async و بین runtime های مختلف.
  • به خاطر تغییراتی که در حال حاضر روی Async داره صورت میگیره از نظر نگهداری کد باید دقت بیشتری کنید.

به طور خلاصه استفاده از Async در Rust سخت تر از حالت عادی و کد sync هست ولی در عین حال بهترین سرعت رو تو مواردی که راجبش صحبت کردیم بهمون میده، البته اینم باید گفت که Async دائما در حال پیشرفته بنابراین این مسائل تو طول زمان از بین میره.

پشتیبانی زبان و کتابخانه ها

در حالی که برنامه نویسی Async توسط خود Rust پشتیبانی میشه، بیشتر برنامه های Async وابسته به عملکر کتابخانه هایی هستن که جامعه برنامه نویسان Rust اونا رو نوشتن و در قالب crate ها نگهداری میکنن. بنابراین شما باید رو ترکیبی از ویژگی های خود زبان و کتابخانه های موجود برنامه های Async تون رو جلو ببرید:

  • بیشتر trait های پایه ای، type ها و توابع Async مثل Future که توسط کتابخانه اصلی و استاندارد Rust فراهم شده.
  • کلیدواژه های async/await توسط خود کامپایلر Rust پشتیبانی میشه.
  • خیلی از type ها، ماکرو ها و توابع توسط پیکیجی به اسم futures فراهم شده. که میشه ازشون داخل هر برنامه Async که با Rust نوشته شده استفاده کرد.
  • اجرا کردن کد async، IO ها و ساخت task های جدید توسط runtime های async صورت میگیره. دو تا از این runtime ها Tokio و async-std هستن. خیلی از برنامه های Async و کتابخانه های Async داخل crate به یک runtime خاص وابستگی دارن. برای جزئیات بیشتر بخش "اکوسیستم کد Async" رو مطالعه کنید.

خیلی از ویژگی های زبان که عادت داشتید توی کد معمولی sync استفاده کنید هنوز داخل Async موجود نیستن. این نکته هم قابل ذکر هست که Rust اجازه نمیده توابعی از نوع Async داخل trait ها تعریف کنید، در عوض برای اینکه به نتیجه ای که میخواید برسید باید روش ها و الگو های کمی پیچیده تری رو طی کنید.

کامپایل و خطایابی (Debuging)

برای بیشتر قسمت ها، خطاهای مربوط به کامپایلر و runtime توی کد async دقیقا مشابه حالت عادی توی Rust کار میکنه. البته یک سری نکاتی وجود داره که مطرح کردنش خالی از لطف نیست:

خطاهای کامپایل

خطاهای کامپایل در حالت Async در Rust دقیقا مشابه حالت عادی و sync خیلی دقیق و طبق استاندارد های همیشگی هستن، ولی از اونجایی که حالت Async معمولا وابسته به ویژگی های پیچیده تر زبان مثل Lifetime ها و Pin کردن هست، احتمال این که از این نوع خطا ها موقع نوشتن برنامه های Async بکنید بیشتره.

خطاهای Runtime

هر موقع کامپایل به یک تابع از نوع Async بر بخوره، یک state یا یه متغیر برای وضعیتش تو زیرساخت خودش براش در نظر میگیره. اطلاعاتی که برای ردیابی یه خطا وجود داره (Stack traces) معمولا اطلاعاتی از این state ها به ما میده، همچنین توضیح میده توسط چه توابعی داخل runtime به مشکل خورده. بنابراین تفسیر این خطاها شاید کمی بیشتر از کد های معمول و sync ما رو درگیر کنه.

حالت های جدید Fail شدن برنامه

چند حالت جدید برای به مشکل خوردن و fail شدن برنامه های Async وجود داره، برای مثال اگه یک تابعی که باعث block شدن منابع خاصی میشه رو به صورت Async صدا بزنید یا مفاهیم Future رو اشتباه پیاده سازی کنید احتمال داره برنامتون به شکل های جدیدی که قبلا نبوده fail بشه. این خطاها و مشکلات ممکنه خیلی بی سرو صدا قوانین کامپایلر و حتی unit تست هایی که نوشتید رو رد کنه و هیچکس بهشون گیر نده. درک عمیق از مفهوم های پایه ای Async که دقیقا کاریه که این کتاب براتون انجام میده باعث میشه این مشکلات تو برنامتون به وجود نیاد.

ملاحظاتی که باید موقع سازگاری کد بهش توجه بشه

کد Async و Sync نمیتونن همیشه خیلی آزادانه با هم ترکیب بشن. برای مثال، شما نمیتونید یک تابع Async رو مستقیما از داخل یک تابع sync صدا بزنید. در ضمن معمولا نوشتن کد sync و Async سبک ها و الگو های طراحی متفاوت از همی رو میطلبه، که همین مسئله میتونه اجرا کردن کد توی محیط های مختلف رو دشوار تر کنه.

حتی خود کد های Async هم همیشه نمیتونن آزادانه با هم ترکیب بشن. بعضی از کتابخانه ها و پکیج های crate وابسته به یک runtime خاص هستن تا کار کنن. که البته اگه اینطوری باشه باید داخل لیست وابستگی های اون crate این مورد رو مطرح کرده باشه.

این مشکلات مربوط به سازگاری کد با کتابخانه ها و محیط های مختلف دست شما رو انتخاب هاتون بسته تر میکنه، پس مطمئن حتما قبلش مطمئن شوید که راجب crate ها و runtime های مختلف تحقیقاتتون رو کردید. زمانی که یه rumtime مشخص رو انتخاب کردید، نیازی نیست نگران سازگاری کدتون باشید.

ویژگی های عملکردی از نظر سرعت

سرعت و عملکرد کد Async در Rust وابسته به runtime ای هست که ازش استفاده میکنید. اگرچه runtime هایی که قدرت Async به Rust میدن تقریبا جدید هستن، واقعا برای اکثر مواقع و کار ها عملکرد خیلی خوبی دارن.

بیشتر اکوسیستم های Async شرایط multi-thread رو توی runtime هایشون در نظر میگیرن. این باعث میشه لذت بردن از مزایای تئوری که در شرایط single-thread ای وجود داره سخت تر بشه. یکی دیگه از مواردی که بهش توجهی نشده عملیات و task هایی هستن که حساس به زمان هستن و تاخیر اجرا توشون خیلی مهمه، که واقعا مورد مهمی برای درایور ها یا برنامه هایی که رابط گرافیکی (GUI) دارن حساب میشه. اینجور برنامه ها نیاز دارن تا runtime ها یا/و سیستم عامل قابلیت زمان بندی دقیق اجراشون رو داشته باشه. میتونید در آینده انتظار کتابخانه های بهتری برای پشتیبانی از این موارد داشته باشید.

کلیدواژه های async/await

async/.await ابزار های Rust برای نوشتن توابع Async هستن که باعث میشه کد خیلی شبیه به کد sync به نظر بیاد. async یک بلاک کد رو تبدیل میکنه به یک وضعیت یا state machine که trait ای به اسم Future رو ازش استفاده و پیاده سازی کرده. در حالی که صدا زدن یک تابع که که کل thread رو نگه میداره یا اصطلاحا بلاک میکنه، Future های بلاک شده کنترل thread رو بخودش برمیگردونن و باعث میشن Future های دیگه اجرا بشن.

بیاید یه پکیج crate رو به فایل Cargo.toml پروژمون اضافه کنیم:

[dependencies]
futures = "0.3"

برای ساختن یک تابع Async، میتونید از کلیدواژه async fn استفاده کنید:


#![allow(unused)]
fn main() {
async fn do_something() { /* ... */ }
}

چیزی که اون تابع async fn برمیگردونه یک Future هست. برای اینکه در ادامه چیزی اتفاق بیفته، اون Future باید توسط یک اجرا کننده اجرا بشه.

// تابع "بلاک آن" تردی که در آن اجرا می شود را بلاک و مسدود میکند
// و وقتی آزاد می شود که اون "فیوچر"ی که در حال اجرای آن است به پایان برسد
// اجرا کننده های دیگه رفتار های پیچیده تری از خودشون نشون میدن
// مثل زمان بندی برای اجرای چند فیوچر در یک ترد
use futures::executor::block_on;

async fn hello_world() {
    println!("hello, world!");
}

fn main() {
    let future = hello_world(); // اینجا هیچی پرینت نمیشه
    block_on(future); // تابع "فیوچر" اجرا شد و متن بالا پرینت شد
}

داخل یک تابع async fn، میتونید از .await استفاده کنید تا برای تموم شدن اجرای یک type دیگه ای که اونم از نوع Future هست صبر کنید، مثلا خروجی یک تابع async fn دیگه. برعکس block_on، .await اون thread رو بلاک و مسدود نمیکنه، در ازاش به صورت ناهمزمان و Async برای تموم شدن یک future صبر میکنه، و این کار اجازه میده تا task ها و عملیات های دیگه اجرا بشن اگه در حال حاضر اون future قادر به ادامه پردازشش نیست.

برای مثال فرض کنید با سه تابع async fn داریم به اسم های learn_song، sing_song و dance:

async fn learn_song() -> Song { /* ... */ }
async fn sing_song(song: Song) { /* ... */ }
async fn dance() { /* ... */ }

یه راه برای صدا زدن این سه تابع اینه که به صورت تک تک بهشون برسیم و thread بلاک بشه تا اجرا بشن:

fn main() {
    let song = block_on(learn_song());
    block_on(sing_song(song));
    block_on(dance());
}

با این حال با از کل توان و سرعتمون تو این روشن استفاده نکردیم برای اینکه فقط یک کار رو در لحظه داریم انجام میدیم! مشخصه که ما اول باید یاد بگیریم چجوری آهنگ بخونیم learn_song که بعدش واقعا بتوینم بخونیمش sing_song، ولی این شدنیه که طی یادگرفتن و خوندن آهنگ همزمان برقصیم dance. برای این کار میتونیم دو تا تابع async fn مستقل بسازیم که به صورت همزمان اجرا بشن:

async fn learn_and_sing() {
    // اینجا اول صبر میکنیم آهنگ یاد گرفته بشه قبل از خوندنش
    // انیجا ما از "اویت" به جای "بلاک آن" استفاده میکنیم تا جلوگیری
    // بشه از بلاک شدن و مسدود شدن ترد اینجوری میتونیم همزمان با این کار
    // تابع رقصیدن هم اجرا کنیم
    let song = learn_song().await;
    sing_song(song).await;
}

async fn async_main() {
    let f1 = learn_and_sing();
    let f2 = dance();

    // ماکرو "جوین" مثل "اویت" هست. با استفاده از این ماکرو میشه
    // برای تموم شدن چندین "فیوچر" به صورت همزمان صبر کرد.
    // اگه به صورت موقت تابع اولی ترد رو بخواد مسدود کنه کار تابع بعدی یعنی رقصیدن انجام میشه.
    // و برعکسش هم میتونه اتفاق بیفته، یعنی اگه رقصیدن بخواد مسدود بشه اون یکی تابع اجرا میشه
    // و اگه جفتشون بلاک بشن این فانکشن کلا بلاک میشه و اجراکنندش صبر میکنه تموم بشه کارش
    futures::join!(f1, f2);
}

fn main() {
    block_on(async_main());
}

توی این مثال، یاد گرفتن آهنگ learn_song باید قبل از خوندنش sing_song انجام بشه، ولی هر دو کار یادگرفتن و خوندن میتونه همزمان با رقصیدن dance اتفاق بیفته. اگه ما داخل learn_and_sing به جای learn_song().await از block_on(learn_song()) استفاده کرده بودیم، توی thread هیچ کاره دیگه ای نمیشد انجام بدیم وقتی learn_song اجرا میشد. که همین باعث میشد نتونیم همزمان با اون کارها برقصیم dance. با استفاده از .await روی learn_song که از نوع future بود، این اجازه رو به thread دادیم که به کار بقیه task ها و عملیات برسه اگه learn_song بلاک شده بود. با این روش این امکان فراهم شد تا چندین future به صورت همزمان توی یک thread کارشون تموم شه و به پایان برسن.

زیر و بم اجرای کد های Async با Future

تو این فصل، نگاهی میندازیم به زیرساخت نحوه اولیوت بندی اجرای Future و task ها و عملیات Async. اگه علاقه ای به زیرساخت و نحوه اجرای کد ها و تایپ های Future ندارید و فقط میخواید کد های سطح بالای Future بنویسید و با اونا کار کنید میتونید این فصل رو رد کنید و برید سراغ فصل async/await. البته خیلی از مباحثی که توی این فصل راجبش بحث میکنیم به درک ما از نحوه کار کد async/await کمک میکنه، مثل نحوه کار runtime ها و قسمت های کلیدی از نظر سرعت و عملکرد کد Async. به هر حال اگه تصمیم گرفتید این فصل رو رد کنید، یادتون باشه bookmark اش کنید تا یه وقتی تو آینده حتما بهش سر بزنید و مطالعش کنید.

خب، پس بدون معطلی بریم سراغ trait ای قراره راجبش صحبت کنیم، یعنی Future.

نگاه دقیق به Future Trait

trait Future در Rust هسته مرکزیه برنامه نویسی Async محسوب میشه. Future یک محاسبه Async هست که در نهایت یک مقداری رو تولید میکنه.(که البته ممکنه اون مقدار خالی باشه مثل ()). یک مثال آسون شده از trait Future میتونه این شکلی باشه:


#![allow(unused)]
fn main() {
trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}
}

Future ها میتونن با استفاده از صدا زدن تابع poll اجرا بشن، که این باعث میشه Future رو همینجور به سمتی سوق بده که بالاخره اجراش تکمیل بشه. اگه Future تموم بشه مقدار Poll::Ready(result) رو برمیگردونه. اگه Future هنوز آماده نیست و تکمیل نشده باشه مقدار Poll::Pending رو برمیگردونه و هر وقت Future آماده باشه تا پردازش بیشتری رو جلو ببره تابع wake() رو صدا میزنه. وقتی wake() صدا زده بشه، اون اجراکننده ای که داره Future رو انجام میده دوباره poll رو صدا میزنه تا Future بتونه پزدازششو جلو ببره.

بدون wake() اون اجراکننده هیچ اطلاعی نداره که یه Future خاص میتونه ادامه پردازششو انجام بده، و باید به صورت مداوم همه future ها رو poll بگیره. با تابع wake() اجراکننده دقیقا میدونه کدوم future آماده هست تا poll گرفته بشه.

برای مثال فرض کنیید قراره از یک socket دیتایی رو بخونیم که هنوز آماده نیست. اگه دیتا آماده باشه میتونیم بخونیمش و مقدار Poll::Ready(data) ولی اگه دیتایی آماده نباشه future ما بلاک و مسدود میشه و نمیتونم پردازش رو جلو ببریم. وقتی دیتایی آماده نباشه ما باید یک wake رو ثبت کنیم یا به اصطلاح register کنیم تا وقتی دیتا socket آماده بود اونو صدا بزنیم، که این کار باعث میشه به اجراکننده کدمون این پیغامو برسونه که future آماده ادامه پردازششه. یه مثال ساده از SocketRead میتونه چیزی شبیه به مثال زیر باشه:

pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // سوکت دیتا رو آماده داره -- بخونش داخل یه بافر و برگردونش
            Poll::Ready(self.socket.read_buf())
        } else {
            // سوکت هنوز دیتایی نداره
            //
            // تابع "بیدار شدن" رو زمانی که دیتا آماده بود صدا بزن.
            // وقتی دیتا آماده باشه تابع "بیدار شدن" صدا زده میشه و در نتیجه
            // اون "فیوچر" میدونه که الان باید "پول" رو صدا بزنه دوباره و دیتا رو بگیره
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}

این مدل از Future ها امکان ترکیب چندین عملیات async رو بدون نیاز به allocate کردن state های اضافی فراهم می کنند. اجرا کردن چندین Future به صورت یکجا یا به صورت زنجیره ای از Future ها میتونه بدون نیاز به allocation های اضافی state های مربوطه به صورت زیر پیاده سازی بشه:

/// یک "فیوچر ساده" که دو تا تابع "فیوچر" دیگه رو اجرا میکنه تا همزمان تموم بشن
/// 
/// همزمانی در اینجا با استفاده از صدا کردن "پول" روی تک تک "فیوچر" ها
/// انجام میشه، که این اجازه رو میده که اگر "فیوچر"ی خواست میتونه دیگه اجرا نشه و بقیه
/// اجرا بشن و در واقع هر "فیوچر"ی با سرعت خودش اجرا بشه
pub struct Join<FutureA, FutureB> {
    // هر کدوم از فیلد ها زیر ممکنه توشون "فیوچر" ی باشه که باید اجرا شه
    // اگر "فیوچرز" تکمیل شده باشه مقدار فیلد به "نان" تغییر پیدا میکنه
    // که این باعث میشه ما "فیوچر" هایی که تکمیل شدن رو دوباره اجرا نکنیم و "پول" نگیریم
    a: Option<FutureA>,
    b: Option<FutureB>,
}

impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        // سعی کن "فیوچر" اول رو انجلم بدی
        if let Some(a) = &mut self.a {
            if let Poll::Ready(()) = a.poll(wake) {
                self.a.take();
            }
        }

        // سعی کن "فیوچر" بعدی رو انجام بدی
        if let Some(b) = &mut self.b {
            if let Poll::Ready(()) = b.poll(wake) {
                self.b.take();
            }
        }

        if self.a.is_none() && self.b.is_none() {
            // هر دوی "فیوچر" ها با موفقیت تموم شدن -- حالا میتونیم با موفقیت نتیجه رو برگردونیم
            Poll::Ready(())
        } else {
            // یک یا هر دوی "فیوچر" ها آماده نیست و هنوز کار دارن تا تموم بشن
            // اونا تابع "بیدار شدن" رو زمانی که بتونن پردازشی رو جلو ببرن صدا میزنن
            Poll::Pending
        }
    }
}

این نشون دهنده اینه که چندین Future میتونن به صورت همزمان اجرا بشن بدون نیاز به allocation های جداگانه، که همین باعث میشه برنامه های async با بازده بیشتر رو بتونیم بسازیم. مشابه همین داستان، چندین Future میتونن به صورت خطی پشت سر هم اجرا بشن، مثل مثال زیر:

/// یک "فیچوچر" دیگه که دو تا "فیوچر" رو انجام میده، یکی بعد از اونی یک و پشت سر هم
//
// نکته: برای اهداف این مثال هر دوی "فیوچر" ها در لحظه ساختن در دسترس هستن
// در واقعیت خروجی "فویچر" دوم میتونه به عنوان ورودی به "فیوچر" بعدئی منتقل شه
pub struct AndThenFut<FutureA, FutureB> {
    first: Option<FutureA>,
    second: FutureB,
}

impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if let Some(first) = &mut self.first {
            match first.poll(wake) {
                // ما اولین "فیوچر" رو انجام دادیم -- پاکش کن
                // و دومی رو شروع کن
                Poll::Ready(()) => self.first.take(),
                // ما هنوز نمیتونیم اولین "فیوچر" روو انجام بدیم
                Poll::Pending => return Poll::Pending,
            };
        }
        // حالا که اولین "فیوچر" تموم شده، سعی کن دومی رو انجام بدی
        self.second.poll(wake)
    }
}

این مثال ها نشون میده چطوری trait Future میتونه بیان های مختلفی از جریان کنترلی روی یه برنامه async رو بدون نیاز به allocate کردن چندین object و callback های تو در تو ارائه بده. با این تفاسیر بیاید راجب trait اصلی Future و تفاوت هاش صحبت کنیم:

trait Future {
    type Output;
    fn poll(
        // به تغییر تایپ این مقدار پایین دقیت کنید
        self: Pin<&mut Self>,
        // تایپ مقدار پایین هم عوض شده
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output>;
}

اولین چیزی که احتمالا متوجهش شدید اینه که self دیگه از نوع &mut Self نیست و تبدیل شده به Pin<&mut Self>. ما راجب Pin کردن بیشتر در اینده صحبت میکنم، ولی فعلا در همین حد بدونید که Pin کردن این اجازه رو به ما میده تا Future های غیر قابل حرکت (ثابت) بسازیم. Object های ثابت یا غیر قابل حرکت میتونن Pointer مربوط به مقادیرشون رو بین همون مقادیر ذخیره کنن مثلا struct MyFut { a: i32, ptr_to_a: *const i32 }. Pin کردن یه چیز واجب برای استفاده از async/await هست.

در ادامه میبینم که wake: fn() تبدیل شده به &mut Context<'_>. در SimpleFuture ما یک تابع pointer (fn()) رو صدا میزدیم تا به اون اجراکننده Future بگیم اون Future ای که نیاز داریم رو poll بگیره یا وضعیتشو چک کنه. با این وجود، چون fn() فقط یه تابع pointer هست، نمیتونه هیچ اطلاعاتی راجب اینکه کدوم Future تابع wake رو صدا زده ذخیره کنه.

توی یک نرم افزار واقعی که قراره ساخته بشه، یه برنامه پیچیده مثل یک وب سرور شاید هزاران کانکشن داره که اون کانکشن ها باید توابع wake up شون به صورت جداگانه همگی صدا زده بشه. تایپ Context این مشکل رو با استفاده از تایپ Waker و دسترسی به اون حل کرده، که در حقیقت میتونه باهاش یه task خاص و مشخص رو صدا بزنه و wake up اش کنه.

بیدار کردن یک عملیات Async با Waker

این خیلی عادیه که Future ها همون بار اولی که poll میشن قادر به اتمام کارشون نباشن. تو این موقعیت، Future باید اطمیانان حاصل کنه که در آینده دوباره poll میشه وقتی آمادس تا پردازش بیشتری رو جلو ببره. این کار با استفاده از تایپ Waker انجام میشه.

هر زمانی که یه Future poll میشه، به عنوان قسمتی از یک کارواحد یا "task" poll میشه. task ها همون Future های سطح بالایی هستن که به یک اجراکننده داده شدن تا اجرا بشن.

Waker یه متد wake() داره که کارش اینه که به اجرا کننده بگهئ task ای که بهش وصل شده رو صدا بزنه یا به اصطلاح بیدارش کنه. وقتی wake() وقتی wake() صدا زده میشه اجراکننده میدونه task ای که به Waker متصل شده آمادس تا پردازش بیشتری رو انجام بده و Future اون باید دوباره poll بشه.

Waker متد clone() هم پیاده سازی کرده که میتونه تو جاهای مختلف کپی و ذخیره بشه.

خب حالا بیاید سعی کنیم یه تایمر ساده Future با استفاده از Waker بسازیم:

مثال عملی: یک تایمر بسازید

برای درک بهتر این مثال، وقتی تایمر درست شد ما میایم یک thread جدید ایجاد میکنیم و برای مدتی که اون تایمر ست شده thread رو تو حالت sleep میبریم و بعد از اون یه signal به تایمر Future میدیم که متوجه بشه تایم به اتمام رسیده.

اول، یه پروژه جدید با دستور cargo new --lib time_future ایجاد کنید و چیزایی که لازم داریم رو به src/lib.rs اضافه کنید:


#![allow(unused)]
fn main() {
use std::{
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex},
    task::{Context, Poll, Waker},
    thread,
    time::Duration,
};
}

بیاین اول از تعریف کردن خود تایپ Future شروع کنیم. Future ما نیاز به یک راهی داره تا بتونه با thread ارتباط برقرار کنه و متوجه تموم شدن تایم در اون thread بشه و در نهایت Future کارش با موفقت به اتمام برسه. برای این کار از Arc<Mutex<..>> برا ذخیره مقادیرمون استفاده میکنیم تا بتونیم از طریق اون بین Future و thread مون ارتباط برقرار کنیم.

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// وضعیت اشتراکی بین "فیوچر" و "ترد" ی که منتظر وایساده
struct SharedState {
    /// این مقدار نشون میده که تایمر کارش تموم شده یا نه
    completed: bool,

    /// تابع "بیدار شونده" برای این تایمر. که وقتی تایمر کارش تموم شد
    /// این تابع صدا زده بشه و ادامه عملیات انجام بشه
    waker: Option<Waker>,
}

حالا بیاین واقغا خود Future رو پیاده سازی کنیم!

impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // بررسی کن میکنه ببینه تایمر کارش تموم شده یا نه
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // تابع "بیدار شونده" رو ست میکنیم تا ترد بتونه
            // در زمانی که تایمر کارش تموم میشه اون صدا بزنه
            //
            // اینجا فقط یک بار عملیات کپی کردن از تابع "بیدار شونده" انجام میشه به جای
            // اینکه هر سری بیاد و کپی کنه
            //
            // البته میشه از تابع زیر هم برای بررسی اینکه آیا اون "فیوچر" بیدار میشه یا نه
            // هم استفاده کرد ولی برای اینکه مثال رو ساده نگه داریم اینجوری استفاده کردیم
            // `Waker::will_wake`
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

خیلی راحت بود مگه نه ؟ اگه thread مقدار shared_state.completed = true رو ست کنه ما کارمون تمومه! در غیر این صورت میایم Waker رو clone برای task فعلی clone میکنیم به منتقلش میکنیم به shared_state.waker. به این ترتیب thread میتونه task رو دوباره بیدار کنه.

این خیلی مهمه که هر موقع که Future دوباره poll میشه ما Waker رو آپدیت کنیم، چون ممکنه Future یه task دیگه رو با یک Waker دیگه رو منتقل کرده باشه. این داستان زمانی اتفاق میفته که Future ها بین task های مختلف وقتی poll میشن جا به جا بشن.

در نهایت ما به یه API نیاز داریم تا واقعا تایمر رو بسازیم و thread رو شروع کنیم:

impl TimerFuture {
    /// یک "فیوچر" جدید درست میکنه زمانی که تایمر کارش انجام میشه
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // یک ترد جدید میسازه
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            // زمانی که تایمر تموم شد یک سیگنال میفرسته و آخرین تسکی که "پول" شده رو بیدار میکنه
            // البته اگه تسکی موجود باشه
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}

این تمام چیزی بود که نیاز داشتیم تا یک تایمر Future ساده بسازیم. فقط کاش یه اجراکننده داشتیم که میتونستیم Future رو روی اون اجرا کنیم ...

پیاده سازی یک اجراکننده برای عملیات Async

Future های Rust تنبل هستن: اونا هیچ کاری انجام نمیدن تا وقتی یکی اونا رو اجرا کنه تا اتمام پیدا کنن. یکی از راه ها برای اجرای یک Future اینه که از .await داخل یک تابع async استفاده کنیم، ولی خود این قضیه رو مشکل تر میکنه: حالا کی اون Future ای که توسط توابع async تولید شده رو اجرا میکنه ؟ جواب اینه که ما به یک اجرا کننده Future نیاز داریم.

اجارکننده های Future توابع سطح بالا Future رو میگیرم و اجراشون میکنن، که این کار رو با صدا زدن poll زمانی که Future میتونه پردازش بیشتری رو انجام بده، انجام میدن. معمولا اجراکننده های poll یک Future رو برای شروع صدا میزنن. وقتی Future میفهمه که آمادس تا پردازش بیشتری رو با صدا زدن wake() انجام بده، داخل یک صف گذاشته میشن تا دوباره نوبتشون بشه و poll دوباره صدا زده بشه، این کار اونقدر ادامه پیدا میکنه تا Future کارش تموم بشه.

تو این قسمت قراره اجراکننده ساده خودمون رو بنویسم که قادره تعداد زیادی توابع سطح بالا Future رو بگیره و به صورت همزمان اجرا کنه تا تموم بشن.

For this example, we depend on the futures crate for the ArcWake trait, which provides an easy way to construct a Waker. Edit Cargo.toml to add a new dependency:

برای این مثال ما از پکیج futures استفاده میکنیم تا بتونیم از trait ArcWake استفاده کنیم. چون خیلی راحت میتونیم یه Waker بسازیم. فایل cargo.toml رو باز کنید و این crate رو اضافه کنید:

[package]
name = "timer_future"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2021"

[dependencies]
futures = "0.3"

در ادامه نیاز داریم تا این چیزا ها رو به بالای src/main.rs اضافه کنیم:

use futures::{
    future::{BoxFuture, FutureExt},
    task::{waker_ref, ArcWake},
};
use std::{
    future::Future,
    sync::mpsc::{sync_channel, Receiver, SyncSender},
    sync::{Arc, Mutex},
    task::Context,
    time::Duration,
};
// اون تایمری که توی قسمت قبل با هم نوشتیم
use timer_future::TimerFuture;

اجراکننده ما با ارسال task ها روی یک کانال (channel) کار میکنه. اجراکننده event ها رو از روی کانال بر میداره و اونا رو اجرا میکنه. زمانی که یه task آماده هست تا کار بیشتری انجام بده (بیدار شده)، میتونه خودشو دوباره از طریق کانال بفرسته که دوباره poll بشه.

توی این طراحی، خود اجراکننده فقط لازمه قسمت دریافت کننده کانال رو داشته باشه. و کاربر باید قسمت فرستنده رو داشته باشه تا بتونه Future های جدید تولید کنه. Task ها خودشون فقط Future هایی هستن که میتونن دوباره خودشونو برنامه ریزی کنن برای اجرا دوباره، پس با اونا رو به صورت جفت Future با فرستنده ذخیره میکنیم تا task بتونه دوباره خودشو توی صف بزاره.

/// اجراکننده ای که تسک ها رو از یه کانال میگیره و اجراشون میکنه
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

/// این یه "فیوچر" جدید میسازه و میفرسته به کانال
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

/// یک "فیوچر" ی که میتونه خودشو دوباره برنامه ریزی که که توسط اجراکننده دوباره "پول" بشه
struct Task {
    /// فیوچری که در حاله پردازشه و باید به داخل صف گذاشته بشه تا انجام بشه
    /// 
    /// اینجا استفاده از "میوتکس" اجباری نیست، به دلیل اینکه ما فقط از طریق یک "ترد" داریم به مقادریمون دسترسی
    /// پیدا میکنیم، اما "راست" اونقدر باهوش نیست که بفهمه ما فقط از یه "ترد" استفاده میکنیم
    /// برای همین مجبوریم از "میتوکس" استفاده کنیم تا قوانین امن بودن مموری در
    /// زبان "راست" رو رعایت کرده باشیم
    /// البته میشه از تایپ زیر هم به جای میتوکس استفاده کرد
    /// `UnsafeCell`
    future: Mutex<Option<BoxFuture<'static, ()>>>,

    /// یک هندلر که تسک رو برنامه ریزی میکنه و برش میگردونه به داخل صف
    task_sender: SyncSender<Arc<Task>>,
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    // بیشترین حد تسک هایی که میتونیم داخل صف داشته باشیم از طریق کانال در یک لحظه
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    (Executor { ready_queue }, Spawner { task_sender })
}

بیاید یه متد هم برای راحتی تولید Future های جدید بسازیم. این متد یک تایپ Future میگیره که ما اینو با تایپ box یکی میکنیم و یک Arc<Task> جدید میسازیم که میتونه داخل اجراکننده به صورت صف اجرا بشه.

impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("too many tasks queued");
    }
}

برای poll کردن Future ها باید یه Waker بسازیم. همونطور که داخل بیدار کردن یک عملیات Async با Waker بحث کریدم، Waker ها موظف هستن که یک task رو برنامه ریزی کنن برای poll شدن دوباره وقتی wake صدا زده میشه. یادتون باشه که Waker به اجراکننده میگه که دقیقا چه task ای آماده هست، که همین باعث میشه فقط Future های poll بشن که آماده هستن تا پردازش بیشتری رو انجام بدن. راحت ترین راه برای ساختن یک Waker پیاده سازی trait ArcWake و بعد استفاده از توابع waker_ref یا .into_waker() هست تا بتونیم Arc<impl ArcWake> رو به Waker تبدیل کنیم. بیاید ArcWake رو برای task هامون پیاده سازی کنیم تا این اجازه رو بهشون بدیم که به Waker تبدیل بشن و در نهایت بیدار بشن:

impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // تابع "بیدار کننده" رو با فرستادن تسک توی کانال پیاده سازی میکنیم
        // تا دوباره توسط اجرا کننده "پول" بشه
        let cloned = arc_self.clone();
        arc_self
            .task_sender
            .send(cloned)
            .expect("too many tasks queued");
    }
}

وقتی یک Waker از Arc<Task> ساخته میشه، صدا کردن wake() باعث میشه یک کپی از Arc به کانال task ها فرستاده بشه. در نهایت اجارکننده ما باید task رو برداره و poll اش کنه. بیاید همینو پیاده سازی کنیم:

impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            // فیوچر رو میگیره و اگه هنوز تکمیل نشده باشه "پول" اش میکنه
            // و سعی میکنه تکمیلش کنه
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                // یک "بیدار شونده" از خود تسک میسازیم
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&*waker);
                // `BoxFuture<T>` این تایپ یه تایپ مستعار برای تایپ زیره
                // `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
                if future.as_mut().poll(context).is_pending() {
                    // ایمنجا هنوز تسک ما تکمیل نشده برای همین
                    // دوباره برش میگردونیم تا دوباره توسط اجراکننده اجرا بشه
                    *future_slot = Some(future);
                }
            }
        }
    }
}

تبریک میگم! ما الان یک اجراکننده Future داریم که کار میکنه. حتی الا میتونیم با این اجارکننده عملیات asyn/.await و Future های شخصی سازی شده رو انجام بدیم. عملیاتی مثل TimeFuture که قبلا نوشتیمش:

fn main() {
    let (executor, spawner) = new_executor_and_spawner();

    //یک تسک برای پرینت کردن قبل و بعد از اتمام تایمر میسازیم
    spawner.spawn(async {
        println!("howdy!");
        // صبر میکنیم تا تایمر "فیوچر" ما بعد از 2 ثانیه تکمیل بشه
        TimerFuture::new(Duration::new(2, 0)).await;
        println!("done!");
    });

    // حالا از حافظه پاکش میکنیم تا اجرا کننده بدونه تموم شده کارش
    // و تسک های بیشتری در آینده نمیگیره ازش تا اجراش کنه
    drop(spawner);

    // Run the executor until the task queue is empty.
    // This will print "howdy!", pause, and then print "done!".
    // اجرا کننده رو اجرا میکنه تا زمانی که صف تسک ها خالی بشه
    // که در نهایت برای ما اول پرینت میکنه:
    // "howdy!"
    // و بعدش پرینت میکنه
    // "done!"
    executor.run();
}

اجراکننده ها و IO سیستم

توی قسمت قبلی در بخش نگاه دقیق به Future Trait راجب این مثال از خوندن socket به صورت async که یک Future بود بحث کردیم:

pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // سوکت دیتا رو آماده داره -- بخونش داخل یه بافر و برگردونش
            Poll::Ready(self.socket.read_buf())
        } else {
            // سوکت هنوز دیتایی نداره
            //
            // تابع "بیدار شدن" رو زمانی که دیتا آماده بود صدا بزن.
            // وقتی دیتا آماده باشه تابع "بیدار شدن" صدا زده میشه و در نتیجه
            // اون "فیوچر" میدونه که الان باید "پول" رو صدا بزنه دوباره و دیتا رو بگیره
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}

این Future دیتای موجود socket رو میخونه، و اگر دیتای برای خوندن موجود نباشه، به اجراکننده میگه، و ازش میخواد که task هاش صدا زده بشن یا دوباره بیدار بشن در زمانی که دیتایی برای خوندن موجود باشه. با این حال، توی این مثال واضح نیست که تایپ Socket چجوری پیاده سازی شده یا به صورت دقیق تر، مشخص نیست که تابع set_readable_callback چجوری کار میکنه. چجوری میتونیم wake() رو فراخونی کنیم زمانی که socket چیزی برای خوندن داشته باشه ؟ یه راه حل اینه که یک thread به صورت مداوم چک کنه که آیا socket دیتایی برای خوندن داره یا نه و در زمان مناسب wake() رو فراخونی کنه. البته این روش اصلا بهینه نیست، چون داریم برای هر IO ای که مسدود کننده (blocking) هست یک thread باز میکنیم. این روش باعث میشه کارآمدی کد async ما خیلی پایین باشه.

در واقعیت، این مشکل با یک سری کتابخونه هایی که به اصطلاح آگاه به IO مسدود کننده سیستم هستن حل شده، مثل epoll روی لینوکس، kqueue روی FreeBSD و MacOS، IOCP روی ویندوز و port رو Fuchsia همه ی اینا داخل Rust از طریق crate ای به نام mio که به صورت cross platform دسترسی به این کتابخونه های سیستمی رو فراهم میکنه به صورت آماده وجود داره. این کتابخونه ها این امکان رو فراهم میکنن که یک thread رو چندین عملیات IO async مسدود بشه و زمانی نتیجش برگرده که یکی از اون event ها انجام شده و نتیجش حاضره. تو عمل استفاده از API این کتابخونه ها یه چیزی شبیه مثال زیره:


struct IoBlocker {
    /* ... */
}

struct Event {
    // یک آی دی یکتا که مشخص کننده رویداد هست و با این آی دی لیست شده
    id: usize,

    // یک سری سیگنال که برای جوابشون صبر میکنه
    signals: Signals,
}

impl IoBlocker {
    /// ساخت یک کالکشن از عملیات ناهمزمان برای رویداد های "آی او" که برای گرفتن جوابشون ترد بلاک یا مسدود میشه
    fn new() -> Self { /* ... */ }

    /// تعریف یک رویداد خاص برای بررسی در آینده
    fn add_io_event_interest(
        &self,

        /// آبجکتی که رویداد توی اون اتفاق میفته
        io_object: &IoObject,

        /// یک سری از سینگنال هایی که درون آبجکت بالا پدید میان
        /// که در نتیجه اون رویدادی باید اتفاق بیفته که اون رویداد با یک آی دی یکتا مشخص شده
        event: Event,
    ) { /* ... */ }

    /// بلاک شدن و مسدود شدن تا زمانی که یکی از رویداد ها اتفاق بیفته
    fn block(&self) -> Event { /* ... */ }
}

let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
    &socket_1,
    Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
    &socket_2,
    Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();

// اگر سوکت 1 زمانی قبال خوندن بشه برای مثال چاپ میشه: سوکت 1 آماده خوندن هست
println!("Socket {:?} is now {:?}", event.id, event.signals);

اجراکننده های Future میتونن از این کتابخونه های سیستمی استفاده کنن تا امکان اجرا شدن callback این Object های IO async مثل socket ها موقعی که event مربوطه اتفاق میفته فراهم بشه. توی مثال مربوط به SocketRead در بالا، تابع Socket::set_readable_callback یه چیزی شبیه کد پایینه:

impl Socket {
    fn set_readable_callback(&self, waker: Waker) {
        // یک رفرنس از اجراکننده ای که قرار اجرا کنه
        // که البته خیلیا این رو به صورت کلی داخل اون تردی که قراره اجرا بشه میارن
        // اما اینجا یک رفرنس از اون پاس داده شده تا از این رفرنس استفاده بشه
        let local_executor = self.local_executor;

        // یک ای دی یکتا برای این آبجکت آی او
        let id = self.id;

        // تابع بیدارکننده رو توی مپ اجراککنده ذخیره میکنیم تا بعدا وقت رویداد های آی او
        // اتفاق افتادن بتونه صداشون بزنه
        local_executor.event_map.insert(id, waker);
        local_executor.add_io_event_interest(
            &self.socket_file_descriptor,
            Event { id, signals: READABLE },
        );
    }
}

حالا ما میتونیم فقط یک اجراکننده روی thread داشته باشیم تا رویداد ها و event های IO رو بگیره و Waker متناسب با اون رو فعال کنه، که باعث میشه task مربوطه انجام بشه. همین مساله باعث میشه که اجراکننده فرصت داشته باشه تا task های بیشتری رو اجرا کنه قبل از اینکه دوباره بره و چک کنه event جدید IO ای برای اجرا کردن داره یا نه (و این چرخه همینجور ادامه داره ...).

async/.await

توی فصل اول نگاه کوتاهی به ‍‍async/.await داشتیم. توی این فصل با جزئیات بیشتری روی async/.await بحث میکنیم و میبینم چجوری کد async با کد عادی که قبلا تو Rust میدیدیم فرق داره.

async/.await کلیدواژه های ویژه ای هستند که قدرت برنامه نویسی async رو به Rust میدن و اجازه میدن قدرت به thread برگرده و عملیات بعدی رو انجام بده در حالی که منتظر جواب یه کد async هست.

دو راه اصلی برای استفاده از async وجود داره: یکی async fn و اون یکی بلاک های async هستن. که هر کدوم مقداری رو بر میگردونن که trait ‍‍‍‍Future رو پیاده سازی کرده.


// تابع زیر تایپی رو برمیگردونه که "تریت" زیر رو پیاده سازی کرده:
// `Future<Output = u8>`
// که باعث میشه در نهایت تایپ زیر برگرده:
// u8
async fn foo() -> u8 { 5 }

fn bar() -> impl Future<Output = u8> {
    // بلاک زیر تایپی رو برمیگردونه از نوع:
    // `Future<Output = u8>`
    async {
        let x: u8 = foo().await;
        x + 5
    }
}

همونطور که توی فصل اول دیدیم بدنه async و بقیه Future ها تنبل هستن: به این معنی که تا زمانی که اجرا نشن هیچ کاری نمیکنن. معمول ترین روش اجرا کردن Future اینه که منتظرش بمونید یا .await اش کنید. زمانی که .await روی یه Future صدا زده میشه سعی میکنه اونو اجراش کنه تا با موفقیت تموم بشه. اگه Future بلاک کننده و مسدود کندده thread باشه کنترل رو به thread بر میگردونه. وقتی پردازش بیشتری میتونه انجام بشه Future دوباره توسط اجرا کننده برداشته میشه و ادامه پردازشش انجام میشه و این باعث میشه .await با موفقیت تکمیل بشه.

Lifetime های async

برعکس توابع معمول در Rust، توابع async fn ای که رفرنس ها یا آرگومان های غیر از static رو به عنوان ورودی میگیرن، Future ای بر میگردونن که lifetime شون دقیقا مثل lifetime آرگومان هاشون هست:

// این تابع:
async fn foo(x: &u8) -> u8 { *x }

// با این تابع برابره:
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
    async move { *x }
}

که یعنی تابع async fn ای که یک Future بر میگردونه باید توسط .await صدا زده بشه در حالی که هنوز آرگومان های غیر static اش موجود هستن. تو حالت عادی موقعی که .await میکنید بعد از اینکه تابع رو صدا میزنید مثلا مثل foo(&x).await موردی به موجود نمیاد و مشکلی نیست. با این حال، ذخیره کردن Future ها یا ارسالشون به یک task یا thread دیگه ممکنه مشکل ساز بشه.

یکی از کارهایی که برای تبدیل asyn fn با آرگومان هایی از جنس رفرنس به Future های 'static میشه انجام داد، اینه که آرگومان ها رو با صدا کردن async fn داخل یک بلاک async جا بدیم:

fn bad() -> impl Future<Output = u8> {
    let x = 5;
    borrow_x(&x) // ارور: مقدار "ایکس" زمان زیادی زنده نیست یعنی لایف تایمش اجازه نمیده
}

fn good() -> impl Future<Output = u8> {
    async {
        let x = 5;
        borrow_x(&x).await
    }
}

با انتقال آرگومان ها به بلاک async، lifetime اون رو بسط دادیم به lifetime اون Future ای که از صدا زدن good برمگیرده.

async move

بلاک های async و clouser مثل clouser های عادی اجازه استفاده از کلیدواژه move رو میده. یه بلاک async move باعث میشه مالکیت تمام متغیر هایی که به عنوان رفرنس استفاده میشن رو بگیره و عمرشون دقیقا به اون scope محدود بشه که این قابلیت باعث میشه دیگه نشه ازشون تو قسمت های دیگه کد استفاده کرد:

/// `async` block:
///
/// چندین بلاک از نوع زیر میتونن به متغیر های لوکال دسترسی داشته باشن
/// البته تا وقتی که تو اون اسکوپ اجرا بشن
async fn blocks() {
    let my_string = "foo".to_string();

    let future_one = async {
        // ...
        println!("{my_string}");
    };

    let future_two = async {
        // ...
        println!("{my_string}");
    };

    // هر دو "فیوچر" اجرا میشه تا در نهایت دو بار مقدار زیر تپرینت میشه:
    // "foo"
    let ((), ()) = futures::join!(future_one, future_two);
}

/// `async move` block:
///
/// فقط یک بلاکی که کلیدواژه "موو" داره میتونه به مقادیر لوکال در لحظه دسترسی داشته باشه
/// به خاطر اینکه اون مقادیر به اسکوپ "فیوچر" منتقل شدن
/// البته همین باعث میشه اون "فیوچر" بیشتر از اسکوپ اصلی زنده بمونه و دووم بیاره:
fn move_block() -> impl Future<Output = ()> {
    let my_string = "foo".to_string();
    async move {
        // ...
        println!("{my_string}");
    }
}

.await کردن روی یک اجراکننده Multi-thread

دقت کنید که وقتی از یک اجرا کننده Future به صورت Multi-thread استفاده میکنیم، Future ها میتونن بین thread ها منتقل بشن پس هم این Future ها هم متغیر ها باید امکان انتقال بین thread ها رو داشته باشن، چون هر درخواست .await میتونه منجر به تعویض thread بشه.

که این یعنی استفاده از Rc یا &RefCell یا تایپ های دیگه ای که trait Send رو پیاده سازی نکردن و همچنین رفرنس هایی که به تایپ هایی اشاره دارن که trait Sync رو پیاده سازی نکردن، امن نیست.

(احتیاط: امکانش وجود داره از این تایپ ها استفاده کنید البته تا زمانی که توی scope ای که .await صدا زده میشه نباشن.)

به طور مشابه، ایده خوبی نیست مقادیر غیر Future ای lock شده رو توی .await ها استفاده کنید، چون باعث lock شدن یا قفل شدن thread pool میشه: مثلا ممکنه یه task باعث lock بشه و .await هم صدا زده بشه و قدرت کنترل برگرده به اجراکننده تا یه task دیگه انجام بشه که سعی کنه همون چیز رو lock کنه و همین باعث ایجاد deadlock بشه. برای جلوگیری از این مشکل از تایپ Mutex داخل future::lock به جای std::sync استفاده کنید.

Pinning

Streams

Iteration and Concurrency

Executing Multiple Futures at a Time

join!

select!

Workarounds to Know and Love

? in async Blocks

Send Approximation

Recursion

async in Traits

The Async Ecosystem

Final Project: HTTP Server

Running Asynchronous Code

Handling Connections Concurrently

Testing the Server