Rebalance線程默認每隔20s進行壹次消息隊列加載,獲取subject隊列信息mqSet和消費者組的所有當前消費者cidAll,然後按照壹定的加載算法分配隊列。分配原則是同壹個消費者可以分配多個消息隊列,同壹個消息消費隊列在同壹時間只會分配給壹個消費者。此時,您可以計算當前使用者分配的消息隊列集,並將原始加載隊列與當前分配隊列進行比較。如果新隊列集不包含原始隊列,則停止並刪除原始隊列消息使用,如果原始隊列不包含新分配隊列,則創建PullRequest。
負載平衡在RebalanceService線程中啟動,壹個mqclientstatus保存壹個RebalanceService實現,它從mqclientstatus的啟動開始。
從上面可以看出,MQClientinstance遍歷註冊的使用者,並對使用者執行doRebalance方法。
以上是遍歷訂閱信息,重新加載每個主題的隊列。接下來,將執行rebalanceByTopic方法,該方法將根據廣播模式或集群模式以不同的方式進行處理。這裏只說明集群模式下的方法。
獲取該主題下的隊列信息以及該使用者組中的所有當前使用者id。每個DefaultMQPushConsumerImpl保存壹個單獨的RebalanceImpl對象。
對該主題下的隊列信息和該使用者組中的所有當前使用者id進行排序,以確保壹個使用者組的成員看到相同的訂單,並防止同壹使用者隊列被多個使用者分配。
AllocateResult記錄當前使用者分配的消息隊列。
調用updateprocessqueuetableinrebalance來比較消息隊列是否已更改。
從上面可以看出,processQueueTable記錄了當前消費者負載的消息隊列緩存表,該方法中的mqSet記錄了負載分配後當前消費者的消息隊列集。如果processQueueTable中的消息隊列在mqSet中不存在,說明該消息隊列已經分配給了其他的消費者,所以需要暫停該消息隊列的消息消費,使用* * pq . set dropped(true);這個說法就可以了。
然後使用removeuncessarymessagequeue * *的方法來確定mq是否從緩存中刪除。
之後,它開始遍歷這次分配給消費者的消息隊列,並與mqSet結合。如果processQueueTable中不包含該消息隊列,則意味著這是壹個新添加的消息隊列。
首先從內存中刪除消息隊列的消息進度,然後調用computePullFromWhere從磁盤中讀取消息隊列的消耗進度,並創建壹個PullRequest對象。
從上面可以看出,計算消息進度的方法主要有三種,有些是相似的。
首先,從磁盤獲取消息隊列的消耗進度。如果大於0,則表示消息隊列已被消耗,下壹次消耗將從該位置繼續。如果等於-1,則嘗試將消息存儲時間戳作為消費者發起的時間戳進行操作,如果能找到則返回找到的偏移量,如果找不到則返回0;如果小於-1,則表示消息進度文件中存儲了錯誤的偏移量,返回-1。
此方法結束時,將調用dispatchPullRequest方法,並將PullRequest添加到PullMessageService中,以喚醒PullMessageService線程並拉取消息。
至此,消費者負載平衡方面已經結束。