langohr.basic

Functions that cover basic.* protocol methods: publishing and consumption
of messages, acknowledgements.

Relevant guides:

* http://clojurerabbitmq.info/articles/queues.html
* http://clojurerabbitmq.info/articles/exchanges.html

ack

(ack channel delivery-tag)(ack channel delivery-tag multiple)
Acknowledges one or more messages using basic.ack AMQP 0.9.1 method

add-return-listener

(add-return-listener channel f)
Adds return listener to the given channel

cancel

(cancel channel consumer-tag)
Cancels a consumer (subscription) using basic.cancel AMQP 0.9.1 method

consume

(consume ch queue consumer)(consume ch queue consumer {:keys [consumer-tag auto-ack exclusive arguments no-local], :or {consumer-tag "", auto-ack false, exclusive false, no-local false}})
Adds new consumer to a queue using basic.consume AMQP 0.9.1 method.

Called with default parameters, starts non-nolocal, non-exclusive consumer with explicit acknowledgement and server-generated consumer tag.

^String queue - the name of the queue
^Consumer consumer - callback to receive notifications and messages from a queue by subscription. For more information about consumers, check out langohr.consumers ns.

Options:

  ^String :consumer-tag: a unique consumer (subscription) identifier.
                         Omit the option or pass an empty string to make RabbitMQ generate one for you.
  ^Boolean :auto-ack (default false): true if the server should consider messages acknowledged once delivered,
                                      false if server should expect manual acknowledgements.
  ^Boolean :exclusive (default false): true if this is an exclusive consumer
                                       (no other consumer can consume given queue)

get

(get channel queue)(get channel queue auto-ack)
Fetches a message from a queue using basic.get AMQP 0.9.1 method

nack

(nack channel delivery-tag multiple requeue)
Negative acknowledgement of one or more messages using basic.nack AMQP 0.9.1 methods (a RabbitMQ-specific extension)

publish

(publish ch exchange routing-key payload)(publish channel exchange routing-key payload {:keys [mandatory content-type content-encoding headers persistent priority correlation-id reply-to expiration message-id timestamp type user-id app-id cluster-id], :or {mandatory false}})
Publishes a message using basic.publish AMQP 0.9.1 method.

This method publishes a message to a specific exchange. The message will be routed to queues as defined by
the exchange configuration and distributed to any active consumers when the transaction, if any, is committed.

^String :exchange: name of the exchange to publish to. Can be an empty string, which means default exchange.
^String :routing-key: the routing key for the message. Used for routing messages depending on exchange configuration.

Payload can be anything the clojurewerkz.support.bytes/ByteSource protocol is extended for, Langohr ships with
an implementation for byte arrays and strings.

Options:
^Boolean :mandatory (default false): specifies reaction of server if the message can't be routed to a queue.


Basic properties:

  ^String :content content-type: MIME Content type
  ^String :content-encoding: MIME Content encoding
  ^Map :headers: headers that will be passed to subscribers, given in Map format.
  ^Integer :persistent: should this message be persisted to disk?
  ^Integer :priority: message priority, number from 0 to 9
  ^String :correlation-id: application correlation identifier. Useful for cases when it's required to match request with the response.
                           Usually a unique value.
  ^String :reply-to: name of the reply queue.
  ^String :expiration: how long a message is valid
  ^String :message-id: message identifier
  ^Date :timestamp: timestamp associated with this message
  ^String :type: message type, can be in any format, e.g. search.requests.index
  ^String :user-id: user ID. RabbitMQ will validate this against the active connection user
  ^String :app-id: publishing application ID

Example:

    (lhb/publish channel exchange queue payload {:priority 8 :message-id msg-id :content-type content-type :headers { "key" "value" }})

qos

(qos channel prefetch-count)(qos channel prefetch-size prefetch-count global)
Sets channel or connection prefetch level using basic.qos AMQP 0.9.1 method

recover

(recover channel)(recover channel requeue)
Notifies RabbitMQ that it needs to redeliver unacknowledged messages using basic.recover AMQP 0.9.1 method

reject

(reject channel delivery-tag)(reject channel delivery-tag requeue)
Rejects (and, optionally, requeues) a messages using basic.reject AMQP 0.9.1 method

return-listener

(return-listener f)
Creates new return listener. Usually used in order to be notified of failed deliveries when basic-publish is called with :mandatory or :immediate flags set, but
message couldn't be delivered.

If the client has not configured a return listener for a particular channel, then the associated returned message will be silently dropped.

f: a handler function that accepts reply-code, reply-text, exchange, routing-key, properties and body as arguments.


Example:

   (let [f (langohr.basic/return-listener (fn [reply-code reply-text exchange routing-key properties body]
                                            (println reply-code reply-text exchange routing-key properties body)))]
     (.addReturnListener ch f))