RabbitMQのクライントの使い方
以下はJavaのクライアントの例。
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.close();
conn.close();
AMQPのコンセプト
AMQPにはexchangesとqueuesという部分がある。
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);
上記のコードは以下を作成する:
- durable(継続する)、自動的に削除されないexchangeを
direct
typeで作成 - non-durableで排他的な、自動削除されるキューを生成された名前で作成
上記の関数はqueueをexchangeにrouting keyと一緒に。bindする。
この例の場合は、1つのクライアントが1つのキューを参照し、使い終わったら削除する…という使い方のときにはよい。 クライアントが複数あり、共通のキューを参照する必要があるのであれば、以下のようにキューを定義できる:
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
上記の例の場合は以下を作成している:
- durable, non-autodelete, exchange of direct type
- durable, non-autodelete. non-exclusive, queue with a well-known name(
queueName
)
※上記の例はshort-formなので、詳細な設定ができる別のAPIもあるとのこと。
Channelsと並列処理で考慮すべきこと(thread safety)
Channel
はthread間で共有してはいけない。threadごとに生成すべき。
channelsに対するいくつかの操作については並列呼び出しを安全に行えるが、いくつかは安全ではない。
channelsをthread間で共有すると、publisher confirmにも干渉してしまう。
キューのsubscription
以下はqueueに対してconsumer tagを指定して、subscription設定をする例。 Consumerインターフェースを実装したクラス(下記例でゃDefaultComsumer)をbasicComsumeに与えることにより、 subscription設定をすることができる。
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});
この例ではautoAckパラメータをfalseにしているため、ackをhandleDelivery
を実装して送信している。
Comsumersにはその他にもoverrideできるメソッドがある。
handleShutdownSignal
- channelsとconnectionsがcloseしたときに呼ばれる
handleConsumeOk
- Consumerに対する他のcallbackが呼ばれる前にconsumer tagをわたす
その他handleCancelOk
やhandleCancel
のようなメソッドを定義して、明確にあるいは暗黙的にキャンセル通知が取得できる。
また、キャンセルは以下のようにして行う。
channel.basicCancel(consumerTag);
Consumer
に対するcallbackは、Connection
とは別のthreadとしてdispatchされる。Consumer
は安全にConnection
やChennel
上のブロッキングメソッドを呼び出すことができる。例えば、queueDeclare
,txCommit
,basicCancel
,basicPublish
など。
それぞれのChannel
は自身のdispatch threadを持っている。Channel
ごとに1つのConsumer
がいる場合、Consumer
は他のComsumer
を遅らせ(hold up)しない。
ただし、1つのChannel
に対して複数のConsumer
がいる場合は、ある処理の長いConsumer
は、対象のChannel
のほかのConsumer
のcallbackのdispatchを遅らせてしまうかもしれない。
メッセージの受信
以下の例では、autoAck=false
にしているので、自分で処理中にackを送っている。
boolean autoAck = false;
GetResponse response = channel.basicGet(queueName, autoAck);
if (response == null) {
// No message retrieved.
} else {
AMQP.BasicProperties props = response.getProps();
byte[] body = response.getBody();
long deliveryTag = response.getEnvelope().getDeliveryTag();
...
channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message
}
Shutdown
AMQPのconnectionとcunsumerにはlifecycle stateがある。 詳細は以下参照。shutdownのlistenerを登録しておくことによって、shutdown時に特定の何かのアクションがとれるようにしておくことができる。
“Overview of the AMQP client shutdown” RabbitMQ - Java Client API Guide
Message acknowledgmentについて
ConsumerはRabbitMQからメッセージを取得する。この後ConsumerがRabbitMQに対してackを返さないうちにChannel, Connection, あるいはtcp connectionがclosedになると、RabbitMQはメッセージが正しく処理されていないものとして、re-queueする。
Message durabilityについて
RabbitMQサーバが死ぬと、上記のackがあってもメッセージは失われる。
これを防ぐためには、queueとmessageの両方にdurable
設定を入れる。
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
上記はtask_queue
をdurableなqueueとして定義する。
※ すでに宣言済みのqueueをdurableにすることはできない
※ queueDeclareはproducer/consumerどちらにも必要な記述である
messageをdurableにしてpublishするには、以下のように書く。
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
Fair dispatch
あるconsumerが処理中のmessageのackを返すまでに、receiveするmessageを制限するオプションがある。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
RabbitMQのURIについて
RabbitMQ - RabbitMQ URI Specification
virtual host
RabbitMQ - AMQP 0-9-1 Model Explained
HTTPのvirtual hostとは違う意味で使われている。 プロトコルにたいするentitiesのnamespace定義として使われる。 これは、userやgroupやqueueの良識をvirtual hostごとに分離して持てるようになるしくみ。
ConnectionとChannelsの生成について
RabbitMQ - AMQP 0-9-1 Model Explained
AMQP connections are typically long-lived. AMQP is an application level protocol that uses TCP for reliable delivery. AMQP connections use authentication and can be protected using TLS (SSL). When an application no longer needs to be connected to an AMQP broker, it should gracefully close the AMQP connection instead of abruptly closing the underlying TCP connection.
- Connectionは1つのTCP connetionを保持している模様
job 実装でやったほうがよさそうなところ
- Connetion pool的なComponentを用意し、Scheduledメソッドの中でpoolからconnectionを取り出して使う
- @Scheduledメソッドの中では、channelsを新規生成する
Exchangesについて
Exchangeは、producerからメッセージを受け取るモジュール。 メッセージを受け取ったexchangeは、複数のqueueに対してどのようにmessageを送付するかを決定する。 messageを送付する際のstrategyは以下の種類がある:
- direct
- topic
- headers
- fanout
- exchangeが知っているすべてのqueueに対してmessageを送信する
まとめ
Channel.basicConsume
でsubscribe開始basicGet
でメッセージを取り出す。メッセージに対する処理が終わったらackを返す
AMQP仕様メモ
概念
Producer
メッセージを送信する。 ProducerはメッセージがどのQueueで処理されるかは知らず、Exchangeに対してpublishするのみである。
Exchange
Producerから受け取ったメッセージを、Queueに送信する。
Exchangeには以下のtypeがある。
- direct
- topic
- headers
- fanout
- Exchangeが知っているすべてのQueueにメッセージを配信する
Queue
メッセージのキュー。
Consumer側がスケールする際に、Queueの名前を指定するのは煩わしい。 クライアントで以下のような書き方をして、Queueを自動的に生成させることができる。
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "");
prefetch
prefetch=1とすることで、メッセージが処理されackされるまで1つのconsumerに1つのメッセージしか渡さなくなる。
RabbitMQ - RabbitMQ tutorial - Work Queues
n order to defeat that we can use the basicQos method with the prefetchCount = 1 setting. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don’t dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.
Bindings
QueueがExchangeに興味が有ることをあらわしている。
BindingsにはroutingKey=binding keyという設定を持てる。routingKeyの意味は、Exchangeのtype(fanoutなど)により変化する。
以下にtypeごとの違いを記載する。
fanout
Exchangeが知っているすべてのQueueにメッセージを配信する。
direct
Exchangeのtypeがdirectだった場合は、Queueのbinding keyがメッセージのrouting keyと一致するQueueにメッセージが配信される。
複数のQueueが同一のbinding keyを設定していた場合には、該当するbinding keyをもつすべてのQueueにメッセージが配信される。
topic
direct同様に、routing keyとbinding keyが一致したQueueに対してメッセージを配信する。
ただし、routing keyはsystem.log.warning
のような形式の文字列を使う。
binding keyは以下の特殊文字を使うことができる。
*
- 1ワードの代替となる
#
- 以上のワードの代替となる
Confirmation
publisher ackというのがある。
RabbitMQ - Confirms (aka Publisher Acknowledgements)
もともとAMQP0-9-1ではchannelにtransaction(publish -> commit)を持っていたが、重いためにackの仕組みが追加された。 これはchannelにconfirmation modeを追加することによって実現される。 channelがconfirm modeの場合は、clientとbrokerはmessageをcountする。 brokerは、brokerがmessageをhandleするときに、channelにbasic.ackを送信する。これでmesasgeがconfirmされたことになる。
順序保証
1つのchannel, 1つのexchange, 1つのqueueを通り、1つの出力channelに渡ったメッセージは、送信時と同じ順序で受信されることが保証されているという。
Section 4.7 of the AMQP 0-9-1 core specification explains the conditions under which ordering is guaranteed: messages published in one channel, passing through one exchange and one queue and one outgoing channel will be received in the same order that they were sent. RabbitMQ offers stronger guarantees since release 2.7.0.
prefetch
“A more natural and efficient way to limit unacknowledged messages.” とのこと。
AMQPはbasic.qos
というメソッドを用意している。これは、channel(あるいはconnection)において1度に読み込むunacknowledge messagesの数を制限するものである。このことを別名でprefetch countとも言う。
ただ、この場合scopeをchannelにするのは良いやり方ではない。なぜなら、1つのchannelは複数のqueueから読み込みされる可能性がある。この場合、それぞれのqueueとchannelは、messagesがprefetch countに達していないか確認する必要があり、非効率である。
たくさんの利用者がいる場合は、consumerに対してprefetch countを設定するのがより自然だ。
RabbitMQのglobal flagでの prefetch_count
の設定は以下のようになる
- trueの場合
shared across all consumers on the channel
- 同一のchannelのconsumersの間でprefetch countaは共有される
- falseの場合
applied separately to each new consumer on the channel
- channelのそれぞれのconsumerごとにprefetch countが設定される
例1) 複数の独立したconsumerにprefetch countを持つ場合
以下の例では、それぞれのconsumerが1度に10件のメッセージをfetchする。
hannel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
例2) 複数のconsumerが共有のprefetch countを持つ場合
以下の例では、1つのconsumerにつき10件のprefetch countが指定されているが、同時にchannelに15件のprefetch countが指定されている。 そのため、それぞれのconsumerは設定された最大のprefetch countまでメッセージをfetchすることはない。 それぞれのconsumersの間で15件のメッセージが取得されるのみとなってしまう。
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true); // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
RabbitMQのクラスタリングについて
すべてのデータはnodes間でreplicatedされる。ただし、message queueは例外となる。 message queueは標準で最初に宣言された1つのnode上にしかいない。しかし、すべての他のnodeからアクセス可能である。
※ exchangeやbindingsの情報はすべてのnode上にある。
message queueはミラー機能を使って複数のnode間に作成することができる。 それぞれのミラーは1つのmasterと複数のslaveで構成される。masterが死んだら、slaveの中で一番古いものがmasterとして選択される。
queueにpublsihされたメッセージはすべてのnode上のslaveにreplicateされる。consumerは、どのnodeに接続しても、master queueに接続している。slaveは、masterがackしたメッセージをdropする。
queueのミラーリングは、avairabilityを上げるが、loadをnode間で分散はしない。(master queueに作業は集中しているため)
Spring AMQPで、ConsumerがListenしているQueueのmasterがいるnodeに接続するようにする
性能観点からは、master queueがいるbroker nodeに接続したほうがよい。
CachingConnectionFactory
は複数のbroker addressesを指定できるものの、これはfail-overのためである。
クライアントは接続失敗したら順に別のnodeに接続する…という挙動になる。
LocalizedQueueConnectionFactory
は、RabbitMQのadmin pluginが提供するREST APIを使って、queueのmasterがどのnodeにいるかを調べることができる。
そして、masterがいるnodeに接続するCachingConnectionFactory
が作成される。(あるいは、キャッシュから取得される)
接続失敗した場合には、新しいmaster nodeが検知されて、consumerはそれに接続する。
master queueの場所がわからない場合、LocalizedQueueConnectionFactory
はdefault connection factoryで設定される。これは前述したmaster queueの場所とは関係なく、通常通りクラスタに接続する。
LocalizedQueueConnectionFactory
は、RoutingConnectionFactory
であり、SimpleMessageListenerContainer
はqueueの名前をlookup-keyとして利用する。
- ※ 上記のlook-upの仕組み上、
LocalizedQueueConnectionFactory
はcontainerが1つのqueueをlistenしている場合にのみ使うことができる - ※ RabbitMQ management pluginはそれぞれのnodeで有効になっている必要がある
- ※ connection factoryは
SimpleMessageListenerContainer
のようなlong-livedな接続に使われることを想定している。RabbitTemplate
を使うような短い接続は想定されていない。また、publishのようなすべてのクラスタのメンバに通知がいくような操作では利点があまりない。
以下はbean設定例だが、address
, nodes
, adminUrls
の順は各nodeの位置が揃っていなくてはならない。
例えば下記の例だと、host1を指すaddress
はカンマ区切りの文字列中の1番目に書かれていなければいけない。e.g. host1:15672,host2:15672
@Autowired
private RabbitProperties props;
private final String[] adminUris = { "http://host1:15672", "http://host2:15672" };
private final String[] nodes = { "rabbit@host1", "rabbit@host2" };
@Bean
public ConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.props.getAddresses());
cf.setUsername(this.props.getUsername());
cf.setPassword(this.props.getPassword());
cf.setVirtualHost(this.props.getVirtualHost());
return cf;
}
@Bean
public ConnectionFactory queueAffinityCF(
@Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
return new LocalizedQueueConnectionFactory(defaultCF,
StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
this.adminUris, this.nodes,
this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
false, null);
}