Rust程式語言在1.34之後的版本中開始正式提供完整的原子(Atomic)型別了。所謂的原子是指一系列不可被上下文交換(Context Switch)的機器指令,這些機器指令組成的操作又稱為原子操作(Atomic Operation)。在多CPU核心的環境下,當某個CPU核心開始執行原子操作時,就會先暫停其它CPU核心對記憶體的操作,以保證在原子操作執行的過程中,記憶體內容不會受到其它CPU核心干擾。所以原子操作若用得好,就不需要去使用會拖累程式效能的互斥鎖(Mutex)或是訊息傳遞(message passing)機制。只不過依靠原子操作來解決同步問題的話,會牽扯到編譯器優化以及CPU架構的問題,這篇文章會針對Rust程式語言提供的原子型別來探討原子操作。
先來看看以下這個程式吧!
use std::thread::{self, JoinHandle};
const N_TIMES: u64 = 1000;
const N_THREADS: usize = 10;
static mut R: u64 = 0;
fn reset() {
unsafe {
R = 0;
}
}
fn add_n_times(n: u64) -> JoinHandle<()> {
thread::spawn(move || {
for _ in 0..n {
unsafe {
R += 1;
}
}
})
}
fn main() {
loop {
reset();
let mut threads = Vec::with_capacity(N_THREADS);
for _ in 0..N_THREADS {
threads.push(add_n_times(N_TIMES));
}
for thread in threads {
thread.join().unwrap();
}
assert_eq!(N_TIMES * N_THREADS as u64, unsafe { R });
}
}
全域靜態變數R
是可變的。在main
函數的無窮迴圈中,每次迭代會將R
設為0
,並且建立出N_THREADS
個執行緒,每個執行緒都會執行N_TIMES
次的R += 1
。也就是說,在理想狀態下,我們可以預估無窮迴圈的每次迭代,R += 1
都會被執行N_TIMES * N_THREADS
次,所以在該次迭代的最後,R
的值也應為N_TIMES * N_THREADS
才對。
然而現實並非如此,當N_THREADS
大於1時,R
的值在每次迭代的最後,有可能會小於N_TIMES * N_THREADS
,這就是所謂的競跑現象(Data Race)。競跑之所以會發生,是由於R += 1
這行敘述,雖然它只有一行,但它在實際運行時卻可能是由好幾個步驟組成,例如:第一步,將R
讀取進CPU暫存器;第二步,將該暫存器的值加一;第三步,將該暫存器的值回存給R
。此時若使用超過一個執行緒來運行R += 1
這行敘述的話,有可能會變成這樣:當第一個執行緒執行完第二步,還沒到第三步時,第二個執行緒就先執行第一步了,所以會造成第二個執行緒讀取到原本應該已經被第一個執行緒加一卻還沒回存好的R
。也有可能會變成這樣:當第一個執行緒執行完第一步後,第二個執行緒卻執行完兩輪R += 1
了,此時第一個執行緒再繼續把第二步和第三步做完反而會讓R
的值比先前的更小!
使用互斥鎖可以很輕易地解決這個問題,程式如下:
use std::thread::{self, JoinHandle};
use std::sync::{Mutex, Arc};
const N_TIMES: u64 = 1000;
const N_THREADS: usize = 10;
static mut R: u64 = 0;
fn reset() {
unsafe {
R = 0;
}
}
fn add_n_times(n: u64, mutex: Arc<Mutex<()>>) -> JoinHandle<()> {
thread::spawn(move || {
for _ in 0..n {
let lock = mutex.lock().unwrap();
// critical section START
unsafe {
R += 1;
}
// critical section END
drop(lock);
}
})
}
fn main() {
let mutex = Arc::new(Mutex::new(()));
loop {
reset();
let mut threads = Vec::with_capacity(N_THREADS);
for _ in 0..N_THREADS {
threads.push(add_n_times(N_TIMES, mutex.clone()));
}
for thread in threads {
thread.join().unwrap();
}
assert_eq!(N_TIMES * N_THREADS as u64, unsafe { R });
}
}
利用互斥鎖形成的臨界區段(Critical Section),來使R += 1
程式敘述在同一時間只能夠被一個執行緒來執行。不過功能強大的互斥鎖會需要耗費不少額外的運算資源,在這個例子中,我們其實只需利用Rust程式語言的原子型別就能達到相同的結果,效能也會好很多。
程式如下:
use std::thread::{self, JoinHandle};
use std::sync::atomic::{Ordering, AtomicU64};
const N_TIMES: u64 = 1000;
const N_THREADS: usize = 10;
static R: AtomicU64 = AtomicU64::new(0);
fn reset() {
R.store(0, Ordering::Relaxed);
}
fn add_n_times(n: u64) -> JoinHandle<()> {
thread::spawn(move || {
for _ in 0..n {
R.fetch_add(1, Ordering::Relaxed);
}
})
}
fn main() {
loop {
reset();
let mut threads = Vec::with_capacity(N_THREADS);
for _ in 0..N_THREADS {
threads.push(add_n_times(N_TIMES));
}
for thread in threads {
thread.join().unwrap();
}
assert_eq!(N_TIMES * N_THREADS as u64, R.load(Ordering::Relaxed));
}
}
以上程式,使用AtomicU64
來替換原本的u64
,我們甚至還可以把全域靜態變數R
的mut
關鍵字拿掉,因為原子型別的值就算不使用mut
也還是可變的,這點和被Mutex
型別包裹的值一樣。
例如以下程式可以通過編譯:
use std::sync::Mutex;
use std::sync::atomic::{Ordering, AtomicU64};
struct Counter {
count: u64
}
fn main() {
let n = Mutex::new(Counter {
count: 0
});
n.lock().unwrap().count += 1;
let n = AtomicU64::new(0);
n.fetch_add(0, Ordering::Relaxed);
}
在使用原子型別提供的原子操作時,需要額外傳入一個Ordering
列舉的變體實體。這個Ordering
列舉可不是std::cmp
這個模組下用來比大小的Ordering
哦!而是位於std::sync::atomic
模組下的Ordering
列舉。
這邊的Ordering
列舉是用來控制原子操作時所使用的「記憶體順序」(Memory Ordering)的限制,共有Relaxed
、Acquire
、Release
、AcqRel
、SeqCst
五種變體。
記憶體順序
記憶體順序是指CPU在存取記憶體時的順序,這個順序不單純是程式敘述的撰寫順序,可能還會因編譯器優化,在編譯階段發生改變(reordering),也可能在執行階段時,因CPU的快取機制而被打亂順序。
舉個例子,在編譯以下程式敘述時:
static mut X: u64 = 0;
static mut Y: u64 = 1;
fn main() {
... // A
unsafe {
... // B
X = 1;
... // C
Y = 3;
... // D
X = 2;
... // E
}
}
如果C
、D
段落根本沒有用到X = 1
,那麼編譯器很可能會直接將X = 1
和X = 2
合併在一起,變成:
static mut X: u64 = 0;
static mut Y: u64 = 1;
fn main() {
... // A
unsafe {
... // B
X = 2;
... // C
Y = 3;
... // D
... // E
}
}
此時若段落A
中有使用新的執行緒來讀取全域靜態變數X
,則不可能會讀取到當X
的值為1
時的結果,因為在編譯階段時就被編譯器給省略掉了!
另一方面,假設X = 1
並沒有被編譯器省略掉好了,並且在段落A中有一個新執行緒,主執行緒和段落A的執行緒對於全域變數的執行順序關係如下:
initial state: X = 0, Y = 1 THREAD Main THREAD A X = 1; if X == 1 { Y = 3; Y *= 2; X = 2; }
理想的情況下,Y
最終可能的值為:
Y = 3
:THREAD A
執行完後才執行THREAD Main
。或是THREAD Main
執行完後才執行THREAD A
。Y = 6
:THREAD Main
執行完Y = 3
後,執行THREAD A
。THREAD A
執行完後,THREAD Main
才繼續執行完。
而實際上我們卻有可能會得到以下這種狀態:
Y = 2
:THREAD Main
正在執行Y = 3
,THREAD A
此時也開始執行Y *= 2
。3
這個值來不及回存到Y
,Y
就被Y *= 2
先行取用了(此時取到的Y
為1
),而當3
這個值終於回存到Y
後,Y *= 2
才計算完成,所以Y
的值變成2
。
上述只是一般的競跑,更極端一點由CPU快取引起的記憶體順序問題還有以下這個:
Y = 2
:THREAD Main
雖然已經確實執行完Y = 3
了,但是該CPU快取中的Y = 3
還沒同步到其它CPU的快取中,此時THREAD A
的Y *= 2
就開始讀取Y
,因此它讀到的Y
值為1
,計算之後就出現Y = 2
的結果。
甚至即便改成:
initial state: X = 0, Y = 1 THREAD Main THREAD A X = 1; if X == 2 { Y = 3; Y *= 2; X = 2; }
也還是可能會出現Y = 2
的情形,因為X
和Y
被同步至其它CPU的快取中的順序不一。
原子操作與記憶體順序
Relaxed
Relaxed
只會進行單純的原子操作,並不會對記憶體順序進行任何限制。換句話說,它可以最大幅度地保留編譯器優化的程度,不過如果想要在多個原子操作間實現跨執行緒的同步機制,就得採用其它的記憶體順序的限制方式了。
例如以下這個範例:
use std::thread::{self, JoinHandle};
static mut DATA: u64 = 0;
static mut READY: bool = false;
fn reset() {
unsafe {
DATA = 0;
READY = false;
}
}
fn producer() -> JoinHandle<()> {
thread::spawn(move || {
unsafe {
DATA = 100; // A
READY = true; // B
}
})
}
fn consumer() -> JoinHandle<()> {
thread::spawn(move || {
unsafe {
while !READY {} // C
assert_eq!(100, DATA); // D
}
})
}
fn main() {
loop {
reset();
let t_producer = producer();
let t_consumer = consumer();
t_producer.join().unwrap();
t_consumer.join().unwrap();
}
}
單就程式邏輯來看,這樣的程式似乎是挺安全的:當READY
為true
時,DATA
一定是100
。但是,實際上,這個程式在經過編譯器優化或是CPU快取的影響後,可能會讓C
行在讀取到READY
是true
後,D
行讀取到的DATA
卻還是0
。
這東西很反直覺的原因是我們可能比較常在x86或是x86_64架構上的CPU開發程式,而x86或是x86_64架構的CPU屬於「強有序」(strongly-ordered)的記憶體模型,不太會發生記憶體順序的問題。但是在如ARM架構等使用「弱有序」(weakly-ordered)的記憶體模型下,記憶體順序就很有可能會被打亂。
即便我們將以上程式的READY
,使用原子型別搭配Relaxed
記憶體順序限制來修改,問題也是依舊存在的。
use std::thread::{self, JoinHandle};
use std::sync::atomic::{Ordering, AtomicBool};
static mut DATA: u64 = 0;
static READY: AtomicBool = AtomicBool::new(false);
fn reset() {
unsafe {
DATA = 0;
}
READY.store(false, Ordering::Relaxed);
}
fn producer() -> JoinHandle<()> {
thread::spawn(move || {
unsafe {
DATA = 100; // A
}
READY.store(true, Ordering::Relaxed); // B
})
}
fn consumer() -> JoinHandle<()> {
thread::spawn(move || {
while !READY.load(Ordering::Relaxed) {} // C
assert_eq!(100, unsafe { DATA }); // D
})
}
fn main() {
loop {
reset();
let t_producer = producer();
let t_consumer = consumer();
t_producer.join().unwrap();
t_consumer.join().unwrap();
}
}
AcqRel
藉由Acquire
和Release
這兩個記憶體順序的限制,可以構築出一對記憶體屏障(Memory Barrier),或稱記憶體柵欄(Memory Fence),防止編譯器和CPU將屏障前(Release)和屏障後(Acquire)中的資料操作重新排在屏障圍成的範圍之外。
如下:
use std::thread::{self, JoinHandle};
use std::sync::atomic::{Ordering, AtomicBool};
static mut DATA: u64 = 0;
static READY: AtomicBool = AtomicBool::new(false);
fn reset() {
unsafe {
DATA = 0;
}
READY.store(false, Ordering::Relaxed);
}
fn producer() -> JoinHandle<()> {
thread::spawn(move || {
unsafe {
DATA = 100; // A
}
READY.store(true, Ordering::Release); // B: memory fence ↑
})
}
fn consumer() -> JoinHandle<()> {
thread::spawn(move || {
while !READY.load(Ordering::Acquire) {} // C: memory fence ↓
assert_eq!(100, unsafe { DATA }); // D
})
}
fn main() {
loop {
reset();
let t_producer = producer();
let t_consumer = consumer();
t_producer.join().unwrap();
t_consumer.join().unwrap();
}
}
原則上,Acquire
用於讀取,而Release
用於寫入。但是由於有些原子操作同時擁有讀取和寫入的功能,此時就需要使用AcqRel
來設定記憶體順序了。在記憶體屏障中被寫入的資料,都可以被其它執行緒讀取到,不會有CPU快取的問題。
SeqCst
SeqCst
就像是AcqRel
的加強版,它不管原子操作是屬於讀取還是寫入的操作,只要某個執行緒有用到SeqCst
的原子操作,執行緒中該SeqCst
操作前的資料操作絕對不會被重新排在該SeqCst
操作之後,且該SeqCst
操作後的資料操作也絕對不會被重新排在SeqCst
操作前。
另外,Acquire
、Release
和AcqRel
等也可以與SeqCst
搭配使用,來構築出一對記憶體屏障。