Home Post BigData

Apache Kafka : Setup Multi Broker Kafka Cluster on Amazon EC2

Mar 31, 2024

In this article, we'll look at how to set up a three-node Apache Kafka cluster on AWS EC2 instances.

Let's assume we already have three EC2 instances up and running with the following public IPs:

13.250.8.119
13.0.46.70
192.0.185.170

Please ensure that the 9092 port on each machine is open, as the Kafka cluster requires it to communicate with the client.

Setup OpenJDK 17

The first step is to install OpenJDK 17 on each server, as shown below:

sudo apt update && sudo apt upgrade -y
apt-cache search openjdk
sudo apt-get install openjdk-17-jdk
java --version

openjdk 17.0.4 2022-07-19
OpenJDK Runtime Environment (build 17.0.4+8-Ubuntu-122.04)
OpenJDK 64-Bit Server VM (build 17.0.4+8-Ubuntu-122.04, mixed mode, sharing)

Install ZooKeeper

Zookeeper is a centralized service to handle distributed synchronization.

ZooKeeper is primarily used by Kafka to track the status of nodes in the cluster and maintain a list of Kafka topics and messages.

Let's assume we already have a Zookeeper ensemble up and running with following public IPs:

113.250.8.119:2181
113.0.46.70:2181
113.0.185.170:2181

Install Kafka

There are only two requirements in the broker configuration to allow multiple Kafka brokers to join a single cluster.

1) All brokers must have the same configuration for the "zookeeper.connect" parameter.

2) All brokers in the cluster must have a unique value for the "broker.id" parameter.

Download Kafka from the release page and move it to "/usr/local/" directory as shown below.

https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
tar -zxf kafka_2.12-3.2.0.tgz
sudo mv kafka_2.12-3.2.0 /usr/local/kafka

Also create a "kafka" folder under "/var/lib/", this will act as a data directory (log.dirs) for kafka.

sudo mkdir -p /var/lib/kafka

If the ips of the servers in the cluster are 13.250.8.119, 13.0.46.70, and 192.0.185.170, the configuration file "server.properties under "/usr/local/kafka/config/" should have following four properties changed to new values as shwon below:

vi /usr/local/kafka/config/server.properties"

1) Node 15.232.8.517

. . .
broker.id=1
advertised.listeners=PLAINTEXT://ec2-13-259-8-119.ap-south-1.compute.amazonaws.com:9092
log.dirs=/var/lib/kafka
zookeeper.connect=113.250.8.119:2181,113.0.46.70:2181,113.0.185.170:2181
. . .

2) Node 15.232.46.302

. . .
broker.id=2
advertised.listeners=PLAINTEXT://ec2-13-0-46-70.ap-south-1.compute.amazonaws.com:9092
log.dirs=/var/lib/kafka
zookeeper.connect=113.250.8.119:2181,113.0.46.70:2181,113.0.185.170:2181
. . .

3) Node 15.0.185.130

. . .
broker.id=3
advertised.listeners=PLAINTEXT://ec2-192-0-185-170.ap-south-1.compute.amazonaws.com:9092
log.dirs=/var/lib/kafka
zookeeper.connect=113.250.8.119:2181,113.0.46.70:2181,113.0.185.170:2181
. . .

Note: The "advertised.listeners" expects "Public IPv4 DNS", you can find it from AWS EC2 dashboard.

Once these steps are complete, start up the servers with below mentioned command and the nodes should communicate with one another in a cluster.

export KAFKA_HEAP_OPTS="-Xmx512m -Xms256m"
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

In order to test if the cluster is running correctly, we can try to create a topic as shown below:

/usr/local/kafka/bin/kafka-topics.sh --create --topic demo-topic --bootstrap-server ec2-13-259-8-119.ap-south-1.compute.amazonaws.com:9092 --replication-factor 3 --partitions 5

/usr/local/kafka/bin/kafka-topics.sh --describe --topic demo-topic --bootstrap-server ec2-13-259-8-119.ap-south-1.compute.amazonaws.com:9092
Topic: demo-topic    TopicId: J3wLwYv_TPi1J2JEyrJK5g    PartitionCount: 5    ReplicationFactor: 3    Configs: segment.bytes=1073741824
    Topic: demo-topic    Partition: 0    Leader: 2    Replicas: 1,2,3    Isr: 3,2,1
    Topic: demo-topic    Partition: 1    Leader: 2    Replicas: 2,3,1    Isr: 3,2,1
    Topic: demo-topic    Partition: 2    Leader: 3    Replicas: 3,1,2    Isr: 3,2,1
    Topic: demo-topic    Partition: 3    Leader: 3    Replicas: 1,3,2    Isr: 2,3,1
    Topic: demo-topic    Partition: 4    Leader: 2    Replicas: 2,1,3    Isr: 3,2,1

Here we are all done with our Kafka cluster setup.

Configuration

There are other optional configuration parameters used when running a cluster.

These configurations are discussed briefly in the table below:

                                                                                                                                               
ConfMeaningDefault
broker.idan integer identifier for broker, must be unique for each broker within a cluster0
listenerscomma-separated list of URIs, e.g. TEXT://localhost:9092, SSL://:9091not enabled
zookeeper.connectsemicolon-separated list of zookeeperlocalhost:2181
log.dirsdirectory to store log segments, comma-separated list of paths on the local system/tmp/kafka-logs
num.recovery.threads.per.data.dirnumber of thread per log directory for handling log segments during startup and shutdown1
num.partitionshow many partitions a new topic is created with1
log.retention.hoursMinimum age of a log file to be eligible for deletion due to age.

168
log.segment.bytesThe maximum size of a log segment file. When this size is reached a new log segment will be created.

1073741824 (1 GB)

If more than one path is specified in "log.dirs", the broker will store partitions on them in a "least-used" fashion, with one partition's log segments stored within the same path.

The broker will place a new partition in the path that has the least number of partitions currently stored in it, not the least amount of disk space used, so an even distribution of data across multiple directories is not guaranteed.

avatar

NK Chauhan

NK Chauhan is a Principal Software Engineer with one of the biggest E Commerce company in the World.

Chauhan has around 12 Yrs of experience with a focus on JVM based technologies and Big Data.

His hobbies include playing Cricket, Video Games and hanging with friends.

Categories
Spring Framework
Microservices
BigData
Core Java
Java Concurrency