# RabbitMQ 지연 큐(delayed queue)

delayed queue 큐는 말그대로 메시지 보낸 바로 즉시 받는게 아니라 나중에 받는다는 의미.

지연 큐 사용적합한 상황 아래와 같은 경우일것 같다.

* 주문기능 : 일정한 시간 지나도 최종 결제를 하지 않으면 주문 자동취소 되게 하려는 경우.
* 메시지 기능 : 주문완료후 60초 이내 고객에 메시지를 전송하도록 하고싶은 경우.
* 실패에 대한 재시도 : 업무시도 실패 후 일정한 시간후 재시도 하게끔 하는 경우.

이외 에도 많은 곳에서 지연 큐를 사용할수 있을것으로 추정된다.

RabbitMQ 자체는 지연 기능이 없지만 Time-To-Live Extensions[ ](broken://pages/wsIhTqVqTuPXIe09ZQGY)과 Dead Letter Exchange 의 특징을 이용하여 지연큐를 기능을 구현해 낼수 있다.&#x20;

pom.xml 에 spring-boot-starter-amqp dependency 추가

```xml
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.46</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies
```

application.properties 에 rabbitmq 설정을 추가 한다.

```properties
spring.rabbitmq.username=demo
spring.rabbitmq.password=demo
#혹은 기타 ip
spring.rabbitmq.host=127.0.0.1  
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
# 수동ACK , 자동 ACK모드 활성화 하지 않는다. 목적은 에러 발생후 message 유실되는걸 방지. 기본 값 : none
spring.rabbitmq.listener.simple.acknowledge-mode=manual
```

```java
package com.demo.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * RabbitMQ설정
 *
 * @author Blake
 * @since 2022/12/28 0011
 */
@Configuration
public class RabbitConfig {

    private static final Logger log = LoggerFactory.getLogger(RabbitConfig.class);

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message));
        return rabbitTemplate;
    }

    /**
     * 지연큐 TTL 이름
     */
    private static final String REGISTER_DELAY_QUEUE = "dev.book.register.delay.queue";
    /**
     * DLX，dead letter exchange 전송
     * TODO 여기 exchange 매우중요한 부분 , 메시지가 여기로 보내진다.
     */
    public static final String REGISTER_DELAY_EXCHANGE = "dev.book.register.delay.exchange";
    /**
     * routing key 이름
     * TODO 여기 routingKey 매우중요한 부분, 메시는  routingKey 보내진다.
     */
    public static final String DELAY_ROUTING_KEY = "";


    public static final String REGISTER_QUEUE_NAME = "dev.book.register.queue";
    public static final String REGISTER_EXCHANGE_NAME = "dev.book.register.exchange";
    public static final String ROUTING_KEY = "all";

    /**
     * 지연큐에 대한 설정
     * <p>
     * 1、params.put("x-message-ttl", 5 * 1000);
     * TODO 방식1: 직접 Queue  지연시간 설정, 큐에 expaired된 시간을 설정하면? 별로 유연한 설정 방법은 아닌듯.
     * 2、rabbitTemplate.convertAndSend(book, message -> {
     * message.getMessageProperties().setExpiration(2 * 1000 + "");
     * return message;
     * });
     * TODO 방식2: 매번 메시지 전송시 동적으로 지연시간을 설정한다. 조금더 유연해 보인다.
     **/
    @Bean
    public Queue delayProcessQueue() {
        Map<String, Object> params = new HashMap<>();
        // x-dead-letter-exchange : DLX 이름 정의
        params.put("x-dead-letter-exchange", REGISTER_EXCHANGE_NAME);
        // x-dead-letter-routing-key : routing-key 이름 정의
        params.put("x-dead-letter-routing-key", ROUTING_KEY);
        return new Queue(REGISTER_DELAY_QUEUE, true, false, false, params);
    }

    /**
     * 큐를 exchange 에 바인딩시켜야 한다. 해당 메시지와 라우터와 매칭되어야 한다.
     * TODO TopicExchange 처럼 여러개를 매칭할수 없다.
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange delayExchange() {
        return new DirectExchange(REGISTER_DELAY_EXCHANGE);
    }

    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(delayProcessQueue()).to(delayExchange()).with(DELAY_ROUTING_KEY);
    }


    @Bean
    public Queue registerBookQueue() {
        return new Queue(REGISTER_QUEUE_NAME, true);
    }

  
    @Bean
    public TopicExchange registerBookTopicExchange() {
        return new TopicExchange(REGISTER_EXCHANGE_NAME);
    }

    @Bean
    public Binding registerBookBinding() {
       
        return BindingBuilder.bind(registerBookQueue()).to(registerBookTopicExchange()).with(ROUTING_KEY);
    }

}
```

Book (demo) 클래스를 만든다.

```java
public class Book implements java.io.Serializable {

    private static final long serialVersionUID = -2164058270260403154L;

    private String id;
    private String name;
	// get set ...생략
}
```

controller 작성

```java
package com.demo.controller;

import com.demo.config.RabbitConfig;
import com.demo.entity.Book;
import com.demo.handler.BookHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.time.LocalDateTime;

/**
 * @author Blake
 * @since 2022/12/28 0002
 */
@RestController
@RequestMapping(value = "/books")
public class BookController {

    private static final Logger log = LoggerFactory.getLogger(BookController.class);

    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public BookController(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    /**
     * this.rabbitTemplate.convertAndSend(RabbitConfig.REGISTER_DELAY_EXCHANGE, RabbitConfig.DELAY_ROUTING_KEY, book); 对应 {@link BookHandler#listenerDelayQueue}
     */
    @GetMapping
    public void defaultMessage() {
        Book book = new Book();
        book.setId("1");
        book.setName("하이!Spring Boot");
        // 큐추가
        this.rabbitTemplate.convertAndSend(RabbitConfig.REGISTER_DELAY_EXCHANGE, RabbitConfig.DELAY_ROUTING_KEY, book, message -> {
            // TODO 해도되고 안해도 되는 부분, 직접 처리 해야 됨
            message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, Book.class.getName());
            // TODO 만일  params.put("x-message-ttl", 5 * 1000); 설정하였다면 그러면 이 라인도 생략 가능.
            message.getMessageProperties().setExpiration(5 * 1000 + "");
            return message;
        });
        log.info("[전송시간] - [{}]", LocalDateTime.now());
    }
}
```

### 메시지 소비자(Consumer)

```java
package com.demo.handler;

import com.demo.config.RabbitConfig;
import com.demo.entity.Book;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.time.LocalDateTime;

/**
 * BOOK_QUEUE 소비자
 *
 * @author Blake
 * @since 2022/12/28 0011
 */
@Component
public class BookHandler {

    private static final Logger log = LoggerFactory.getLogger(BookHandler.class);

    @RabbitListener(queues = {RabbitConfig.REGISTER_QUEUE_NAME})
    public void listenerDelayQueue(Book book, Message message, Channel channel) {
        log.info("[listenerManualAck 모니터링하고 있는 message] - [소비시간] - [{}] - [{}]", LocalDateTime.now(), book.toString());
        try {
            // TODO MQ에 메시지가 성공적으로 소비되었으니 ACK 해도 된다고 알려줌.
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            // TODO 만일 실패했을때 다른 큐에 넣어서 처리하도록 한다.
        }
    }
}
```

메인 클래스&#x20;

```java
package com.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author Blake
 */
@SpringBootApplication
public class RabbitMqDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMqDemoApplication.class, args);
    }
}
```

RabbitMqDemoApplication 를 구동하면 아래와 같은 메시지가 출력 될것이다.

```
2022-12-28 18:06:36.248  INFO 29048 --- [nio-8080-exec-1] com.demo.controller.BookController     : [전송시간] - [2018-05-23T19:56:36.248]
2022-12-28 18:06:41.256  INFO 29048 --- [cTaskExecutor-1] com.demo.handler.BookHandler
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://blakes-organization.gitbook.io/rainsister/springboot-2.x/undefined/rabbitmq-delayed-queue.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
