NodeJs producing messages in and consuming messages from an Apache Kafka topic

Introduction

In this post, I’m going to install Apache Kafka on Linux Mint, produce some Kafka messages from server-side JavaScript in NodeJs using the kafka-node package and then consume them from other NodeJs programs.

 

Assumptions

This post builds on previous ones I’ve written up recently. If you want to follow along, the assumptions are:

  • You have followed all the steps to install Vagrant and Cygwin as described in this older post, right up to and including the section “Test Go in Cygwin”;
  • You installed Linux Mint 18 as described in this post as described in the first section “Getting a Vagrant Linux Mint virtual image”;
  • You installed Visual Studio Code and NodeJs as described in this post;
  • You are comfortable using a Linux command prompt.

 

Steps we will follow

  • Install Kafka on the Linux Mint virtual image
  • Start Zookeeper
  • Test Zookeeper
  • Start a Kafka broker
  • Create a topic
  • List topics
  • Produce a message in the topic using the Kafka shell script
  • Consume the message in the topic using the Kafka shell script
  • Create a NodeJs script in Visual Studio Code to produce a topic message
  • Create a NodeJs script that consumes only the latest message in the topic
  • Create a NodeJs script that consumes all the messages from the beginning of the topic
  • Create a NodeJs script that consumes a message with a specific index

 

Install Kafka on the Linux Mint virtual image

So let’s go back to my Linux Mint 18 Vagrant virtual image. I open a Cygwin prompt in my Windows 7 host:

04

And launch the Linux Mint virtual image with the following commands :

cd /cygdrive/c/gocode/mint18

vagrant up

Vagrant does its thing:

04.png

And I can log in to my Linux Mint virtual machine. Please recall I set the password a couple of posts ago by doing the following commands in Cygwin after doing a “vagrant up”:

vagrant ssh

sudo passwd vagrant

05.png

Remember this Linux virtual machine has the convenience of a synchronized folder with its Windows host at /vagrant

Open a Linux terminal and do the following commands to download and unarchive Apache Kafka, preserving file attributes :

cd /vagrant

wget http://apache.mirror.iweb.ca/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz

tar -xzf kafka_2.11-0.10.2.0.tgz

cd kafka_2.11-0.10.2.0

06.png

Start Zookeeper

From here on out, I am going to use the default configuration files and shell scripts that come with Kafka. Also bundled with Kafka is an instance of Apache Zookeeper. A zookeper server will be used to oversee and dispatch our Kafka brokers.

Kafka did not require any additional dependencies to be set up on our Linux Mint virtual image, which already came with a Java runtime. My references mentioned a scala runtime was also required, but this was not my experience.

Zookeeper needs to be started first with the command:

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

It starts out very verbose :

07

but settles down and settles in to wait:

08.png

Test Zookeeper

Minimize this Linux terminal and open a second one. Open a telnet connection to the Zookeeper server with the command:

telnet localhost 2181

09.png

Next, while connected, get its status with command:

stat

10.png

 

Start a Kafka broker

Still in our second prompt, I will start a first broker with the commands:

cd /vagrant/kafka_2.11-0.10.2.0/

./bin/kafka-server-start.sh ./config/server.properties

Once again, the program has quite a lot to say but eventually it stops and waits.

11.png

Create a topic

Right now, we have a Zookeeper server started in the first Linux terminal and a Kafka broker started in a second Linux terminal.

Minimize the second terminal and open a third Linux terminal. We are going to create a messaging topic named “bertrandszoghytopic” on just one partition with the following commands:

cd /vagrant/kafka_2.11-0.10.2.0/

# in one line:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic bertrandszoghy

12.png

List topics

Still in the third Linux terminal, we are going to list all the topics currently in our broker. Unsurprisingly, there is only one:

./bin/kafka-topics.sh --list --zookeeper localhost:2181

13.png

Produce a message in the topic using the Kafka shell script

Still in the third Linux terminal, we are going to add (produce) a message in our topic with the one-line command:

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic bertrandszoghytopic

This puts the prompt in a waiting mode:

14.png

Whatever we will type here will be stored as a message in this topic. I will type:

first color is blue

and hit ENTER

second color is red

and hit ENTER

Leave the prompt open.

Consume the message in the topic using the Kafka shell script

Open a fourth Linux terminal and type the following commands:

cd /vagrant/kafka_2.11-0.10.2.0/

# in one line:

./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic bertrandszoghytopic --from-beginning

The first two messages are listed and the prompt remains open:

15

Now let’s go back to the third prompt and type in another message in our Producer shell script:

third color is beige

Immediately, this new message is caught and displayed in our Consumer shell in the fourth Linux terminal:

16.png

Looking good. We are going to leave all four Linux terminals open and start coding some JavaScript.

Create a NodeJs script in Visual Studio Code to produce a topic message

We start Visual Studio Code from the Linux Mint GUI menu > Programming > Visual Studio Code:

17

We are going to re-use the same folder for our NodeJs scripts I used last post, located on the Linux virtual image under /vagrant/nodecode

First we are going to install the kafka-node plugin for NodeJs (or package if you prefer). Open the Visual Studio Code Integrated terminal by doing menu View > Integrated Terminal.

135.png

 This opens the command prompt inside Visual Studio Code:

10.png

Type the following commands:

cd /vagrant/nodecode

sudo npm install --no-bin-links kafka-node --save

Ignore the warnings about missing optional dependencies which we don’t care about:

18.png

Open file /vagrant/nodecode/package.json and make sure kafka-node is listred in the dependencies:

19.png

 

 

Create a NodeJs script in Visual Studio Code to produce a topic message

In Visual Studio Code, click the + icon to add a new file:

20.png

Name the file addmsg4.js

21.png

Here is the complete JavaScript code for addmsg4.js

var kafka = require('kafka-node'),
    Producer = kafka.Producer,
    KeyedMessage = kafka.KeyedMessage,
    client = new kafka.Client(),
    producer = new Producer(client),
    km = new KeyedMessage('key', 'message'),
    payloads = [
        { topic: 'bertrandszoghytopic', messages: 'fourth color is yellow', partition: 0 },
        { topic: 'bertrandszoghytopic', messages: 'fifth color is green', partition: 0 }
     
    ];
producer.on('ready', function () {
    producer.send(payloads, function (err, data) {
        console.log(data);
        process.exit(0);
    });
});
 
producer.on('error', function (err) {
console.log('ERROR: ' + err.toString();
})

In the Visual Studio Code Integrated Terminal, run command:

node admsg4

The following result is displayed:

01.png

If we go back and view the contents the fourth Linux terminal where our Shell script Consumer is still running, we can see these two new messages have been received:

02.png

 

Create a NodeJs script that consumes only the latest message in the topic

Most frequently, you want to receive only the latest message in a topic. Let’s add a new file, call it receivelatestmsg.js, and her is the complete listing:

var options = {
    fromOffset: 'latest'
};

var kafka = require('kafka-node'),
    Consumer = kafka.Consumer,
    client = new kafka.Client(),
    consumer = new Consumer(
        client,
        [
            { topic: 'bertrandszoghytopic', partition: 0 }
        ],
        [
		{
			autoCommit: false
		},
		options =
		{
			fromOffset: 'latest'
		}
        ]
    );

consumer.on('message', function (message) 
{
    console.log(message);
});

consumer.on('error', function (err) 
{
    console.log('ERROR: ' + err.toString());
});

.

If we run the following command in the Visual Studio Code Integrated Terminal:

node receivelatestmsg

The result is we receive ALL the mesages so far and it does not exit, waiting:

03.png

Let’s halt the program with CTRL-C, clear the screen and run the same command again. This time, nothing is displayed. Under the covers, we are keeping track of which messages have been consumed already:

04.png

Let’s leave this program running and open up our third Linux terminal, which is still running our Producer shell script. Let’s type a new message:

sixth color is aquamarine

And hit the ENTER key

05

If we return to our Visual Studio Code Integrated Terminal, we see this message has been received:

06.png

Halt the program with CTRL-C.

 

Create a NodeJs script that consumes all the messages from the beginning of the topic

By default, Apache Kafka retains messages in a topic for a full week before they start dropping off. This is by design, to support multiple lazy loading clients.

We can therefore create a new NodeJs JavaScript file called receiveallmsgs.js and here is the complete listing:

var kafka = require('kafka-node'),
    Consumer = kafka.Consumer,
    client = new kafka.Client("localhost:2181/"),
    consumer = new Consumer(
        client,
        [
              { topic: 'bertrandszoghytopic', partition: 0, offset: 0 }
        ],
        { fromOffset: true }         
    );

consumer.on('message', function (message) 
{
    console.log(message);
});

consumer.on('error', function (err) 
{
    console.log('ERROR: ' + err.toString());
});

If we run the following command in the Visual Studio Code Integrated Terminal:

node receiveallmsgs

We get what we expect and the program stops and waits:

07.png

That’s pretty good. But what if we want it to list all current and to exit? The trick will be to extract the value of highWaterOffset, which is the index of the next mesage to be received in this topic.

Halt the program with CTRL-C.

Let’s create a new file called displayallandexit.js and here is the complete listing:

var kafka = require('kafka-node'),
    Consumer = kafka.Consumer,
    client = new kafka.Client("localhost:2181/"),
    consumer = new Consumer(
        client,
        [
              { topic: 'bertrandszoghytopic', partition: 0, offset: 0 }
        ],
        { fromOffset: true }         
    );

consumer.on('message', function (message) 
{
    console.log(message);

    // extract the highWaterOffset
    console.log("highWaterOffset is " + message.highWaterOffset);
    console.log("index of this message is " + message.offset);
    
    if(message.offset === (message.highWaterOffset - 1))
    {
        console.log('Exiting');
        process.exit(0);
    }
});

consumer.on('error', function (err) 
{
    console.log('ERROR: ' + err.toString());
});

The new code is in purple.

This is a little bit like what we did in our previous post except that in this case, the message is a full blown object we can use directly, not a JSON string that needs to be parsed.

If we run the following command in the Visual Studio Code Integrated Terminal:

node displayallandexit

We obtain the desired results and the program exits cleanly:

08.png

 

Create a NodeJs script that consumes a message with a specific index

We can modify the previous example just a bit to specify a selected zero-based index. The tricky thing to remember is that Kaka calls it an offset instead of an index.

For a final example, if we only wanted to display the third message only (i.e. the one with offset 2 that says “third color is beige”) , we could create a new JavaScript file called  getthirdmessage.js containing the following:

var kafka = require('kafka-node'),
    Consumer = kafka.Consumer,
    client = new kafka.Client("localhost:2181/"),
    consumer = new Consumer(
        client,
        [
              { topic: 'bertrandszoghytopic', partition: 0, offset: 0 }
        ],
        { fromOffset: true }         
    );

consumer.on('message', function (message) 
{
    if(message.offset === 2)
    {
        console.log(message);
        process.exit(0);
    }
});

consumer.on('error', function (err) 
{
    console.log('ERROR: ' + err.toString());
});

 

If we run the following command in the Visual Studio Code Integrated Terminal:

node getthirsmsg

We obtain the desired results and the program exits cleanly:

09.png

(end of post)

Bertrand Szoghy, June 2017.

 

 

Advertisements

One thought on “NodeJs producing messages in and consuming messages from an Apache Kafka topic

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s