RabbitMQ 기본설정

RabbitMQ는 AMQP(Advanced Message Queuing Protocol)를 구현한 오픈소스 메시지 브로커. AMQP는 MQ를 오픈 소스에 기반한 표준 프로토콜. 프로토콜만 맞다면 다른 AMQP를 사용한 애플리케이션끼리 통신이 가능하고 플러그인을 통해서 SMTP, STOMP 프로토콜과의 확장이 가능.

pom.xml spring-boot-starter-amqp 추가 한다.

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.46</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

application.properties 설정

spring.rabbitmq.username=demo
spring.rabbitmq.password=demo
#혹은 기타 ip
spring.rabbitmq.host=127.0.0.1  
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
# 수동ACK , 자동 ACK모드 활성화 하지 않는다. 목적은 에러 발생후 message 유실되는걸 방지. 기본 값 : none
spring.rabbitmq.listener.simple.acknowledge-mode=manual
package com.demo.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ설정
 *
 * @author Blake
 * @since 2022/12/27 0011
 */
@Configuration
public class RabbitConfig {

    public static final String DEFAULT_BOOK_QUEUE = "dev.book.register.default.queue";
    public static final String MANUAL_BOOK_QUEUE = "dev.book.register.manual.queue";

    @Bean
    public Queue defaultBookQueue() {
        return new Queue(DEFAULT_BOOK_QUEUE, true);
    }

    @Bean
    public Queue manualBookQueue() {
        return new Queue(MANUAL_BOOK_QUEUE, true);
    }
}

Book 객체 생성

public class Book implements java.io.Serializable {

    private static final long serialVersionUID = -2164058270260403154L;

    private String id;
    private String name;
	// get set ...생략 
}

Controller 작성

package com.demo.controller;

import com.demo.config.RabbitConfig;
import com.demo.entity.Book;
import com.demo.handler.BookHandler;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author Blake
 * @since 2022/12/27 0002
 */
@RestController
@RequestMapping(value = "/books")
public class BookController {

    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public BookController(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    /**
     * this.rabbitTemplate.convertAndSend(RabbitConfig.DEFAULT_BOOK_QUEUE, book); 对应 {@link BookHandler#listenerAutoAck}
     * this.rabbitTemplate.convertAndSend(RabbitConfig.MANUAL_BOOK_QUEUE, book); 对应 {@link BookHandler#listenerManualAck}
     */
    @GetMapping
    public void defaultMessage() {
        Book book = new Book();
        book.setId("1");
        book.setName(" 하이 ! Spring Boot");
        this.rabbitTemplate.convertAndSend(RabbitConfig.DEFAULT_BOOK_QUEUE, book);
        this.rabbitTemplate.convertAndSend(RabbitConfig.MANUAL_BOOK_QUEUE, book);
    }
}

일반적으로 spring-boot-data-amqp 는 자동 ACK(nowledgement) 이다. 즉 소비자(Consumer)는 MQ 는 message 소비완료되고 자동으로 ACK 한다. 하지만 여기서 약간의 문제가 있는데 바로 이렇게 하는경우 "에러 발생"시 message는 유실되지 않겠지만 무한으로 루프를 돌기때문에 Disk 공간을 모소할수 있다. 소비회수를 지정할수 있지만 이 또한 완벽한 해결방법은 아니라고 한다. 현재 가장 이상적인 방법은 수동 ACK 설정으로 하고 "에러발생" 시 message를 다른 message대기 열에 넣어 보상처리한다.

package com.demo.handler;

import com.demo.config.RabbitConfig;
import com.demo.entity.Book;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * BOOK_QUEUE 소비자
 *
 * @author Blake
 * @since 2022/12/27 0011
 */
@Component
public class BookHandler {

    private static final Logger log = LoggerFactory.getLogger(BookHandler.class);

    @RabbitListener(queues = {RabbitConfig.DEFAULT_BOOK_QUEUE})
    public void listenerAutoAck(Book book, Message message, Channel channel) {
        
        final long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            log.info("[listenerAutoAck 모니터링하고 있는 message] - [{}]", book.toString());
            
            channel.basicAck(deliveryTag, false);
        } catch (IOException e) {
            try {
                
                channel.basicRecover();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }
    }

    @RabbitListener(queues = {RabbitConfig.MANUAL_BOOK_QUEUE})
    public void listenerManualAck(Book book, Message message, Channel channel) {
        log.info("[listenerManualAck 모니터링하고 있는 message] - [{}]", book.toString());
        try {
            
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            
        }
    }
}

main class

package com.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author Blake
 */
@SpringBootApplication
public class RabbitMqDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMqDemoApplication.class, args);
    }
}

application 구동하면

2022-12-27 23:04:26.708  INFO 23752 --- [cTaskExecutor-1] com.demo.handler.BookHandler           : [listenerAutoAck 모니터링하고 있는 message] - [com.battcn.entity.Book@77d8be18]
2022-12-27 23:04:26.709  INFO 23752 --- [cTaskExecutor-1] com.demo.handler.BookHandler           : [listenerAutoAck 모니터링하고 있는 message] - [com.battcn.entity.Book@8bb452]

Last updated