用「async / await」語法來撰寫並發與並行程式是現代程式語言流行的方式,比起傳統開啟新的執行緒來完成並發處理工作的目的,「async / await」可以在同一個執行緒下達到同樣的目的,非常適用於當並發處理的工作並不重的時候,能夠省下建立執行緒所產生的開支(overhead)。
作為一個2015年才被正式釋出的程式語言,雖然Rust在1.39之後正式支援「async / await」語法,但是並沒有提供標準的「非同步(Asynchronous) runtime」,需要添加很多額外的crate來實現這項機制。
「async / await」語法
在開始介紹非同步runtime之前,要先了解Rust的async
和.await
關鍵字的用法。
在程式區塊的前面加上async
關鍵字或是async move
關鍵字,可以快速地使這個程式區塊變成一個有實作core::future::Future
特性的物件,Future
特性的Output
關聯型別會是這個程式區塊的回傳值型別(如果無回傳值就會是()
)。例如:
let future = async {
println!("Hello, world.");
0
};
以上的future
變數,儲存著一個可以向標準輸出(stdout)輸出Hello, world.
字串,並且回傳0
的Future
物件。
Future
物件在被建立之後,其程式區塊中的敘述並不會立刻被執行。需在同樣有使用async
關鍵字的程式區塊內使用.await
關鍵字,才能夠去執行它。這樣的作法與JavaScript提供的「async / await」語法有點不一樣,在JavaScript中,Promise
物件被建立出來的同時,其回呼函數的主體就會立刻被執行。
let future = async {
let inner_future = async {
println!("Hello, world.");
0
};
let output = inner_future.await;
// ...
};
一個Future
物件只能夠被執行一次,想要執行多次Future
物件的內容,就要重複產生該Future
物件。
我們也可以在fn
、unsafe fn
關鍵字前,加上async
關鍵字,定義出來的函數就是Future
物件的產生器;或者在閉包前或是閉包的move
關鍵字前,加上async
關鍵字,定義出來的閉包就是Future
物件的產生器。
let future = async {
async fn generate_future() -> i32 {
println!("Hello, world.");
0
}
let inner_future = generate_future();
let output = inner_future.await;
// ...
};
至此您可能會發現這個「async / await」語法好像一點用也沒有,因為每次呼叫Future
物件都一樣是阻塞的(blocking),到底哪裡並發了?而且若是在開發應用程式,main
函數還不能夠加上async
關鍵字,根本就不能在main
函數中執行Future
物件嘛!
先別急,這是因為我們現在能看到的函式庫(如標準函式庫)完全都是阻塞的(blocking)才會有「async / await」語法一點用都沒有的感覺,引用了其它非同步runtime後,情況就會不一樣了。
futures
futures
是一個開發非同步程式時使用的基本函式庫。它也提供了block_on
函數,讓我們可以在非async
的程式區塊(尤其是main
函數)中執行Future
物件。
[dependencies]
futures = "0.3"
use futures::executor::block_on;
fn main() {
let inner_future = async {
println!("Hello, world.");
0
};
let output = block_on(inner_future);
// ...
}
非同步runtime
非同步runtime用來提供非同步程式進入點和基礎的非同步I/O函數,以及任務執行器(executor)或者說Future
執行器。目前的Rust生態圈有三種主要的非同步runtime,分別是tokio
、async-std
和smol
。
先看看以下這個程式:
use std::{
error::Error,
io::{Read, Write},
net::TcpListener,
thread,
};
fn main() -> Result<(), Box<dyn Error>> {
let listener = TcpListener::bind("127.0.0.1:8080")?;
loop {
let (mut socket, _) = listener.accept()?;
thread::spawn(move || {
let mut buf = [0; 1024];
loop {
let n = match socket.read(&mut buf) {
Ok(0) => return,
Ok(n) => n,
Err(e) => {
eprintln!("failed to read from socket; err = {e:?}");
return;
},
};
if let Err(e) = socket.write_all(&buf[0..n]) {
eprintln!("failed to write to socket; err = {e:?}");
return;
}
}
});
}
}
上面是一個TCP伺服器的程式,接受一個連線之後就開啟新的執行緒來處理,會將傳入的資料原封不動地回傳。
tokio
tokio
是最被廣泛使用的runtime,提供多種API,可以啟用full
特色來全部啟用。
以下是用tokio
來實作上面那個TCP伺服器程式的程式碼:
[dependencies]
tokio = { version = "1", features = ["full"] }
use std::error::Error;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpListener,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = [0; 1024];
loop {
let n = match socket.read(&mut buf).await {
Ok(0) => return,
Ok(n) => n,
Err(e) => {
eprintln!("failed to read from socket; err = {e:?}");
return;
},
};
if let Err(e) = socket.write_all(&buf[0..n]).await {
eprintln!("failed to write to socket; err = {e:?}");
return;
}
}
});
}
}
以上是使用tokio
實作的TCP伺服器,看起來好像沒有產生新執行緒,但實際上它在一開始就會產生一個執行緒池(Thread Pool),用來執行tokio::spawn
傳進去的Future
物件。
執行緒池的執行緒數量預設是處理器(邏輯處理器)的數量,如果要更改的話,可以替#[tokio::main]
屬性加上worker_threads
參數。如下,可以讓執行緒池使用4個執行緒:
use std::error::Error;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpListener,
};
#[tokio::main(worker_threads = 4)]
async fn main() -> Result<(), Box<dyn Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = [0; 1024];
loop {
let n = match socket.read(&mut buf).await {
Ok(0) => return,
Ok(n) => n,
Err(e) => {
eprintln!("failed to read from socket; err = {e:?}");
return;
},
};
if let Err(e) = socket.write_all(&buf[0..n]).await {
eprintln!("failed to write to socket; err = {e:?}");
return;
}
}
});
}
}
即便將worker_threads
設定為1
,TCP伺服器也還是可以正常允許同時間有多筆連線,這就是非同步runtime真正厲害的地方啦!
async-std
async-std
強調對應Rust的標準函式庫,可以將其看作是std
的非同步版本。啟用attributes
特色可以使用#[async_std::main]
屬性來建立程式進入點,用法就如同#[tokio::main]
。
以下是用async-std
來實作上面那個TCP伺服器程式的程式碼:
[dependencies]
async-std = { version = "1", features = ["attributes"] }
use std::error::Error;
use async_std::{net::TcpListener, prelude::*, task};
#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
let (mut socket, _) = listener.accept().await?;
task::spawn(async move {
let mut buf = [0; 1024];
loop {
let n = match socket.read(&mut buf).await {
Ok(0) => return,
Ok(n) => n,
Err(e) => {
eprintln!("failed to read from socket; err = {e:?}");
return;
},
};
if let Err(e) = socket.write_all(&buf[0..n]).await {
eprintln!("failed to write to socket; err = {e:?}");
return;
}
}
});
}
}
以上是使用async-std
實作的TCP伺服器,看起來好像沒有產生新執行緒,但實際上它在一開始就會產生一個執行緒池,用來執行task::spawn
傳進去的Future
物件。
執行緒池的執行緒數量預設是處理器(邏輯處理器)的數量,如果要更改的話,直接透過環境變數ASYNC_STD_THREAD_COUNT
來設定即可。同樣地,即便將ASYNC_STD_THREAD_COUNT
設為1
,TCP伺服器也還是可以正常允許同時間有多筆連線。
smol
smol
是輕巧又快速的非同步runtime,用法上與前面兩者比較不同。
以下是用smol
來實作上面那個TCP伺服器程式的程式碼:
[dependencies]
smol = "1"
use std::error::Error;
use smol::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpListener,
};
fn main() -> Result<(), Box<dyn Error>> {
smol::block_on(async {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
let (mut socket, _) = listener.accept().await?;
smol::spawn(async move {
let mut buf = [0; 1024];
loop {
let n = match socket.read(&mut buf).await {
Ok(0) => return,
Ok(n) => n,
Err(e) => {
eprintln!("failed to read from socket; err = {e:?}");
return;
},
};
if let Err(e) = socket.write_all(&buf[0..n]).await {
eprintln!("failed to write to socket; err = {e:?}");
return;
}
}
})
.detach();
}
})
}
以上是使用smol
實作的TCP伺服器,看起來好像沒有產生新執行緒,但實際上它在一開始就會產生一個執行緒池,用來執行task::spawn
傳進去的Future物件。
執行緒池的執行緒數量預設是1,如果要更改的話,直接透過環境變數SMOL_THREADS
來設定即可。同樣地,即便沒有設定SMOL_THREADS
,或是將SMOL_THREADS
設為1
,TCP伺服器也還是可以正常允許同時間有多筆連線。