Apache Kafka : Setup Multi Broker Kafka Cluster on Amazon EC2
- 4.7/5
- 6147
- Jul 20, 2024
In this article, we'll look at how to set up a three-node Apache Kafka cluster on AWS EC2 instances.
Let's assume we already have three EC2 instances up and running with the following public IPs:
13.250.8.119 13.0.46.70 192.0.185.170
Please ensure that the 9092 port on each machine is open, as the Kafka cluster requires it to communicate with the client.
Setup OpenJDK 17
The first step is to install OpenJDK 17 on each server, as shown below:
sudo apt update && sudo apt upgrade -y apt-cache search openjdk sudo apt-get install openjdk-17-jdk
java --version openjdk 17.0.4 2022-07-19 OpenJDK Runtime Environment (build 17.0.4+8-Ubuntu-122.04) OpenJDK 64-Bit Server VM (build 17.0.4+8-Ubuntu-122.04, mixed mode, sharing)
Install ZooKeeper
Zookeeper is a centralized service to handle distributed synchronization.
ZooKeeper is primarily used by Kafka to track the status of nodes in the cluster and maintain a list of Kafka topics and messages.
Let's assume we already have a Zookeeper ensemble up and running with following public IPs:
113.250.8.119:2181 113.0.46.70:2181 113.0.185.170:2181
Install Kafka
There are only two requirements in the broker configuration to allow multiple Kafka brokers to join a single cluster.
1) All brokers must have the same configuration for the "zookeeper.connect" parameter. 2) All brokers in the cluster must have a unique value for the "broker.id" parameter.
Download Kafka from the release page and move it to "/usr/local/" directory as shown below.
https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz tar -zxf kafka_2.12-3.2.0.tgz sudo mv kafka_2.12-3.2.0 /usr/local/kafka
Also create a "kafka" folder under "/var/lib/", this will act as a data directory (log.dirs) for kafka.
sudo mkdir -p /var/lib/kafka
If the ips of the servers in the cluster are 13.250.8.119, 13.0.46.70, and 192.0.185.170, the configuration file "server.properties under "/usr/local/kafka/config/" should have following four properties changed to new values as shwon below:
vi /usr/local/kafka/config/server.properties"
1) Node 15.232.8.517
. . . broker.id=1 advertised.listeners=PLAINTEXT://ec2-13-259-8-119.ap-south-1.compute.amazonaws.com:9092 log.dirs=/var/lib/kafka zookeeper.connect=113.250.8.119:2181,113.0.46.70:2181,113.0.185.170:2181 . . .
2) Node 15.232.46.302
. . . broker.id=2 advertised.listeners=PLAINTEXT://ec2-13-0-46-70.ap-south-1.compute.amazonaws.com:9092 log.dirs=/var/lib/kafka zookeeper.connect=113.250.8.119:2181,113.0.46.70:2181,113.0.185.170:2181 . . .
3) Node 15.0.185.130
. . . broker.id=3 advertised.listeners=PLAINTEXT://ec2-192-0-185-170.ap-south-1.compute.amazonaws.com:9092 log.dirs=/var/lib/kafka zookeeper.connect=113.250.8.119:2181,113.0.46.70:2181,113.0.185.170:2181 . . .
Note: The "advertised.listeners" expects "Public IPv4 DNS", you can find it from AWS EC2 dashboard.
Once these steps are complete, start up the servers with below mentioned command and the nodes should communicate with one another in a cluster.
export KAFKA_HEAP_OPTS="-Xmx512m -Xms256m" /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
In order to test if the cluster is running correctly, we can try to create a topic as shown below:
/usr/local/kafka/bin/kafka-topics.sh --create --topic demo-topic --bootstrap-server ec2-13-259-8-119.ap-south-1.compute.amazonaws.com:9092 --replication-factor 3 --partitions 5 /usr/local/kafka/bin/kafka-topics.sh --describe --topic demo-topic --bootstrap-server ec2-13-259-8-119.ap-south-1.compute.amazonaws.com:9092 Topic: demo-topic TopicId: J3wLwYv_TPi1J2JEyrJK5g PartitionCount: 5 ReplicationFactor: 3 Configs: segment.bytes=1073741824 Topic: demo-topic Partition: 0 Leader: 2 Replicas: 1,2,3 Isr: 3,2,1 Topic: demo-topic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 3,2,1 Topic: demo-topic Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,2,1 Topic: demo-topic Partition: 3 Leader: 3 Replicas: 1,3,2 Isr: 2,3,1 Topic: demo-topic Partition: 4 Leader: 2 Replicas: 2,1,3 Isr: 3,2,1
Here we are all done with our Kafka cluster setup.
Configuration
There are other optional configuration parameters used when running a cluster.
These configurations are discussed briefly in the table below:
Conf | Meaning | Default |
---|---|---|
broker.id | an integer identifier for broker, must be unique for each broker within a cluster | 0 |
listeners | comma-separated list of URIs, e.g. TEXT://localhost:9092, SSL://:9091 | not enabled |
zookeeper.connect | semicolon-separated list of zookeeper | localhost:2181 |
log.dirs | directory to store log segments, comma-separated list of paths on the local system | /tmp/kafka-logs |
num.recovery.threads.per.data.dir | number of thread per log directory for handling log segments during startup and shutdown | 1 |
num.partitions | how many partitions a new topic is created with | 1 |
log.retention.hours | Minimum age of a log file to be eligible for deletion due to age. |
168 |
log.segment.bytes | The maximum size of a log segment file. When this size is reached a new log segment will be created. |
1073741824 (1 GB) |
If more than one path is specified in "log.dirs", the broker will store partitions on them in a "least-used" fashion, with one partition's log segments stored within the same path.
The broker will place a new partition in the path that has the least number of partitions currently stored in it, not the least amount of disk space used, so an even distribution of data across multiple directories is not guaranteed.