Client Library

CrimsonQ has an official Nodejs library at the moment and the rest are being developed. The current contribution team will provide client libraries for NodeJS + Typescript, Go and Python and would be happy to receive some community support on other libraries. Read the Advanced section on how to create a library based off of any Redis client, it is straight forward and we would appreciate the support.

NodeJS

The client library for both Nodejs and Typescript can be found here at the CrimsonQ Client Git Repo.

To get started with using it all you need to do is install it

$ npm install crimsonq --save

Once you have installed the module you can then require it to use it in your code, whether as a producer or consumer you need to require it as follows; let CrimsonQ = require("./lib/crimsonq"); Once required you will need to setup the connection as follows;

//Initiate an Instance Consumer 
let  CQ = new  CrimsonQ({
    port:  9001, // Redis port
    host:  "127.0.0.1", // Redis host
    password:  "crimsonQ!",
});
//Connect and await for the connect 
await CQ.connect();

Using it as a Producer

To use the client as a Producer all you need to do is create a new producer for the client instance as follows;

let producer = CQ.Producer();

Then you can use the producer to publish messages. There are 2 ways you can publish messages, either directly to a consumer on its id, or to multiple consumers through a MQTT like topics, see MQTT topic matching for details. Below are the commands that can be used against the producer.

producer.pushToConsumer(consumerId, message)

Send messages to a consumer's queue directly into its queue and will be in the pending state awaiting to be pulled.

Kind: instance method of Producer

Param
Type
Description

consumerId

string

The consumer Id that will receive the message

message

Object

The message object that needs to be pushed to consumers

producer.pushToTopic(topic, message) Send message to the topic queue and this will provide a message copy for each consumer in its queue.

Kind: instance method of Producer

Param
Type
Description

topic

string

The topic that will be used to send to consumers listening to the topic. The topic can have MQTT style wildcards.

message

Object

The message object that needs to be pushed to consumers

Using it as a Consumer

The consumer will read messages out of its own queue, all the messages that have been sent to the consumer will be accessible through the consumer part of the library. To get started you will need to setup the consumer as follows;

let  consumer = CQ.Consumer("I_WANT_MESSAGES");
await consumer.init(["/path1/sub2", "/path2/#"], 1);

You can setup the consumer by creating an instance and giving it a unique ID, make sure that each consumer you have has a different ID, if you need 2 consumers to read the same messages, create the consumer twice with different IDs but same topics, then push a message to a topic that matches both consumers.

Once you have gotten the consumer setup now its a matter of waiting for messages

consumer.events.on("message", async function (msg) {
        try {
	         //DO something with the msg data here, when done make sure to call done to tell
	         //the queue that you have consumed it properly.  
             await msg.done()
        } catch (e) {
            //If the above failes, you can call msg.fail and send the reason of failure back 
            //to the queue. 
            msg.fail(e.message)
        }
    })

Auto-reconnect

This client is based on ioredis and has the same methodology, if a connection is interrupted it will reconnect and also be up and functional. The client will also start to pull the messages that the client missed when was online.

Example

let CrimsonQ = require("../lib/crimsonq");
/**
 * Create crimsonQ connection
 * Create a new consumer / or update existing one
 * listen to consumer events 
 * create a new producer to send messages to the consumer 
 */

async function run() {
    // Connect to crimsonQ 
    let CQ = new CrimsonQ({
        port: 9001, // Redis port
        host: "127.0.0.1", // Redis host
        password: "crimsonQ!",
    });
    await CQ.connect();

    // Create w new Consumer 
    let consumer = await CQ.Consumer("MyConsumer");
    // init consumer with consumer topics , and concurrency for pulled messages from the queue;
    await consumer.init(["MyConsumer/topic1", "MyConsumer/topic2"], 1);

    // Get Consumer topics
    await consumer.getTopics();

    // Check crimsonq.md for more methods   

    // Listen to the consumer events messages 
    consumer.events.on("message", async function (msg) {
        try {
            //DO something with the msg data here, when done make sure to call done to tell
            //the queue that you have consumed it properly.  
            await msg.done()
        } catch (e) {
            //If the above failes, you can call msg.fail and send the reason of failure back 
            //to the queue. 
            msg.fail(e.message)
        }
    })


    // Create a producer to send messages into consumer , or consumer topic!
    let producer = await CQ.Producer();
    // Push message into MyConsumer
    await producer.pushToConsumer("MyConsumer", { messageData: "My First Consumer message " });
    //Push message into a specific topic 
    await producer.pushToTopic("MyConsumer/topic1", { messageData: "My first topic message" });

    // Check consumer messages counts
    console.log(await consumer.messageCountByStatus());
}

run();

Typescript

Documentation is under development

Go

Python

Last updated

Was this helpful?