Client Library
CrimsonQ has an official Nodejs library at the moment and the rest are being developed. The current contribution team will provide client libraries for NodeJS + Typescript, Go and Python and would be happy to receive some community support on other libraries. Read the Advanced section on how to create a library based off of any Redis client, it is straight forward and we would appreciate the support.
NodeJS
The client library for both Nodejs and Typescript can be found here at the CrimsonQ Client Git Repo.
To get started with using it all you need to do is install it
$ npm install crimsonq --save
Once you have installed the module you can then require it to use it in your code, whether as a producer or consumer you need to require it as follows; let CrimsonQ = require("./lib/crimsonq");
Once required you will need to setup the connection as follows;
//Initiate an Instance Consumer
let CQ = new CrimsonQ({
port: 9001, // Redis port
host: "127.0.0.1", // Redis host
password: "crimsonQ!",
});
//Connect and await for the connect
await CQ.connect();
Using it as a Producer
To use the client as a Producer all you need to do is create a new producer for the client instance as follows;
let producer = CQ.Producer();
Then you can use the producer to publish messages. There are 2 ways you can publish messages, either directly to a consumer on its id, or to multiple consumers through a MQTT like topics, see MQTT topic matching for details. Below are the commands that can be used against the producer.
producer.pushToConsumer(consumerId, message)
Send messages to a consumer's queue directly into its queue and will be in the pending state awaiting to be pulled.
Kind: instance method of
Producer
consumerId
string
The consumer Id that will receive the message
message
Object
The message object that needs to be pushed to consumers
producer.pushToTopic
(topic, message) Send message to the topic queue and this will provide a message copy for each consumer in its queue.
Kind
: instance method of
Producer
topic
string
The topic that will be used to send to consumers listening to the topic. The topic can have MQTT style wildcards.
message
Object
The message object that needs to be pushed to consumers
Using it as a Consumer
The consumer will read messages out of its own queue, all the messages that have been sent to the consumer will be accessible through the consumer part of the library. To get started you will need to setup the consumer as follows;
let consumer = CQ.Consumer("I_WANT_MESSAGES");
await consumer.init(["/path1/sub2", "/path2/#"], 1);
You can setup the consumer by creating an instance and giving it a unique ID, make sure that each consumer you have has a different ID, if you need 2 consumers to read the same messages, create the consumer twice with different IDs but same topics, then push a message to a topic that matches both consumers.
Once you have gotten the consumer setup now its a matter of waiting for messages
consumer.events.on("message", async function (msg) {
try {
//DO something with the msg data here, when done make sure to call done to tell
//the queue that you have consumed it properly.
await msg.done()
} catch (e) {
//If the above failes, you can call msg.fail and send the reason of failure back
//to the queue.
msg.fail(e.message)
}
})
Auto-reconnect
This client is based on ioredis and has the same methodology, if a connection is interrupted it will reconnect and also be up and functional. The client will also start to pull the messages that the client missed when was online.
Example
let CrimsonQ = require("../lib/crimsonq");
/**
* Create crimsonQ connection
* Create a new consumer / or update existing one
* listen to consumer events
* create a new producer to send messages to the consumer
*/
async function run() {
// Connect to crimsonQ
let CQ = new CrimsonQ({
port: 9001, // Redis port
host: "127.0.0.1", // Redis host
password: "crimsonQ!",
});
await CQ.connect();
// Create w new Consumer
let consumer = await CQ.Consumer("MyConsumer");
// init consumer with consumer topics , and concurrency for pulled messages from the queue;
await consumer.init(["MyConsumer/topic1", "MyConsumer/topic2"], 1);
// Get Consumer topics
await consumer.getTopics();
// Check crimsonq.md for more methods
// Listen to the consumer events messages
consumer.events.on("message", async function (msg) {
try {
//DO something with the msg data here, when done make sure to call done to tell
//the queue that you have consumed it properly.
await msg.done()
} catch (e) {
//If the above failes, you can call msg.fail and send the reason of failure back
//to the queue.
msg.fail(e.message)
}
})
// Create a producer to send messages into consumer , or consumer topic!
let producer = await CQ.Producer();
// Push message into MyConsumer
await producer.pushToConsumer("MyConsumer", { messageData: "My First Consumer message " });
//Push message into a specific topic
await producer.pushToTopic("MyConsumer/topic1", { messageData: "My first topic message" });
// Check consumer messages counts
console.log(await consumer.messageCountByStatus());
}
run();
Typescript
Go
This is currently under development
Python
This is currently under development
Last updated
Was this helpful?