在這個章節中,我們要用最基礎的方式透過TCP的監聽與連接,解析與傳送符合HTTP格式的訊息,來完成我們的Web伺服器。至於會選擇用這樣的方式來製作Web伺服器的原因是,要練習把我們在先前的章節中學到的東西加以應用。畢竟現在crates.io上已經有很多現成的Web框架(Web Framework)可以使用,正常來講我們是不需要從底層刻出Web伺服器的。
建立單執行緒的Web伺服器
雖然我們的目標是要建立多執行緒的Web伺服器,但是一開始還是先把問題簡化會比較容易處理。就讓我們先想辦法建立單執行緒的Web伺服器吧!
提醒一下,在這個章節中不會介紹計算機網路的概念,會預設各位早就已經對這個領域駕輕就熟了。
建立專案
先使用以下指令,建立出Cargo的應用程式專案,專案名稱為hello_web
:
監聽TCP Socket
Rust程式語言的標準函式庫中的std::net
模組,提供了一些基本的TCP和UDP的相關工具。我們可以使用其中的TcpListener
結構體來監聽指定的TCP Socket位址。main.rs
檔案的程式碼如下:
use std::net::TcpListener;
const PORT: i32 = 3000;
fn main() {
let listener = TcpListener::bind(format!("127.0.0.1:{PORT}")).unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
println!("Connection established!");
}
}
TcpListener
結構體的bind
關聯函數可以讓我們藉由綁定某個TCP Socket位址的方式來產生出TcpListener
的結構實體。PORT
常數就是我們的HTTP Server要監聽的埠號,在開發階段,建議不要使用「0~1023」的數值範圍,因為這個範圍的埠號被作業系統保留使用,如果應用程式要使用這個範圍的埠號,將需要root執行權限,因此我們這裡將其設為3000
。而使用PORT
常數的原因是,當程式日後需要改HTTP Server監聽的埠號時,可以直接改動原始碼中常數的值,非常方便。
TcpListener
結構實體的incoming
方法會回傳一個Incoming
迭代器。在程式執行的時候,如果有一個客戶端想要使用TCP連到127.0.0.1:3000
,Incoming
迭代器就會進行一次迭代,回傳一個Result
列舉實體,其Ok
變體會包裹一個TcpStream
結構實體。此時TCP連線就建立起來了,我們無需自行實作TCP連線的三向交握(Three-way Handshake)。
我們可以執行一下這個專案,並打開網頁瀏覽器,在網頁瀏覽器的網址列輸入http://127.0.0.1:3000
網址,並嘗試開啟它。此時網頁瀏覽器應該會出現「無法連上這個網站」的畫面,但是再回頭看看我們的終端機畫面,會多出幾行Connection established!
訊息,表示我們的TcpStream
結構實體有成功被建立出來。
雖然我們只是開啟一次http://127.0.0.1:3000
網址,但是有些網頁瀏覽器除了會去連結到http://127.0.0.1:3000
網址之外,同時還會嘗試去取得http://127.0.0.1:3000/favicon.ico
(網站圖示)等等的資源。因此雖然只我們只讓網頁瀏覽器開啟一次http://127.0.0.1:3000
網址,可能就會對伺服器發送好幾個連線請求,所以印出好幾行Connection established!
訊息是正常的。
我們可以重新整理「無法連上這個網站」的畫面,依然會看到「無法連上這個網站」的畫面,但是再回頭看一下終端機畫面,又會多出幾行Connection established!
訊息。這就表示我們剛才寫的程式Incoming
迭代器會不斷地在有客戶端連線進來的時候,每次去產生出新的TcpListener
結構實體,完全不需要重開程式就能持續處理多個TCP連線的任務。
讀取客戶端傳來的HTTP訊息
客戶端發送的HTTP訊息,我們稱之為「請求」(request)。為了避免main
方法太長,我們通常會將處理HTTP請求的程式寫在別的地方,可能會是別的函數,也可能會是別的模組。在此我們直接在main.rs
檔案中加上handle_connection
函數,並定義其參數可以傳入一個TcpListener
結構實體。
use std::net::{TcpListener, TcpStream};
const PORT: i32 = 3000;
fn main() {
let listener = TcpListener::bind(format!("127.0.0.1:{PORT}")).unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
handle_connection(stream);
}
}
fn handle_connection(stream: TcpStream) {
println!("Connection established!");
}
我們可以使用TcpStream
結構實體提供的read
方法,來讀取輸入串流中的資料。這個read
方法需要從參數輸入一個u8
型別的陣列,用來當作緩衝空間使用。read
方法並不會一次把輸入串流中的資料讀完,而是一次讀一小塊,我們需要提供一個記憶體空間讓它存放讀取到的資料,它會將它讀取到的資料量回傳以Result列舉實體來包裹回傳。在使用read
方法時,TcpStream
結構實體本身用來記錄資料位置的欄位會發生變化,因此我們需要讓stream
變數為可變的。于是乎,我們將程式修改如下:
use std::{
io::prelude::*,
net::{TcpListener, TcpStream},
};
const PORT: i32 = 3000;
fn main() {
let listener = TcpListener::bind(format!("127.0.0.1:{PORT}")).unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
handle_connection(stream);
}
}
fn handle_connection(mut stream: TcpStream) {
println!("Connection established!");
let mut buffer = [0; 512];
let c = stream.read(&mut buffer).unwrap();
}
緩衝空間需要多大,其實沒有硬性規定,需看程式使用情境來決定。在此我們設定為512個位元組(u8),足夠讓我們的程式可以呼叫一次read
方法就把輸入串流中的該讀的資料讀完。Rust程式語言的標準函式庫所提供的std::io::prelude
模組,實作I/O相關的常用功能,在進行I/O操作時,如果我們對I/O讀寫沒有特別請求的話,可以直接將這個模組引用至目前程式的scope下。
再來要將讀到的資料轉成字串,我們可以使用String結構體提供的from_utf8_lossy
方法將u8
型別的陣列切片轉成Cow<str>>
結構實體。這個Cow
結構實體是一個智慧型指標,其名稱「Cow」表示「Clone-on-write」,當要使用其to_mut
方法取得其所指到的實體的可變參考時,如果實體的擁有者不是這個Cow
智慧型指標時(即這個智慧型指標是指到該實體的不可變參考,而不是直接指到該實體),Cow
智慧型指標就會先複製出新的實體,並且改為指到這個的新實體,再把其可變參考回傳,這個新實體的擁有者即為該Cow
智慧型指標,因此下次再使用to_mut
方法時就不會再去複製新實體了。
程式修改如下:
use std::{
io::prelude::*,
net::{TcpListener, TcpStream},
};
const PORT: i32 = 3000;
fn main() {
let listener = TcpListener::bind(format!("127.0.0.1:{PORT}")).unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
handle_connection(stream);
}
}
fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 512];
let c = stream.read(&mut buffer).unwrap();
let request = String::from_utf8_lossy(&buffer[0..c]);
println!("Request: {request}");
}
from_utf8_lossy
方法還可以將u8
型別的陣列切片中,不能被正常使用UTF8編碼為字串的資料以�
字元取代。
執行程式專案試試,接著同樣使用網頁瀏覽器開啟http://127.0.0.1:3000
網址,終端機結果如下:
傳送HTTP訊息給客戶端
發送HTTP訊息給客戶端,我們稱之為「回應」(response)。我們可以使用TcpStream
結構實體提供的write
方法,來將資料寫入至輸出串流中。使用flush
方法,可以等待對方把我們先前寫入的資料讀取掉。
程式修改如下:
use std::{
io::prelude::*,
net::{TcpListener, TcpStream},
};
const PORT: i32 = 3000;
fn main() {
let listener = TcpListener::bind(format!("127.0.0.1:{PORT}")).unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
handle_connection(stream);
}
}
fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 512];
let c = stream.read(&mut buffer).unwrap();
let request = String::from_utf8_lossy(&buffer[0..c]);
let content = "<!DOCTYPE html>
<html lang=\"en\">
<head>
<meta charset=\"utf-8\">
<title>Hello!</title>
</head>
<body>
<h1>Hello!</h1>
<p>Hi from Rust</p>
</body>
</html>
";
let response = format!(
"HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nContent-Length: {}\r\n\r\n{content}",
content.len()
);
stream.write_all(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
以上程式的response
變數所儲存的字串,有在HTTP訊息的Body中代入HTML。執行程式專案試試,接著同樣使用網頁瀏覽器開啟http://127.0.0.1:3000
網址,就能看到我們的首頁了!
可是當我們嘗試開啟http://127.0.0.1:3000/test
網址時,也會看到跟首頁一樣的畫面,那是因為現階段我們完全沒有去解析(parse)使用者的HTTP請求,看他到底是要存取哪個頁面。於是,我們可以將程式修改如下:
use std::{
io::prelude::*,
net::{TcpListener, TcpStream},
};
const PORT: i32 = 3000;
fn main() {
let listener = TcpListener::bind(format!("127.0.0.1:{PORT}")).unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
handle_connection(stream);
}
}
fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 512];
let c = stream.read(&mut buffer).unwrap();
let request = String::from_utf8_lossy(&buffer[0..c]);
let page404 = || {
let content = "<!DOCTYPE html>
<html lang=\"en\">
<head>
<meta charset=\"utf-8\">
<title>Hello!</title>
</head>
<body>
<h1>Oops!</h1>
<p>Sorry, I don't know what you're asking for.</p>
</body>
</html>
";
format!(
"HTTP/1.1 404 FOUND\r\nContent-Type: text/html\r\nContent-Length: {}\r\n\r\n{content}",
content.len()
)
};
let page200 = |content: &str| {
format!(
"HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nContent-Length: {}\r\n\r\n{content}",
content.len()
)
};
let response = if request.starts_with("GET") {
let request = &request["GET".len() + 1..];
let path = &request[..request.find(' ').unwrap()];
match path {
"/" => page200(
"<!DOCTYPE html>
<html lang=\"en\">
<head>
<meta charset=\"utf-8\">
<title>Hello!</title>
</head>
<body>
<h1>Hello!</h1>
<p>Hi from Rust</p>
</body>
</html>
",
),
_ => page404(),
}
} else {
page404()
};
stream.write_all(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
程式第52行,使用了字串的starts_with
方法,先判斷使用者發送的HTTP請求是否以「GET」(HTTP請求方法的一種)開頭,如果是的話就繼續進行解析;如果不是使用「GET」方法的話,就直接回傳404頁面的HTTP回應。
程式第53行和第54行,我們將使用者想要存取的URI路徑解析出來,存進path
變數。程式第56行開始的match
關鍵字,對path
變數的值做型樣匹配,就像是在做路由(routing)。如果匹配到/
就回傳首頁的HTTP回應,如果是其它的,就回傳404頁面的HTTP回應。
接著執行程式專案試試,然後同樣使用網頁瀏覽器分別開啟http://127.0.0.1:3000
網址和http://127.0.0.1:3000/test
網址,就可以看到我們的404頁面有在正常工作啦!
建立多執行緒的Web伺服器
在建立多執行緒的Web伺服器,我們先來模擬一個情境,假設我們的Web伺服器,處理某個使用者的請求需要比較長(可能數秒、甚至數分鐘)的時間。此時如果又有新的請求進來會發生什麼事呢?我們先將程式改寫如下:
use std::{
io::prelude::*,
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
const PORT: i32 = 3000;
fn main() {
let listener = TcpListener::bind(format!("127.0.0.1:{PORT}")).unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
handle_connection(stream);
}
}
fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 512];
let c = stream.read(&mut buffer).unwrap();
let request = String::from_utf8_lossy(&buffer[0..c]);
let page404 = || {
let content = "<!DOCTYPE html>
<html lang=\"en\">
<head>
<meta charset=\"utf-8\">
<title>Hello!</title>
</head>
<body>
<h1>Oops!</h1>
<p>Sorry, I don't know what you're asking for.</p>
</body>
</html>
";
format!(
"HTTP/1.1 404 FOUND\r\nContent-Type: text/html\r\nContent-Length: {}\r\n\r\n{content}",
content.len()
)
};
let page200 = |content: &str| {
format!(
"HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nContent-Length: {}\r\n\r\n{content}",
content.len()
)
};
let response = if request.starts_with("GET") {
let request = &request["GET".len() + 1..];
let path = &request[..request.find(' ').unwrap()];
match path {
"/" => page200(
"<!DOCTYPE html>
<html lang=\"en\">
<head>
<meta charset=\"utf-8\">
<title>Hello!</title>
</head>
<body>
<h1>Hello!</h1>
<p>Hi from Rust</p>
</body>
</html>
",
),
"/search" => {
thread::sleep(Duration::from_secs(15));
page200(
"<!DOCTYPE html>
<html lang=\"en\">
<head>
<meta charset=\"utf-8\">
<title>Hello!</title>
</head>
<body>
<h1>Search Result</h1>
<p>Found nothing.</p>
</body>
</html>
",
)
},
_ => page404(),
}
} else {
page404()
};
stream.write_all(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
程式第73行開始,我們新增了/search
路徑,當網頁瀏覽器去開啟http://127.0.0.1:3000/search
網址時,程式會呼叫std::thread
模組的sleep
方法,來模擬搜尋15秒的過程。在程式處理搜尋請求的15秒內,此時如果我們另開新分頁,去嘗試開啟首頁,也就是http://127.0.0.1:3000
網址,會發現首頁會在http://127.0.0.1:3000/search
的頁面開啟後才會被開啟。
就是因為我們目前的Web伺服器只有支援單執行緒,無法同時處理不同的HTTP請求,而造成後來的HTTP請求必須等待的情形。所以現在我們要讓我們的Web伺服器支援多執行緒,改善其吞吐量(throughput)。
由於我們已經把處理HTTP請求的程式都寫在handle_connection
函數,我們只需要在每次建立出TcpStream
結構實體後,都去建立新的執行緒來呼叫handle_connection
函數,即可讓我們的Web伺服器支援多執行緒。
程式修改如下:
use std::{
io::prelude::*,
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
const PORT: i32 = 3000;
fn main() {
let listener = TcpListener::bind(format!("127.0.0.1:{PORT}")).unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
thread::spawn(|| handle_connection(stream));
}
}
fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 512];
let c = stream.read(&mut buffer).unwrap();
let request = String::from_utf8_lossy(&buffer[0..c]);
let page404 = || {
let content = "<!DOCTYPE html>
<html lang=\"en\">
<head>
<meta charset=\"utf-8\">
<title>Hello!</title>
</head>
<body>
<h1>Oops!</h1>
<p>Sorry, I don't know what you're asking for.</p>
</body>
</html>
";
format!(
"HTTP/1.1 404 FOUND\r\nContent-Type: text/html\r\nContent-Length: {}\r\n\r\n{content}",
content.len()
)
};
let page200 = |content: &str| {
format!(
"HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nContent-Length: {}\r\n\r\n{content}",
content.len()
)
};
let response = if request.starts_with("GET") {
let request = &request["GET".len() + 1..];
let path = &request[..request.find(' ').unwrap()];
match path {
"/" => page200(
"<!DOCTYPE html>
<html lang=\"en\">
<head>
<meta charset=\"utf-8\">
<title>Hello!</title>
</head>
<body>
<h1>Hello!</h1>
<p>Hi from Rust</p>
</body>
</html>
",
),
"/search" => {
thread::sleep(Duration::from_secs(15));
page200(
"<!DOCTYPE html>
<html lang=\"en\">
<head>
<meta charset=\"utf-8\">
<title>Hello!</title>
</head>
<body>
<h1>Search Result</h1>
<p>Found nothing.</p>
</body>
</html>
",
)
},
_ => page404(),
}
} else {
page404()
};
stream.write_all(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
接著用同樣的方式開啟http://127.0.0.1:3000
網址和http://127.0.0.1:3000/search
網址,就會發現,等待前一次HTTP請求處理完才會處理下一次HTTP請求的問題已經解決了!
使用執行緒池(thread pool)增加程式的安全性
我們目前程式的多執行緒作法,是在每次建立出TcpStream
結構實體後,都去建立新的執行緒來呼叫handle_connection
函數。也就是說,如果我們的伺服器在短時間內突然來了很多HTTP請求(例如遭到DDOS攻擊),假設一個每秒有一萬個HTTP請求同時進來,程式理論上每秒就需要建立出一萬個執行緒來處理,就算一開始我們的記憶體夠大能夠負荷一萬個執行緒,但是我們不太可能會有一萬個處理器(處理器核心)來同時快速地將一萬個HTTP請求在1秒內處理完。前一秒的HTTP請求都還沒處理完,下一秒又有大量的HTTP請求進來,記憶體遲早會不夠用。因此,我們必須限制我們的程式,在同一時間用來處理每個HTTP請求的執行緒數量。
我們可以實作一個簡單的執行緒池,用固定數量的執行緒,來依序完成列隊(Queue)裡的任務。
在同樣的hello_web
應用程式專案的src
目錄中,新增lib.rs
檔案,撰寫我們的執行緒池程式。程式碼如下:
pub struct ThreadPool;
ThreadPool
結構體代表我們之後會使用到的執行緒池,每個ThreadPool
結構實體都是一個執行緒池。接著替ThreadPool
結構體實作new
關聯函數,程式如下:
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
}
new
關聯函數的size
參數表示要使用的執行緒數量,如果這個數量不大於0的話,就直接讓程式發生panic。
我們希望ThreadPool
結構體也有類似spawn
的方法,能夠從參數傳入要在新執行緒內執行的閉包。參考一下std::thread
模組內的spawn
函數的定義:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static
我們發現spawn
函數是利用泛型來傳遞閉包,且這個閉包可以是任意的閉包(所有閉包都有實作FnOnce
特性),且必須要實作Send
特性。Send
特性類似先前提過的Sized
特性,在編譯階段,如果編譯器認為型別適合使用在多執行緒,就會自動實作Send
特性。閉包使用到的外部資源的參考,其生命周期也必須是'static
,因為編譯器不知道新的執行緒會執行多久,要確保在新執行緒結束之前所參考到的外部資源都還存在。
總而言之,我們就照著spawn
函數的定義來替我們的ThreadPool
結構體完成execute
函數吧!程式如下:
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
pub fn execute<F, T>(&self, f: F)
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static, {
}
}
我們的程式並不需要使用到閉包的回傳值,因此可以將程式簡化如下:
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
pub fn execute<F: FnOnce() + Send + 'static>(&self, f: F) {}
}
spawn
函數會回傳JoinHandle
實體,我們姑且就把這個當作是執行緒實體吧!我們需要在new
關聯函數內,利用size
參數,來產生出相對應數量的執行緒實體,並儲存在ThreadPool
結構實體中。程式如下:
use std::thread::JoinHandle;
pub struct ThreadPool {
threads: Vec<JoinHandle<()>>,
}
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// create some threads and store them in the vector, but how?
}
ThreadPool {
threads
}
}
pub fn execute<F: FnOnce() + Send + 'static>(&self, f: F) {}
}
以上程式第11行,我們使用了Vec
結構體的with_capacity
關聯函數來建立出預設容量的Vec
結構實體,即便新建出來的Vec
結構實體還沒有存放任何元素,它也會先在記憶體中配置好傳入的容量大小(元素數量)。如此一來,之後我們要將新元素加入至Vec
結構實體時,在容量未滿之前,就不需要去進行記憶體的重新配置動作,可以讓程式效能稍微好一點。
程式第14行,我們雖然知道應該要產生執行緒的實體並加入至threads
這個Vec
結構實體內,但是執行緒的實體要怎麼產生呢?如果直接使用std::thread
模組提供的spawn
函數,就需要傳入新執行緒要執行的閉包給它,而且新執行緒在建立出來後會立刻執行傳入的閉包。為了要讓新執行緒可以等待ThreadPool
結構實體使用execute
方法傳入的閉包,我們需要替每個執行緒實作一些與ThreadPool
結構實體同步的方式。為了簡化程式,我們要再實作兩個型別,分別是Worker
(工人)結構體、和Job
(工作)型別別名,每個Worker
結構實體都負責處理一個執行緒,並提供能夠讓其負責的執行緒隨時執行Job
閉包的方法。將程式改寫如下:
use std::{
sync::{
mpsc::{self, Receiver, Sender},
Arc,
},
thread::{self, JoinHandle},
};
type Job = Box<dyn FnOnce() + Send + 'static>;
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Sender<Job>,
}
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(receiver);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver.clone()));
}
ThreadPool {
workers,
sender,
}
}
pub fn execute<F: FnOnce() + Send + 'static>(&self, f: F) {
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Receiver<Job>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
});
Worker {
id,
thread,
}
}
}
以上程式會編譯錯誤,因為Receiver
結構體並沒有實作Send
特性,它無法直接跨執行緒使用。因此,我們必須要搭配Mutex
結構體一起使用。另外再多注意一下第55行,job
變數此時的型別雖然為被Box
智慧型指標參考的閉包,不過我們依然還是可以直接用原先呼叫函數的方式來呼叫閉包。
將程式加上Mutex
結構體,修改如下:
use std::{
sync::{
mpsc::{self, Receiver, Sender},
Arc, Mutex,
},
thread::{self, JoinHandle},
};
type Job = Box<dyn FnOnce() + Send + 'static>;
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Sender<Job>,
}
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver.clone()));
}
ThreadPool {
workers,
sender,
}
}
pub fn execute<F: FnOnce() + Send + 'static>(&self, f: F) {
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
});
Worker {
id,
thread,
}
}
}
如此一來我們的ThreadPool
結構體就可以使用了,接著將main.rs
檔案的程式碼修改如下:
use std::{
io::prelude::*,
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
use hello_web::ThreadPool;
const PORT: i32 = 3000;
const WORKER_SIZE: usize = 8;
fn main() {
let listener = TcpListener::bind(format!("127.0.0.1:{PORT}")).unwrap();
let pool = ThreadPool::new(WORKER_SIZE);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 512];
let c = stream.read(&mut buffer).unwrap();
let request = String::from_utf8_lossy(&buffer[0..c]);
let page404 = || {
let content = "<!DOCTYPE html>
<html lang=\"en\">
<head>
<meta charset=\"utf-8\">
<title>Hello!</title>
</head>
<body>
<h1>Oops!</h1>
<p>Sorry, I don't know what you're asking for.</p>
</body>
</html>
";
format!(
"HTTP/1.1 404 FOUND\r\nContent-Type: text/html\r\nContent-Length: {}\r\n\r\n{content}",
content.len()
)
};
let page200 = |content: &str| {
format!(
"HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nContent-Length: {}\r\n\r\n{content}",
content.len()
)
};
let response = if request.starts_with("GET") {
let request = &request["GET".len() + 1..];
let path = &request[..request.find(' ').unwrap()];
match path {
"/" => page200(
"<!DOCTYPE html>
<html lang=\"en\">
<head>
<meta charset=\"utf-8\">
<title>Hello!</title>
</head>
<body>
<h1>Hello!</h1>
<p>Hi from Rust</p>
</body>
</html>
",
),
"/search" => {
thread::sleep(Duration::from_secs(15));
page200(
"<!DOCTYPE html>
<html lang=\"en\">
<head>
<meta charset=\"utf-8\">
<title>Hello!</title>
</head>
<body>
<h1>Search Result</h1>
<p>Found nothing.</p>
</body>
</html>
",
)
},
_ => page404(),
}
} else {
page404()
};
stream.write_all(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
使用WORKER_SIZE
常數來表示執行緒池要使用的執行緒數量,在此設為8
,表示使用8個執行緒。我們可以執行程式專案,隨便使用瀏覽器快速開啟數次http://127.0.0.1:3000
網址和http://127.0.0.1:3000/search
網址來試試。會發現到,當程式正在處理的HTTP請求超過8個時,之後進去的HTTP請求便需要等待,這就表示我們的執行緒池有正常工作!
可安全關閉的Web伺服器
不知道各位在嘗試執行我們剛才製作的Web伺服器時有沒有發現,我們在關閉Web伺服器應用程式時,必須按Ctrl + C或是直接把終端機關掉,才能把我們製作的Web伺服器程式關閉。雖然很多Web伺服器用程式都是一樣,只能使用這樣的方式關閉,但這像這樣強制中斷程式的執行,會讓正在處理中的HTTP請求被切斷,可能會破壞資料庫或是檔案系統的一致性,導致服務出現問題。
為了讓我們的Web伺服器程式能夠安全地關閉,我們需要先替我們的ThreadPool
結構體實作Drop
特性,在drop
方法中將所有的執行緒join
進來執行,確保ThreadPool
結構實體在在被消滅之前,要先結束其所有執行緒的執行。
程式修改如下:
use std::{
sync::{
mpsc::{self, Receiver, Sender},
Arc, Mutex,
},
thread::{self, JoinHandle},
};
type Job = Box<dyn FnOnce() + Send + 'static>;
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Sender<Job>,
}
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver.clone()));
}
ThreadPool {
workers,
sender,
}
}
pub fn execute<F: FnOnce() + Send + 'static>(&self, f: F) {
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
});
Worker {
id,
thread,
}
}
}
程式第48行會編譯失敗,因為呼叫JoinHandle
結構實體的join
方法必須要改變JoinHandle
結構實體的擁有者,但是我們現在的JoinHandle
結構實體的擁有者是Worker
結構實體的thread
欄位,無法直接改變。因此我們必須要更改Worker
結構體的定義,使其thread
欄位儲存的值為Option
結構實體,如此一來我們才能使用Option
列舉實體的take
方法,來轉移這個Option
列舉實體的擁有者。
程式修改如下:
use std::{
sync::{
mpsc::{self, Receiver, Sender},
Arc, Mutex,
},
thread::{self, JoinHandle},
};
type Job = Box<dyn FnOnce() + Send + 'static>;
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Sender<Job>,
}
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver.clone()));
}
ThreadPool {
workers,
sender,
}
}
pub fn execute<F: FnOnce() + Send + 'static>(&self, f: F) {
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
worker.thread.take().unwrap().join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: Option<JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
});
Worker {
id,
thread: Some(thread),
}
}
}
接著,我們不能讓Worker
結構實體的每個執行緒執行一直執行無窮迴圈。要有個方式能讓每個執行緒接收到「程式目前正在關閉,快點跳出迴圈」的訊息,使它們能夠結束執行。我們同樣可以透過訊息傳遞,來完成這個功能。程式修改如下:
use std::{
sync::{
mpsc::{self, Receiver, Sender},
Arc, Mutex,
},
thread::{self, JoinHandle},
};
type Job = Box<dyn FnOnce() + Send + 'static>;
enum Message {
NewJob(Job),
Terminate,
}
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Sender<Message>,
}
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver.clone()));
}
ThreadPool {
workers,
sender,
}
}
pub fn execute<F: FnOnce() + Send + 'static>(&self, f: F) {
let job = Box::new(f);
self.sender.send(Message::NewJob(job)).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
println!("Sending terminate message to all workers.");
for _ in &mut self.workers {
self.sender.send(Message::Terminate).unwrap();
}
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
worker.thread.take().unwrap().join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: Option<JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<Receiver<Message>>>) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("Worker {id} got a job; executing.");
job();
},
Message::Terminate => {
println!("Worker {id} was told to terminate.");
break;
},
}
});
Worker {
id,
thread: Some(thread),
}
}
}
我們原本的訊息直接傳遞Job
,現在則改成傳遞Message
列舉實體。Message
列舉有NewJob
和Terminate
兩個變體,NewJob
變體可以讓接收到該訊息的執行緒去執行新的閉包;Terminate
變體可以讓接收到該訊息的執行緒跳出loop迴圈。在ThreadPool
結構實體要被消滅之前,會先傳送Terminate
變體訊息其底下的所有Worker
結構實體的執行緒,接著等待所有的執行緒都跳出loop迴圈並且執行完畢之後,ThreadPool
結構實體才會被消滅。
至於要如何觸發ThreadPool
結構實體的drop
方法?首要步驟就是讓Incoming
迭代器的for迴圈能夠停止執行,一旦main
函數執行結束,屬於該scope底下的擁有者所持有的值都會被消滅,包括pool
變數所擁有的ThreadPool
結構實體。我們可以讓Incoming
迭代器的for迴圈,在每次迭代時都去檢查現在是否為正在關閉程式的狀態,如果是的話就跳出迴圈。
程式修改如下:
use std::{
io::{self, prelude::*, Read},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
use hello_web::ThreadPool;
const PORT: i32 = 3000;
const WORKER_SIZE: usize = 8;
static mut CLOSING: bool = false;
fn main() {
let listener = TcpListener::bind(format!("127.0.0.1:{PORT}")).unwrap();
let pool = ThreadPool::new(WORKER_SIZE);
thread::spawn(|| loop {
let mut buffer = String::new();
io::stdin().read_line(&mut buffer).unwrap();
if buffer.trim().to_lowercase() == "exit" {
unsafe {
CLOSING = true;
}
println!("Server is closing.");
if let Ok(mut stream) = TcpStream::connect(format!("127.0.0.1:{PORT}")) {
if stream.write_all(b"exit").is_ok() && stream.flush().is_err() {
// do nothing
}
}
break;
}
});
for stream in listener.incoming() {
unsafe {
if CLOSING {
break;
}
}
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Server connection closed.");
}
fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 512];
let c = stream.read(&mut buffer).unwrap();
let request = String::from_utf8_lossy(&buffer[0..c]);
let page404 = || {
let content = "<!DOCTYPE html>
<html lang=\"en\">
<head>
<meta charset=\"utf-8\">
<title>Hello!</title>
</head>
<body>
<h1>Oops!</h1>
<p>Sorry, I don't know what you're asking for.</p>
</body>
</html>
";
format!(
"HTTP/1.1 404 FOUND\r\nContent-Type: text/html\r\nContent-Length: {}\r\n\r\n{content}",
content.len()
)
};
let page200 = |content: &str| {
format!(
"HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nContent-Length: {}\r\n\r\n{content}",
content.len()
)
};
let response = if request.starts_with("GET") {
let request = &request["GET".len() + 1..];
let path = &request[..request.find(' ').unwrap()];
match path {
"/" => page200(
"<!DOCTYPE html>
<html lang=\"en\">
<head>
<meta charset=\"utf-8\">
<title>Hello!</title>
</head>
<body>
<h1>Hello!</h1>
<p>Hi from Rust</p>
</body>
</html>
",
),
"/search" => {
thread::sleep(Duration::from_secs(15));
page200(
"<!DOCTYPE html>
<html lang=\"en\">
<head>
<meta charset=\"utf-8\">
<title>Hello!</title>
</head>
<body>
<h1>Search Result</h1>
<p>Found nothing.</p>
</body>
</html>
",
)
},
_ => page404(),
}
} else {
page404()
};
stream.write_all(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
我們設定一個CLOSING
全域靜態變數來儲存目前的程式狀態是不是正在關閉中,預設為false
。並在main
函數中,進入for迴圈之前,先建立了一個執行緒,從標準輸入中取得使用者輸入的指令。當使用者輸入的指令為exit
時,就會將CLOSING
全域靜態變數的值更改為true
。Incoming
迭代器的for迴圈中,在處理HTTP請求前,我們先去判斷CLOSING
全域靜態變數的值是否為true
,如果是的話就直接跳出迴圈,結束程式。因此,當使用者輸入的指令為exit
時,除了要將CLOSING
全域靜態變數的值更改為true
之外,我們還必須要再使用TcpStream
結構體,連結到我們的HTTP Server,來觸發Incoming
迭代器的for迴圈進行下一次的迭代,程式才會去判斷CLOSING
全域靜態變數的值是否為true
。
執行程式專案,隨便使用瀏覽器快速開啟數次http://127.0.0.1:3000
網址和http://127.0.0.1:3000/search
網址來試試,最後再從終端機輸入exit
來安全地結束程式。終端機結果如下:
由於我們使用到可變的靜態變數,因此有使用unsafe
關鍵字使部份程式進入不安全模式。如果想挑戰不使用unsafe
關鍵字的話,也可以使用訊息傳遞的方式來完成同樣的功能,只是效能可能會稍微差一點。可將程式改寫如下:
use std::{
io::{self, prelude::*, Read},
net::{TcpListener, TcpStream},
sync::mpsc,
thread,
time::Duration,
};
use hello_web::ThreadPool;
const PORT: i32 = 3000;
const WORKER_SIZE: usize = 8;
fn main() {
let listener = TcpListener::bind(format!("127.0.0.1:{PORT}")).unwrap();
let pool = ThreadPool::new(WORKER_SIZE);
let (sender, receiver) = mpsc::channel();
thread::spawn(move || loop {
let mut buffer = String::new();
io::stdin().read_line(&mut buffer).unwrap();
if buffer.trim().to_lowercase() == "exit" {
sender.send("exit").unwrap();
println!("Server is closing.");
if let Ok(mut stream) = TcpStream::connect(format!("127.0.0.1:{PORT}")) {
if stream.write_all(b"exit").is_ok() && stream.flush().is_err() {
// do nothing
}
}
break;
}
});
for stream in listener.incoming() {
if receiver.try_recv().is_ok() {
break;
}
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Server connection closed.");
}
fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 512];
let c = stream.read(&mut buffer).unwrap();
let request = String::from_utf8_lossy(&buffer[0..c]);
let page404 = || {
let content = "<!DOCTYPE html>
<html lang=\"en\">
<head>
<meta charset=\"utf-8\">
<title>Hello!</title>
</head>
<body>
<h1>Oops!</h1>
<p>Sorry, I don't know what you're asking for.</p>
</body>
</html>
";
format!(
"HTTP/1.1 404 FOUND\r\nContent-Type: text/html\r\nContent-Length: {}\r\n\r\n{content}",
content.len()
)
};
let page200 = |content: &str| {
format!(
"HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nContent-Length: {}\r\n\r\n{content}",
content.len()
)
};
let response = if request.starts_with("GET") {
let request = &request["GET".len() + 1..];
let path = &request[..request.find(' ').unwrap()];
match path {
"/" => page200(
"<!DOCTYPE html>
<html lang=\"en\">
<head>
<meta charset=\"utf-8\">
<title>Hello!</title>
</head>
<body>
<h1>Hello!</h1>
<p>Hi from Rust</p>
</body>
</html>
",
),
"/search" => {
thread::sleep(Duration::from_secs(15));
page200(
"<!DOCTYPE html>
<html lang=\"en\">
<head>
<meta charset=\"utf-8\">
<title>Hello!</title>
</head>
<body>
<h1>Search Result</h1>
<p>Found nothing.</p>
</body>
</html>
",
)
},
_ => page404(),
}
} else {
page404()
};
stream.write_all(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
結論
我們的多執行緒且可以被安全關閉的Web伺服器程式到這裡就已經實作完成啦!這個章節的內容只是前幾個章節的應用,如果真的要使用Rust程式語言來開發Web伺服器程式,建議還是直接在crates.io上尋找現成的Web應用框架和套件來使用,開發速度會快很多。
這個系列的文章就到這裡為止了,Rust程式語言不是一個容易上手的程式語言,尤其是擁有權和生命周期真的很難理解,畢竟它們是其它大部分的程式語言所沒有的概念。不過,相信大家在使用Rust程式語言開發程式的時候都會注意到,雖然我們寫的程式可能要按照編譯器提示的訊息修改好幾次才能編譯成功,但編譯出來的程式幾乎就已經沒什麼問題了,不太需要用什麼偵錯工具來一行一行地查找程式碼哪邊有Bug出現。
期望在未來,Rust程式語言可以與更多領域作結合,像是完美搭配Webassembly,真正實現網站前後端都使用Rust程式語言來完成的架構。甚至是更深入底層,讓單晶片微電腦(Microcontroller)的程式也能夠使用Rust程式語言來撰寫!
如果還想學習更多Rust程式語言的知識,請參考程式語言的Rust分類文章。