當前位置:名人名言大全網 - 短信平臺 - 對於NodeJS如何操作消息隊列RabbitMQ的分析

對於NodeJS如何操作消息隊列RabbitMQ的分析

這篇文章主要介紹了關於對NodeJS如何操作消息隊列RabbitMQ的分析,有著壹定的參考價值,現在分享給大家,有需要的朋友可以參考壹下

壹. 什麽是消息隊列?消息(Message)是指在應用間傳送的數據。消息可以非常簡單,比如只包含文本字符串,也可以更復雜,可能包含嵌入對象。

消息隊列(Message Queue)是壹種應用間的通信方式,消息發送後可以立即返回,由消息系統來確保消息的可靠傳遞。消息發布者只管把消息發布到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不管是誰發布的。這樣發布者和使用者都不用知道對方的存在。

二. 常用的消息隊列有哪些?RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq。

甚至現在部分NoSQL也可做消息隊列,如Redis。

三. 消息隊列的使用場景?異步處理應用解耦流量削峰四. 使用案例上規模的公司都會有自己的日誌分析系統,日誌系統是怎麽實現的呢?

圖解:用戶在訪問應用的時候,我們要記錄下用戶的操作記錄和系統的異常日誌,常規的做法是將系統產生的日誌保存到服務器磁盤,在服務器中開啟定時任務,定時將磁盤的日誌信息傳入mq中(生產者),也定時將mq中的消息取出並存到相應的數據庫,如ElasticSearch或Hive中。

五. 如何安裝RabbitMQ?上面的案例介紹了MQ的壹個使用場景,我這裏是用RabbitMQ舉例,現實項目中可能用到的是Kafka。

首先安裝brew(mac為例)

/usr/bin/ruby -e "$(curl -fsSL /Homebrew/install/master/install)"安裝RabbitMQ

brew install rabbitmq運行RabbitMQ

進入到 /usr/local/Cellar/rabbitmq/3.7.7,執行

sbin/rabbitmq-server啟動插件

進入到 /usr/local/Cellar/rabbitmq/3.7.7/sbin

./rabbitmq-plugins enable rabbitmq_management登陸管理界面

打開瀏覽器輸入:http://localhost:15672,RabbitMQ默認15672端口六. Nodejs操作RabbitMQ

網上可以找到好幾個相應的Node SDK,這裏推薦amqplib

1.生產者

/**

* 對RabbitMQ的封裝

*/

let amqp = require('amqplib');

class RabbitMQ {

constructor() {

this.hosts = [];

this.index = 0;

this.length = this.hosts.length;

this.open = amqp.connect(this.hosts[this.index]);

}

sendQueueMsg(queueName, msg, errCallBack) {

let self = this;

self.open

.then(function (conn) {

return conn.createChannel();

})

.then(function (channel) {

return channel.assertQueue(queueName).then(function (ok) {

return channel.sendToQueue(queueName, new Buffer(msg), {

persistent: true

});

})

.then(function (data) {

if (data) {

errCallBack && errCallBack("success");

channel.close();

}

})

.catch(function () {

setTimeout(() => {

if (channel) {

channel.close();

}

}, 500)

});

})

.catch(function () {

let num = self.index++;

if (num <= self.length - 1) {

self.open = amqp.connect(self.hosts[num]);

} else {

self.index == 0;

}

});

}

}2. 消費者

/**

* 對RabbitMQ的封裝

*/

let amqp = require('amqplib');

class RabbitMQ {

constructor() {

this.open = amqp.connect(this.hosts[this.index]);

}

receiveQueueMsg(queueName, receiveCallBack, errCallBack) {

let self = this;

self.open

.then(function (conn) {

return conn.createChannel();

})

.then(function (channel) {

return channel.assertQueue(queueName)

.then(function (ok) {

return channel.consume(queueName, function (msg) {

if (msg !== null) {

let data = msg.content.toString();

channel.ack(msg);

receiveCallBack && receiveCallBack(data);

}

})

.finally(function () {

setTimeout(() => {

if (channel) {

channel.close();

}

}, 500)

});

})

})

.catch(function () {

let num = self.index++;

if (num <= self.length - 1) {

self.open = amqp.connect(self.hosts[num]);

} else {

self.index = 0;

self.open = amqp.connect(self.hosts[0]);

}

});

}3. 通過生產者向MQ發送壹個消息,並創建隊列

let mq = new RabbitMQ();

mq.sendQueueMsg('testQueue', 'my first message', (error) => {

console.log(error)

})執行之後,我們打開管理平臺,發現RabbbitMQ已經接受到了壹條消息:

並且RabbbitMQ新增了壹個隊列testQueue

4. 獲取指定隊列的消息

let mq = new RabbitMQ();

mq.receiveQueueMsg('testQueue',(msg) => {

console.log(msg)

})// 輸出結果:my first message此時打開RabbitMQ管理平臺,消息數量已經變為0

綜上:我們簡單講述了消息隊列及RabbitMQ相關的壹些知識,以及我們如何通過nodejs來生產與消費消息。