Blog

KAFKA 무작정 따라하기 (1)

최근에 플랫폼을 구축하다가 데이터베이스에서 변경된 데이터를 가져와야 할 필요가 생겨서 찾아보다가 Kafka + MongoDB Change Stream을 활용하면 되겠다는 생각이 들어 찾아보게 되었다.
Kafka는 기존부터 관심이 많았었지만, 사례들을 찾다 보니 더 좋을거 같다는 판단이 들어 클러스터를 띄워보기로 했다.
로컬 환경에서는 docker-compose를 사용하여 손쉽게 띄웠지만, 실제 AWS EC2 위에서 docker를 사용하지 않고 띄우려고 하다 보니 삽질하는 상황이 많이 생겨서 정리해두려고 한다.
Confluent Community License에 포함된 REST Proxy, ksqldB, Schema Registry 까지 적용해보려고 한다.

kafka 무작정 따라하기

기본 설치

openjdk 설치

$ cd /opt $ wget https://download.java.net/openjdk/jdk11/ri/openjdk-11+28_linux-x64_bin.tar.gz $ tar -zxvf openjdk-11+28_linux-x64_bin.tar.gz $ sudo ln -s /opt/jdk-11 /usr/local/java
Plain Text
복사

confluent community 설치

$ curl -O http://packages.confluent.io/archive/6.1/confluent-community-6.1.1.tar.gz $ tar -zxvf confluent-community-6.1.1.tar.gz $ mv confluent-6.1.1/ confluent $ sudo ln -s /opt/confluent /usr/local/confluent
Plain Text
복사

PATH 설정

$ sudo vim /etc/profile export JAVA_HOME=/usr/local/java export PATH=$PATH:$JAVA_HOME/bin:/usr/local/confluent/bin
Plain Text
복사

설정 파일

/var/lib/zookeeper/myid

1
Plain Text
복사
sudo sh -c "echo 1 > /var/lib/zookeeper/myid"
1 부분은 각 zookeeper 서버마다 다르게 설정해야 한다

/etc/hosts

0.0.0.0 zookeeper-1 {PRIVATE-IP-1} zookeeper-2 {PRIVATE-IP-2} zookeeper-3 0.0.0.0 broker-1 {PRIVATE-IP-2} broker-2 {PRIVATE-IP-3} broker-3
Plain Text
복사
sudo vim /etc/hosts 명령어를 실행한 후 맨 아래에 내용을 추가한다

/usr/local/confluent/etc/kafka/zookeeper.properties

dataDir=/var/lib/zookeeper maxClientCnxns=0 admin.enableServer=false initLimit=5 syncLimit=2 server.1=zookeeper-1:2888:3888 server.2=zookeeper-2:2888:3888 server.3=zookeeper-3:2888:3888
Plain Text
복사

/usr/local/confluent/etc/kafka/server.properties

######### 각 서버마다 고유한 ID 값을 설정해야한다. ######### broker.id=0 ################################################### listeners=INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092 # EC2 의 Public IP 를 설정해야 한다. advertised.listeners=INTERNAL://:19092,EXTERNAL://{MY-PUBLIC-IP}:9092 listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT inter.broker.listener.name=INTERNAL # 토픽 자동 생성 옵션은 꺼두는게 좋다 auto.create.topics.enable=false # 토픽 삭제 활성화 delete.topic.enable=true zookeeper.connect=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
Plain Text
복사

/usr/local/confluent/etc/kafka-rest/kafka-rest.properties

zookeeper.connect=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
Plain Text
복사

/usr/local/confluent/etc/schema-registry/schema-registry.properties

listeners=http://0.0.0.0:8081 kafkastore.connection.url=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181 kafkastore.topic=_schemas
Plain Text
복사

/usr/local/confluent/etc/kafka/connect-distributed.properties

bootstrap.servers=broker-1:9092,broker-2:9092,broker-3:9092 group.id=connect-connect-group key.converter=org.apache.kafka.connect.storage.StringConverter key.converter.schemas.enable=true value.converter=io.confluent.connect.avro.AvroConverter value.converter.schemas.enable=true value.converter.schema.registry.url=http://kafka.private.ddocdoc.kr:8081 rest.host.name=0.0.0.0 rest.port=8083
Plain Text
복사

실행하기

zookeeper

실행하기
$ zookeeper-server-start -daemon /usr/local/confluent/etc/kafka/zookeeper.properties
Plain Text
복사
종료하기
$ zookeeper-server-stop
Plain Text
복사

kafka

실행하기
$ kafka-server-start -daemon /usr/local/confluent/etc/kafka/server.properties
Plain Text
복사
종료하기
$ kafka-server-stop
Plain Text
복사

schema-registry

실행하기
$ schema-registry-start -daemon /usr/local/confluent/etc/schema-registry/schema-registry.properties
Plain Text
복사
종료하기
$ schema-registry-stop
Plain Text
복사

kafka-rest

실행하기
$ kafka-rest-start -daemon /usr/local/confluent/etc/kafka-rest/kafka-rest.properties
Plain Text
복사
종료하기
$ kafka-rest-stop
Plain Text
복사

kafka-connect

실행하기
$ connect-distributed -daemon /usr/local/confluent/etc/kafka/connect-distributed.properties
Plain Text
복사

주의사항

confluent 가이드만 보고 따라하다가 방화벽 등을 생각하지 않아서 하루정도 삽질을 했다고 한다.
서버를 실행할 때 -daemon 옵션을 제거하고 실행해본 후, 정상 동작을 확인한 후 -daemon 옵션을 설정하는 것이 편하다.

Security Group

AWS EC2 위에 설치를 완료했다면, 실행하기 전에 반드시 Security Group을 확인해줘야 하며, 아래의 포트들이 열려있어야 한다.
연결은 되는데 console-consumer 혹은 console-producer를 사용할 때 LEADER_NOT_AVAILABLE 에러를 발견하게 되면 아래 포트들로 정상 통신이 가능한지 확인해봐야 한다.
TCP 2888: zookeeper sync
TCP 3888: zookeeper leader
TCP 2181: zookeeper client
TCP 9092: kafka broker
TCP 8082: kafka-rest
TCP 8083: kafka-connect
TCP 8081: schema-registry

cli 명령어

kafka-topics

// 토픽 생성 $ kafka-topics --create --zookeeper zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181 --topic test-1 --partitions 1 --replication-factor 3 // 토픽 목록 조회 $ kafka-topics --list --zookeeper zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181 // 토픽 조회 $ kafka-topics --describe --zookeeper zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181 --topic test-1 // 토픽 삭제 $ kafka-topics --delete --zookeeper {IP}:2181 --topic test-1
Plain Text
복사

kafka-console-consumer

$ kafka-console-consumer --bootstrap-server {IP}:9092 --topic test-1
Plain Text
복사

kafka-console-producer

$ kafka-console-producer --bootstrap-server {IP}:9092 --topic test-1 > Hello > World
Plain Text
복사

kafka-connector topic 만들기

$ kafka-topics --create --bootstrap-server {IP}:9092 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact $ kafka-topics --create --bootstrap-server {IP}:9092 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact $ kafka-topics --create --bootstrap-server {IP}:9092 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact
Plain Text
복사