06 Consumer Pattern
06 Consumer Pattern
1. Note
- 익숙한 듯 어려운듯 복잡한듯
- 실제로 구현해서 써먹으려면 고민이 더 많이 필요할 듯.
- 역할과 책임을 구현한다는게 무슨말인지는 알 것 같음.
- 공통 Queue를 관리/기록하는 메소드를 만들고,
- 메인 도메인에서 Queue 서비스를 호출해서 비즈니스로직을 넘겨준느 형태
2. 흐름
1. TaskQueue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class TaskQueue extends BaseEntity {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
Long id;
@Setter
@Column
Long eventId;
@Column(nullable = false)
@Enumerated(EnumType.STRING)
TaskType taskType; // 작업 타입 Order(주문), Alm(알람) 등 배치 타입
@Setter
@Column(nullable = false)
@Enumerated(EnumType.STRING)
TaskStatus status; // 상태 표기 PENDING, PROCESSING, COMPLETED
@Builder
public TaskQueue(
TaskType taskType,
TaskStatus status
) {
this.taskType = taskType;
this.status = status;
}
2. Order Service
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private final TaskQueueService taskQueueService;
private final OrderInfoService orderInfoService;
private final OrderProcessService orderProcessService;
@Transactional(propagation = Propagation.REQUIRED)
public void orderRequest(OrderRequest request) {
// 유저정보를 찾고
User user = orderInfoService.getUser(request.getUserId());
// TaskQueue 엔티티를 생성 (작업 타입 설정, PENDING<작업중>)
// 역할이나 의미 X, 매핑전상태
// 2-3 TaskQueueService - 1
TaskQueue taskQueue = taskQueueService.requestQueue(TaskType.ORDER);
// 작업 시작
// 신규로 생성한 Task와 reqeust, user를 orderProcess로 넘김.
// 2-4 OrderProcessService
orderProcessService.orderProcess(taskQueue.getId(), request, user);
}
3. TaskQueueService - 1
1
2
3
4
5
6
7
8
9
10
11
12
// requestQueue 생성 작업
// TaskQueue를 생성 (작업 타입 설정, PENDING<작업중>)
// 여기에서 EventID는 비어있음.
// Task는 부여하지만 어떤 Task랑 매핑할지는 미정상태
public TaskQueue requestQueue(TaskType taskType) {
TaskQueue taskQueue = TaskQueue.builder()
.taskType(taskType)
.status(TaskStatus.PENDING)
.build();
return taskQueueRepository.save(taskQueue);
}
4. OrderProcessService
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
private final TaskQueueService taskQueueService;
private final OrderRepository orderRepository;
private final ProductRepository productRepository;
private final OrderProductRepository orderProductRepository;
// 비동기로 실행하며, 해당 작업은 메소드 호출마다 신규 트랜잭션으로 만듬
// order 주문이 여러개가 오면, 각 요청은 독립 트랜잭션 내에서 작업함.
// 해당 작업은 order 1건 단위로 들어오는 것만 처리가능 함.
@Async
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void orderProcess(Long taskQueueId, OrderRequest request, User user) {
// 2-5 TaskQueueService - 2
// TaskQueueService는 Task 관리 역할
// orderProcess는 Task와 queue 매핑 및 작업 구현
// 해당 두번째 파라미터를 구현하면서 task ID(PK) + 요청값 + 유저정보 1개 맥락이됨.
taskQueueService.processQueueById(taskQueueId, (taskQueue) -> {
// 1. 주문 엔티티 생성 + DB 저장
// - user 기준으로 Order 생성
// - 이 시점에 ID(PK) 생성됨 (보통 영속화 시점 또는 flush 시점)
Order order = createAndSaveOrder(user);
// 2. TaskQueue와 Order 연결
// - 큐의 eventId를 주문 ID로 설정
// - "이 큐가 어떤 주문을 처리했는지" 추적하기 위한 용도
// - 일종의 비동기 처리 결과 매핑 키
taskQueue.setEventId(order.getId());
// 3. 주문 상품 생성 및 처리
// - request에 담긴 상품 목록을 기반으로 OrderProduct 생성
// - 내부에서:
// - 재고 차감
// - 가격 확정
// - Order와 연관관계 설정
// 같은 로직이 포함될 가능성이 높음
List<OrderProduct> orderProducts = createAndProcessOrderProducts(
request.getProducts(),
order);
// 4. 총 주문 금액 계산
// - OrderProduct 리스트를 기반으로 총 가격 합산
// - BigDecimal 사용하는 이유:
// -> 금액 계산에서 부동소수점 오차 방지
BigDecimal totalPrice = calculateTotalPrice(orderProducts);
// 5. 주문에 총 금액 반영
// - Order 상태를 최종적으로 완성
// - 트랜잭션 종료 시 dirty checking으로 DB 반영됨
order.setTotalPrice(totalPrice);
});
}
5. TaskQueueService - 2
- Coumnser
1 2 3 4 5
// `java.uitl.function` 패키지에 있는 인터페이스 // 파라미터를 인자로 메소드 호출하고 종결 public interface Consumer<T> { void accept(T t); } - 소스
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
// processQueueById 작업 // 기본 트랜잭션 @Transactional public void processQueueById(Long taskQueueId, Consumer<TaskQueue> task) { //여기서 TaskQueue를 조회하여 유무 확인함. TaskQueue taskQueue = taskQueueRepository.findByIdForUpdate(taskQueueId) .orElseThrow(() -> new DomainException(DomainExceptionCode.NOT_FOUND_TASK)); //prviate Method //작업상태 Udpate(Pending -> proceesssing) updateStatus(taskQueue, TaskStatus.PROCESSING); // processQueueById 메소드의 두번째 인자로 받은 메소드를 실행 // accept의 구현체는 익명함수로 넘겨받음. // ProcessQueueById를 호출하는 쪽에서 만듬. // 근데 그 메소드를 실행할때 파라미터는 processQueueById 메소드의 두번째 인자 taskQueue다. task.accept(taskQueue); // prviate Method // 작업상태 업데이트 (Processing -> Completed) updateStatus(taskQueue, TaskStatus.COMPLETED); } // 업데이트용 priavte 메소드 private void updateStatus(TaskQueue taskQueue, TaskStatus taskStatus) { taskQueue.setStatus(taskStatus); taskQueueRepository.flush(); }
This post is licensed under CC BY 4.0 by the author.