Post

04 kafka Spring

04 kafka Spring

1. Spring으로 Producer 생성

1. Memo

  • kafka에서 Producer 역할
  • 카프카에 특정 메시지를 생성하는 역할을 함.
  • 메일을 보낸다는 가정하에 카프카에 순서대로 메일을 계속보냄

2. 의존성 & 프로퍼티스

  • 의존성
    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    ~~~
    dependencies {
         implementation 'org.springframework.boot:spring-boot-starter-kafka' // kafka 의존성 추가  
         implementation 'org.springframework.boot:spring-boot-starter-webmvc'
         developmentOnly 'org.springframework.boot:spring-boot-devtools'
         testImplementation 'org.springframework.boot:spring-boot-starter-kafka-test'
         testImplementation 'org.springframework.boot:spring-boot-starter-webmvc-test'
         testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
      }
    
  • 프로퍼티스 값
    1
    2
    3
    4
    5
    6
    
    # Kafka 서버 주소 (EC2에 카프카를 설치했기 때문에 EC2 주소를 입력해야 한다.)
    spring.kafka.bootstrap-servers=IP:port
    # 메시지의 key 직렬화 방식 : 자바 객체를 문자열(String)로 변환해서 Kafka에 전송
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    # 메시지의 value 직렬화 방식 : 자바 객체를 문자열(String)로 변환해서 Kafka에 전송
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    
  • yaml형태
    1
    2
    3
    4
    5
    6
    
      spring:
     kafka:
       bootstrap-servers: 15.164.96.71:9092
       producer:       
         key-serializer: org.apache.kafka.common.serialization.StringSerializer
         value-serializer: org.apache.kafka.common.serialization.StringSerializer
    

3. Spring Class

1. Controller 관련

  • Controller
    1
    2
    3
    4
    5
    6
    7
    
       //특이사항없이 바로 진행함.
       @PostMapping
       public ResponseEntity<String> sendEmail(@RequestBody SendEmailRequestDto sendEmailRequestDto
       ) {
           emailService.sendEmail(sendEmailRequestDto);
           return ResponseEntity.ok("이메일 발송 요청 완료");
       }
    
  • sendEmailRequestDTO (넘어온 메일 내용 / getter+setter)
    1
    2
    3
    4
    
     private String from;         // 발신자 이메일
     private String to;           // 수신자 이메일
     private String subject;      // 이메일 제목
     private String body;         // 이메일 본문
    

2. Service

  • Serivce
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    
     // <메시지의 Key 타입, 메시지의 Value 타입>
     private final KafkaTemplate<String, String> kafkaTemplate;
    
     public EmailService(KafkaTemplate<String, String> kafkaTemplate) {
         this.kafkaTemplate = kafkaTemplate;
     }
    
     public void sendEmail(SendEmailRequestDto request) {
         EmailSendMessage emailSendMessage = new EmailSendMessage(
                 request.getFrom(),
                 request.getTo(),
                 request.getSubject(),
                 request.getBody()
         );
    
         // 생성자에서 메시지의 Value타입을 String으로 설정하여 json(DTO)을 Spring으로 변경 처리
         // 토픽 이름: "email.send"
         // 메시지 value: toJsonString(emailSendMessage) 결과값
         this.kafkaTemplate.send("email.send", toJsonString(emailSendMessage));
     }
    
     // 객체를 Json 형태의 String으로 만들어주는 메서드
     private String toJsonString(Object object) {
         ObjectMapper objectMapper = new ObjectMapper();
         try {
             String message = objectMapper.writeValueAsString(object);
             return message;
         } catch (JacksonException e) {
             throw new RuntimeException("Json 직렬화 실패");
         }
     }   
    
  • EmailSendMessage DTO ( 카프카에 보낼 메시지 / getter+setter)
    1
    2
    3
    4
    
     private String from;         // 발신자 이메일
     private String to;           // 수신자 이메일
     private String subject;      // 이메일 제목
     private String body;         // 이메일 본문
    

2. Spring으로 Consumer(기본형) 생성

1. Memo

  • 기존에 Producer로 만들었던 패턴 그대로 Consumer에서 사용이 필요.
  • 학습과정에서 Producer과 Consumer이 동일한 컴퓨터라면 서버 분리도 필요!

2. 의존성 & 프로퍼티스

  • 의존성은 동일
  • 프로퍼티스
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
    server:
      port: 0 # 사용 가능한 랜덤 포트를 찾아서 서버를 실행 (Producer 서버와의 포트 충돌을 방지)
    
    spring:
      kafka:
        # Kafka 서버 주소 
        bootstrap-servers: 15.164.96.71:9092
        consumer:
         # 메시지의 key 역직렬화 방식 : Kafka에서 받아온 메시지를 String으로 변환
         key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
         # 메시지의 value 역직렬화 방식 : Kafka에서 받아온 메시지를 String으로 변환
         value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            
         # 컨슈머 그룹이 미리 안 만들어져있는 경우에, 컨슈머 그룹을 직접 생성해서 메시지를 처음부터 읽음.
         # 만약 컨슈머 그룹이 이미 만들어져있다면, 해당 컨슈머 그룹이 읽었던 메시지부터 읽음.
            
         # 이 옵션을 주지 않으면 컨슈머 그룹을 직접 생성해서 메시지를 읽을 때, 
         # 기존에 쌓여있던 메시지를 읽지 않고 컨슈머 그룹이 생성된 이후에 들어온 메시지부터 읽음
         # 그럼 컨슈머 그룹이 생성되기 전에 쌓여있던 메시지들이 처리되지 않고 누락됨
         auto-offset-reset: earliest
    

3. Spring Class

1. Controller

1
2
3
4
5
6
7
8
9
10
  @KafkaListener(topics = "email.send", groupId = "email-send-group") // 컨슈머 그룹 이름
  public void consume(String message) {
     System.out.println("Kafka로부터 받아온 메시지: " + message);
    
     EmailSendMessage emailSendMessage = EmailSendMessage.fromJson(message);

     // 메일 발송 로직

     System.out.println("이메일 발송 완료");
  }

2. EmailSendMessage DTO (카프카로부터 받을 객체)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
    private String from;         // 발신자 이메일
    private String to;           // 수신자 이메일
    private String subject;      // 이메일 제목
    private String body;         // 이메일 본문
  
    // 역직렬화(String 형태의 카프카 메시지 -> Java 객체)시 필요함
    public EmailSendMessage() { }
    
    // Json 값을 EmailSendMessage로 역직렬화하는 메서드
    // Json형태를 String으로 변환해서 카프카에 줬으므로,
    // Consumer에서도 다시 JSon으로 변경해서 사용이 필요함.
    public static EmailSendMessage fromJson(String json) {
      try {
        ObjectMapper objectMapper = new ObjectMapper();
        return objectMapper.readValue(json, EmailSendMessage.class);
      } catch (JsonProcessingException e) {
        throw new RuntimeException("JSON 파싱 실패");
      }
  }

3. Spring으로 Consumer 개선 - 재시도

1. Memo

  • Producer + Consumer의 한계는 Producer에서 응답에 대한 답변이 먼저 가기 때문에,
  • Consumer에서 실제로 그 작업이 정상처리인지 요청자에게 확인이 안됨.

2. 기본 설정

  • 에러로그
    1
    
    Backoff FixedBackOff{interval=0, currentAttempts=10, maxAttempts=9} …
    
  • interval : 재시도를 하는 시간 간격 (ms) // interval=0일 경우 실패하자마자 즉시 재시도
  • maxAttempts : 최대 재시도 횟수
  • currentAttempts : 지금까지 시도한 횟수 (최초 시도 횟수 + 재시도 횟수)

3. 값 변경

1
2
3
4
5
6
7
8
9
10
11
12
13
  @KafkaListener(
      topics = "email.send",
      groupId = "email-send-group" // 컨슈머 그룹 이름
  )
  // 추가된부분 start
  @RetryableTopic(
      // 총 시도 횟수 (최초 시도 1회 + 재시도 4회)
      attempts = "5", 
      // 재시도 간격 (1000ms -> 2000ms -> 4000ms -> 8000ms 순으로 재시도 시간이 증가한다.)
      backoff = @Backoff(delay = 1000, multiplier = 2) 
  )
  // 추가된부분 end
  public void consume(String message) {

3. Spring으로 Consumer 개선 - 재시도 후 실패

1. memo

  • 지속적인 시도에도 불구하고 실패한경우 DLT를 이용해서 진행함.

2. Dead Letter Topic

  • 오류로 인해 처리할 수 없는 메시지를 임시로 저장하는 토픽
  • Kafka에서는 재시도까지 실패한 메시지를 다른 토픽에 따로 저장해서 유실을 방지하고 후속 조치를 가능하게함.
  • 이점
    • 실패한 메시지를 DLT 토픽에 저장해놓기 때문에, 실패한 메시지가 유실되는 걸 방지할 수 있음.
    • DLT 토픽에 실패한 메시지가 저장되어 있기 때문에, 사후에 실패 원인을 분석할 수 있음.
    • DLT 토픽에 실패한 메시지가 저장되어 있기 때문에, 처리되지 못한 메시지를 수동으로 처리함.

3. 기본설정

  • @RetryableTopic을 사용하면 자동으로 DLT 토픽을 생성하고 메시지를 전송
  • Default DLT 토픽 이름은 {기존 토픽명}-dlt 형태

4. DLT 설정 변경작업

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    @KafkaListener(
      topics = "email.send",
      groupId = "email-send-group" // 컨슈머 그룹 이름
    )
    @RetryableTopic( // 이것을 사용하면 자동으로 DLT를 생성함.
      // 총 시도 횟수 (최초 시도 1회 + 재시도 4회)
      attempts = "5",

      // 재시도 간격 (1000ms -> 2000ms -> 4000ms -> 8000ms 순으로 재시도 시간이 증가한다.)
      backoff = @Backoff(delay = 1000, multiplier = 2),

      // DLT 토픽 이름에 붙일 접미사 (수동)
      dltTopicSuffix = ".dlt"
    )
    public void consume(String message) {
  • 실패할경우
    1
    
    Received message in dlt listener : email.send.dlt-8@8 패턴
    

5. DLT 처리하는 Consumer

1
2
3
4
5
6
7
8
9
   @KafkaListener(
            topics = "email.send.dlt", // dlt 토픽을 읽는
            groupId = "email-send-dlt-group" // 그룹 설정
    )
    public void consume(String message) {

        // 로그 전송
        // 알림등
    }
This post is licensed under CC BY 4.0 by the author.