用「async / await」語法來撰寫並發與並行程式是現代程式語言流行的方式,比起傳統開啟新的執行緒來完成並發處理工作的目的,「async / await」可以在同一個執行緒下達到同樣的目的,非常適用於當並發處理的工作並不重的時候,能夠省下建立執行緒所產生的開支(overhead)。



作為一個2015年才被正式釋出的程式語言,雖然Rust在1.39之後正式支援「async / await」語法,但是並沒有提供標準的「非同步(Asynchronous) runtime」,需要添加很多額外的crate來實現這項機制。

「async / await」語法

在開始介紹非同步runtime之前,要先了解Rust的async.await關鍵字的用法。

在程式區塊的前面加上async關鍵字或是async move關鍵字,可以快速地使這個程式區塊變成一個有實作core::future::Future特性的物件,Future特性的Output關聯型別會是這個程式區塊的回傳值型別(如果無回傳值就會是())。例如:

let future = async {
    println!("Hello, world.");
    0
};

以上的future變數,儲存著一個可以向標準輸出(stdout)輸出Hello, world.字串,並且回傳0Future物件。

Future物件在被建立之後,其程式區塊中的敘述並不會立刻被執行。需在同樣有使用async關鍵字的程式區塊內使用.await關鍵字,才能夠去執行它。這樣的作法與JavaScript提供的「async / await」語法有點不一樣,在JavaScript中,Promise物件被建立出來的同時,其回呼函數的主體就會立刻被執行。

let future = async {
    let inner_future = async {
        println!("Hello, world.");
        0
    };

    let output = inner_future.await;

    // ...
};

一個Future物件只能夠被執行一次,想要執行多次Future物件的內容,就要重複產生該Future物件。

我們也可以在fnunsafe fn關鍵字前,加上async關鍵字,定義出來的函數就是Future物件的產生器;或者在閉包前或是閉包的move關鍵字前,加上async關鍵字,定義出來的閉包就是Future物件的產生器。

let future = async {
    async fn generate_future() -> i32 {
        println!("Hello, world.");
        0
    }

    let inner_future = generate_future();

    let output = inner_future.await;

    // ...
};

至此您可能會發現這個「async / await」語法好像一點用也沒有,因為每次呼叫Future物件都一樣是阻塞的(blocking),到底哪裡並發了?而且若是在開發應用程式,main函數還不能夠加上async關鍵字,根本就不能在main函數中執行Future物件嘛!

先別急,這是因為我們現在能看到的函式庫(如標準函式庫)完全都是阻塞的(blocking)才會有「async / await」語法一點用都沒有的感覺,引用了其它非同步runtime後,情況就會不一樣了。

futures

futures是一個開發非同步程式時使用的基本函式庫。它也提供了block_on函數,讓我們可以在非async的程式區塊(尤其是main函數)中執行Future物件。

[dependencies]
futures = "0.3"
use futures::executor::block_on;

fn main() {
    let inner_future = async {
        println!("Hello, world.");
        0
    };

    let output = block_on(inner_future);

    // ...
}

非同步runtime

非同步runtime用來提供非同步程式進入點和基礎的非同步I/O函數,以及任務執行器(executor)或者說Future執行器。目前的Rust生態圈有三種主要的非同步runtime,分別是tokioasync-stdsmol

先看看以下這個程式:

use std::{
    error::Error,
    io::{Read, Write},
    net::TcpListener,
    thread,
};

fn main() -> Result<(), Box<dyn Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080")?;

    loop {
        let (mut socket, _) = listener.accept()?;

        thread::spawn(move || {
            let mut buf = [0; 1024];

            loop {
                let n = match socket.read(&mut buf) {
                    Ok(0) => return,
                    Ok(n) => n,
                    Err(e) => {
                        eprintln!("failed to read from socket; err = {e:?}");
                        return;
                    },
                };

                if let Err(e) = socket.write_all(&buf[0..n]) {
                    eprintln!("failed to write to socket; err = {e:?}");
                    return;
                }
            }
        });
    }
}

上面是一個TCP伺服器的程式,接受一個連線之後就開啟新的執行緒來處理,會將傳入的資料原封不動地回傳。

tokio

tokio是最被廣泛使用的runtime,提供多種API,可以啟用full特色來全部啟用。

以下是用tokio來實作上面那個TCP伺服器程式的程式碼:

[dependencies]
tokio = { version = "1", features = ["full"] }
use std::error::Error;

use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    net::TcpListener,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            let mut buf = [0; 1024];

            loop {
                let n = match socket.read(&mut buf).await {
                    Ok(0) => return,
                    Ok(n) => n,
                    Err(e) => {
                        eprintln!("failed to read from socket; err = {e:?}");
                        return;
                    },
                };

                if let Err(e) = socket.write_all(&buf[0..n]).await {
                    eprintln!("failed to write to socket; err = {e:?}");
                    return;
                }
            }
        });
    }
}

以上是使用tokio實作的TCP伺服器,看起來好像沒有產生新執行緒,但實際上它在一開始就會產生一個執行緒池(Thread Pool),用來執行tokio::spawn傳進去的Future物件。

執行緒池的執行緒數量預設是處理器(邏輯處理器)的數量,如果要更改的話,可以替#[tokio::main]屬性加上worker_threads參數。如下,可以讓執行緒池使用4個執行緒:

use std::error::Error;

use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    net::TcpListener,
};

#[tokio::main(worker_threads = 4)]
async fn main() -> Result<(), Box<dyn Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            let mut buf = [0; 1024];

            loop {
                let n = match socket.read(&mut buf).await {
                    Ok(0) => return,
                    Ok(n) => n,
                    Err(e) => {
                        eprintln!("failed to read from socket; err = {e:?}");
                        return;
                    },
                };

                if let Err(e) = socket.write_all(&buf[0..n]).await {
                    eprintln!("failed to write to socket; err = {e:?}");
                    return;
                }
            }
        });
    }
}

即便將worker_threads設定為1,TCP伺服器也還是可以正常允許同時間有多筆連線,這就是非同步runtime真正厲害的地方啦!

async-std

async-std強調對應Rust的標準函式庫,可以將其看作是std的非同步版本。啟用attributes特色可以使用#[async_std::main]屬性來建立程式進入點,用法就如同#[tokio::main]

以下是用async-std來實作上面那個TCP伺服器程式的程式碼:

[dependencies]
async-std = { version = "1", features = ["attributes"] }
use std::error::Error;

use async_std::{net::TcpListener, prelude::*, task};

#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;

    loop {
        let (mut socket, _) = listener.accept().await?;

        task::spawn(async move {
            let mut buf = [0; 1024];

            loop {
                let n = match socket.read(&mut buf).await {
                    Ok(0) => return,
                    Ok(n) => n,
                    Err(e) => {
                        eprintln!("failed to read from socket; err = {e:?}");
                        return;
                    },
                };

                if let Err(e) = socket.write_all(&buf[0..n]).await {
                    eprintln!("failed to write to socket; err = {e:?}");
                    return;
                }
            }
        });
    }
}

以上是使用async-std實作的TCP伺服器,看起來好像沒有產生新執行緒,但實際上它在一開始就會產生一個執行緒池,用來執行task::spawn傳進去的Future物件。

執行緒池的執行緒數量預設是處理器(邏輯處理器)的數量,如果要更改的話,直接透過環境變數ASYNC_STD_THREAD_COUNT來設定即可。同樣地,即便將ASYNC_STD_THREAD_COUNT設為1,TCP伺服器也還是可以正常允許同時間有多筆連線。

smol

smol是輕巧又快速的非同步runtime,用法上與前面兩者比較不同。

以下是用smol來實作上面那個TCP伺服器程式的程式碼:

[dependencies]
smol = "1"
use std::error::Error;

use smol::{
    io::{AsyncReadExt, AsyncWriteExt},
    net::TcpListener,
};

fn main() -> Result<(), Box<dyn Error>> {
    smol::block_on(async {
        let listener = TcpListener::bind("127.0.0.1:8080").await?;

        loop {
            let (mut socket, _) = listener.accept().await?;

            smol::spawn(async move {
                let mut buf = [0; 1024];

                loop {
                    let n = match socket.read(&mut buf).await {
                        Ok(0) => return,
                        Ok(n) => n,
                        Err(e) => {
                            eprintln!("failed to read from socket; err = {e:?}");
                            return;
                        },
                    };

                    if let Err(e) = socket.write_all(&buf[0..n]).await {
                        eprintln!("failed to write to socket; err = {e:?}");
                        return;
                    }
                }
            })
            .detach();
        }
    })
}

以上是使用smol實作的TCP伺服器,看起來好像沒有產生新執行緒,但實際上它在一開始就會產生一個執行緒池,用來執行task::spawn傳進去的Future物件。

執行緒池的執行緒數量預設是1,如果要更改的話,直接透過環境變數SMOL_THREADS來設定即可。同樣地,即便沒有設定SMOL_THREADS,或是將SMOL_THREADS設為1,TCP伺服器也還是可以正常允許同時間有多筆連線。