RabbitMQとAMQPについての調査・使い方メモ

Sep 8, 2016 ( Feb 11, 2022 更新 )

RabbitMQのクライントの使い方

以下はJavaのクライアントの例。

ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.close();
conn.close();

AMQPのコンセプト

AMQPにはexchangesとqueuesという部分がある。

channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);

上記のコードは以下を作成する:

  • durable(継続する)、自動的に削除されないexchangeをdirect typeで作成
  • non-durableで排他的な、自動削除されるキューを生成された名前で作成

上記の関数はqueueをexchangeにrouting keyと一緒に。bindする。

この例の場合は、1つのクライアントが1つのキューを参照し、使い終わったら削除する…という使い方のときにはよい。 クライアントが複数あり、共通のキューを参照する必要があるのであれば、以下のようにキューを定義できる:

channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

上記の例の場合は以下を作成している:

  • durable, non-autodelete, exchange of direct type
  • durable, non-autodelete. non-exclusive, queue with a well-known name(queueName)

※上記の例はshort-formなので、詳細な設定ができる別のAPIもあるとのこと。

Channelsと並列処理で考慮すべきこと(thread safety)

Channelはthread間で共有してはいけない。threadごとに生成すべき。 channelsに対するいくつかの操作については並列呼び出しを安全に行えるが、いくつかは安全ではない。 channelsをthread間で共有すると、publisher confirmにも干渉してしまう。

キューのsubscription

以下はqueueに対してconsumer tagを指定して、subscription設定をする例。 Consumerインターフェースを実装したクラス(下記例でゃDefaultComsumer)をbasicComsumeに与えることにより、 subscription設定をすることができる。

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             String routingKey = envelope.getRoutingKey();
             String contentType = properties.getContentType();
             long deliveryTag = envelope.getDeliveryTag();
             // (process the message components here ...)
             channel.basicAck(deliveryTag, false);
         }
     });

この例ではautoAckパラメータをfalseにしているため、ackをhandleDeliveryを実装して送信している。 Comsumersにはその他にもoverrideできるメソッドがある。

  • handleShutdownSignal
    • channelsとconnectionsがcloseしたときに呼ばれる
  • handleConsumeOk
    • Consumerに対する他のcallbackが呼ばれる前にconsumer tagをわたす

その他handleCancelOkhandleCancelのようなメソッドを定義して、明確にあるいは暗黙的にキャンセル通知が取得できる。 また、キャンセルは以下のようにして行う。

channel.basicCancel(consumerTag);

Consumerに対するcallbackは、Connectionとは別のthreadとしてdispatchされる。Consumerは安全にConnectionChennel上のブロッキングメソッドを呼び出すことができる。例えば、queueDeclare,txCommit,basicCancel,basicPublishなど。

それぞれのChannelは自身のdispatch threadを持っている。Channelごとに1つのConsumerがいる場合、Consumerは他のComsumerを遅らせ(hold up)しない。 ただし、1つのChannelに対して複数のConsumerがいる場合は、ある処理の長いConsumerは、対象のChannelのほかのConsumerのcallbackのdispatchを遅らせてしまうかもしれない。

メッセージの受信

以下の例では、autoAck=falseにしているので、自分で処理中にackを送っている。

boolean autoAck = false;
GetResponse response = channel.basicGet(queueName, autoAck);
if (response == null) {
    // No message retrieved.
} else {
    AMQP.BasicProperties props = response.getProps();
    byte[] body = response.getBody();
    long deliveryTag = response.getEnvelope().getDeliveryTag();
    ...
    channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message
}

Shutdown

AMQPのconnectionとcunsumerにはlifecycle stateがある。 詳細は以下参照。shutdownのlistenerを登録しておくことによって、shutdown時に特定の何かのアクションがとれるようにしておくことができる。

“Overview of the AMQP client shutdown” RabbitMQ - Java Client API Guide

Message acknowledgmentについて

ConsumerはRabbitMQからメッセージを取得する。この後ConsumerがRabbitMQに対してackを返さないうちにChannel, Connection, あるいはtcp connectionがclosedになると、RabbitMQはメッセージが正しく処理されていないものとして、re-queueする。

Message durabilityについて

RabbitMQサーバが死ぬと、上記のackがあってもメッセージは失われる。 これを防ぐためには、queueとmessageの両方にdurable設定を入れる。

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

上記はtask_queueをdurableなqueueとして定義する。 ※ すでに宣言済みのqueueをdurableにすることはできない ※ queueDeclareはproducer/consumerどちらにも必要な記述である

messageをdurableにしてpublishするには、以下のように書く。

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

Fair dispatch

あるconsumerが処理中のmessageのackを返すまでに、receiveするmessageを制限するオプションがある。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

RabbitMQのURIについて

RabbitMQ - RabbitMQ URI Specification

virtual host

RabbitMQ - AMQP 0-9-1 Model Explained

HTTPのvirtual hostとは違う意味で使われている。 プロトコルにたいするentitiesのnamespace定義として使われる。 これは、userやgroupやqueueの良識をvirtual hostごとに分離して持てるようになるしくみ。

ConnectionとChannelsの生成について

RabbitMQ - AMQP 0-9-1 Model Explained

AMQP connections are typically long-lived. AMQP is an application level protocol that uses TCP for reliable delivery. AMQP connections use authentication and can be protected using TLS (SSL). When an application no longer needs to be connected to an AMQP broker, it should gracefully close the AMQP connection instead of abruptly closing the underlying TCP connection.
  • Connectionは1つのTCP connetionを保持している模様

job 実装でやったほうがよさそうなところ

  • Connetion pool的なComponentを用意し、Scheduledメソッドの中でpoolからconnectionを取り出して使う
  • @Scheduledメソッドの中では、channelsを新規生成する

Exchangesについて

Exchangeは、producerからメッセージを受け取るモジュール。 メッセージを受け取ったexchangeは、複数のqueueに対してどのようにmessageを送付するかを決定する。 messageを送付する際のstrategyは以下の種類がある:

  • direct
  • topic
  • headers
  • fanout
    • exchangeが知っているすべてのqueueに対してmessageを送信する

まとめ

  • Channel.basicConsumeでsubscribe開始
  • basicGetでメッセージを取り出す。メッセージに対する処理が終わったらackを返す

AMQP仕様メモ

概念

Producer

メッセージを送信する。 ProducerはメッセージがどのQueueで処理されるかは知らず、Exchangeに対してpublishするのみである。

Exchange

Producerから受け取ったメッセージを、Queueに送信する。

Exchangeには以下のtypeがある。

  • direct
  • topic
  • headers
  • fanout
    • Exchangeが知っているすべてのQueueにメッセージを配信する

Queue

メッセージのキュー。

Consumer側がスケールする際に、Queueの名前を指定するのは煩わしい。 クライアントで以下のような書き方をして、Queueを自動的に生成させることができる。

String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "");
prefetch

prefetch=1とすることで、メッセージが処理されackされるまで1つのconsumerに1つのメッセージしか渡さなくなる。

RabbitMQ - RabbitMQ tutorial - Work Queues

n order to defeat that we can use the basicQos method with the prefetchCount = 1 setting. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don’t dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.

Bindings

QueueがExchangeに興味が有ることをあらわしている。

BindingsにはroutingKey=binding keyという設定を持てる。routingKeyの意味は、Exchangeのtype(fanoutなど)により変化する。

以下にtypeごとの違いを記載する。

fanout

Exchangeが知っているすべてのQueueにメッセージを配信する。

direct

Exchangeのtypeがdirectだった場合は、Queueのbinding keyがメッセージのrouting keyと一致するQueueにメッセージが配信される。

direct-exchange.png (408×171)

複数のQueueが同一のbinding keyを設定していた場合には、該当するbinding keyをもつすべてのQueueにメッセージが配信される。

direct-exchange-multiple.png (398×171)

topic

direct同様に、routing keyとbinding keyが一致したQueueに対してメッセージを配信する。 ただし、routing keyはsystem.log.warningのような形式の文字列を使う。 binding keyは以下の特殊文字を使うことができる。

  • *
    • 1ワードの代替となる
  • #
    • 以上のワードの代替となる

python-five.png (424×171)

Confirmation

publisher ackというのがある。

RabbitMQ - Confirms (aka Publisher Acknowledgements)

もともとAMQP0-9-1ではchannelにtransaction(publish -> commit)を持っていたが、重いためにackの仕組みが追加された。 これはchannelにconfirmation modeを追加することによって実現される。 channelがconfirm modeの場合は、clientとbrokerはmessageをcountする。 brokerは、brokerがmessageをhandleするときに、channelにbasic.ackを送信する。これでmesasgeがconfirmされたことになる。

順序保証

RabbitMQ - Broker Semantics

1つのchannel, 1つのexchange, 1つのqueueを通り、1つの出力channelに渡ったメッセージは、送信時と同じ順序で受信されることが保証されているという。

Section 4.7 of the AMQP 0-9-1 core specification explains the conditions under which ordering is guaranteed: messages published in one channel, passing through one exchange and one queue and one outgoing channel will be received in the same order that they were sent. RabbitMQ offers stronger guarantees since release 2.7.0.

prefetch

RabbitMQ - Consumer Prefetch

“A more natural and efficient way to limit unacknowledged messages.” とのこと。

AMQPはbasic.qosというメソッドを用意している。これは、channel(あるいはconnection)において1度に読み込むunacknowledge messagesの数を制限するものである。このことを別名でprefetch countとも言う。

ただ、この場合scopeをchannelにするのは良いやり方ではない。なぜなら、1つのchannelは複数のqueueから読み込みされる可能性がある。この場合、それぞれのqueueとchannelは、messagesがprefetch countに達していないか確認する必要があり、非効率である。

たくさんの利用者がいる場合は、consumerに対してprefetch countを設定するのがより自然だ。

RabbitMQのglobal flagでの prefetch_count の設定は以下のようになる

  • trueの場合
    • shared across all consumers on the channel
    • 同一のchannelのconsumersの間でprefetch countaは共有される
  • falseの場合
    • applied separately to each new consumer on the channel
    • channelのそれぞれのconsumerごとにprefetch countが設定される

例1) 複数の独立したconsumerにprefetch countを持つ場合

以下の例では、それぞれのconsumerが1度に10件のメッセージをfetchする。

hannel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

例2) 複数のconsumerが共有のprefetch countを持つ場合

以下の例では、1つのconsumerにつき10件のprefetch countが指定されているが、同時にchannelに15件のprefetch countが指定されている。 そのため、それぞれのconsumerは設定された最大のprefetch countまでメッセージをfetchすることはない。 それぞれのconsumersの間で15件のメッセージが取得されるのみとなってしまう。

Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true);  // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

RabbitMQのクラスタリングについて

すべてのデータはnodes間でreplicatedされる。ただし、message queueは例外となる。 message queueは標準で最初に宣言された1つのnode上にしかいない。しかし、すべての他のnodeからアクセス可能である。

※ exchangeやbindingsの情報はすべてのnode上にある。

message queueはミラー機能を使って複数のnode間に作成することができる。 それぞれのミラーは1つのmasterと複数のslaveで構成される。masterが死んだら、slaveの中で一番古いものがmasterとして選択される。

queueにpublsihされたメッセージはすべてのnode上のslaveにreplicateされる。consumerは、どのnodeに接続しても、master queueに接続している。slaveは、masterがackしたメッセージをdropする。

queueのミラーリングは、avairabilityを上げるが、loadをnode間で分散はしない。(master queueに作業は集中しているため)

Spring AMQPで、ConsumerがListenしているQueueのmasterがいるnodeに接続するようにする

性能観点からは、master queueがいるbroker nodeに接続したほうがよい。 CachingConnectionFactoryは複数のbroker addressesを指定できるものの、これはfail-overのためである。 クライアントは接続失敗したら順に別のnodeに接続する…という挙動になる。

LocalizedQueueConnectionFactoryは、RabbitMQのadmin pluginが提供するREST APIを使って、queueのmasterがどのnodeにいるかを調べることができる。 そして、masterがいるnodeに接続するCachingConnectionFactoryが作成される。(あるいは、キャッシュから取得される)

接続失敗した場合には、新しいmaster nodeが検知されて、consumerはそれに接続する。 master queueの場所がわからない場合、LocalizedQueueConnectionFactoryはdefault connection factoryで設定される。これは前述したmaster queueの場所とは関係なく、通常通りクラスタに接続する。

LocalizedQueueConnectionFactoryは、RoutingConnectionFactoryであり、SimpleMessageListenerContainerはqueueの名前をlookup-keyとして利用する。

  • ※ 上記のlook-upの仕組み上、LocalizedQueueConnectionFactoryはcontainerが1つのqueueをlistenしている場合にのみ使うことができる
  • ※ RabbitMQ management pluginはそれぞれのnodeで有効になっている必要がある
  • ※ connection factoryはSimpleMessageListenerContainerのようなlong-livedな接続に使われることを想定している。RabbitTemplateを使うような短い接続は想定されていない。また、publishのようなすべてのクラスタのメンバに通知がいくような操作では利点があまりない。

以下はbean設定例だが、address, nodes, adminUrlsの順は各nodeの位置が揃っていなくてはならない。 例えば下記の例だと、host1を指すaddressはカンマ区切りの文字列中の1番目に書かれていなければいけない。e.g. host1:15672,host2:15672

@Autowired
private RabbitProperties props;

private final String[] adminUris = { "http://host1:15672", "http://host2:15672" };

private final String[] nodes = { "rabbit@host1", "rabbit@host2" };

@Bean
public ConnectionFactory defaultConnectionFactory() {
    CachingConnectionFactory cf = new CachingConnectionFactory();
    cf.setAddresses(this.props.getAddresses());
    cf.setUsername(this.props.getUsername());
    cf.setPassword(this.props.getPassword());
    cf.setVirtualHost(this.props.getVirtualHost());
    return cf;
}

@Bean
public ConnectionFactory queueAffinityCF(
        @Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
    return new LocalizedQueueConnectionFactory(defaultCF,
            StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
            this.adminUris, this.nodes,
            this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
            false, null);
}
Return to top