-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.js
100 lines (87 loc) · 2.8 KB
/
consumer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
class Consumer {
constructor(options) {
// additional packages
this.amqp = require("amqplib");
this.EventEmitter = require("events");
this.uuid = require("uuid");
// client id
this.correlationId = options.correlationId || this.uuid.v4();
this.channel = null;
// options
this.connectionAdress = options.connectionAdress || null;
this.connection = null;
this.queue = options.queue || "";
this.replyTo = options.replyTo || "amq.rabbitmq.reply-to";
this.contentType = options.contentType
? options.contentType == "application/json"
? "application/json"
: "application/textplain"
: "application/textplain";
this.responseType = options.contentType
? options.contentType == "application/json"
? "application/json"
: "application/textplain"
: "application/textplain";
// If provided adress to connect, connect and return available methods
if (this.connectionAdress) {
return this.connect(this.connectionAdress);
}
return new Promise((resolve) => resolve(this));
}
async connect(connection_adress) {
return this.amqp
.connect(connection_adress)
.then((connection) => {
this.connectionAdress = connection_adress;
this.connection = connection;
this.channel = connection.createChannel();
return new Promise((resolve) => resolve(this.channel));
})
.then((channel) => {
this.channel = channel;
this.channel.responseEmitter = new this.EventEmitter();
this.channel.responseEmitter.setMaxListeners(0);
this.channel.consume(
this.replyTo,
(msg) => {
this.channel.responseEmitter.emit(
msg.properties.correlationId,
this.responseType === "application/json"
? Buffer.from(
JSON.stringify(msg.content.toString("utf8"))
).toJSON()
: Buffer.from(msg.content).toString("utf8")
);
},
{ noAck: true }
);
return this;
});
}
async send(message) {
if (typeof message === "object") this.contentType = "application/json";
message =
this.contentType === "application/json"
? Buffer.from(JSON.stringify(message))
: Buffer.from(message);
return new Promise((resolve) => {
this.channel.responseEmitter.once(this.correlationId, resolve);
this.channel.sendToQueue(this.queue, message, {
correlationId: this.correlationId,
replyTo: this.replyTo,
});
});
}
}
new Consumer({
queue: "example4",
}).then((client) => {
client
.connect("amqp://localhost")
.then((client) => {
return client.send({ text: "hello" });
})
.then((response) => {
console.log(response);
});
});