2020/07/20

馬可夫鏈 Markov Chain

有許多自然或社會現象,其發展或演變會隨時間進行而呈現出幾種可能的狀態,稱為狀態空間,這種隨機變化的過程稱為隨機過程 stochastic process。如果記錄狀態的時間點是離散的,就稱為離散時間隨機過程, Markov Process 或是高速公路每天的意外事件數量就是一種離散時間隨機過程,如果記錄狀態的時間點是連續的,就稱為連續時間隨機過程,布朗運動 Brownian Motion 或是在銀行等待服務的人數就是一種連續時間隨機過程。

如果時間與狀態都是離散的 Markov Process 就稱為 Markov Chain。Markov Chain 是隨機變數 \(X_0, X_1, X_2, X_3, ...\) ,也就是 \({X_n, n>=0}\) 的一個數列,其中 \(X_0, X_1, ... X_{n-1}\) 都是過去時間的狀態, \(X_n\) 是目前的狀態, \(X_{n+1}, X_{n+2}, ...\) 是未來的狀態,每一個狀態可轉移到其他狀態,轉移的機率總和為 1,因此目前的狀態都是由前一個狀態以及轉移機率來決定的。

討論 Markov Chain 有兩個先決條件

  1. 目前的狀態都是由前一個狀態的轉移機率來決定的
  2. 在任何時間點,系統的事件只存在於某一種狀態中

Makov Chain 的表示方式

  1. transition diagram

  2. transition matrix 轉移矩陣

    \( \begin{bmatrix} 0 & 1 & 0 \\ 0 & 0.5 & 0.5 \\ 0.6 & 0 & 0.4 \end{bmatrix} \)

  3. tree diagram

transition matrix

為方便計算,以 transition matrix 作為表示方式

\( T = \begin{bmatrix} a_{11} & a_{12} & a_{13} \\ a_{21} & a_{22} & a_{23} \\ a_{31} & a_{32} & a_{33} \end{bmatrix} \)

選定某一個狀態為起點,根據轉移矩陣改變狀態,當狀態不斷地改變,會產生一個狀態變化的路徑。如考慮所有的狀況,可計算出每一個路徑的發生機率,這些路徑就稱為 Markov Chain。


問題:如果一開始的狀態在 \(a_1\) ,三天後,在 \(a_2\) 的機率為?

一天後: \( \begin{bmatrix} 1 & 0 & 0 \end{bmatrix}\begin{bmatrix} 0 & 1 & 0 \\ 0 & 0.5 & 0.5 \\ 0.6 & 0 & 0.4 \end{bmatrix} = \begin{bmatrix} 0 & 1 & 0 \end{bmatrix} \)

兩天後: \( \begin{bmatrix} 0 & 1 & 0 \end{bmatrix}\begin{bmatrix} 0 & 1 & 0 \\ 0 & 0.5 & 0.5 \\ 0.6 & 0 & 0.4 \end{bmatrix} = \begin{bmatrix} 0 & 0.5 & 0.5 \end{bmatrix} \)

三天後: \( \begin{bmatrix} 0 & 0.5 & 0.5 \end{bmatrix}\begin{bmatrix} 0 & 1 & 0 \\ 0 & 0.5 & 0.5 \\ 0.6 & 0 & 0.4 \end{bmatrix} = \begin{bmatrix} 0.3 & 0.25 & 0.45 \end{bmatrix} \)

如果直接一次計算:\( \begin{bmatrix} 1 & 0 & 0 \end{bmatrix} T^3 = \begin{bmatrix} 1 & 0 & 0 \end{bmatrix}(\begin{bmatrix} 0 & 1 & 0 \\ 0 & 0.5 & 0.5 \\ 0.6 & 0 & 0.4 \end{bmatrix})^3 = \begin{bmatrix} 0.3 & 0.25 & 0.45 \end{bmatrix} \)

結果為 0.25

吸收馬可夫鏈

在 Markov Process 中,當事件進入某個狀態時,就一直停留在該狀態,稱該狀態為吸收狀態 absorbing state

如果 Markov Chain 有下列兩種特性,就稱為 Absorbing Markov Chain

  1. 至少存在一個吸收狀態
  2. 事件在任何非吸收狀態時,經過有限時間後,就會進入 absorbing state

ex:

\( \begin{bmatrix} 0.2 & 0.3 & 0 & 0.5 \\ 0 & 0.4 & 0 & 0.6 \\ 0.5 & 0 & 0.5 & 0 \\ 0 & 0 & 0 & 1 \end{bmatrix} \) 是一種 Absorbing Markov Chain

\( \begin{bmatrix} 0.2 & 0 & 0.4 & 0.4 \\ 0 & 1 & 0 & 0 \\ 0.1 & 0 & 0.5 & 0.4 \\ 0.5 & 0 & 0.3 & 0.2 \end{bmatrix} \) 不是一種 Absorbing Markov Chain,雖然 \(a_2\) 是 absorbing state,但是其他三種狀態,都不能到達 \(a_2\),因此這不是 Absorbing Markov Chain

醫院是一種 absorbing markov chain 的實例,如果將進入醫院住院的病患,分為住院可走動,住院臥床,痊癒出院,死亡四種狀態,這四種狀態可以構成一個 absorbing markov chain,其中痊癒出院即死亡,都分別為 absorbing state

工廠品管商品,生產線製造的新品(狀態1),通過品管後為合格品(狀態2),失敗的商品會進入再造品(狀態3),再造品可再重新加工,變成修復品(狀態4),重新加工失敗最終成為報廢品(狀態5)

\( \begin{matrix} \begin{matrix} \quad\quad\quad\quad & 合格品 & 報廢品 & 新品 & 再造品 & 修復品 \end{matrix} \\ \begin{matrix} 合格品 \\ 報廢品 \\ 新品 \\ 再造品 \\ 修復品 \end{matrix} \begin{bmatrix} 1 & 0 & 0 & 0 & 0 \\ 0 & 1 & 0 & 0 & 0 \\ 0.7 & 0 & 0 & 0.3 & 0 \\ 0.9 & 0 & 0 & 0 & 0.1 \\ 0.6 & 0.4 & 0 & 0 & 0 \end{bmatrix} \end{matrix} \)

m 階馬可夫鏈

前面討論的 Markov Chain,用前一個單位時間的狀態可以預測下一個狀態,這稱為一階馬可夫鏈 first-order Markov Chain,如果需要前 m 個狀態,才能預測下一個狀態,就稱為 m 階馬可夫鏈。當然也有零階馬可夫鏈,也就是一般的機率分佈模型。

通常討論的馬可夫鏈都是一階馬可夫鏈,最重要的是無記憶性,因為系統不需要記錄前面經過的狀態,只需要目前的狀態,就可以預測下一個狀態。

HMM: Hidden Markov Model

當系統存在一些外部看不見的內部狀態,隨著系統內部狀態不同,表現出來的外部狀態也不同,我們可以直接觀察到外部狀態。

以買飲料為例,假設目前有三種飲料可以選擇:可樂、茶、牛奶,我們只能觀察到使用者每天早上9:00吃早餐,正在喝哪一種飲料,但是我們不知道他今天是在 7-11、FamilyMart 或是 HiLife 買到的。便利商店是內部狀態,三種飲料是外部狀態。

內部狀態的變化可用一個 transition matrix 表示

\( A= \begin{bmatrix} 0.7 & 0.1 & 0.2 \\ 0.3 & 0.2 & 0.5 \\ 0.4 & 0.3 & 0.3 \end{bmatrix} \)

不同的內部狀態,會產生不同的外部狀態,也可以用另一個 matrix 表示

\( B = \begin{bmatrix} 0.2 & 0.2 & 0.6 \\ 0.3 & 0.3 & 0.4 \\ 0.5 & 0.3 & 0.2 \end{bmatrix} \)

如果以 h 為內部狀態, \(h^{t}\) 為 t 單位時間的內部狀態, y 為外部狀態,\(y^{t}\) 是 t 單位時間的外部狀態,則

\(h^{t} = h^{t-1}A\)

\(y^{t} = h^{t}B\)

在實務上的問題,我們只知道外部狀態的觀察值,內部狀態被隱藏無法得知,也就是我們不會知道 transition matrix 實際的機率值,也就是我們不知道 A 與 B 的數值,只有一堆數據資料。

例如我們目前有甲乙丙三個人半年來每一週買飲料的歷史資料,當給定某一週買飲料的序列清單時,我們可以根據歷史資料,找出每一個人的購買習慣,也就是計算出 transition matrix,訓練模型,最後利用這個模型,將新的那一週的資料,分類判斷可能是哪一個人購買的。

在已知觀察序列,但未知轉移矩陣的條件下,找出其對應的的所有可能狀態序列,以及其中最大機率之狀態序列。

HMM 是有關時間序列的數據資料建立的模型,也是所有時間模型(ex: RNN, NLP, RL)的基礎。

HMM 三個基本問題

定義HMM參數為 λ,S 為狀態值集合, O 為觀察值集合,初始機率 π (Initial Probability)、轉移機率 A (Transition Probability)、發射機率 B (Emission Probability),因此 λ = ( π, A, B )

  1. Evaluation Problem

    評估問題最簡單,將所有機率做乘積就可以得到觀測序列發生的機率。

    根據觀察序列O及已知的模型參數λ,得出發生此序列的機率 P(O|λ)

    可用 Forward-backward Algorithm 降低計算複雜度

  2. Decoding Problem

    根據觀察序列O及模型參數λ,得出最有可能的隱藏狀態序列(hidden state sequence),也就是要求得哪一種狀態序列發生的機率最大,求給定觀測序列條件概率P(I|Q,λ)最大的狀態序列I

    可用 Viterbi Algorithm

    Viterbi算法是求最短路徑的過程,由於不知道每個觀測狀態的隱藏狀態是什麼,所以初始化時,每種隱藏狀態都有可能,計算在每種隱藏狀態下的整個觀測序列發生的機率,機率最大的那條路徑就是每個觀測狀態對應的唯一隱藏狀態。

  3. Learning Problem

    學習問題最複雜

    在已知隱藏狀態與觀察狀態的條件下,根據觀察序列O,找到最佳的參數 狀態轉移矩陣 A、觀察轉移矩陣 B、起始值機率 π,調整模型參數 λ = [A, B, π] 使得 P(O|λ) 最大化

    可用 EM Algorithm

語音辨識 HMM

語音辨識 HMM

語音辨識的目標,是事先使用大量語料,對每一個辨識語句建立一個 DHMM,然後當我們取得使用者輸入的測試語句後,即對測試語句進行端點偵測及 MFCC 特徵擷取,然後再將 MFCC 送至各個辨識語句所形成的 DHMM,算出每一個模型的機率值,機率值最大的 DHMM,所對應的辨識語句就是此系統的辨識結果。每一個音框的特徵向量都是39維的 MFCC

建立 0~9 數字語音辨識的方法

  1. 建立 0, 1, 2 ~ 9 的 HMMs
  2. 利用 Viterbi Decoding 找到新語句中,計算針對每一個 HMM 發音音素順序的序列的發生機率,也就是計算輸入語句和每個模型的相似度
  3. 最大機率值的 HMM,就是預測的 digit

進行 DHMM 的設計時,我們必須對每一個數字建立一個 HMM 模型,以數字「九」的發音為例,相關的 HMM 模型可以圖示如下:

把「九」的發音切分成三個「狀態」(States),分別代表 ㄐ、ㄧ、ㄡ 的發音,每一個狀態可以包含好幾個音框,而每一個音框隸屬於一個狀態的程度,是用一個機率值來表示,稱為「狀態機率」(State Probability)。此外,當一個新的音框進來時,我們可以將此音框留在目前的狀態,也可以將此音框送到下一個狀態,這些行為模式也都是用機率來表示,統稱為「轉移機率」(Transition Probability),其中我們使用「自轉移機率」(Self-transition Probability)來代表新音框留在目前狀態的機率,而用「次轉移機率」(Next-transition Probability)來代表新的音框跳到下一個狀態的機率。

由於每一個音框都會轉成一個語音特徵向量,我們首先使用 VQ 來將這些特徵向量轉成符號(Symbols),換句話說,每一個音框所對應的 VQ 群聚的索引值(Index),即是此音框的符號。若以數學來表示,k = O(i) 即代表 frame i 被分到 cluster k。

定義一些常用的數量:

  • frameNum:一段語音所切出的音框個數。
  • dim:每一個音框所對應的語音特徵向量的維度(例如基本的 MFCC 有 12 維)。
  • stateNum:一個 DHMM 的狀態個數
  • symbolNum:符號的個數,也就是 VQ 後的群數

HMM 的參數,就是指「狀態機率」和「轉移機率 Transition Probability」,說明如下:

  • 我們通常以矩陣 A 來表示轉移機率,其維度是 stateNum x stateNum,其中 A(i, j) 即是指由狀態 i 跳到狀態 j 的機率值。例如在上圖中,由狀態 1 跳到狀態 2 的機率是 0.3,因此 A(1, 2) = 0.3。一般而言,A(i, j) 滿足下列條件:
    • 在進行轉移時,我們只允許搜尋路徑跳到下一個相鄰的狀態,因此 A(i, j) = 0 if j≠i and j≠i+1。
    • 對於某一個狀態而言,所有的轉移機率總和為 1,因此 A(i, i) + A(i, i+1) = 1。
  • 我們通常以矩陣 B 來表示狀態機率,其維度是 symbolNum x stateNum,B(k,j) 即是指符號 k 隸屬於狀態 j 的機率值。換句話說,B 定義了由「符號」到「狀態」的機率,因此給定第 i 個音框,我們必須先由 O(i) 找出這個音框所對應的符號 k,然後再由 B(k,j) 找出此音框屬於狀態 j 的機率。

假設我們已經知道「九」的 HMM 所包含相關的參數 A 和 B,此時當我們錄音得到一段語音,我們如何算出來此語音隸屬於「九」的機率或程度?如何計算輸入語句和每個模型的相似度?換句話說,我們如何將每個音框分配到各個狀態之中,使得我們能夠得到整段語音最高的機率值?最常用的方法稱為 Viterbi Decoding,這是基於「動態規劃」(Dynamic Programming)的方法,可用數學符號定義如下:

  1. 目標函數:定義 D(i, j) 是 t(1:i) 和 r(1:j) 之間的最大的機率,t(1:i) 是前 i 個音框的特徵向量所成的矩陣,r(1:j) 則是由前 j 個狀態所形成的 DHMM,對應的最佳路徑是由 (1, 1) 走到 (i, j)。
  2. 遞迴關係:D(i, j) = B(O(i), j) + max{D(i-1, j)+A(j, j), D(i-1, j-1)+A(j-1, j)}
  3. 端點條件:D(1, j) = p(1, j) + B(O(1), j), j = 1 ~ stateNum
  4. 最後答案:D(m, n)

在上述數學式中,我們所用的所有機率都是「對數機率」(Log Probabilities),所以原來必須連乘的數學方程式,都變成了連加,具有下列好處:

  • 以加法來取代乘法,降低計算的複雜度。
  • 避開了由於連乘所造成的數值誤差。

假設轉移機率 A 和狀態機率 B 已知,那麼經由 Viterbi Decoding,我們可以找到最佳路徑(即是最佳分配方式),使整段路徑的機率值為最大,此機率值及代表輸入語音隸屬於此 DHMM 的機率,若是機率越高,代表此輸入語音越可能是「九」。

有關於 B(O(i),j)的定義,都是「音框 i 隸屬於狀態 j 的機率」,但是在 DHMM 和 CHMM 的算法差異很大,說明如下:

  • 在 DHMM,若要計算 B(O(i),j),我們必須先找到音框 i 所對應的符號 O(i),然後在經由查表法查到此符號 O(i) 屬於狀態 j 的機率是 B(O(i), j)。
  • 在 CHMM,B(O(i),j) 是由一個連續的機率密度函數所定義。

如何經由大量語料來估測每個 HMM 模型的最佳參數值 A 和 B?

首先,我們必須先定義什麼是「最佳參數」:對某一個特定模型而言,最佳參數值 A 和 B 應能使語料產生最大的對數機率總和。這個定義和最大似然估計(maximum likelihood estimation,縮寫為MLE) 完全相同,也非常合理。

計算最佳參數的方法,稱為 Re-estimation,其原理非常類似 batch k-means (Forgy's method) 的方法,先猜 A 和 B 的值,再反覆進行 Viterbi Decoding,然後再重新估算 A 和 B 的值,如此反覆計算,我們可以證明在疊代過程中,機率總和會一路遞增,直到逼近最佳值。(但是,就如同 k-means Clustering,我們並無法證明所得到的機率值是 Global Maximum,因此在訓練的過程中,可以使用不同的啟始參數,看看是否在反覆疊代後,能夠得到更好的機率總和。)求取參數的方法說明如下:

  1. 將所有的訓練語句切出音框,並將每一個音框轉成語音特徵向量,例如 39 維的 MFCC。

  2. 對所有語音特徵向量進行向量量化編碼(Vector Quantization, VQ),找出每一個向量所對應的 symbol(此即為對應的中心點或codeword)

  3. 先猜一組 A 和 B 的啟始值。如果沒有人工的標示資料,我們可以使用簡單的「均分法」,示意圖如下:

  4. 反覆進行下列兩步驟,直到收斂

    • Viterbi decoding:在 A 和 B 固定的情況下,利用 Viterbi decoding,找出 n 句「九」的語料所對應的 n 條最佳路徑。
    • Re-estimation:利用這 n 條最佳路徑,重新估算 A 和 B。

估算 A 的方法

\( A= \begin{bmatrix} 3/4 & 1/4 & 0 \\ 0 & 4/5 & 1/5 \\ 0 & 0 & 1 \end{bmatrix} \)

估算 B 的方法:

\( B= \begin{bmatrix} 1/4 & 3/5 & 0 \\ 1/4 & 2/5 & 2/6 \\ 2/4 & 0 & 0 \\ 0 & 0 & 4/6 \end{bmatrix} \)

假設我們的目標函數可以寫成 P(A, B, Path),則上述求參數的方法會讓 P(A, B, Path)逐次遞增,直到收斂,因為:

  1. 在 A, B 固定時,Viterbi decoding 會找出最佳的路徑,使 P(A, B, Path) 有最大值
  2. 在路徑(Path)固定時,Re-estimation 會找出最佳的 A, B 值以使 P(A, B, Path)有最大值

References

隨機過程.pdf

Markov Chain

Markov Model

Hidden Markov Model (part 1)

Markov chain 及 HMM

HMM(Hidden Markov Model)簡介

機器不學習:HMM模型解析

Hidden Markov Model (part 3)

hmmDiscrete_chinese

即時語音辨識系統

2020/07/13

BRD、MRD、PRD Requirement Documents

文件用途

  • BRD:Bussiness Requirement Document 商業需求文件

    討論市場中的市場趨勢、商業模型。一份BRD可能不止一個產品,而是數個產品為主的切入商業模式。用來決定公司大戰略的方向,去看數個產品對於公司的「總體戰略」與「商業價值」。

    主要給產品、運營、研發、財務、老闆等管階層看的,用來決定是否要開始某個產品。

  • MRD:Market Requirement Document 市場需求文件

    決定產品對於用戶的價值,一份MRD內需有該產品的市場分析、用戶分析、需求分析、競品策略、產品路線圖等,用來看公司現在醞釀的產品能提供哪些與其他競爭對手們,增加哪些不同的功能去切入這個市場。

    主要是給產品、運營、研發等專案人員看的,在大家一致認可需求成立的時候,來商量該怎麼做,如何做,什麼時間做。

  • PRD:Product Requirement Document 產品需求文件

    決定產品該做成什麼樣子,包含產品功能、功能流程邏輯、互動方式、產品雛形等等,用以跟開發團隊溝通。

    主要是給專案經理、UI 設計師、開發團隊、測試工程師、運營等人員查看,是具體的產品設計文件,開發者可以根據PRD得知整個產品的邏輯,測試人員可以根據PRD 產生測試項目,專案經理可以根據PRD拆分工作。PRD是專案啓動之前,必須要通過評審確定的最重要文檔,PRD決定了成品的結果!

簡單地說:

  • BRD 討論現在做什麼產品比較好,決定要不要做
  • MRD 討論如何做有特色具有競爭力的產品,決定如何開始做
  • PRD 決定做成什麼樣子

BRD 文件內容

  • Background
    • Market
    • Client
    • Competitor
  • Business Model
    • Niche point
    • Growth point
    • Margin analysis
  • Product Description
    • Product Positioning
    • Product Vision
    • Product Roadmap
    • Product Core Feature
  • Profit & Cost
    • Revenue
    • Fixed Cost
    • Variable Cost
  • Risk
    • Risk Assessment
    • Risk Control

MRD 文件內容

  • Market Research
    • Character
    • Opportunity
    • Trend
    • Segment
    • Barriers
    • Timing
  • Target Market
    • Target Point
    • Scale
    • Character
    • Trend
  • Target User
    • Persona
    • Journey
    • Category
    • Core User
  • Competitor
    • Direct & Indirect
    • Business Model
    • TA
    • Operation
    • Techonics
    • Market Share
  • Adv. & Positioning
    • SWOT
    • Porter Five Force
    • 4P, 4C
    • STP
  • Product Description
    • Product Positioning
    • Product Vision
    • Product Roadmap
    • Product Core Feature

PRD 文件內容

  • File Description
    • File name
    • Version
    • Revised History
    • Table
    • Term Explanation
  • Product Description
    • Background
    • Vision
    • Roadmap
    • Goal
    • Value
  • Funciton Requirement
    • Global Rule
    • Product Structure
    • Flow Chart
    • Function List
    • Detailed Description
  • Non-Function Requirement
    • Data Need
    • Historical Data
    • API
    • Security
    • Performance
    • Compatibility

References

PM 文件指南 - 產品經理一定要掌握的 PRD/SPEC 心法與撰寫教學

如何撰寫新產品開發相關文件?MRD與PRD是甚麼?又有何不同?

【SOP不藏私】系列#EP1「連猴子也會的PRD指南」

產品需求文檔

互聯網產品設計常用文檔類型BRD、MRD、PRD、FSD

BRD、MRD 和 PRD 之間的區別與聯繫有哪些?

如何撰寫一份合格的產品需求文件PRD?

PRD文檔範例,產品經理值得收藏的寫作手冊

2020/07/06

rust20_實作 multithread web server

  1. 學習一些 TCP 與 HTTP 知識
  2. 在 socket 上監聽 TCP connections
  3. 解析少量的 HTTP request
  4. 產生一個合適的 HTTP response
  5. 通過 thread pool 改善 server 的吞吐量

Building a Single-Threaded Web Server

產生 project heelo

$ cargo new hello
     Created binary (application) `hello` project
$ cd hello

修改 main.rs,監聽 TCP port 7878

use std::net::TcpListener;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        println!("Connection established!");
    }
}

bind 類似 new,會回傳一個新的 TcpListener instance

bind 回傳 Result<T, E>,這代表綁定可能會失敗,例如,連接 80 端口需要管理員權限(非管理員用戶只能監聽大於 1024 的端口),所以如果不是管理員嘗試連接 80 端口,則會綁定失敗。

另外如果運行兩個此程序的實例這樣會有兩個程式監聽相同的port,綁定會失敗。因為我們是出於學習目的來編寫一個基礎的 server,將不用關心處理這類錯誤,使用 unwrap 在出現這些情況時直接停止程式。

TcpListenerincoming 方法回傳一個 iterator,它提供了一系列的 stream(更準確的說是 TcpStream 類型的流)。stream)代表一個客戶端和服務端之間打開的連線。連線connection)代表客戶端連接服務端、服務端生成 response 以及服務端關閉連接的全部 request / response 過程。為此,TcpStream 允許我們讀取它來查看客戶端發送了什麼,並可以撰寫 response。for 循環會依次處理每個連接並產生一系列的流供我們處理。

目前為止,處理流的過程包含 unwrap,如果出現任何錯誤會終止程序,如果沒有任何錯誤,則列印出信息。接下來我們將為成功的情況增加更多功能。當客戶端連接到服務端時 incoming方法回傳錯誤是可能的,因為我們實際上沒有遍歷連接,而是遍歷 連線嘗試connection attempts)。連線可能會因為很多原因不能成功,大部分是操作系統相關的。例如,很多系統限制同時打開的連接數;新連接嘗試產生錯誤,直到一些打開的連接關閉為止。

測試

在終端執行 cargo run,接著在 browser 中瀏覽 127.0.0.1:7878。瀏覽器會顯示出看起來無法連接的錯誤訊息,因為 server 目前並沒 response 任何資料。但是如果我們觀察 terminal,會發現當瀏覽器連接 server 時會印出一系列的訊息!

Connection established!
Connection established!
Connection established!

使用 ctrl-C 來停止程序。並在做出最新的修改之後執行 cargo run 重啟服務。

讀取 request

為了分離取得連線和接下來對連接的操作的相關內容,我們將開始一個新函數來處理連線。在這個新的 handle_connection 函數中,我們從 TCP 流中讀取資料並列印出來以便觀察瀏覽器發送過來的資料。

use std::io::prelude::*;
use std::net::TcpStream;
use std::net::TcpListener;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

// stream 參數是可變的。這是因為 TcpStream 在內部記錄了所返回的資料,內部狀態可能會改變
fn handle_connection(mut stream: TcpStream) {
    // 宣告一個 buffer 來存放讀取到的數據
    let mut buffer = [0; 512];

    // 將緩衝區中的字節轉換為字符串並打印出來。String::from_utf8_lossy 函數獲取一個 &[u8] 並產生一個 String
    // 函數名的 “lossy” 部分來源於當其遇到無效的 UTF-8 序列時的行為:它使用 �,U+FFFD REPLACEMENT CHARACTER,來代替無效序列。你可能會在緩衝區的剩餘部分看到這些替代字元
    stream.read(&mut buffer).unwrap();

    println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
}

觀察 http request

Request: GET / HTTP/1.1
Host: 127.0.0.1:7878
Connection: keep-alive
Cache-Control: max-age=0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-TW,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-CN;q=0.6,es;q=0.5,vi;q=0.4,de;q=0.3
DN

http 格式

Method Request-URI HTTP-Version CRLF
headers CRLF
message-body

第一行是 request line

/ 是 URI: Uniform Resource Identifier

CRLF序列 (CRLF代表 carriage return line feed,這是打字機時代的術語!)結束。

撰寫 response

HTTP-Version Status-Code Reason-Phrase CRLF
headers CRLF
message-body

第一行是 status line

回應 HTTP/1.1 200 OK\r\n\r\n

use std::io::prelude::*;
use std::net::TcpStream;
use std::net::TcpListener;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 512];

    stream.read(&mut buffer).unwrap();

    let response = "HTTP/1.1 200 OK\r\n\r\n";

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

當在瀏覽器中加載 127.0.0.1:7878 時,會得到一個空頁面而不是錯誤。

回應 html

在項目根目錄創建一個新文件,hello.html

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="utf-8">
    <title>Hello!</title>
  </head>
  <body>
    <h1>Hello!</h1>
    <p>Hi from Rust</p>
  </body>
</html>

修改 handle_connection 來讀取 HTML 文件

use std::fs;
// --snip--

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 512];
    stream.read(&mut buffer).unwrap();

    let contents = fs::read_to_string("hello.html").unwrap();

    let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", contents);

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

在 browser 就能看到 hello.html 網頁內容

validating request and selectively responding

目前會回傳固定的 html,修改為可判斷網址

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 512];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";

    if buffer.starts_with(get) {
        let contents = fs::read_to_string("hello.html").unwrap();

        let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", contents);

        stream.write(response.as_bytes()).unwrap();
        stream.flush().unwrap();
    } else {
        // 其他請求
        let status_line = "HTTP/1.1 404 NOT FOUND\r\n\r\n";
        let contents = fs::read_to_string("404.html").unwrap();

        let response = format!("{}{}", status_line, contents);

        stream.write(response.as_bytes()).unwrap();
        stream.flush().unwrap();
    }
}

匹配 requst GET / HTTP/1.1,並區分不同的 response

對於任何不是 / 的請求返回 404 狀態碼的 response 和錯誤頁面

404.html

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="utf-8">
    <title>Hello!</title>
  </head>
  <body>
    <h1>Oops!</h1>
    <p>Sorry, I don't know what you're asking for.</p>
  </body>
</html>

refactoring

將這些區別分別提取到一行 ifelse 中,對狀態行和文件名變數賦值;然後在讀取文件和寫入 response 的代碼中無條件的使用這些變數。

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 512];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    let contents = fs::read_to_string(filename).unwrap();

    let response = format!("{}{}", status_line, contents);

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

Turn into a Multithreaded Server

目前 server 會依次處理每一個請求,如果 server 正接收越來越多的請求,會使性能越來越差。如果一個請求花費很長時間來處理,隨後而來的請求則不得不等待這個長請求結束。我們需要修復這種情況,不過首先讓我們實際嘗試重現一下這個問題。

模擬 slow request

在回應 /sleep 時會暫停 5s

use std::thread;
use std::time::Duration;
// --snip--

fn handle_connection(mut stream: TcpStream) {
    // --snip--

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else if buffer.starts_with(sleep) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    // --snip--
}

利用 thead pool 改善效能

線程池thread pool)是一組預先分配的等待或準備處理任務的 thread。當程式收到一個新任務,線程池中的一個線程會被分配任務,這個線程會離開並處理任務。其餘的線程則可用於處理在第一個線程處理任務的同時處理其他接收到的任務。當第一個線程處理完任務時,它會返回空閒線程池中等待處理新任務。線程池允許我們並發處理連接,增加 server 的吞吐量。

我們會將 pool 線程限制為較少的數量,以防拒絕服務(Denial of Service, DoS)攻擊;如果程序為每一個接收的請求都產生一個線程,某人向 server 發起千萬級的請求時會耗盡服務器的資源並導致所有請求的處理都被終止。

不同於分配無限的線程,線程池中將有固定數量的等待線程。當新進請求時,將請求發送到線程池中做處理。線程池會維護一個接收請求的隊列。每一個線程會從隊列中取出一個請求,處理請求,接著向對隊列索取另一個請求。通過這種設計,則可以並發處理 N 個請求,其中 N 為線程數。如果每一個線程都在響應慢請求,之後的請求仍然會阻塞隊列,不過相比之前增加了能處理的慢請求的數量。

這個設計僅僅是多種改善 web server 吞吐量的方法之一。其他可供探索的方法有 fork/join 模型和單線程異步 I/O 模型。

先為每一個 stream 分配了一個新線程進行處理

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

一個假想的 thread pool API

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

使用 ThreadPool::new 來產生一個新的線程池,它有一個可配置的線程數的參數,在這裡是 ˋ。這樣在 for 循環中,pool.execute 有類似 thread::spawn 的 API,它獲取一個線程池運行於每一個流的閉包。pool.execute 需要實現為獲取閉包並傳遞給池中的線程。這段 code 還不能編譯,不過編譯器會告訴我們如何修復它。

建立 ThreadPool struct

src/lib.rs

pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

src/main.rs

use hello::ThreadPool;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

因為還沒有定義 execute method

$ cargo check
    Checking hello v0.1.0 (/Users/charley/project/idea/rust/hello)
error[E0599]: no method named `execute` found for type `hello::ThreadPool` in the current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |              ^^^^^^^

error: aborting due to previous error

我們會在 ThreadPool 上定義 execute 函數來獲取一個閉包參數。chap13 的 “使用帶有泛型和 Fn trait 的閉包” 部分,閉包作為參數時可以使用三個不同的 trait:FnFnMutFnOnce。我們需要決定這裡該使用哪種閉包。因最終需要實現的類似於標準庫的 thread::spawn,所以我們可以觀察 thread::spawn 的簽名在其參數中使用了何種 bound。

spawn 的 API 文件是這樣

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T + Send + 'static,
        T: Send + 'static

F 是這裡我們關心的參數;T 與回傳值有關所以我們並不關心。spawn 使用 FnOnce 作為 F 的 trait bound,這可能也是我們需要的,因為最終會將傳遞給 execute 的參數傳給 spawn。因為處理請求的線程只會執行閉包一次,這也進一步確認了 FnOnce 是我們需要的 trait,這裡符合 FnOnceOnce 的意思。

F 還有 trait bound Send 和生命週期綁定 'static,這對我們的情況也是有意義的:需要 Send來將閉包從一個線程轉移到另一個線程,而 'static 是因為不知道線程會執行多久。讓我們編寫一個使用帶有這些 bound 的泛型參數 FThreadPoolexecute 方法:

impl ThreadPool {
    // --snip--

    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {

    }
}
$ cargo check
    Checking hello v0.1.0 (/Users/charley/project/idea/rust/hello)
warning: unused variable: `size`
 --> src/lib.rs:4:16
  |
4 |     pub fn new(size: usize) -> ThreadPool {
  |                ^^^^ help: consider prefixing with an underscore: `_size`
  |
  = note: #[warn(unused_variables)] on by default

warning: unused variable: `f`
 --> src/lib.rs:8:30
  |
8 |     pub fn execute<F>(&self, f: F)
  |                              ^ help: consider prefixing with an underscore: `_f`

    Finished dev [unoptimized + debuginfo] target(s) in 1.57s

目前只能編譯,還無法運作

new 驗證 pool 中 thread 的數量

這裡仍然存在警告是因為其並沒有對 newexecute 的參數做任何操作。讓我們用期望的行為來實現這些函數。

new 開始。先前選擇使用無符號類型作為 size 參數的類型,因為線程數為負的線程池沒有意義。然而,線程數為零的線程池同樣沒有意義,不過零是一個完全有效的 u32 值。讓我們增加在返回 ThreadPool 實例之前檢查 size 是否大於零的代碼,並使用 assert! 宏在得到零時 panic,如 20-13 所示:

impl ThreadPool {
    /// 創建線程池。
    ///
    /// 線程池中線程的數量。
    ///
    /// # Panics
    ///
    /// `new` 函數在 size 為 0 時會 panic。
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // --snip--
}

這裡用文檔註釋為 ThreadPool 增加了一些文件

如果要做得更好,可以改為傳回 Result

pub fn new(size: usize) -> Result<ThreadPool, PoolCreationError> {

產生儲存 threads 的空間

現在有了一個有效的線程池線程數,就可以實際產生這些線程,並在回傳前將他們儲存在 ThreadPool 結構中。不過要如何 “儲存” 一個線程?讓我們再看看 thread::spawn 的簽名:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T + Send + 'static,
        T: Send + 'static

spawn 返回 JoinHandle<T>,其中 T 是閉包返回的類型。嘗試使用 JoinHandle 來看看會發生什麼。在我們的情況中,傳遞給線程池的閉包會處理連線並不返回任何值,所以 T 將會是單元類型 ()

20-14 中的代碼可以編譯,不過實際上還並沒有產生任何線程。我們改變了 ThreadPool 的定義來存放一個 thread::JoinHandle<()> 的 vector 實例,使用 size 容量來初始化,並設置一個 for 循環了來執行產生線程的code,並回傳包含這些線程的 ThreadPool 實例:

lib.rs

use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
   pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        // with_capacity 與 Vec::new 做了同樣的工作,不過它為 vector 預先分配空間。因為已經知道了 vector 中需要 size 個元素,預先進行分配比僅僅 Vec::new 要稍微有效率一些
        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // create some threads and store them in the vector
        }

        ThreadPool {
            threads
        }
    }

    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {

    }
}

Worker struct 負責從 ThreadPool 將 code 傳給 thread

20-14 的 for 循環中留下了一個關於產生線程的註釋。實際上如何產生線程呢?這是一個難題。標準庫提供的產生線程的方法,thread::spawn,它期望獲取一些一旦產生線程就應該執行的代碼。然而,我們希望開始線程並使其等待稍後傳遞的代碼。標準庫的線程實現並沒有包含這麼做的方法;我們必須自己實現。

我們將要實現的行為是產生線程並稍後發送代碼,這會在 ThreadPool 和線程間引入一個新類別來管理這種新行為。這個資料結構稱為 Worker:這是一個 pool 中的常見概念。想像一下在餐館廚房工作的員工:員工等待來自客戶的訂單,他們負責接受這些訂單並完成它們。

不同於在線程池中儲存一個 JoinHandle<()> 實例的 vector,我們會儲存 Worker 結構的實例。每一個 Worker 會儲存一個單獨的 JoinHandle<()> 實例。接著會在 Worker 上實現一個方法,它會獲取需要允許代碼的閉包並將其發送給已經啟動的線程執行。我們還會賦予每一個 worker id,這樣就可以在日誌和呼叫中區別線程池中的不同 worker。

首先,讓我們做出產生 ThreadPool 時所需的修改。在通過如下方式設置完 Worker 之後,我們會實現向線程發送閉包的代碼:

  1. 定義 Worker 結構存放 idJoinHandle<()>
  2. 修改 ThreadPool 存放一個 Worker 實例的 vector
  3. 定義 Worker::new 函數,它獲取一個 id 數字並返回一個帶有 id 和用空閉包分配的線程的 Worker 實例
  4. ThreadPool::new 中,使用 for 循環計數生成 id,使用這個 id 新建 Worker,並儲存進 vector 中

20-15 就是一個做出了這些修改的例子:

use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
   pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool {
            workers
        }
    }

    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {

    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker {
            id,
            thread,
        }
    }
}

ThreadPool 中從 threads 改為 workers,因為它現在儲存 Worker 而不是 JoinHandle<()>。使用 for 循環中的計數作為 Worker::new 的參數,並將每一個新建的 Worker 儲存在叫做 workers 的 vector 中。

Worker 結構和其 new 函數是私有的,因為外部代碼(比如 src/bin/main.rs 中的 server)並不需要知道關於 ThreadPool 中使用 Worker 結構的實現細節。Worker::new 函數使用 id 參數並儲存了使用一個 closure 產生的 JoinHandle<()>

這段code 能夠編譯並用指定給 ThreadPool::new 的參數產生儲存了一系列的 Worker 實例,不過 仍然 沒有處理 execute 中得到的閉包。

透過 Channels 發送 request 給 threads

下一個需要解決的問題是傳給 thread::spawn 的閉包完全沒有做任何工作。目前,我們在 execute 方法中獲得期望執行的閉包,不過在創建 ThreadPool 的過程中產生每一個 Worker 時需要向 thread::spawn 傳遞一個閉包。

我們希望剛創建的 Worker 結構能夠從 ThreadPool 的隊列中獲取需要執行的代碼,並發送到線程中執行他們。

在 chap 16,我們學習了 通道 —— 一個溝通兩個線程的簡單手段 —— 對於這個例子來說則是絕佳的。這裡通道將充當任務隊列的作用,execute 將通過 ThreadPool 向其中線程正在尋找工作的 Worker 實例發送任務。如下是計畫:

  1. ThreadPool 會產生一個通道並當作發送端。
  2. 每個 Worker 將會充當通道的接收端。
  3. 一個 Job 結構來存放用於向通道中發送的閉包。
  4. execute 方法會在通道發送端發出期望執行的任務。
  5. 在線程中,Worker 會歷遍通道的接收端並執行任何接收到的任務。

從在 ThreadPool::new 中產生通道並讓 ThreadPool 實例充當發送端開始,如 20-16 所示。Job 是將在通道中發出的類別,目前它是一個沒有任何內容的結構體

// --snip--
use std::sync::mpsc;

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool {
            workers,
            sender,
        }
    }
    // --snip--
}

ThreadPool::new 中,產生了一個通道,並接著讓線程池在接收端等待。這段 code 能夠編譯,不過仍有警告。


嘗試在線程池產生每個 worker 時,將通道的接收端傳遞給他們。我們希望在 worker 所分配的線程中使用通道的接收端,所以將在閉包中引用 receiver 參數。20-17 中展示的 code 還不能編譯:

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool {
            workers,
            sender,
        }
    }
    // --snip--
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker {
            id,
            thread,
        }
    }
}

這是一些小而直觀的修改:將通道的接收端傳遞進了 Worker::new,並接著在閉包中使用它。

Rust 所提供的通道實現是多 生產者,單 消費者 的。這意味著不能簡單的 clone 通道的消費端來解決問題。從通道隊列中取出任務涉及到修改 receiver,所以這些線程需要一個能安全的共享和修改 receiver 的方式,否則可能導致 race condition

chap 16 討論的線程安全智能指針,為了在多個線程間共享所有權並允許線程修改其值,需要使用 Arc<Mutex<T>>Arc 使得多個 worker 擁有接收端,而 Mutex 則確保一次只有一個 worker 能從接收端得到任務。

use std::sync::Arc;
use std::sync::Mutex;
// --snip--

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender,
        }
    }

    // --snip--
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--
    }
}

ThreadPool::new 中,將通道的接收端放入一個 Arc 和一個 Mutex 中。對於每一個新 worker,clone Arc 來增加引用計數,如此這些 worker 就可以共享接收端的所有權了。

通過這些修改,code可以編譯了

實作 execute

實現 ThreadPool 上的 execute 方法。同時也要修改 Job 結構:它將不再是結構,Job 將是一個有著 execute 接收到的閉包類型的 trait 對象的類別別名。 chap 19 “類別別名用來創建類別同義詞” 部分提到過,類別別名允許將長的類別變短。

// --snip--

type Job = Box<FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--

    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

// --snip--

為存放每一個閉包的 Box 產生一個 Job 類型別名,接著在通道中發出任務

在使用 execute 得到的閉包產生 Job instance 之後,將這些任務從通道的發送端發出。這裡呼叫 send 上的 unwrap,因為發送可能會失敗,例如這可能發生於停止了所有線程執行的情況,這意味著接收端停止接收新消息了。不過目前我們無法停止線程執行;只要線程池存在他們就會一直執行。使用 unwrap 是因為我們知道失敗不可能發生,即便編譯器不這麼認為。

在 worker 中,傳給 thread::spawn 的閉包仍然還只是 引用 了通道的接收端。相反我們需要閉包一直循環,向通道的接收端請求任務,並在得到任務時執行他們。如 20-20 對 Worker::new 做出修改:

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {} got a job; executing.", id);

                (*job)();
            }
        });

        Worker {
            id,
            thread,
        }
    }
}

首先在 receiver 上呼叫了 lock 來獲取互斥器,接著 unwrap 在出現任何錯誤時 panic。如果互斥器處於一種叫做 被污染poisoned)的狀態時獲取鎖可能會失敗,這可能發生於其他線程在持有鎖時 panic 了且沒有釋放鎖。在這種情況下,呼叫 unwrap 使其 panic 是正確的行為。可將 unwrap 改為包含有意義錯誤信息的 expect

如果鎖定了互斥器,接著調用 recv 從通道中接收 Job。最後的 unwrap 也繞過了一些錯誤,這可能發生於持有通道發送端的線程停止的情況,類似於如果接收端關閉時 send 方法如何回傳 Err 一樣。

呼叫 recv 會阻塞當前線程,所以如果還沒有任務,其會等待直到有可用的任務。Mutex<T> 確保一次只有一個 Worker 線程嘗試請求任務。

但目前還是無法編譯


為了呼叫儲存在 Box<T> (這正是 Job 別名的類型)中的 FnOnce 閉包,該閉包需要能將自己移動 Box<T>,因為當呼叫這個閉包時,它獲取 self 的所有權。通常來說,將值移動出 Box<T> 是不被允許的,因為 Rust 不知道 Box<T> 中的值將會有多大;chap 15 能夠正常使用 Box<T> 是因為我們將未知大小的值儲存進 Box<T> 從而得到已知大小的值。

chap17 曾見過,17-15 中有使用了 self: Box<Self> 語法的方法,它允許方法獲取儲存在 Box<T> 中的 Self 值的所有權。這正是我們希望做的,然而不幸的是 Rust 不允許我們這麼做:Rust 當閉包被調用時行為的那部分並沒有使用 self: Box<Self> 實現。所以這裡 Rust 也不知道它可以使用 self: Box<Self> 來獲取閉包的所有權並將閉包移動出 Box<T>

Rust 仍在努力改進提升編譯器的過程中,未來新版的 rust,應該可讓 20-20 中的 code 正常工作。


不過目前讓我們通過一個小技巧來繞過這個問題。可以明確告訴 Rust 在這裡我們可以使用 self: Box<Self> 來獲取 Box<T> 中值的所有權,而一旦獲取了閉包的所有權就可以呼叫它了。這涉及到定義一個新 trait,它帶有一個在簽名中使用 self: Box<Self> 的方法 call_box,為任何實現了 FnOnce() 的類型定義這個 trait,修改類別別名來使用這個新 trait,並修改 Worker 使用 call_box 方法。這些修改如 20-21 所示:

trait FnBox {
    fn call_box(self: Box<Self>);
}

impl<F: FnOnce()> FnBox for F {
    fn call_box(self: Box<F>) {
        (*self)()
    }
}

type Job = Box<FnBox + Send + 'static>;

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {} got a job; executing.", id);

                job.call_box();
            }
        });

        Worker {
            id,
            thread,
        }
    }
}

新增一個 trait FnBox 來繞過當前 Box<FnOnce()> 的限制

新增了一個叫做 FnBox 的 trait。這個 trait 有一個方法 call_box,它類似於其他 Fn* trait 中的 call 方法,除了它獲取 self: Box<Self> 以便獲取 self 的所有權並將值從 Box<T> 中移動出來。

接下來,為任何實現了 FnOnce() trait 的類型 F 實現 FnBox trait。這實際上就等於任何 FnOnce() closure 都可以使用 call_box 方法。call_box 的實現使用 (*self)() 將閉包移動出 Box<T> 並呼叫此閉包。

現在我們需要 Job 類別別名是任何實現了新 trait FnBoxBox。這允許我們在得到 Job 值時使用 Worker 中的 call_box。為任何 FnOnce() 閉包都實現了 FnBox trait 意味著無需對實際在通道中發出的值做任何修改。現在 Rust 就能夠理解我們的行為是正確的了。

Graceful shutdown and cleanup

20-21 中的代碼如期通過使用線程池異步的響應請求。這裡有一些警告說 workersidthread 字段沒有直接被使用,這提醒了我們並沒有清理所有的內容。當使用不那麼優雅的 ctrl-C 終止主線程時,所有其他線程也會立刻停止,即便它們正處於處理請求的過程中。

現在我們要為 ThreadPool 實現 Drop trait 對線程池中的每一個線程調用 join,這樣這些線程將會執行完他們的請求。接著會為 ThreadPool 實現一個告訴線程他們應該停止接收新請求並結束的方式。為了實踐這些代碼,修改 server 在優雅停機(graceful shutdown)之前只接受兩個請求。

ThreadPool 實現 Drop Trait

當線程池被丟棄時,應該 join 所有線程以確保他們完成其操作。20-23 展示了 Drop 實現的第一次嘗試;這些代碼還不能夠編譯:

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

編譯錯誤

error[E0507]: cannot move out of borrowed content
  --> src/lib.rs:61:13
   |
61 |             worker.thread.join().unwrap();
   |             ^^^^^^^^^^^^^ cannot move out of borrowed content

error: aborting due to previous error

這告訴我們並不能調用 join,因為只有每一個 worker 的可變借用,而 join 獲取其參數的所有權。為瞭解決這個問題,需要一個方法將 thread 移動出擁有其所有權的 Worker 實例以便 join可以消費這個線程。17-15 中我們曾見過這麼做的方法:如果 Worker 存放的是 Option<thread::JoinHandle<()>,就可以在 Option 上調用 take 方法將值從 Some 成員中移動出來而對 None 成員不做處理。換句話說,正在運行的 Workerthread 將是 Some 成員值,而當需要清理 worker 時,將 Some 替換為 None,這樣 worker 就沒有可以運行的線程了。

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

當新建 Worker 時需要將 thread 值封裝進 Some

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

調用 Option 上的 takethread 移動出 worker

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

向 thread 發送訊息,使其停止接收任務

有了這些修改,code 就能編譯且沒有任何警告。不過也有壞消息,這些代碼還不能以我們期望的方式運行。問題的關鍵在於 Worker 中分配的線程所運行的閉包中的邏輯:呼叫 join 並不會關閉線程,因為他們一直 loop 來尋找任務。如果採用這個實現來嘗試丟棄 ThreadPool ,則主線程會永遠阻塞在等待第一個線程結束上。

為了修復這個問題,修改線程既監聽是否有 Job 運行也要監聽一個應該停止監聽並退出無限循環的信號。所以通道將發送這個枚舉的兩個成員之一而不是 Job 實例

enum Message {
    NewJob(Job),
    Terminate,
}

Message 要麼是存放了線程需要運行的 JobNewJob 成員,要麼是會導致線程退出循環並終止的 Terminate 成員。

同時需要修改通道來使用 Message 而不是 Job,收發 Message 值並在 Worker 收到 Message::Terminate 時退出循環

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Message>,
}

// --snip--

impl ThreadPool {
    // --snip--

    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {
        let job = Box::new(f);

        self.sender.send(Message::NewJob(job)).unwrap();
    }
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) ->
        Worker {

        let thread = thread::spawn(move ||{
            loop {
                let message = receiver.lock().unwrap().recv().unwrap();

                match message {
                    Message::NewJob(job) => {
                        println!("Worker {} got a job; executing.", id);

                        job.call_box();
                    },
                    Message::Terminate => {
                        println!("Worker {} was told to terminate.", id);

                        break;
                    },
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

為了使用 Message enum 需要將兩個地方的 Job 修改為 MessageThreadPool 的定義和 Worker::new 的簽名。ThreadPoolexecute 方法需要發送封裝進 Message::NewJob 成員的任務。然後,在 Worker::new 中當從通道接收 Message 時,當獲取到 NewJob成員會處理任務而收到 Terminate 成員則會退出循環。

修改後再次能夠編譯並繼續按照期望的行為運行。不過還是會得到一個警告,因為並沒有產生任何 Terminate 成員的消息。如 20-25 所示修改 Drop 實現來修復此問題:

impl Drop for ThreadPool {
    fn drop(&mut self) {
        println!("Sending terminate message to all workers.");

        for _ in &mut self.workers {
            self.sender.send(Message::Terminate).unwrap();
        }

        println!("Shutting down all workers.");

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

在對每個 worker 線程調用 join 之前向 worker 發送 Message::Terminate

現在遍歷了 worker 兩次,一次向每個 worker 發送一個 Terminate 消息,一個呼叫每個 worker 線程上的 join。如果嘗試在同一循環中發送消息並立即 join 線程,則無法保證當前迭代的 worker 是從通道收到終止消息的 worker。

為了更好的理解為什麼需要兩個分開的循環,想像一下只有兩個 worker 的場景。如果在一個單獨的循環中遍歷每個 worker,在第一次迭代中向通道發出終止消息並對第一個 worker 線程調用 join。我們會一直等待第一個 worker 結束,不過它永遠也不會結束因為第二個線程接收了終止消息。deadlock!

為了避免此情況,首先在一個循環中向通道發出所有的 Terminate 消息,接著在另一個循環中 join 所有的線程。每個 worker 一旦收到終止消息即會停止從通道接收消息,意味著可以確保如果發送同 worker 數相同的終止消息,在 join 之前每個線程都會收到一個終止消息。

為了實踐這些代碼,如 20-26 所示修改 main 在優雅停止 server 之前只接受兩個請求:

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

你不會希望真實世界的 web server 只處理兩次請求就停機了,這只是為了展示優雅停機和清理處於正常工作狀態。

take 方法定義於 Iterator trait,這裡限制循環最多頭 2 次。ThreadPool 會在 main 的結尾離開作用域,而且還會看到 drop 實現的運行。

使用 cargo run 啟動 server,並發起三個 request。第三個request 應該會失敗

這個特定的運行過程中一個有趣的地方在於:注意我們向通道中發出終止消息,而在任何線程收到消息之前,就嘗試 join worker 0 了。worker 0 還沒有收到終止消息,所以主線程阻塞直到 worker 0 結束。與此同時,每一個線程都收到了終止消息。一旦 worker 0 結束,主線程就等待其他 worker 結束,此時他們都已經收到終止消息並能夠停止了。

以下是完整的程式

main.rs

extern crate hello;
use hello::ThreadPool;

use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::fs::File;
use std::thread;
use std::time::Duration;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 512];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else if buffer.starts_with(sleep) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

     let mut file = File::open(filename).unwrap();
     let mut contents = String::new();

     file.read_to_string(&mut contents).unwrap();

     let response = format!("{}{}", status_line, contents);

     stream.write(response.as_bytes()).unwrap();
     stream.flush().unwrap();
}

lib.rs

use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;

enum Message {
    NewJob(Job),
    Terminate,
}

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Message>,
}

trait FnBox {
    fn call_box(self: Box<Self>);
}

impl<F: FnOnce()> FnBox for F {
    fn call_box(self: Box<F>) {
        (*self)()
    }
}

type Job = Box<dyn FnBox + Send + 'static>;

impl ThreadPool {
    /// 創建線程池。
    ///
    /// 線程池中線程的數量。
    ///
    /// # Panics
    ///
    /// `new` 函數在 size 為 0 時會 panic。
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender,
        }
    }

    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {
        let job = Box::new(f);

        self.sender.send(Message::NewJob(job)).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        println!("Sending terminate message to all workers.");

        for _ in &mut self.workers {
            self.sender.send(Message::Terminate).unwrap();
        }

        println!("Shutting down all workers.");

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) ->
        Worker {

        let thread = thread::spawn(move ||{
            loop {
                let message = receiver.lock().unwrap().recv().unwrap();

                match message {
                    Message::NewJob(job) => {
                        println!("Worker {} got a job; executing.", id);

                        job.call_box();
                    },
                    Message::Terminate => {
                        println!("Worker {} was told to terminate.", id);

                        break;
                    },
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

References

The Rust Programming Language

中文版

中文版 2