RabbitMQ extensions

Supported extensions

The amqp gem supports many RabbitMQ extensions to AMQP 0.9.1:

Enabling RabbitMQ extensions

If you are using RabbitMQ as AMQP broker and want to use these extensions, simply replace

require “amqp”

with
require “amqp”
require “amqp/extensions/rabbitmq”

Per-queue Message Time-to-Live

Per-queue Message Time-to-Live (TTL) is a RabbitMQ extension to AMQP 0.9.1 that allows developers to control how long a message published to a queue can live before it is discarded. A message that has been in the queue for longer than the configured TTL is said to be dead. Dead messages will not be delivered to consumers and cannot be fetched using the basic.get operation (AMQP::Queue#pop).

Message TTL is specified using the x-message-ttl argument on declaration. With the amqp gem, you pass it to AMQP::Queue#initialize orAMQP::Channel#queue:

# 1000 millisecondschannel.queue("", :arguments => { "x-message-ttl" => 1000 })

When a published message is routed to multiple queues, each of the queues gets a copy of the message. If the message subsequently dies in one of the queues, it has no effect on copies of the message in other queues.

Example

The example below sets the message TTL for a new server-named queue to be 1000 milliseconds. It then publishes several messages that are routed to the queue and tries to fetch messages using the basic.get AMQP method (AMQP::Queue#pop after 0.7 and 1.5 seconds:

Learn More

See also rabbitmq.com section on Per-queue Message TTL

Publisher Confirms (Publisher Acknowledgements)

In some situations it is essential that no messages are lost. The only reliable way of ensuring this is by using confirmations. The Publisher Confirms AMQP extension was designed to solve the reliable publishing problem.

Publisher confirms are similar to message acknowledgements documented in the Working With Queues guide but involve a publisher and the AMQP broker instead of a consumer and the AMQP broker.

Public API

To use publisher confirmations, first put the channel into confirmation mode using AMQP::Channel#confirm_select:

channel.confirm_select

From this moment on, every message published on this channel will cause the channel’s publisher index (message counter) to be incremented. It is possible to access the index using AMQP::Channel#publisher_index method. To check whether the channel is in confirmation mode, use the AMQP::Channel#uses_publisher_confirmations? predicate.
To handle AMQP broker acknowledgements, define a handler using AMQP::Channel#on_ack, for example:

channel.on_ack do |basic_ack|
 puts "Received basic_ack: multiple = #{basic_ack.multiple}, delivery_tag = #{basic_ack.delivery_tag}"
end

The delivery tag will indicate the number of confirmed messages. If the multiple attribute is true, the confirmation is for all messages up to the number that the delivery tag indicates. In other words, an AMQP broker may confirm just one message or a batch of them.

Example

Learn More

See also rabbitmq.com section on Confirms aka Publisher Acknowledgements

basic.nack

The AMQP 0.9.1 specification defines the basic.reject method that allows clients to reject individual, delivered messages, instructing the broker to either discard them or requeue them. Unfortunately, basic.reject provides no support for negatively acknowledging messages in bulk.

To solve this, RabbitMQ supports the basic.nack method that provides all of the functionality of basic.reject whilst also allowing for bulk processing of messages.

Public API

When RabbitMQ extensions are loaded, the AMQP::Channel#reject method is overriden via a mixin to take one additional argument: multi (defaults to false). When the ‘multi’ argument is passed with a value of ‘true’, then the amqp gem will use the basic.nack AMQP method, instead of basic.reject, to reject multiple messages at once. Otherwise, basic.reject is used as usual.

Learn More

See also rabbitmq.com section on Confirms aka Publisher Acknowledgements

Alternate Exchanges

Alternate Exchanges is a RabbitMQ extension to AMQP 0.9.1 that allows developers to define “fallback” exchanges where unroutable messages will be sent.

Public API

To specify exchange A as an alternate exchange to exchange B, specify the ‘alternate-exchange’ argument on declaration of B:

exchange1 = channel.fanout("ops.fallback",     :auto_delete => true)
exchange2 = channel.fanout("events.collector", :auto_delete => true, :arguments => { "alternate-exchange" => "ops.fallback" })

Example

Learn More

See also rabbitmq.com section on Alternate Exchanges

Wrapping Up

With amqp gem you can use a number of RabbitMQ extensions to AMQP 0.9.1. Some are special features in the library,
while some other are used via extra declaration attributes and message properties.

Authors

This guide was written by Michael Klishin and edited by Chris Duncan

Tell us what you think!

Please take a moment to tell us what you think about this guide on Twitter or the Ruby AMQP mailing list

Let us know what was unclear or what has not been covered. Maybe you do not like the guide style or grammar or discover spelling mistakes. Reader feedback is key to making the documentation better.

If, for some reason, you cannot use the communication channels mentioned above, you can contact the author of the guides directly