有關於並發(concurrency)和並行(parallelism)的定義每個人可能有不一樣的解釋。筆者認為並行算是並發的子集合,所謂的「並發」,是指在某項工作結束之前,另一項工作就開始了,但這些項工作可以是同時執行,也可以是交替執行。而所謂的「並行」,是並發的一種設計方式,能將工作交給不同的處理器(邏輯處理器)來執行,而達成同時執行兩項以上工作的目的。程式語言如JavaScript,雖然只支援單執行緒,但是它也可以並發處理工作,例如它能在等待I/O處理的同時,去執行其它的函數。程式語言如Java,可以建立出多個執行緒來並發處理工作,當作業系統可以使用多個處理器(邏輯處理器)時,執行緒就可以被分散到各個處理器(邏輯處理器)來同時執行,且如果執行緒數量大於處理器(邏輯處理器)數量的話,也會交替執行;如果作業系統沒有多個處理器(邏輯處理器)的話,則只會讓其交替處理正在執行的執行緒。



程式語言如Golang,可以利用goroutine(go函數)來並發處理工作,它利用多執行緒的方式去完成每個goroutine的工作,但並不是每個goroutine都會有一個獨立的執行緒,執行緒的數量透過GOMAXPROCS環境變數來設定,預設值即為處理器(邏輯處理器)的數量。假設GOMAXPROCS8,而goroutine的數量有100的話,Golang程式只會使用8個執行緒去完成這些goroutine。

Rust原生並沒有什麼特別的方式來實作並發程式(但可以使用「async / await」語法搭配第三方函式庫來實作併發程式,若想更進一步了解可以參考這篇文章),也是和Java一樣要自行建立出多個執行緒。雖然並發程式會讓程式複雜度提升,增加潛在Bug出現的機率,而且也不易偵錯,不過,Rust本身的擁有權規則,可以避免掉並發程式大部份常見的潛在Bug。

使用多執行緒來開發程式可以增加程式的效能,因為在作業系統能使用的處理器(邏輯處理器)數量足夠的情況下,被分配到不同執行緒的程式部份是可以被「同時」執行的。舉例來說,假設完成工作A所需的時間是5秒,完成工作B所需的時間是6秒。只使用單一執行緒來完成工作A和工作B需要5+6=11秒的時間,而如果使用多執行緒來同時完成工作A和工作B,只需要Max(5,6)=6秒的時間。

而多執行緒的程式可能會遭遇到以下幾個問題:

1. 競態條件(Race Condition):不同執行緒同時存取相同的資料所產生的不一致問題。
2. 死結(Deadlock):兩個執行緒彼此等待對方釋出資源的使用權,而造成這兩個執行緒永遠無法繼續執行。
3. 多執行緒的程式在每次執行時,程式的執行順序不一定會相同,因此當發生Bug時,會很難去重現出來。

要使用Rust程式語言建立新的執行緒,可以透過標準函式庫std::thread模組提供的spawn函數,搭配閉包來使用。舉例來說:

use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {i} from the spawned thread!");

            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {i} from the main thread!");

        thread::sleep(Duration::from_millis(1));
    }
}

以上程式執行結果,可能如下:

hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the main thread!
hi number 2 from the spawned thread!
hi number 3 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the main thread!
hi number 4 from the spawned thread!

std::thread模組提供的sleep函數,可以讓其所在的執行緒進入睡眠狀態,讓其它執行緒能有更多機會被執行到。新的執行緒內的for迴圈計數範圍雖然為1到10(包含1,不包含10),但由於主執行緒的for迴圈計數範圍只有1到5(包含1,不包含5),因此主執行緒的for迴圈很有可能會在新執行緒內的for迴圈結束之前就結束了,一旦主執行緒的程式都執行完成,即便新執行緒還沒執行完,程式也會直接停止。因此在程式執行的輸出結果中,沒有看到新執行緒的for迴圈當i大於4時的結果是正常的。

std::thread模組的spawn函數會回傳JoinHandle結構實體,我們可以利用這個結構實體,將其對應的執行緒加入(join)至其他執行緒中來執行。可以在JoinHandle結構實體所對應的執行緒之外呼叫該JoinHandle結構實體提供join方法,使其它的執行緒能夠等待JoinHandle結構實體對應的執行緒執行完成之後再繼續執行。舉例來說:

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {i} from the spawned thread!");

            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {i} from the main thread!");

        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}

以上程式執行結果,可能如下:

hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the main thread!
hi number 2 from the spawned thread!
hi number 3 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the main thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!

我們讓主執行緒在for迴圈結束執行之後,去等待新執行緒執行完,再結束程式,因此程式可以輸出新執行緒中for迴圈計數範圍為1到10(包含1,不包含10)的完整結果。

先前章節中學到的「借用」,在一般情況下無法跨執行緒使用,舉例來說:

use std::thread;

fn main() {
    let a = 5;

    let handle = thread::spawn(|| {
        println!("{a}");
    });
    
    handle.join().unwrap();
}

以上程式會編譯錯誤,因為主程式中的變數a,無法借用給要在新執行緒執行的閉包。

然而我們可以替閉包加上move關鍵字,讓閉包主體內使用其所在的scope的資源去嘗試「複製」或是「移動」,而不是「借用」,程式就可以通過編譯了。程式如下:

use std::thread;

fn main() {
    let a = 5;

    let handle = thread::spawn(move || {
        println!("{a}");
    });
    
    handle.join().unwrap();
}

在很多情況下,我們還是必須要讓相同的資料在不同的執行緒中被使用,此時可以透過「訊息傳遞」(message passing)來達成目的。

套用Golang文件中的一句話:

Do not communicate by sharing memory; instead, share memory by communicating.

意思為:不要使用共享記憶體來讓執行緒彼此溝通,而是要用溝通的方式去分享記憶體中的資料。

Rust程式語言的標準函式庫,提供了std::sync模組,提供了一些工具來協助我們處理執行緒間的同步。其中的子模組mpsc(multi-producer, single-consumer FIFO queue communication primitives),便是利用「訊息傳遞」的方式來實作的。

我們可以先使用mpsc模組提供的channel函數替要彼此同步的執行緒建立出「通道」。channel函數會回傳一個元組,第一個元素為Sender結構實體,第二個元素為Receiver結構實體。透過Sender結構實體發送出的訊息,可以被Receiver結構實體接收到,無論當時Sender結構實體或是Receiver結構實體被移動到哪個執行緒。

舉例來說:

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");

        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();

    println!("Got: {received}");
}

以上程式,第5行我們建立出了一個新的通道,並在第10行新執行緒中,使用Sender結構實體的send方法將字串hi送出。程式第13行主執行緒中,使用Receiver結構實體的recv方法來接收從Sender結構實體送出的訊息,主執行緒執行時會先停在該recv方法,直到接收到來自同通道的Sender結構實體的訊息後,才會繼續執行。

程式執行結果如下:

Got: hi

如果不用Receiver結構實體的recv方法,而是使用其try_recv方法的話,主執行緒執行時就不會停住,會直接用回傳被Result列舉所包裹的結果。

通道所能傳送的資料型別並沒有限制,因此如果要傳遞多種資料的話,可以搭配Vec結構體使用。舉例來說:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();

            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}

Receiver結構體本身和其參考型別皆有實作IntoIterator特性,因此可以直接被使用在for迴圈。程式執行結果如下:

Got: hi
Got: from
Got: the
Got: thread

Sender結構實體可以被複製,也就是說,我們可以用多個Sender結構實體,透過同一個通道將訊息傳遞給同一個Receiver結構實體。舉例來說:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    let tx_1 = tx;
    let tx_2 = tx_1.clone();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx_1.send(val).unwrap();

            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx_2.send(val).unwrap();

            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}

以上程式執行結果,可能如下:

Got: hi
Got: more
Got: from
Got: messages
Got: the
Got: for
Got: thread
Got: you

當我們的資料需要有多個擁有者時,訊息傳遞的執行緒溝通方式就不太適合了,此時我們還是得回歸共享記憶體的作法。在前面的章節中,我們學會了Rc結構體,可以讓資料的擁有者不只限於一個,然而,Rc結構體並不能用在多執行緒的場合,因為會很容易造成競態條件的問題。

舉例來說:

use std::cell::RefCell;
use std::rc::Rc;
use std::thread;

fn main() {
    let a = 5;
    let r = RefCell::new(a);
    let b = Rc::new(r);

    let mut handles = vec![];

    for _ in 1..=10 {
        let c = b.clone();
        let handle = thread::spawn(move || {
            let num = c.borrow_mut();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *b.borrow());
}

以上程式會編譯失敗,因為Rc結構體並不是「執行緒安全」(thread-safe)的設計,如果它可以被允許用在多個執行緒上,在參考的計數上將會出現問題。還好std::sync模組有提供一個Atomic版本的Rc結構體,稱為ArcArc結構體可以被用在多執行緒的場合,以上程式可以使用Arc結構體改寫如下:

use std::cell::RefCell;
use std::sync::Arc;
use std::thread;

fn main() {
    let a = 5;
    let r = RefCell::new(a);
    let b = Arc::new(r);

    let mut handles = vec![];

    for _ in 1..=10 {
        let c = b.clone();
        let handle = thread::spawn(move || {
            let num = c.borrow_mut();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *b.borrow());
}

以上程式依然會編譯失敗,因為RefCell結構體並不是「執行緒安全」(thread-safe)的設計,如果它可以被允許用在多個執行緒上,當同一個資料被多個執行緒同時存取時,就會發生競態條件的問題。還好std::sync模組也有提供一個好用的結構體來解決這個問題,稱為MutexMutex結構體可以被用在多執行緒的場合,因為它會替資料建立出「互斥鎖」(mutex's lock),想要拿到該筆資料的擁有者必須要先拿到資料的互斥鎖才能存取該資料,且在結束資料的使用後,還要將資料的互斥鎖還回去,以供之後其它的擁有者來使用。以上程式可以使用Mutex結構體改寫如下:

use std::sync::Arc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let a = 5;
    let m = Mutex::new(a);
    let b = Arc::new(m);

    let mut handles = vec![];

    for _ in 1..=10 {
        let c = b.clone();
        let handle = thread::spawn(move || {
            let mut num = c.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *b.lock().unwrap());
}

Mutex結構實體並沒有提供borrow或是borrow_mut方法,取而代之的是lock方法。呼叫lock方法時會進行等待,直到能取得該Mutex結構實體所使用的互斥鎖後,才去取得互斥鎖,並且將資料本身放在MutexGuard結構實體中,以Result列舉包裹並回傳出來。MutexGuard結構體也是一種智慧型指標,其實體可以作為不可變參考和可變參考來使用。另外,Mutex結構實體還有一個try_lock方法,即為lock方法的不等待版本。

以上程式會編譯成功,並且輸出:

Result: 15

正確答案!

以效能來說,有「執行緒安全」的Arc結構體和Mutex結構體會比Rc結構體和RefCell結構體還要差一點,因此我們在單執行緒的場合下要使用Rc結構體和RefCell結構體,而在多執行緒的場合下才去使用Arc結構體和Mutex結構體會比較好。另外,在之前的章節中有提到,Rc結構體和RefCell結構體的組合可能會造成記憶體洩漏,如果改用Arc結構體和Mutex結構體的組合也是一樣會有!而且當Arc結構體和Mutex結構體用在多執行緒時,還可能會導致死結的發生,舉例來說:

use std::sync::Arc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let x = 0;
    let y = 0;

    let x1 = Arc::new(Mutex::new(x));
    let y1 = Arc::new(Mutex::new(y));

    let x2 = x1.clone();
    let y2 = y1.clone();

    thread::spawn(move || {
        for _ in 1..=1000 {
            let i = x2.lock().unwrap();
            let j = y2.lock().unwrap();
            let _ = *i + *j;
        }
    });

    for _ in 1..=1000 {
        let u = y1.lock().unwrap();
        let v = x1.lock().unwrap();
        let _ = *u + *v;
    }
}

以上程式,可能會因為主執行緒和新的執行緒發生死結而導致程式永遠無法執行結束。發生死結的契機為,新執行緒在程式第17行的變數i拿到x的互斥鎖的同時,主執行緒正好已經執行完程式第24行,變數u拿到y的互斥鎖。接著新執行緒在程式第18行想要讓變數jy的互斥鎖,就需要等主執行緒將互斥鎖釋出;而主執行緒此時想要讓程式第25行的變數vx的互斥鎖,就需要等新執行緒將互斥鎖釋出。像這種多個執行緒互相等待對方釋出資源的情況,就是死結啦!

除了Arc結構體和Mutex結構體可能會發生死結之外,「訊息傳遞」也是有可能會發生的。

舉例來說:

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx1, rx1) = mpsc::channel();
    let (tx2, rx2) = mpsc::channel();

    thread::spawn(move || {
        println!("spawned thread is waiting for channel 1");
        rx1.recv().unwrap();

        println!("spawned thread is sending channel 2");
        tx2.send(()).unwrap();
    });

    println!("main thread is waiting for channel 2");
    rx2.recv().unwrap();

    println!("main thread is sending channel 1");
    tx1.send(()).unwrap();
}

以上程式,在新執行緒發送訊息至第二個通道前,會先嘗試接收來自第一個通道的訊息;而主執行緒則是要在收到來自第二個通道的訊息後,才會發送訊息至第一個通道。因此,這兩個執行緒彼此都在等待對方的訊息,死結發生!程式執行結果如下:

main thread is waiting for channel 2
spawned thread is waiting for channel 1

結論

Rust程式語言嚴格的擁有權規則,使得開發併發程式的過程更有條理,更不易在執行階段出錯(尤其是競態條件的部份)。雖然還是有些典型的多執行緒問題可能會發生,但已經比其它程式語言要好多了!在下一章節,我們將會學習如何在Rust程式語言中應用物件導向概念來開發程式。

下一章:物件導向程式設計