當前位置:名人名言大全網 - 心情說說 - 如何為nodejs redis會話計時?

如何為nodejs redis會話計時?

這裏說的定時器任務,可以說是壹個定時器任務。例如,如果用戶觸發了壹個動作,那麽在24小時後應該對它做些什麽。那麽如果有1000個用戶觸發這個動作,就會有1000個計劃任務。所以這個不在Cron範疇。

舉個最簡單的例子,壹個用戶推薦了另壹個用戶,24小時後設置壹個任務,看推薦的用戶有沒有註冊,如果沒有,給他發壹條短信。σ& gt;―(〃 ω 〃)?→

最初的想法

壹開始我想直接在內存中調用這個定時器。

考慮到Node.js的計時不是那麽準確(無論是setTimeout還是setInterval),我原本打算自己維護這個計時器隊列。

考慮到Node.js的原生對象消耗內存較多。之前我用JSON object保存了壹個字典,大概有12萬個詞條,原始文件大概有五六兆。當我用Node.js保存原始對象時,它實際上占用了五六百兆的內存——所以我打算用C++為這個定時器隊列編寫addon。

考慮到隨時插入的任務可能在已有任務之前,也可能在已有任務之後,本來想用C++寫壹個小的根堆。每當用戶執行壹個任務時,該任務就會被插入到堆中。

如果按照上面的方法,時間要求沒有那麽緊,那麽就是壹個process.nextTick()的連續過程。

在process.nextTick()中執行這樣壹個函數:

堆頂部的任務被連續地從小根堆中獲得並被處理,直到堆頂部的任務的執行時間大於當前時間。

繼續使用process.nextTick()讓下壹個Tick執行步驟1中的流程。

所以說到底就是壹個把任務插入小根堆,通過process.nextTick()消耗任務的過程。

最後,為了考慮到程序重啟時內存數據會丟失,我們還應該做壹件持久化的事情——在每次插入任務時,在持久化中間件中插入壹個副本,比如MySQL、MongoDB、Redis、Riak等任何三方依賴。消費任務時,順便刪除中間件中的這個任務數據。

也就是說,中間件中永遠留存的,是目前沒有完成的任務。每次程序重啟時,從中間件讀取所有任務重建堆,然後就可以繼續工作了。

如果當時沒有發現Redis的這壹妙用,上述過程就是實現預定任務的過程。

Redis妙用

Redis 2 . 8 . 0版本之後,引入了壹個新的特性——Redis key space Notifications,可以在2.0.0版本之後用SUBSCRIBE完成這個定時任務的操作,但是定時單位是秒。

發布/訂閱

在2.0.0之後,Redis引入了Pub/Sub命令,大致意思是壹方面向Redis的特定通道發送消息,另壹方面從Redis的特定通道取值——形成壹個簡單的消息隊列。

例如,您可以將消息欄推送到foo通道,這樣您就可以直接:

發布foo欄

java描述語言

var Redis = require(" iore dis ");

Var sub = new Redis(/**連接信息*/);

sub.once("連接",函數(){

//假設妳需要選擇redis的db,因為實際上我們不會汙染默認的db 0。

sub select(DB _ NUMBER,function(err) {

if(err)process . exit(4);

sub.subscribe("foo ",function() {

//...已成功訂閱頻道。

});

});

});

//偵聽來自“foo”的消息

sub.on("消息",功能(頻道,消息){

console.log(channel,msg);

});

Redis密鑰空間通知

Redis中有壹些事件,比如密鑰過期,密鑰刪除等等。然後,您可以配置壹些東西,讓Redis在這些事件被觸發時將消息推送到特定的通道。

需要關註本文中涉及的需求的事件是EXPIRE事件。

壹般的流程是給Redis的某個db設置壹個過期事件,這樣壹旦它的key過期,就會把消息推送到特定的通道,在自己的客戶端繼續消費這個通道就好了。

當壹個調度任務稍後到來時,任務狀態被壓縮成壹個鍵,到期時間是從這個任務執行的時間差。然後當密鑰過期的時候,就是任務執行的時候了,Redis自然會把過期的消息推走,讓客戶端接收。這樣就起到了定時任務的作用。

消息類型

當滿足某些條件時,會觸發兩種類型的此類消息,您需要選擇使用哪壹種。比如在db 0中刪除了壹個名為foo的鍵,那麽系統會推送消息到兩個通道,壹個是del事件通道推送foo消息,另壹個是foo通道推送del消息。系統推送的指令相當於:

PUBLISH _ _ keyspace @ 0 _ _:foo del PUBLISH _ _ keyevent @ 0 _ _:del foo

將del推送到foo的通道名為__keyspace@0__:foo,即“_ _ key space @”+db _ number+“_ _:"+key _ name;而del的通道名是“_ _ key event @”+db _ number+“_ _:”+event _ name。

部署

即使妳的Redis版本達標,這個功能在Redis裏默認是關閉的。您需要修改配置文件才能打開它,或者通過CLI中的指令直接修改它。這裏說壹下配置文件的修改。。

首先,打開Redis的配置文件。在不同的系統和安裝方法下,文件位置可能會有所不同。比如brew安裝的MacOS下的/usr/local/etc/redis.conf下,或者apt-get安裝的Ubuntu下的/etc/redis.conf下。簡而言之,找到配置文件。或者自己寫壹個配置文件,啟動時指定配置文件地址即可。

然後找壹個叫notify-keyspace-events的地方,找不到就自己加。它的值可以是Ex,Klg等等。這些字母的具體含義如下:

k,這意味著keyspace事件,這個字母意味著它將被發送到_ _ Keyspace @

e,表示keyevent事件,這個字母表示將發送到_ _ Keyevent @

g,表示壹些通用的指令事件支持,如DEL、EXPIRE、RENAME等。

$,它表示對字符串相關指令的事件支持。

l,表示支持列表相關的指令事件。

s,它表示對集合相關指令事件的支持。

哈希相關指令事件支持。

z,有序集相關指令事件支持。

x,expiration event,與G中的expiration不同,G中的expiration指的是EXPIRE key TTL的指令執行時的方式觸發的事件,這裏指的是此時key剛好到期時觸發的事件。

e,驅逐事件,由於內存限制,當密鑰被驅逐時將觸發的事件。

a的別名,g$lshzxe。換句話說,AKE代表了所有事件的意義。

結合上面的列表,可以拼湊出自己需要的事件支持字符串,只需要Ex就可以滿足要求,所以配置項是這樣的:

通知-密鑰空間-事件Ex

然後保存配置文件並啟動Redis來啟用對過期事件的支持。

實踐

讓我們從任務的創建者開始。因為這裏Redis的事件只傳鍵名,不傳鍵值,而當過期事件被觸發時,鍵就沒了,妳也拿不到鍵值,主系統和任務系統都是分布式的,所以需要的信息都塞到鍵名裏了。

壹個最簡單的鍵名設計就是任務類型+":"+JSON.stringify後的參數數組;更重要的是,妳可以直接用需要的函數路徑替換任務類型。比如需要執行這個任務的函數是task/foo/bar文件下的baz函數,參數arguments數組為0。反正妳只需要觸發這個鍵,不需要查詢這個鍵。當真正逾期的任務系統收到這個鍵名時,會逐壹分析,得到需要執行task/foo/bar.baz的消息,並將這個arguments傳入net函數。

因此,當接收到定時任務時,會獲得消息、函數名和到期時間參數。該功能

/* *假設redis是ioredis的對象*/

var sampleTaskMaker = function(message,func,timeout) {

message = JSON . stringify(message);

console.log("收到新任務:",func,message,"在"+ timeout +"之後。");

//uuid這裏是npm的壹個包。

//生成唯壹uuid的目的是防止兩個任務使用相同的函數和參數,然後

//鍵名可能會被復制和覆蓋。

//UUID的文檔是/package/node-uuid。

//

//這裏?是分隔符,冒號將uuid與下面的內容分隔開,而?分區函數的名稱。

//和消息

var key = uuid.v1()。替換(/-/g," ")+

":?"+ func +"?+消息;

var content =

redis.multi()

。設置(密鑰、內容)

。過期(按鍵,超時)

。exec(函數(錯誤){

如果(錯誤){

console.error("未能發布“+ content”的過期事件);

console.error(錯誤);

返回;

}

});

};

// assign是sugarjs中的壹個函數。

//將db填充到字符串中的{db}中。

var subscribe key = " _ _ key event @ { db } _ _:expired "。賦值({ db:1 });

//假設sub是ioredis的對象。

sub.once("連接",函數(){

//假設妳需要選擇redis的db,因為實際上不會汙染默認的db 0。

子選擇(1,函數(錯誤){

if(err)process . exit(4);

sub.subscribe("foo ",function() {

//...已成功訂閱頻道。

});

});

});

//偵聽來自“foo”的消息

sub.on(“消息”,sample on expired);

註意:此處選擇db 1是因為壹旦打開過期事件監視,就會發送該db的所有過期事件。為了不與通常使用的redis expiration key混淆,專門為此事使用了壹個新的db。比如妳在平時使用的db 0中監聽,那麽任務沒有觸發的過期事件也會被傳輸。此時解析的鍵名是錯誤的。

最後,sampleOnExpired函數。

var sampleOnExpired = function(channel,key) {

// UUID:?func?參數

var body = key.split("?);

if(body . length & lt;3)退貨;

//取出正文第壹位是func。

var func = body[1];

//推出前兩位,剩下的可能包含在參數裏?分裂了,所以把它放回去。

body . shift();body . shift();

var params = body.join("?);

//然後將params傳入func執行。

// func:

// path1/path2.func

func = func.split(" . ");

if(func.length!== 2) {

console . error(" task:",func.join(")的參數不正確)、“-”,params);

返回;

}

var path = func[0];

func = func[1];

var mod

嘗試{

mod = require("。/tasks/"+path);

} catch(e) {

console.error("加載模塊失敗",路徑);

控制臺.錯誤(e . stack);

返回;

}

process.nextTick(function() {

嘗試{

mod[func]。apply(null,JSON . parse(params));

} catch(e) {

console.error("調用函數失敗",路徑,"-",func,"-",params);

控制臺.錯誤(e . stack);

}

});

};

這個簡單的架子搭好之後,妳只需要寫壹堆任務執行函數,然後在生成任務的時候把相應的參數傳遞給sampleTaskMaker。Redis會自動過期並觸發壹個事件給sampleOnExpired函數,然後它會執行相應的任務處理函數。