舉個最簡單的例子,壹個用戶推薦了另壹個用戶,24小時後設置壹個任務,看推薦的用戶有沒有註冊,如果沒有,給他發壹條短信。σ& gt;―(〃 ω 〃)?→
最初的想法
壹開始我想直接在內存中調用這個定時器。
考慮到Node.js的計時不是那麽準確(無論是setTimeout還是setInterval),我原本打算自己維護這個計時器隊列。
考慮到Node.js的原生對象消耗內存較多。之前我用JSON object保存了壹個字典,大概有12萬個詞條,原始文件大概有五六兆。當我用Node.js保存原始對象時,它實際上占用了五六百兆的內存——所以我打算用C++為這個定時器隊列編寫addon。
考慮到隨時插入的任務可能在已有任務之前,也可能在已有任務之後,本來想用C++寫壹個小的根堆。每當用戶執行壹個任務時,該任務就會被插入到堆中。
如果按照上面的方法,時間要求沒有那麽緊,那麽就是壹個process.nextTick()的連續過程。
在process.nextTick()中執行這樣壹個函數:
堆頂部的任務被連續地從小根堆中獲得並被處理,直到堆頂部的任務的執行時間大於當前時間。
繼續使用process.nextTick()讓下壹個Tick執行步驟1中的流程。
所以說到底就是壹個把任務插入小根堆,通過process.nextTick()消耗任務的過程。
最後,為了考慮到程序重啟時內存數據會丟失,我們還應該做壹件持久化的事情——在每次插入任務時,在持久化中間件中插入壹個副本,比如MySQL、MongoDB、Redis、Riak等任何三方依賴。消費任務時,順便刪除中間件中的這個任務數據。
也就是說,中間件中永遠留存的,是目前沒有完成的任務。每次程序重啟時,從中間件讀取所有任務重建堆,然後就可以繼續工作了。
如果當時沒有發現Redis的這壹妙用,上述過程就是實現預定任務的過程。
Redis妙用
Redis 2 . 8 . 0版本之後,引入了壹個新的特性——Redis key space Notifications,可以在2.0.0版本之後用SUBSCRIBE完成這個定時任務的操作,但是定時單位是秒。
發布/訂閱
在2.0.0之後,Redis引入了Pub/Sub命令,大致意思是壹方面向Redis的特定通道發送消息,另壹方面從Redis的特定通道取值——形成壹個簡單的消息隊列。
例如,您可以將消息欄推送到foo通道,這樣您就可以直接:
發布foo欄
java描述語言
var Redis = require(" iore dis ");
Var sub = new Redis(/**連接信息*/);
sub.once("連接",函數(){
//假設妳需要選擇redis的db,因為實際上我們不會汙染默認的db 0。
sub select(DB _ NUMBER,function(err) {
if(err)process . exit(4);
sub.subscribe("foo ",function() {
//...已成功訂閱頻道。
});
});
});
//偵聽來自“foo”的消息
sub.on("消息",功能(頻道,消息){
console.log(channel,msg);
});
Redis密鑰空間通知
Redis中有壹些事件,比如密鑰過期,密鑰刪除等等。然後,您可以配置壹些東西,讓Redis在這些事件被觸發時將消息推送到特定的通道。
需要關註本文中涉及的需求的事件是EXPIRE事件。
壹般的流程是給Redis的某個db設置壹個過期事件,這樣壹旦它的key過期,就會把消息推送到特定的通道,在自己的客戶端繼續消費這個通道就好了。
當壹個調度任務稍後到來時,任務狀態被壓縮成壹個鍵,到期時間是從這個任務執行的時間差。然後當密鑰過期的時候,就是任務執行的時候了,Redis自然會把過期的消息推走,讓客戶端接收。這樣就起到了定時任務的作用。
消息類型
當滿足某些條件時,會觸發兩種類型的此類消息,您需要選擇使用哪壹種。比如在db 0中刪除了壹個名為foo的鍵,那麽系統會推送消息到兩個通道,壹個是del事件通道推送foo消息,另壹個是foo通道推送del消息。系統推送的指令相當於:
PUBLISH _ _ keyspace @ 0 _ _:foo del PUBLISH _ _ keyevent @ 0 _ _:del foo
將del推送到foo的通道名為__keyspace@0__:foo,即“_ _ key space @”+db _ number+“_ _:"+key _ name;而del的通道名是“_ _ key event @”+db _ number+“_ _:”+event _ name。
部署
即使妳的Redis版本達標,這個功能在Redis裏默認是關閉的。您需要修改配置文件才能打開它,或者通過CLI中的指令直接修改它。這裏說壹下配置文件的修改。。
首先,打開Redis的配置文件。在不同的系統和安裝方法下,文件位置可能會有所不同。比如brew安裝的MacOS下的/usr/local/etc/redis.conf下,或者apt-get安裝的Ubuntu下的/etc/redis.conf下。簡而言之,找到配置文件。或者自己寫壹個配置文件,啟動時指定配置文件地址即可。
然後找壹個叫notify-keyspace-events的地方,找不到就自己加。它的值可以是Ex,Klg等等。這些字母的具體含義如下:
k,這意味著keyspace事件,這個字母意味著它將被發送到_ _ Keyspace @
e,表示keyevent事件,這個字母表示將發送到_ _ Keyevent @
g,表示壹些通用的指令事件支持,如DEL、EXPIRE、RENAME等。
$,它表示對字符串相關指令的事件支持。
l,表示支持列表相關的指令事件。
s,它表示對集合相關指令事件的支持。
哈希相關指令事件支持。
z,有序集相關指令事件支持。
x,expiration event,與G中的expiration不同,G中的expiration指的是EXPIRE key TTL的指令執行時的方式觸發的事件,這裏指的是此時key剛好到期時觸發的事件。
e,驅逐事件,由於內存限制,當密鑰被驅逐時將觸發的事件。
a的別名,g$lshzxe。換句話說,AKE代表了所有事件的意義。
結合上面的列表,可以拼湊出自己需要的事件支持字符串,只需要Ex就可以滿足要求,所以配置項是這樣的:
通知-密鑰空間-事件Ex
然後保存配置文件並啟動Redis來啟用對過期事件的支持。
實踐
讓我們從任務的創建者開始。因為這裏Redis的事件只傳鍵名,不傳鍵值,而當過期事件被觸發時,鍵就沒了,妳也拿不到鍵值,主系統和任務系統都是分布式的,所以需要的信息都塞到鍵名裏了。
壹個最簡單的鍵名設計就是任務類型+":"+JSON.stringify後的參數數組;更重要的是,妳可以直接用需要的函數路徑替換任務類型。比如需要執行這個任務的函數是task/foo/bar文件下的baz函數,參數arguments數組為0。反正妳只需要觸發這個鍵,不需要查詢這個鍵。當真正逾期的任務系統收到這個鍵名時,會逐壹分析,得到需要執行task/foo/bar.baz的消息,並將這個arguments傳入net函數。
因此,當接收到定時任務時,會獲得消息、函數名和到期時間參數。該功能
/* *假設redis是ioredis的對象*/
var sampleTaskMaker = function(message,func,timeout) {
message = JSON . stringify(message);
console.log("收到新任務:",func,message,"在"+ timeout +"之後。");
//uuid這裏是npm的壹個包。
//生成唯壹uuid的目的是防止兩個任務使用相同的函數和參數,然後
//鍵名可能會被復制和覆蓋。
//UUID的文檔是/package/node-uuid。
//
//這裏?是分隔符,冒號將uuid與下面的內容分隔開,而?分區函數的名稱。
//和消息
var key = uuid.v1()。替換(/-/g," ")+
":?"+ func +"?+消息;
var content =
redis.multi()
。設置(密鑰、內容)
。過期(按鍵,超時)
。exec(函數(錯誤){
如果(錯誤){
console.error("未能發布“+ content”的過期事件);
console.error(錯誤);
返回;
}
});
};
// assign是sugarjs中的壹個函數。
//將db填充到字符串中的{db}中。
var subscribe key = " _ _ key event @ { db } _ _:expired "。賦值({ db:1 });
//假設sub是ioredis的對象。
sub.once("連接",函數(){
//假設妳需要選擇redis的db,因為實際上不會汙染默認的db 0。
子選擇(1,函數(錯誤){
if(err)process . exit(4);
sub.subscribe("foo ",function() {
//...已成功訂閱頻道。
});
});
});
//偵聽來自“foo”的消息
sub.on(“消息”,sample on expired);
註意:此處選擇db 1是因為壹旦打開過期事件監視,就會發送該db的所有過期事件。為了不與通常使用的redis expiration key混淆,專門為此事使用了壹個新的db。比如妳在平時使用的db 0中監聽,那麽任務沒有觸發的過期事件也會被傳輸。此時解析的鍵名是錯誤的。
最後,sampleOnExpired函數。
var sampleOnExpired = function(channel,key) {
// UUID:?func?參數
var body = key.split("?);
if(body . length & lt;3)退貨;
//取出正文第壹位是func。
var func = body[1];
//推出前兩位,剩下的可能包含在參數裏?分裂了,所以把它放回去。
body . shift();body . shift();
var params = body.join("?);
//然後將params傳入func執行。
// func:
// path1/path2.func
func = func.split(" . ");
if(func.length!== 2) {
console . error(" task:",func.join(")的參數不正確)、“-”,params);
返回;
}
var path = func[0];
func = func[1];
var mod
嘗試{
mod = require("。/tasks/"+path);
} catch(e) {
console.error("加載模塊失敗",路徑);
控制臺.錯誤(e . stack);
返回;
}
process.nextTick(function() {
嘗試{
mod[func]。apply(null,JSON . parse(params));
} catch(e) {
console.error("調用函數失敗",路徑,"-",func,"-",params);
控制臺.錯誤(e . stack);
}
});
};
這個簡單的架子搭好之後,妳只需要寫壹堆任務執行函數,然後在生成任務的時候把相應的參數傳遞給sampleTaskMaker。Redis會自動過期並觸發壹個事件給sampleOnExpired函數,然後它會執行相應的任務處理函數。