Spring BootのAMQPライブラリについて調べたのでまとめる。
参考資料はSpring AMQP の公式ドキュメント。
Spring AMQPとは
Spring AMQPとは、RabbitMQのクライアントと、RabbitMQの設定などに必要な機能を汎用化したモジュール…という理解をしている。 AMQPクライントはSpring的にコンポーネントベースで設定でき、DIコンテナを利用してインスタンス化することができる。
Spring AMQPで提供する抽象化
以下の抽象化を提供している。
- Message
- AMQPのメッセージを表す。
- body部とproperty部の読み取りなどの機能を提供する。
- Exchange
- Queue
- Binding
- exchangeとqueueのbindingを表す。
new Binding(someQueue, someDirectExchange, "foo.bar")
- Bindingインスタンスそれ自体ではconnectionのためのdataを保持しているためactiveなcomponentではないとのこと。インスタンスを作成した時点ではbindingをしているわけではなさそう。
- Bindingインスタンスは
AmqpAdmin
によってbindingをトリガーするために使われる。 AmqpTemplate
は実際のmessagingのために使われるInterfaceなのでこちらも参照する。
connection と resource management
AMQP Template
AMQP Temaplateは、Spring AMQPで提供する抽象化の中心部になるもの。 messageのsendとreceiveの一般的な操作を担っている。
RabbitTemplate
としてInterfaceが用意されており、その実装としてAmqpTemplate
がある。
また、SpringのMessagingとの互換性のためにRabbitMessagingTemplate
がトップレベルとして用意されている。
また、AMQP Templateにはリトライの設定をRetry Templateによって行うことができる。
message confirmation
returned messageのackでは、templateのmandatory
propertyをtrueにする必要がある。
(※returned messageは多分receiveしてきたmesasgeのことっぽい?)
また、CachingConnectionFactoryのpublisherReturns propertyがtrueになっている必要もある。(詳細: 3. Reference)
messageのreturn(多分queueからのreceiveはRabbitTemplate.ReturnCallback
をsetReturnCallback(ReturnCallback callback)
メソッドで設定することによって行う。
ReturnCallbackは下記のようなシグネチャになる。
void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey);
1つのRabbitTemplateにつき1つのcallbackとなる。 messageのconsumeに対して異なる操作をしたい場合はtempalteを複数持つ必要がある。
publisherのconfirmationについては、CachingConnectionFactory
のpublisherConfirms
propretyがtrueになっている必要がある。
confirmationはsetConfirmCallback(ConfirmCallback callback)
メソッドによって設定したcallbackで行う。
ConfirmationCallbackは以下の実装をする:
void confirm(CorrelationData correlationData, boolean ack, String cause);
CorrelationData
は、clientからoriginal dataを送ったときのobject。
ack
はackを送りたいときはtrueで、nackにしたいときはfalse。
brokerがchannelをcloseしたときに、cause
に理由が入る。
messageの送信
AMQPTemplateの以下のメソッドを使ってmessageを送信することができる。
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
以下のようにrouting keyとqueueを指定して送ることもできる。
amqpTemplate.send("marketData.topic", "quotes.nasdaq.FOO",
new Message("12.34".getBytes(), someProperties));
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));
sendでmessageだけを指定すると、routing key とqueueはデフォルトのものが使われる。 空文字("")のデフォルトキューに、すべてのメッセージが突っ込まれることを防ぐために明確にrouting keyとqueueを設定すべきだとのこと。
messageの受信
3. Referenceにmessageの受信についての説明がある。
messageの受信方法には2つがある:
- poll for a single
Message
at a time with a polling method callMessage
をメソッド呼び出しによって取得する方法
- register listeners that will receive
Message
s on-demand asynchronouslyMessage
を非同期で必要なときに取得するリスナーを登録する方法
Asynchronous Consumer
@RabbitListener
を使う方法
Annotation-driven Listener Endpointsにドキュメントがある。
annotationを使う場合は、以下のBean設定をする必要がある。
@Configuration
@EnableRabbit
public class AppConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
return factory;
}
}
@RabbitListener
を使うためには、@Configuration
クラスに@EnableRabbit
annotationをつける必要がある。
デフォルト設定ではlistenerのinfrastructureは、rabbitListenerContainerFactory
というbeanを探す。
このfactoryは、listener containersを作るためのbean。
上記のfactory設定では、core thread sizeは3, max thread sizeが10でlistenerが実行される。
ackの送り方は、rabbitListenerContainerFactory
の設定で行う。
3.1.15 Message Listener Container Configuration
下記のコードのように書くと、annotationをつけたメソッドをRabbit listener endpointとしてBean登録してくれる。
@Component
public class MyService {
@RabbitListener(queues = "myQueue")
public void processOrder(String data) {
...
}
}
上記の例では、myQueue
からメッセージが取得可能になったら、processOrder
メソッドがpayloadのdata
を引数にして呼ばれる。
このanottationを使ったmessage listenerのしくみはRabbitListenerContainerFactory
によって提供される。
上記の例では、myQueue
はあらかじめ定義されている必要があり、さらにexchangeにbindされている必要もあった。
Spring AMQPのver1.5.0以降では、RabbitAdmin
が定義されていれば、queueは自動的に宣言・bindされる。
@Component
public class MyService {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "myQueue", durable = "true"),
exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
key = "orderRoutingKey")
)
public void processOrder(String data) {
...
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "auto.exch"),
key = "invoiceRoutingKey")
)
public void processInvoice(String data) {
...
}
}
processOrder
メソッドの例では、必要であればmyQueue
は、exhangeauto.exch
と一緒に自動的に宣言される。
また、invoiceRoutingKey
というroutingKeyとともにexchangeにbindされる。
複数のQueueBinfing
を定義することもできる。そうすることによって、listenerに複数のqueueをlistenさせることができる。
この設定方法は、direct
, fanout
, topic
, headers
のメッセージ方式にのみ適用できる。
詳細な設定は@Bean
を使った設定方法である必要がある。
ignoreDeclarationExceptions
がprocessOrder
メソッドに設定されている。
これは、annotationと異なる設定のexchangeに対してbindingすることを許す設定となっている。(例えば、internal exchangeなど)
デフォルト設定では、すでに存在しているexchangeに対するbindの場合、設定が存在しているexchangeと一致している必要がある。
threadを使う
connectionのthread数
ConnectionFactory
でthreadは設定できる。
Spring AMQP - Configuring the Underlying Client Connection Factory
以下のような ConfigurationProperty
を設定していた場合、
@Component
@Data
@ConfigurationProperties(prefix = "rabbitmq")
public class RabbitMQConfigProperty {
@NotNull
private ConnectionFactory connectionFactory;
@Data
public static class ConnectionFactory {
@NotNull
private Integer corePoolSize;
@NotNull
private Integer maxPoolSize;
}
}
以下のように @Configuration
クラスを定義することができる。
下記の例では、RabbitMQへのconnectionに使うthreadのcorePoolSize, maxPoolSizeをproperty経由で指定している。
@Configuration
public class RabbitConfig {
@Autowired
private RabbitMQConfigProperty property;
@Bean
public ThreadPoolTaskExecutor rabbitConnectionExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(property.getConnectionFactory().getCorePoolSize());
executor.setMaxPoolSize(property.getConnectionFactory().getMaxPoolSize());
return executor;
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(property.getHost());
connectionFactory.setUsername(property.getUsername());
connectionFactory.setPassword(property.getPassword());
connectionFactory.setVirtualHost(property.getVirtualHost());
connectionFactory.setExecutor(rabbitConnectionExecutor());
return connectionFactory;
}
}
TODO: 以下のasync listenerのthread数も追記する
Spring AMQP - Threading and Asynchronous Consumers
Asynchronous Consumer
Message Listener
非同期のメッセージ受信をするためには、message consuming callbackを生成するためのcontaner componentが必要。
非同期のmessage consuming callbackを定義するには、MessageListener
interfaceを実装する。
public interface MessageListener {
void onMessage(Message message);
}
特定のChannelインスタンスによるconsumeをしたい場合、ChannelAwareMessageListener
を使うことができる。
MessageListenerAdapter
アプリケーションのロジックとメッセージングAPIを分離しておきたい場合には、frameworkが提供するadapter実装を使うことができる。
これは Message-driven POJO
のサポートと呼ばれている。
このアダプターを使えば、開発者に必要とされるのは、アダプターが呼び出すべきインスタンスの参照を与えることだけとなる。
MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");
adapterのサブクラスを作って、getListenerMethodName()
の実装を提供し、メッセージによって異なるメソッドを動的に選択することも可能である。
このメソッドには2つのパラメータがある。originalMessage
,originalMessage
である。後者は、変換後の結果となる。
デフォルトでは、SimpleMessageConverter
が設定されている。
SimpleMessageConverterの章に、converterの詳細を書いている。
Container
containerはlifecycle componentの1種である。containerは開始と終了のメソッドを提供する。
containerの設定は、AMQP QueueとMessageListener
のインスタンスの間の橋渡しである。
containerの設定には、ConnectionFactory
とQueueの名前か、Queueインスタンスが必要で、これはListenerがメッセージをconsumeする対象となる。
以下は基本的なSimpleMessageListenerContainer
を使ったContainerの設定例である。
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public MessageListener exampleListener() {
return new MessageListener() {
public void onMessage(Message message) {
System.out.println("received: " + message);
}
};
}
要調査項目
@RabbitListenerで指定している、キュー等の設定はそのままContainerとしてBeanの設定に入れれば移行できそう。
annotation-basedの@RabbitListenerを使う方法で設定している、@Payloadを使ったメッセージの変換方法を、どう上記のcontainerで使えるか調べる必要がある。
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${rabbitmq.bindings.posted-message.queue.name}", durable = "true"),
exchange = @Exchange(value = "${rabbitmq.bindings.posted-message.exchange.name}", durable = "true"),
key = ""))
@Transactional
public void consumePostedMessage(@Payload MessageDto messageDto) throws RetryableException { }
Annotation-driven listenerについて
annotation listener(@RabbitListener)はmessage listener containerを、それぞれのannotationをつけられたメソッドごとに作成する。
containerの作成にはにはRabbitListenerContainerFactory
が使われる。
デフォルトでは1つのRabbitListenerContainerFactory
を利用するが、これをAnnotationごとにしたり雄一のfactoryとして使うこともできる。
デフォルトのBean名はrabbitListenerContainerFactory
。
これは@RabbitListener
のcontainerFactory
により、個別にfactoryを設定することが可能。
MessageListenerContainerでのprefetch countの設定
SimpleRabbitListenerContainerFactory (Spring AMQP 1.6.3.RELEASE API)
以下のようにRabbtiListenerContainerFactoryのBean設定を @Configuration
で行うことができる。
このRabbtiListenerContainerFactoryには、setPrefetchCount
メソッドで、1つのリクエストでそれぞれのconsumerに対していくつのmessageを送るかを指定することができる。
スループットを上げるために、この数字をかなり大きくすることができる。
setPrefetchCount
で渡すprefetchCountは、transaction sizeと同じかこれよりも大きな値である必要がある。
// ...省略
@Autowired
private RabbitMQConfigProperty property;
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(property.getListenerFactory().getCuncurrentConsumerNum());
factory.setMaxConcurrentConsumers(property.getListenerFactory().getMaxCuncurrentConsumerNum());
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}
※transaction sizeには、containerに1つのtransactionでいくつのmessageを処理することができるのかを指定する。 この値はprefetch countと同じか小さい数を指定するのがよい。 また、この値はAckknowkedge.AUTO時にackを送る頻度の設定にもなる。transaction sizeごとに、1つのackが送られることになる。 デフォルトは1となっている。
Dead Letter Queueについて
以下の条件を満たす場合、AMQPメッセージはdead letter exchangeにrepublishされる。
- メッセージが
basic.reject
,basic.nack
され、かつrequeue=false
だった場合 - メッセージのTTL(time-to-live)が期限切れだった場合
- queueのlength limitを超えていた場合
dead letter exchangeはclientにより、queueのargumentsとして指定できる。 もしくは、RabbitMQ serverのpolicyとして指定できる。 policyとclientのarguments両方にDead letter exchangeの設定が入っていた場合には、argumentsの設定が反映される。
リトライ処理の設定
Brokerの接続失敗時の復帰処理はRetryTemplate
, で設定できる。
RetryTemplateを使った接続失敗時の処理定義
Beanによる例外発生時の挙動設定
Spring AMQPが提供するRabbitMQクライアントなどのコンポーネントは、Spring AMQPの例外階層にしたがって例外インスタンスを作成・送出する。例外クラスのトップレベルはAmqpException
。
Listenerが例外送出すると、その例外はListenerExecutionFailedException
にラップされる。
デフォルトの挙動ではこの例外発生時にメッセージはrejectされ、キューに戻される。
ビジネスロジック上、このデフォルト挙動では都合が悪い場合もある。失敗したときに復帰処理をしなくていいようなビジネスロジックもあるだろう。 このときには、以下のような手段をとることができる:
- listener containerの
rejectRequeued
をfalse
に設定し、例外が発生したメッセージがキューに戻されないようにする- listener内で発生するすべての例外で復帰処理をしない場合はこの設定を入れればよい
- 追加でQueue作成時に
x-dead-letter-exchange
を定義することによって、例外発生時のメッセージを予め用意したキューに入れておくことが可能- RabbitMQ - Dead Letter Exchanges
x-dead-letter-exchange
が使われる条件は以下basic.reject
かbasic.nack
状態でrequeue=falseの場合- メッセージのTTLが期限切れになっている場合
- Queueのサイズ制限を超えている場合
- アプリケーションで使うすべてのlistenerで設定適用したい場合は、
org.springframework.boot.autoconfigure.amqp.RabbitProperties
にあるように以下のような
spring:
rabbitmq:
listener:
default-requeue-rejected: false
* ※ 例外発生時に指定したExchangeが存在しない場合は、例外は発生せずにただメッセージが捨てられる模様
channel.exchangeDeclare("some.exchange.name", "direct");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);
-
listener内で
AmqpRejectAndDontRequeueException
を送出する- listenerでリキューをコントロールしたい場合はこれを使うと良さそう
-
StatefulRetryOperationsInterceptor
をlisterのadvice chainに追加し、独自の復帰処理を実装する