Set up Apache Kafka and working with Java Client

Apache Kafka

Apache Kafka is a publish/subscribe messaging system, which is fast, distributed and having persisting data store over multiple clusters.

In our work, we needed to up Apache kafka for continuous data streaming. Here we have shown the basic work to setup Kafka and building a java client (producer) to send message and get those message from Kafka Console Consumer.

First the

Apache Kafka Configuration –

We have used kafka 2.10- and zookeeper-3.4.6 for our current work.

As Apache Kafka will run on cluster on single-node configuration also, first we need to configured Apache Zookeeper for Configuration Management.

The basic Zookeeper Standalone configuration in conf/zoo.cfg

dataDir=/<<Your Directory>>/zookeeper-kafka-out

We have set the default port for it. The port can be changed and all the scripts of Apache Kafka are to be modified as and when required.
To start Zookeeper, the command is –

sudo ./bin/ start

To Start Apache Kafka, the command is

bin/ config/

Topic in Apache Kafka
In Apache Kafka message producers and message consumers are related to per topic.
kafka topic creation –

bin/ –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –<<Your topic name>>

To list the kafka topic in console –

bin/ –list –zookeeper localhost:2181

To show the messages consumed, we can open kafka consumer in console –
bin/ –zookeeper localhost:2181 –topic internationalCDR –from-beginning

If we want to make Kafka Producer from console, we need to open one with the following one –

bin/ –broker-list localhost:9092 –topic <<your topic name>>

In the above section, we are in respective kafka and zookeeper directory.

The Java client – 

It is a Message Producer Program utility with java, tested for a stand-alone kafka. Codes are self explanatory.

package com.apt.server.utility;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;

public class KafkaUtility {

private final kafka.javaapi.producer.Producer<Integer, String> producer;
private final String topic;
private final Properties props = new Properties();

public KafkaUtility(String topic) {
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("", "localhost:9092");
props.put("zk.connect", "localhost:2181");
// Use random partitioner. Don’t need the key type. Just set it to Integer.
// The message is of type String.
producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
this.topic = topic;

public void sendMessage(String particularMesage)
KeyedMessage<Integer, String> km = new KeyedMessage<Integer, String>(this.topic, particularMesage);

public static void main(String[] args) {
KafkaUtility ku = new KafkaUtility("indianCDR");
ku.sendMessage("Sample Message to India CDR");



If you find this article helpful, you can connect us in Google+ and Twitter.

Leave a Reply

Your email address will not be published. Required fields are marked *