但是,在分發消息時,需要壹次將壹個小時的所有索引文件加載到內存中。由於每個小時的消息索引是按順序寫入延遲索引的,而消息的送達時間是隨機的,所以寫入順序與消息的送達順序並不壹致。所以索引加載到內存後,需要按照消息具體投遞的秒級時間戳進行排序,然後根據排序後的索引讀取延遲日誌中的消息進行分發和投遞。
這種存儲方案存在以下問題:1。它需要壹次將整小時的消息索引加載到內存中,如果並發度高,內存壓力比較大。2.按消息傳遞的秒時間戳排序後,實時到達的新消息需要實時插入排序,性能低,延時大。
為了解決上述問題,我們將delay index中的索引元信息{offset,size,delivery timestamp}改為{offset,size,local index,global index,preglobal index},其中:
GlobalIndex可以直接定位延遲索引中的索引單元,從而確定延遲日誌中的壹條消息,preGlobalIndex可以定位同壹秒的最後壹條消息,所以只要在地面存儲每小時最後壹條消息的索引ID,就可以逆序找出每秒的所有消息。壹個小時只有3600秒,只需要將16字節的3600個索引id加載到內存中,就可以實現消息的每秒實時加載。
為了減少消息分發的延遲,可以將10s的最新消息索引預先加載到內存中。對於實時接收的消息,可以根據時間戳更新這壹秒最新消息的索引globalIndex和反向索引preGlobalIndex,不需要排序,消息插入和讀取的復雜度為O(1)。
數組和鏈表實現的多級時間輪機制分別是秒級和小時級。小時級別時間輪向前移動壹個槽,對應於秒級別時間輪的壹次旋轉。秒級時間輪上有3600個槽,每個槽的最大時間跨度為1s,時間輪每秒向前移動壹個槽。小時級實踐理論:每個槽點的時間跨度為1小時。每小時移動壹個槽,打開後面兩個小時的內存映射,同時清除兩個小時前延遲日誌文件的內存映射。
當我們只有2小時5分鐘的消息要發送時,秒級時間輪需要推兩次,即小時級時間輪移動2個槽,延時5分鐘,然後降級到秒級時間輪。這就是所謂的時間輪空轉。
壹般每個使用過的槽都會放入DelayQueue,然後根據DelayQueue來提升時間輪,防止空提升。比如有壹個延遲500s的任務,我們除了把它掛載到時間輪裏,還會把它放到DelayQueue裏,這樣DelayQueue的頭節點就延遲了500s。如果期間沒有添加小於500 s的延遲任務,我們只需要等待500s,推壹次時間輪。如果有壹個新的調度任務少於500秒,我們只需要喚醒DelayQueue,重新計算等待時間。
即在添加定時任務時,如果對應的槽是新槽(即添加的任務是該槽的第壹個任務),則在DelayQueue中添加壹個延遲的任務,並判斷其是否為頭節點,如果是,則喚醒DelayQueue重新計算等待時間。
當主節點漂移或網絡異常時,需要將時間輪分發控制從原來的主節點切換到新的主節點。為了保證分發狀態的連續性和壹致性,主節點將分兩個時間輪分發的tick信息同步到其他從節點,時間間隔為50 ms,Tick可以用來確定具體要分發的秒數,但不能用來確定該秒要分發的消息數。為此,在二級時間輪上增加壹個同步參數localIndex,記錄分發到當前秒的消息數,每個節點會定期持久化分發狀態。
每當主節點切換時,原主節點切換為從節點,會立即停止當前時間輪的分發任務,並清除分發狀態;新的主節點根據當前的同步分發狀態初始化兩級時間輪,但是主節點切換會有壹定的延遲周期或者極端情況下不同節點之間的時鐘有偏差。新主初始化時間輪的tick後,tick對應的秒級時間戳可能與節點的實際時間不壹致,需要在開始分發任務前進行特殊調整。如果tick時間戳小於當前時間,那麽分發任務sleep會壹直等到時間對齊。如果tick時間戳大於當前時間,則意味著存在尚未分發的過期消息。此時,tick遷移被持續推進,過期消息被直接異步交付,直到tick對應的時間戳小於當前時間。
正常的主切換可以分為兩種情況。壹種是主動釋放主,比如節點重啟與主的負載均衡過程。在這種情況下,在丟棄主節點之前,節點將首先將時間輪分布狀態同步到其他從節點,主節點將完全連續地切換時間輪分布。另壹種是主人在某些異常情況下的被動漂移。此時,在新的主節點上,時間輪的分發狀態可能會有最大50ms的延遲,並且會重復分發壹些消息。通過將時間輪分發狀態信息封裝到到期交付消息協議的擴展字段中,可以實現實時同步,paxos請求具有時間輪狀態的同步消息。
眾所周知,RocketMQ支持消息過濾,即在發送消息時,可以為消息設置壹個標簽。訂閱話題時,可以設置只消費帶有某些標簽的消息,起到消息過濾的作用。
客戶端拉取消息時,在服務器端獲取標簽的hash set codeSet,然後從consumerQueue中獲取壹條記錄,判斷記錄的hashCode是否在codeSet中,從而達到消息過濾的目的,決定是否將消息發送給消費者。
因為Hash中有沖突,過濾不完全準確,所以客戶端收到消息後會再次準確過濾。
還有壹種過濾方法,通過hash將標簽轉換成long,索引存儲所有標簽hash值的按位OR結果。在提取消息時,在由訂閱設置的標記哈希值和索引中的哈希值之間執行按位AND運算。如果結果等於訂閱設置的標簽哈希值,則表示索引對應的消息可能滿足要求,二次精準過濾仍在客戶端進行。否則肯定不符合要求,直接被過濾掉。