Spring AMQPの使い方

Oct 3, 2016 ( Feb 11, 2022 更新 )

Spring BootのAMQPライブラリについて調べたのでまとめる。

参考資料はSpring AMQP の公式ドキュメント

Spring AMQPとは

Spring AMQPとは、RabbitMQのクライアントと、RabbitMQの設定などに必要な機能を汎用化したモジュール…という理解をしている。 AMQPクライントはSpring的にコンポーネントベースで設定でき、DIコンテナを利用してインスタンス化することができる。

Spring AMQPで提供する抽象化

以下の抽象化を提供している。

  • Message
    • AMQPのメッセージを表す。
    • body部とproperty部の読み取りなどの機能を提供する。
  • Exchange
  • Queue
  • Binding
    • exchangeとqueueのbindingを表す。
    • new Binding(someQueue, someDirectExchange, "foo.bar")
    • Bindingインスタンスそれ自体ではconnectionのためのdataを保持しているためactiveなcomponentではないとのこと。インスタンスを作成した時点ではbindingをしているわけではなさそう。
    • BindingインスタンスはAmqpAdminによってbindingをトリガーするために使われる。
    • AmqpTemplateは実際のmessagingのために使われるInterfaceなのでこちらも参照する。

connection と resource management

AMQP Template

AMQP Temaplateは、Spring AMQPで提供する抽象化の中心部になるもの。 messageのsendとreceiveの一般的な操作を担っている。

RabbitTemplateとしてInterfaceが用意されており、その実装としてAmqpTemplateがある。 また、SpringのMessagingとの互換性のためにRabbitMessagingTemplateがトップレベルとして用意されている。

また、AMQP Templateにはリトライの設定をRetry Templateによって行うことができる。

message confirmation

returned messageのackでは、templateのmandatory propertyをtrueにする必要がある。 (※returned messageは多分receiveしてきたmesasgeのことっぽい?) また、CachingConnectionFactoryのpublisherReturns propertyがtrueになっている必要もある。(詳細: 3. Reference) messageのreturn(多分queueからのreceiveはRabbitTemplate.ReturnCallbacksetReturnCallback(ReturnCallback callback)メソッドで設定することによって行う。 ReturnCallbackは下記のようなシグネチャになる。

void returnedMessage(Message message, int replyCode, String replyText,
          String exchange, String routingKey);

1つのRabbitTemplateにつき1つのcallbackとなる。 messageのconsumeに対して異なる操作をしたい場合はtempalteを複数持つ必要がある。

publisherのconfirmationについては、CachingConnectionFactorypublisherConfirms propretyがtrueになっている必要がある。 confirmationはsetConfirmCallback(ConfirmCallback callback)メソッドによって設定したcallbackで行う。 ConfirmationCallbackは以下の実装をする:

void confirm(CorrelationData correlationData, boolean ack, String cause);

CorrelationDataは、clientからoriginal dataを送ったときのobject。 ackはackを送りたいときはtrueで、nackにしたいときはfalse。 brokerがchannelをcloseしたときに、causeに理由が入る。

messageの送信

AMQPTemplateの以下のメソッドを使ってmessageを送信することができる。

void send(Message message) throws AmqpException;

void send(String routingKey, Message message) throws AmqpException;

void send(String exchange, String routingKey, Message message) throws AmqpException;

以下のようにrouting keyとqueueを指定して送ることもできる。

amqpTemplate.send("marketData.topic", "quotes.nasdaq.FOO",
    new Message("12.34".getBytes(), someProperties));
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));

sendでmessageだけを指定すると、routing key とqueueはデフォルトのものが使われる。 空文字("")のデフォルトキューに、すべてのメッセージが突っ込まれることを防ぐために明確にrouting keyとqueueを設定すべきだとのこと。

messageの受信

3. Referenceにmessageの受信についての説明がある。

messageの受信方法には2つがある:

  • poll for a single Message at a time with a polling method call
    • Messageをメソッド呼び出しによって取得する方法
  • register listeners that will receive Messages on-demand asynchronously
    • Messageを非同期で必要なときに取得するリスナーを登録する方法

Asynchronous Consumer

@RabbitListenerを使う方法

Annotation-driven Listener Endpointsにドキュメントがある。

annotationを使う場合は、以下のBean設定をする必要がある。

@Configuration
@EnableRabbit
public class AppConfig {

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        return factory;
    }
}

@RabbitListenerを使うためには、@Configurationクラスに@EnableRabbit annotationをつける必要がある。 デフォルト設定ではlistenerのinfrastructureは、rabbitListenerContainerFactoryというbeanを探す。 このfactoryは、listener containersを作るためのbean。 上記のfactory設定では、core thread sizeは3, max thread sizeが10でlistenerが実行される。

ackの送り方は、rabbitListenerContainerFactoryの設定で行う。 3.1.15 Message Listener Container Configuration

下記のコードのように書くと、annotationをつけたメソッドをRabbit listener endpointとしてBean登録してくれる。

@Component
public class MyService {

    @RabbitListener(queues = "myQueue")
    public void processOrder(String data) {
        ...
    }

}

上記の例では、myQueueからメッセージが取得可能になったら、processOrderメソッドがpayloadのdataを引数にして呼ばれる。

このanottationを使ったmessage listenerのしくみはRabbitListenerContainerFactoryによって提供される。 上記の例では、myQueueはあらかじめ定義されている必要があり、さらにexchangeにbindされている必要もあった。 Spring AMQPのver1.5.0以降では、RabbitAdminが定義されていれば、queueは自動的に宣言・bindされる。

@Component
public class MyService {

  @RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "myQueue", durable = "true"),
        exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
        key = "orderRoutingKey")
  )
  public void processOrder(String data) {
    ...
  }

  @RabbitListener(bindings = @QueueBinding(
        value = @Queue,
        exchange = @Exchange(value = "auto.exch"),
        key = "invoiceRoutingKey")
  )
  public void processInvoice(String data) {
    ...
  }
}

processOrderメソッドの例では、必要であればmyQueueは、exhangeauto.exchと一緒に自動的に宣言される。 また、invoiceRoutingKeyというroutingKeyとともにexchangeにbindされる。 複数のQueueBinfingを定義することもできる。そうすることによって、listenerに複数のqueueをlistenさせることができる。

この設定方法は、direct, fanout, topic, headersのメッセージ方式にのみ適用できる。 詳細な設定は@Beanを使った設定方法である必要がある。

ignoreDeclarationExceptionsprocessOrderメソッドに設定されている。 これは、annotationと異なる設定のexchangeに対してbindingすることを許す設定となっている。(例えば、internal exchangeなど) デフォルト設定では、すでに存在しているexchangeに対するbindの場合、設定が存在しているexchangeと一致している必要がある。

threadを使う

connectionのthread数

ConnectionFactoryでthreadは設定できる。

Spring AMQP - Configuring the Underlying Client Connection Factory

以下のような ConfigurationProperty を設定していた場合、

@Component
@Data
@ConfigurationProperties(prefix = "rabbitmq")
public class RabbitMQConfigProperty {

    @NotNull
    private ConnectionFactory connectionFactory;
    @Data
    public static class ConnectionFactory {

        @NotNull
        private Integer corePoolSize;

        @NotNull
        private Integer maxPoolSize;
    }
}

以下のように @Configuration クラスを定義することができる。 下記の例では、RabbitMQへのconnectionに使うthreadのcorePoolSize, maxPoolSizeをproperty経由で指定している。

@Configuration
public class RabbitConfig {

    @Autowired
    private RabbitMQConfigProperty property;

    @Bean
    public ThreadPoolTaskExecutor rabbitConnectionExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(property.getConnectionFactory().getCorePoolSize());
        executor.setMaxPoolSize(property.getConnectionFactory().getMaxPoolSize());

        return executor;
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(property.getHost());
        connectionFactory.setUsername(property.getUsername());
        connectionFactory.setPassword(property.getPassword());
        connectionFactory.setVirtualHost(property.getVirtualHost());
        connectionFactory.setExecutor(rabbitConnectionExecutor());

        return connectionFactory;
    }
}

TODO: 以下のasync listenerのthread数も追記する

Spring AMQP - Threading and Asynchronous Consumers

Asynchronous Consumer

3. Reference

Message Listener

非同期のメッセージ受信をするためには、message consuming callbackを生成するためのcontaner componentが必要。 非同期のmessage consuming callbackを定義するには、MessageListener interfaceを実装する。

public interface MessageListener {
    void onMessage(Message message);
}

特定のChannelインスタンスによるconsumeをしたい場合、ChannelAwareMessageListenerを使うことができる。

MessageListenerAdapter

アプリケーションのロジックとメッセージングAPIを分離しておきたい場合には、frameworkが提供するadapter実装を使うことができる。 これは Message-driven POJO のサポートと呼ばれている。 このアダプターを使えば、開発者に必要とされるのは、アダプターが呼び出すべきインスタンスの参照を与えることだけとなる。

MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
    listener.setDefaultListenerMethod("myMethod");

adapterのサブクラスを作って、getListenerMethodName()の実装を提供し、メッセージによって異なるメソッドを動的に選択することも可能である。 このメソッドには2つのパラメータがある。originalMessage,originalMessageである。後者は、変換後の結果となる。 デフォルトでは、SimpleMessageConverterが設定されている。 SimpleMessageConverterの章に、converterの詳細を書いている。

Container

containerはlifecycle componentの1種である。containerは開始と終了のメソッドを提供する。 containerの設定は、AMQP QueueとMessageListenerのインスタンスの間の橋渡しである。 containerの設定には、ConnectionFactoryとQueueの名前か、Queueインスタンスが必要で、これはListenerがメッセージをconsumeする対象となる。

以下は基本的なSimpleMessageListenerContainerを使ったContainerの設定例である。

@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(rabbitConnectionFactory());
    container.setQueueName("some.queue");
    container.setMessageListener(exampleListener());
    return container;
}

@Bean
public ConnectionFactory rabbitConnectionFactory() {
    CachingConnectionFactory connectionFactory =
        new CachingConnectionFactory("localhost");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    return connectionFactory;
}

@Bean
public MessageListener exampleListener() {
    return new MessageListener() {
        public void onMessage(Message message) {
            System.out.println("received: " + message);
        }
    };
}

要調査項目

@RabbitListenerで指定している、キュー等の設定はそのままContainerとしてBeanの設定に入れれば移行できそう。

annotation-basedの@RabbitListenerを使う方法で設定している、@Payloadを使ったメッセージの変換方法を、どう上記のcontainerで使えるか調べる必要がある。

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "${rabbitmq.bindings.posted-message.queue.name}", durable = "true"),
        exchange = @Exchange(value = "${rabbitmq.bindings.posted-message.exchange.name}", durable = "true"),
        key = ""))
@Transactional
public void consumePostedMessage(@Payload MessageDto messageDto) throws RetryableException { }

Annotation-driven listenerについて

annotation listener(@RabbitListener)はmessage listener containerを、それぞれのannotationをつけられたメソッドごとに作成する。 containerの作成にはにはRabbitListenerContainerFactoryが使われる。

デフォルトでは1つのRabbitListenerContainerFactoryを利用するが、これをAnnotationごとにしたり雄一のfactoryとして使うこともできる。 デフォルトのBean名はrabbitListenerContainerFactory。 これは@RabbitListenercontainerFactoryにより、個別にfactoryを設定することが可能。

MessageListenerContainerでのprefetch countの設定

SimpleRabbitListenerContainerFactory (Spring AMQP 1.6.3.RELEASE API)

以下のようにRabbtiListenerContainerFactoryのBean設定を @Configuration で行うことができる。 このRabbtiListenerContainerFactoryには、setPrefetchCount メソッドで、1つのリクエストでそれぞれのconsumerに対していくつのmessageを送るかを指定することができる。 スループットを上げるために、この数字をかなり大きくすることができる。 setPrefetchCountで渡すprefetchCountは、transaction sizeと同じかこれよりも大きな値である必要がある。

// ...省略

@Autowired
private RabbitMQConfigProperty property;

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
  SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  factory.setConnectionFactory(connectionFactory());
  factory.setConcurrentConsumers(property.getListenerFactory().getCuncurrentConsumerNum());
  factory.setMaxConcurrentConsumers(property.getListenerFactory().getMaxCuncurrentConsumerNum());
  
  factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
  return factory;
}

transaction sizeには、containerに1つのtransactionでいくつのmessageを処理することができるのかを指定する。 この値はprefetch countと同じか小さい数を指定するのがよい。 また、この値はAckknowkedge.AUTO時にackを送る頻度の設定にもなる。transaction sizeごとに、1つのackが送られることになる。 デフォルトは1となっている。

Dead Letter Queueについて

以下の条件を満たす場合、AMQPメッセージはdead letter exchangeにrepublishされる。

  • メッセージがbasic.reject,basic.nackされ、かつrequeue=falseだった場合
  • メッセージのTTL(time-to-live)が期限切れだった場合
  • queueのlength limitを超えていた場合

dead letter exchangeはclientにより、queueのargumentsとして指定できる。 もしくは、RabbitMQ serverのpolicyとして指定できる。 policyとclientのarguments両方にDead letter exchangeの設定が入っていた場合には、argumentsの設定が反映される。

リトライ処理の設定

Brokerの接続失敗時の復帰処理はRetryTemplate, で設定できる。

RetryTemplateを使った接続失敗時の処理定義

Beanによる例外発生時の挙動設定

3.9 Exception Handling

Spring AMQPが提供するRabbitMQクライアントなどのコンポーネントは、Spring AMQPの例外階層にしたがって例外インスタンスを作成・送出する。例外クラスのトップレベルはAmqpException

Listenerが例外送出すると、その例外はListenerExecutionFailedExceptionにラップされる。 デフォルトの挙動ではこの例外発生時にメッセージはrejectされ、キューに戻される。

ビジネスロジック上、このデフォルト挙動では都合が悪い場合もある。失敗したときに復帰処理をしなくていいようなビジネスロジックもあるだろう。 このときには、以下のような手段をとることができる:

  • listener containerのrejectRequeuedfalseに設定し、例外が発生したメッセージがキューに戻されないようにする
    • listener内で発生するすべての例外で復帰処理をしない場合はこの設定を入れればよい
    • 追加でQueue作成時にx-dead-letter-exchangeを定義することによって、例外発生時のメッセージを予め用意したキューに入れておくことが可能
      • RabbitMQ - Dead Letter Exchanges
      • x-dead-letter-exchangeが使われる条件は以下
        • basic.rejectbasic.nack状態でrequeue=falseの場合
        • メッセージのTTLが期限切れになっている場合
        • Queueのサイズ制限を超えている場合
      • アプリケーションで使うすべてのlistenerで設定適用したい場合は、org.springframework.boot.autoconfigure.amqp.RabbitPropertiesにあるように以下のような
spring:
  rabbitmq:
    listener:
      default-requeue-rejected: false
* ※ 例外発生時に指定したExchangeが存在しない場合は、例外は発生せずにただメッセージが捨てられる模様
channel.exchangeDeclare("some.exchange.name", "direct");

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);
  • listener内でAmqpRejectAndDontRequeueExceptionを送出する

    • listenerでリキューをコントロールしたい場合はこれを使うと良さそう
  • StatefulRetryOperationsInterceptorをlisterのadvice chainに追加し、独自の復帰処理を実装する

Return to top