Post

06 kafka 장애대응

06 kafka 장애대응

1. kafka 인프라 관련 개념

1. 전체 흐름

내 그림

2. 노드(node)

  • 카프카가 설치되어 있는 서버 단위를 의미
  • 노드가 고장나게 되면 메시지를 전달하는 것 자체가 막히기 때문에 서비스 장애가됨
  • 실무에서는 위 그림과 같이 노드를 1대만 두지 않고, 최소 3대의 노드를 구성

3. 클러스터(cluster)

  • 유기적으로 작동하는 노드들을 묶어서 클러스터
  • 여러 대의 서버가 연결되어 하나의 시스템처럼 동작하는 서버들의 집합을 의미
  • 메시지를 나눠 저장 / 복제본을 생성해서 유지하는 패턴

4. 노드의 구성

1. 브로커(broker), 컨트롤러(controller)

  • kafka 서버는 크게 컨트롤러(controller)와 브로커(broker)로 구성
    • 브로커는 메시지를 저장하고 클라이언트의 요청을 처리하는 역할
    • 컨트롤러는 브로커들간의 연동과 전반적인 클러스터의 상태를 총괄
  • 노드에서 브로커는 9092번 포트 / 컨트롤러는 9093번 포트

2. 레플리케이션(replication)

  • 데이터의 안정성과 가용성을 높이기 위해 토픽의 파티션을 여러 노드에 복제하는 것
  • 복제된 파티션들은 리더 파티션(원본) / 팔로워 파티션(복제본) 구분
    • 리더 파티션은 프로듀서나 컨슈머가 직접적으로 메시지를 쓰고 읽는 파티션
    • 팔로워 파티션은 프로듀서나 컨슈머가 직접적으로 메시지를 쓰고 읽지 않음
    • 팔로워 파티션은 리더 파티션의 메시지를 실시간으로 복제하며 유지
  • 리더 파티션에 장애가 발생하면 팔로워 파티션이 리더 역할을 대신 수행

2. 카프카 클러스터 설정하기

1. 서버 프로퍼티스 변경(config/server.properties)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  # kafka 노드를 식별하는 ID
  node.id=1

  # 클러스터를 구성할 컨트롤러의 노드 주소 목록을 설정
  # (추가적인 노드의 컨트롤러를 19093, 29093번 포트에 실행시킬 예정)
  # 일반적으로 다른 컴퓨터에 설최되므로, EC2 Public IP는 다르고 포트번호가 같아짐
  controller.quorum.bootstrap.servers={EC2 Public IP}:9093,{EC2 Public IP}:19093,{EC2 Public IP}:29093

  # 브로커, 컨트롤러 프로세스를 실행시킬 포트를 지정
  # (브로커를 PLAINTEXT, 컨트롤러를 CONTROLLER라고 지칭)
  listeners=PLAINTEXT://:9092,CONTROLLER://:9093

  # 외부에서 접근할 수 있는 주소
  advertised.listeners=PLAINTEXT://{EC2 Public IP}:9092,CONTROLLER://{EC2 Public IP}:9093

  # kafka가 데이터(kafka 설정, 브로커가 받은 메시지, 로그 등)를 저장할 디렉터리 경로 설정
  # 동일한 컴퓨터에서 진행할 경우에는 디렉토리 구분 필요
  log.dirs=/tmp/kafka-logs-1

2. 카프카 실행

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  # 처음 실행하는 kafka 노드는 아래 명령어로 실행
  $ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
  $ KAFKA_CONTROLLER_ID="$(bin/kafka-storage.sh random-uuid)"
  $ bin/kafka-storage.sh format \ # 초기화
    -t $KAFKA_CLUSTER_ID \ # 클러스터 번호
 	  -c config/server.properties \  # 프로세스 ID
	  --initial-controllers "1@localhost:9093:$KAFKA_CONTROLLER_ID" 
 	  # 마지막 명령어는 클러스터의 최초 컨트롤러가 누구인지 등록하는 작업으로, 첫번째 노드만 사용함.
	  # 1 : node.id (이 서버 ID)
	  # localhost:9093 : 이 노드 주소
	  # $KAFKA_CONTROLLER_ID : 이 컨트롤러 프로세스 고유 ID
	 
	$ bin/kafka-storage.sh format \ # 초기화
    -t $KAFKA_CLUSTER_ID \ # 이 클러스터 소속, 이값은 최초 컴퓨터에서 복사해서 붙여넣기해야함.
	  -c config/server2.properties \ # 설정파일
	  --no-initial-controllers # 만들어진 클러스터에 참여함.

3. 컨트롤러 쿼럼 등록

1
2
3
4
5
6
7
8
9
 $ bin/kafka-metadata-quorum.sh \ 
	 --command-config config/server2.properties \
	 --bootstrap-server localhost:9092 \
	 add-controller
	 
 # 최초로 등록했던
 --initial-controllers "1@localhost:9093:$KAFKA_CONTROLLER_ID" 
 카프카가 죽을 경우
 다음 컨트롤러 후보군으로 등록함. 	 

4. 토픽 등록

1
2
3
4
5
6
 $ bin/kafka-topics.sh \
	--bootstrap-server localhost:9092 \
	--create \
	--topic email.send \
	--partitions 1 \
	--replication-factor 3 # 여기서 

3. 카프카 현황 및 관계

1. 현황 조회 CLI

1
2
3
4
5
6
  $ bin/kafka-topics.sh \
   --bootstrap-server localhost:9092 \
   --describe \
   --topic email.send
   
  # Replicas와 Isr에 3개의 숫자(1, 2, 3)가 다 있다면 3개의 Kafka 서버가 정상적으로 잘 연동되고 있다는 뜻
  • PartitionCount : 해당 토픽의 파티션 수
  • ReplicationFactor : 해당 토픽의 레플리케이션 수
  • Partition : 파티션 번호
  • Leader : 해당 토픽의 리더 파티션을 가지고 있는 노드 id
  • Replicas : 해당 토픽의 파티션을 복제하기로 설정된 노드들의 id
  • Isr(In-Sync Replicas) : 리더 파티션과 똑같은 상태로 복제(동기화)가 완료된 노드들의 id

2. Memo

1. 파티션 / 레플리케이션관계

1
2
  partitions = N
  replication.factor = M

2. 1:M

  • 파티션이 1개, 브로커가 여러개 일경우
  • 리더의 데이터를 다른 브로커가 가지고 있는 구조가됨.
  • 구성
브로커역할가지고 있는 데이터
서버1LeaderP0 원본 로그
서버2FollowerP0 복제 로그
서버3FollowerP0 복제 로그
서버MFollowerP0 복제 로그

3. N:1

  • 파티션이 N개, 브로커가 1개
  • 그러면 많은 파티션을 가진 브로커가 됨.
  • 구성
파티션LeaderFollower
P0서버1없음
P1서버1없음
P2서버1없음
서버1없음

4. 2:3

  • 설정에 따라서 각 서버는 모든 파티션의 데이터를 가지고 있을수도 없을 수도 있음.
    • 따라서 여러 서버를 운용중일때 특정 서버의 묶음이 죽으면, 일부 파티션의 데이터 손실이 있을 수도 있음.
    • 그렇다고 모든 파티션의 데이터를 가지고 있으면 물리적한계로 인한 문제가 발생할 수도 있음
  • 파티션의 리더는 카프카에서 알아서 결정
    • 각 파티션별로 알아서 로그 작성을 진행하고
    • 팔로워데이터들은 설정에 따라서 다운로드 하는 상황이됨.
  • 구성
파티션S1S2S3
P0LeaderFollowerFollower
P1FollowerLeaderFollower

4. SpringBoot에서 설정

1. Producer / application.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
  server:
    port: 0
  
  spring:
    kafka:
      bootstrap-servers:
        - {Kafka 서버 IP 주소}:9092
        - {Kafka 서버 IP 주소}:19092
        - {Kafka 서버 IP 주소}:29092
      consumer:
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        auto-offset-reset: earliest

2. Consumer / application.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
 server:
  port: 0

 spring:
   kafka:
     bootstrap-servers:
       - {Kafka 서버 IP 주소}:9092
       - {Kafka 서버 IP 주소}:19092
       - {Kafka 서버 IP 주소}:29092
     consumer:
       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
       auto-offset-reset: earliest
This post is licensed under CC BY 4.0 by the author.