세차새차 비즈콜 서비스 개발 (1) -RabbitMQ 적용
Summary
개요
세차새차 서비스의 비즈콜 서비스 개발을 담당하게 됐다. 비즈콜은 크게 WebSocket과 RabbitMq로 기능이 나뉘어지는데, 이번 포스팅에서는 그 중 RabbitMq에 대해 설명하려한다.
비즈콜
세차새차는 비즈콜을 도입하여 전보다 더 간편한 예약을 시스템을 제공하려 한다. 비즈콜을 사용함으로써, 세차장은 관리인의 개인 번호가 노출됨을 막을 수 있으며 고객의 전화와 동시에 세차장의 패드에 예약 정보 입력 팝업을 띄워 세차장 예약 관리에 편의성을 더해줄 수 있다.
비즈콜 도입의 이점
- 개인 번호 비공개
- 예약 전화와 동시에 팝업을 띄어 예약 정보 입력에 편의성을 더함.
유무선 전화 통화에 대한 로그 데이터
비즈콜에서 제공하는 CDR 파라미터에는 아래와 같은 데이터가 있다.
seq | CDR을 구분짓는 번호 |
sdt | Start Date: 호 시작 시각, YYYYMMDDHH24MISS |
edt | End Date: 호 종료 시각, YYYYMMDDHH24MISS |
fromn | From Number: 발신번호 |
ton | To Number: 착신번호 |
등등.. |
위 기능의 서버 부분을 조금 더 자세히 살펴보자.
여러 대의 서버 사이에 트래픽을 분산시키는 기술
고객이 비즈콜에 전화를 하면 고객 전화 정보를 담은 CDR 데이터가 서버에 저장된다. 이때, Application load balancer를 통해 고객 정보는 여러대의 인스턴스 중 가장 트래픽이 적은 인스턴스에 저장되게 된다.
즉, 우리는
RabbitMQ의 필요성
인스턴스와 클라이언트(매장의 패드)은 서로 webSocket을 통해 연결돼있다. 고객이 비즈콜을 통해 전화를 하면 그에 따른 CDR 정보가 인스턴스에 저장되고, 이 CDR 정보가 해당 인스턴스와 연결돼있는 매장과 동일하다면 webSocket 연결을 통해 실시간으로 클라이언트에 팝업을 띄울 수 있다.
그러나 문제는 비즈콜을 통한 고객 정보가 어떤 인스턴스에 저장될지 알 수 없다는 점이다. 고객이 매장 A에 전화했다는 CDR 정보가 인스턴스 1에 담기더라도 인스턴스 1이 매장A와 webSocket 연결돼있지 않다면 매장 A로 팝업 요청을 보낼 수 없다.
따라서 우리는 새로 저장된 고객의 CDR 정보를 모든 인스턴스에 공유해줄 필요가 있다.
RabbitMQ
RabbitMQ: 송신자와 수신자 간에 메시지를 전달하는 중간 매개체로 작동하는 메세지 브로커 소프트웨어. AMQP을 지원한다.
서버에 CDR 데이터가 새로 저장됐을 때, 이를 RabbitMQ를 통해 전체 인스턴스로 메세징하기로 한다.
AMQP
분산 시스템에서 메시지 전달, 큐 관리, 라우팅 등을 위한 기능을 정의하는 메시징 시스템의 표준 통신 프로토콜
메세징은 메세지를 보내는 주체인 Producer가 메세지를 받는 대상인 Consumer까지 메세지를 전달하는 과정이다.
이 과정에서 Producer가 전달한 메세지는 Exchanger가 가진 타입에 따라 특정 Queue에 저장되게 된다.
Queue는 하나의 Consumer만을 가질 수도 있고 여러개의 Consumer를 가질 수도 있으며, Consumer가 Consume하는 것으로 할당된 Queue에 저장된 메세지를 빼내 쓸 수 있다.
Direct | 라우팅 키가 정확히 일치하는 Queue에 메시지 전송 |
Topic | 라우팅 키 패턴이 일치하는 Queue에 메시지 전송 |
Headers | [key:value]로 이루어진 header값을 기준으로 일치하는 Queue에 메시지 전송 |
Fanout | 해당 Exchange에 등록된 모든 Queue에 메시지 전송 |
Exchanger는 위와 같은 4가지 타입을 가질 수 있다.
목표로 하는 Consumer가 어느 queue와 연결돼있는지 알 수 없으므로 세차새차 서비스는 모든 큐에 메세지를 저장해야한다. 따라서 하나의 Exchanger를 Fanout 타입으로 설정한 후 모든 Queue를 해당 Exchanger에 바인딩 시켰다.
개발
RabbitMQ 연결
RabbitMQ는 다운받아서 로컬에서도 작동시킬 수 있지만 CloudAMQP라는 사이트에서 회원가입만 진행하고, 웹상에서 활용도 가능하다고 한다.
세차새차는 AWS를 사용하고 있기 때문에 AWS에서 제공하는 AmazonMQ를 통해 RabbitMQ를 사용하기로 했다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
spring:
rabbitmq:
host: {broker ID}.mq.ap-northeast-2.amazonaws.com
port: 5671
username: {username}
password: {password}
virtual-host: /
ssl:
enabled: true
rabbitmq:
queue:
name: bizcall-cdr-inbound-call-dev
exchange:
name: bizcall-exchange-dev
Listener port
5671
- Used for connections made via the secure AMQP URL. For example, given a broker with broker IDb-c8352341-ec91-4a78-ad9c-a43f23d325bb
, deployed in theus-west-2
region, the following is the broker’s fullamqp
URL:b-c8352341-ec91-4a78-ad9c-a43f23d325bb.mq.us-west-2.amazonaws.com:5671
보안 AMQP 연결을 위해 5671번 포트를 사용하자.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@RequiredArgsConstructor
@Configuration
public class RabbitMqConfiguration {
@Value("${rabbitmq.queue.name}")
private String queueName;
@Value("${rabbitmq.exchange.name}")
private String exchangeName;
@Bean
public Queue queue() {
return new Queue(queueName + "-" + UUID.randomUUID(), false, true, true);
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(exchangeName);
}
@Bean
public Binding binding(Queue queue, FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public MessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
Configuration 코드를 통해 Queue와 Exchanger을 생성하고 바인딩 해주었다. 코드로 설정을 하고 실행시키는 것만으로도 자동으로 queue와 exchanger가 생성되고 바인딩 된다.
- Queue
- 인스턴스별로 랜덤하게 큐 이름을 지어주기 위해서 UUID 값을 지정해주었다. 그렇지 않으면 fanout이 되지 않는다고 리뷰가 달렸으나 직접 확인해보진 않았다(ㅎㅎ..)
- Binding
- Exchanger를 Fanout 속성으로 사용하기 때문에 바인딩 코드에서 with를 통한 라우팅 키 설정은 제외했다.
https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp#bindings - Exchanger를 Fanout 속성으로 사용하기 때문에 바인딩 코드에서 with를 통한 라우팅 키 설정은 제외했다.
- jackson2JsonMessageConverter
- RabbitMqTemplate에서 사용하는 MessageConverter를
jackson2JsonMessageConverter
로 교환해주었다. - 기본적으로는
SimpleMessageConverter
을 사용하도록 설정돼있나SimpleMessageConverter
는 String input을 받기 때문에 전송할 메세지를 String으로 변환해주어야 하는 번거로움이 있다.
그러나jackson2JsonMessageConverter
는 Object 타입의 input을 받기 때문에 따로 변환할 필요가 없고, output값을 Json message로 뽑아주기 때문에 사용에 더 용이하다.
- RabbitMqTemplate에서 사용하는 MessageConverter를
에러 발생
inequivalent arg ‘type’ for exchange ‘bizcall-exchange’ in vhost ‘/’: received ‘fanout’ but current is ‘direct’, class-id=40, method-id=10)
설정 코드를 작성하고 큐와 exchanger 생성을 위해 프로젝트를 실행시키니 위와 같은 에러가 발생했다.
이제보니 기존에 존재하던 ‘bizcall-exchange’ 가 direct로 생성돼있었다. 삭제해주자.
삭제후 프로젝트를 다시 실행하니 fanout으로 잘 생성 되었다.
큐도 exchange에 정상적으로 binding된 것을 확인할 수 있다.
코드 작성
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Slf4j
@RequiredArgsConstructor
@Service
public class CallRabbitMqService {
private final RabbitTemplate rabbitTemplate;
private final CallWebSocketService callWebSocketService;
private final StoreRepository storeRepository;
@Value("${rabbitmq.exchange.name}")
private String exchangeName;
public void sendMessage(RabbitMqMessageDto messageDto) {
try {
log.info("RabbitMqMessage SEND: {} to exchanger {}", messageDto.toString(), exchangeName);
rabbitTemplate.convertAndSend(exchangeName, "", messageDto);
} catch (Exception exception) {
log.error(exception.toString());
throw new ApplicationException(ApplicationError.RABBITMQ_CONNECTION_ERROR);
}
}
@RabbitListener(queues = "#{queue.name}")
public void receiveMessage(RabbitMqMessageDto messageDto) {
log.info("RabbitMqMessage RECEIVED: {}", messageDto.toString());
try {
//웹소켓 설정 코드 작성
callWebSocketService.sendMessage(topicPath, messageDto);
} catch (Exception exception) {
log.error("RabbitMqMessage Handling Error");
}
}
}
postman으로 직접 api를 호출해본 결과 정상적으로 queue에 전달되었음을 확인했다.
(위 스크린샷을 찍었을 때는 listener.simple.acknowledge-mode
가 manual
로 설정되어 있어 에러 발생 여부와 무관하게 Unacked로 처리되었다.)
글이 길어져서 TTL, dlx 설정은 다음 게시글로 이어서 작성해야겠다~
배운 점: 만들고 싶은 개인 프로젝트에 유사 메세지 시스템이 포함될거 같은데(언제 만들어 이거ㅠ) RabbitMQ을 통해 선행학습의 기회를 얻었을지도….