Post

06 Consumer Pattern

06 Consumer Pattern

1. Note

  • 익숙한 듯 어려운듯 복잡한듯
    • 실제로 구현해서 써먹으려면 고민이 더 많이 필요할 듯.
    • 역할과 책임을 구현한다는게 무슨말인지는 알 것 같음.
    • 공통 Queue를 관리/기록하는 메소드를 만들고,
    • 메인 도메인에서 Queue 서비스를 호출해서 비즈니스로직을 넘겨준느 형태

2. 흐름

1. TaskQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class TaskQueue extends BaseEntity {

  @Id
  @GeneratedValue(strategy = GenerationType.IDENTITY)
  Long id;

  @Setter
  @Column
  Long eventId;

  @Column(nullable = false)
  @Enumerated(EnumType.STRING)
  TaskType taskType; // 작업 타입 Order(주문), Alm(알람) 등 배치 타입

  @Setter
  @Column(nullable = false)
  @Enumerated(EnumType.STRING)
  TaskStatus status; // 상태 표기 PENDING, PROCESSING, COMPLETED

  @Builder
  public TaskQueue(
      TaskType taskType,
      TaskStatus status
  ) {
    this.taskType = taskType;
    this.status = status;
  }

2. Order Service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
  private final TaskQueueService taskQueueService;
  private final OrderInfoService orderInfoService;
  private final OrderProcessService orderProcessService;
  
  @Transactional(propagation = Propagation.REQUIRED)
  public void orderRequest(OrderRequest request) {
    // 유저정보를 찾고
    User user = orderInfoService.getUser(request.getUserId());
  
    // TaskQueue 엔티티를 생성 (작업 타입 설정, PENDING<작업중>)
    // 역할이나 의미 X, 매핑전상태
    // 2-3 TaskQueueService - 1 
    TaskQueue taskQueue = taskQueueService.requestQueue(TaskType.ORDER);
  
    // 작업 시작
    // 신규로 생성한 Task와 reqeust, user를 orderProcess로 넘김.
    // 2-4 OrderProcessService
    orderProcessService.orderProcess(taskQueue.getId(), request, user);
  }

3. TaskQueueService - 1

1
2
3
4
5
6
7
8
9
10
11
12
  // requestQueue 생성 작업
  // TaskQueue를 생성 (작업 타입 설정, PENDING<작업중>)
  // 여기에서 EventID는 비어있음.
  // Task는 부여하지만 어떤 Task랑 매핑할지는 미정상태
  public TaskQueue requestQueue(TaskType taskType) {
    TaskQueue taskQueue = TaskQueue.builder()
        .taskType(taskType)
        .status(TaskStatus.PENDING)
        .build();
  
    return taskQueueRepository.save(taskQueue);
  }

4. OrderProcessService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
  private final TaskQueueService taskQueueService;
  private final OrderRepository orderRepository;
  private final ProductRepository productRepository;
  private final OrderProductRepository orderProductRepository;
    
    // 비동기로 실행하며, 해당 작업은 메소드 호출마다 신규 트랜잭션으로 만듬
    // order 주문이 여러개가 오면, 각 요청은 독립 트랜잭션 내에서 작업함.
    // 해당 작업은 order 1건 단위로 들어오는 것만 처리가능 함.
    @Async
    @Transactional(propagation = Propagation.REQUIRES_NEW) 
    public void orderProcess(Long taskQueueId, OrderRequest request, User user) {
  
      // 2-5 TaskQueueService - 2
      // TaskQueueService는 Task 관리 역할
      // orderProcess는 Task와 queue 매핑 및 작업 구현 
      // 해당 두번째 파라미터를 구현하면서 task ID(PK) + 요청값 +  유저정보 1개 맥락이됨. 
      taskQueueService.processQueueById(taskQueueId, (taskQueue) -> {
  
        // 1. 주문 엔티티 생성 + DB 저장
        // - user 기준으로 Order 생성
        // - 이 시점에 ID(PK) 생성됨 (보통 영속화 시점 또는 flush 시점)
        Order order = createAndSaveOrder(user);
  
        // 2. TaskQueue와 Order 연결
        // - 큐의 eventId를 주문 ID로 설정
        // - "이 큐가 어떤 주문을 처리했는지" 추적하기 위한 용도
        // - 일종의 비동기 처리 결과 매핑 키
        taskQueue.setEventId(order.getId());
  
        // 3. 주문 상품 생성 및 처리
        // - request에 담긴 상품 목록을 기반으로 OrderProduct 생성
        // - 내부에서:
        //   - 재고 차감
        //   - 가격 확정
        //   - Order와 연관관계 설정
        //   같은 로직이 포함될 가능성이 높음
        List<OrderProduct> orderProducts = createAndProcessOrderProducts(
            request.getProducts(),
            order);
  
        // 4. 총 주문 금액 계산
        // - OrderProduct 리스트를 기반으로 총 가격 합산
        // - BigDecimal 사용하는 이유:
        //   -> 금액 계산에서 부동소수점 오차 방지
        BigDecimal totalPrice = calculateTotalPrice(orderProducts);
  
  
        // 5. 주문에 총 금액 반영
        // - Order 상태를 최종적으로 완성
        // - 트랜잭션 종료 시 dirty checking으로 DB 반영됨
        order.setTotalPrice(totalPrice);
  
      });
    }

5. TaskQueueService - 2

  • Coumnser
    1
    2
    3
    4
    5
    
     // `java.uitl.function` 패키지에 있는 인터페이스
     // 파라미터를 인자로 메소드 호출하고 종결
     public interface Consumer<T> {
       void accept(T t);
     }
    
  • 소스
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    
    // processQueueById 작업
    // 기본 트랜잭션
    @Transactional
    public void processQueueById(Long taskQueueId, Consumer<TaskQueue> task) {
      
      //여기서 TaskQueue를 조회하여 유무 확인함.
      TaskQueue taskQueue = taskQueueRepository.findByIdForUpdate(taskQueueId)
          .orElseThrow(() -> new DomainException(DomainExceptionCode.NOT_FOUND_TASK));
      
      //prviate Method
      //작업상태 Udpate(Pending -> proceesssing)
      updateStatus(taskQueue, TaskStatus.PROCESSING);
      
      // processQueueById 메소드의 두번째 인자로 받은 메소드를 실행
      // accept의 구현체는 익명함수로 넘겨받음.
      // ProcessQueueById를 호출하는 쪽에서 만듬.  
      // 근데 그 메소드를 실행할때 파라미터는 processQueueById 메소드의 두번째 인자 taskQueue다.
      task.accept(taskQueue);
      
      // prviate Method
      // 작업상태 업데이트 (Processing -> Completed)
      updateStatus(taskQueue, TaskStatus.COMPLETED);
    }
      
    // 업데이트용 priavte 메소드
    private void updateStatus(TaskQueue taskQueue, TaskStatus taskStatus) {
      taskQueue.setStatus(taskStatus);
      taskQueueRepository.flush();
    }
    
This post is licensed under CC BY 4.0 by the author.