File List
Not logged in

The union of all files from all check-ins in directory assets/tclrmq1.4.1   [history]


tclrmq

Pure TCL RabbitMQ Library implementing AMQP 0.9.1

This library is completely asynchronous and makes no blocking calls. It relies on TclOO and requires Tcl 8.6, but has no other dependencies (other than a RabbitMQ server).

About

Developed for use within FlightAware (https://flightaware.com).

The package directory contains a Makefile for installing globally. By default the Makefile installs to /usr/local/lib, so this will need editing if an alternative directory is required.

Basic Usage

There are two primary classes required for using the library.

  1. Connection

The Connection class is used for initiating initial communication with the RabbitMQ server. It also relies on a subsidiary Login class, which is used for specifying username, password, vhost and authentication mechanism. Out of the box, this library only supports the PLAIN SASL mechanism. It can be easily extended to support an additional mechanism if required.

```tcl package require rmq

Arguments: -user -pass -vhost

All optional and shown with their defaults

set login [Login new -user "guest" -pass "guest" -vhost "/"]

Pass the login object created above to the Connection

constructor

-host and -port are shown with their default values

set conn [Connection new -host localhost -port 5672 -login $login]

Set a callback for when the connection is ready to use

which will be passed the connection object

$conn onConnected rmqconnready proc rmqconnready {conn} { puts "Connection ready!" $conn connectionClose }

Set a callback for when the connection is closed

$conn onClosed rmqconnclosed proc rmqconnclosed {conn} { puts "Connection closed!" }

Initiate the connection handshake and enter the event loop

$conn connect vwait die ```

  1. Channel

The Channel class is where most of the action happens. The vast majority of AMQP methods refer to a specific channel. After the Connection object has gone through the opening handshake and calls its onOpen callback, a Channel object can be created by passing the Connection object to the Channel class' constructor.

```tcl

Assume the following proc has been set as the Connection object's

onOpen callback

proc rmqconnready {conn} { # Create a channel object # If no channel number is specified, the # next available will be chosen set chan [Channel new $conn]

# Do something with the channel, like 
# declare an exchange
set flags [list $::rmq::EXCHANGE_DURABLE]
$chan exchangeDeclare "test" "direct" $flags

} ```

Callbacks

Using this library for anything useful requires setting callbacks for the AMQP methods needed in the client application. Most callbacks will be set on Channel objects, but the Connection object supports a few as well.

All callbacks are passed the object they were set on as the first parameter. Depending on the AMQP method or object event, additional parameters are provided as appropriate.

Connection Callbacks

Connection objects allow for the setting of the following callbacks:

  1. onConnected: called when the AMQP connection handshake finishes and is passed the Connection object

  2. onBlocked: called when the RabbitMQ server has blocked connections due to resource limitations. Callback is passed the Connection object, a boolean for whether the connection is blocked or not and a textual reason

  3. onClosed: called when the connection is closed and is passed the Connection object and a dict containing the reply code, any reply text, the class ID and the method ID. The corresponding dictionary keys are replyCode, replyText, classID and methodID respectively. An alias method onClose is also provided.

  4. onError: called when an error code has been sent to the Connection and is passed the error code and any accompanying data in the frame

  5. onFailedReconnect: called when all reconnection attempts have been exhausted

```tcl package require rmq

Arguments: username password vhost

set login [Login new -user "guest" -pass "guest" -vhost "/"]

Pass the login object created above to the Connection

constructor

set conn [Connection new -host localhost -port 5672 -login $login]

$conn onConnected rmqconnected $conn onClosed rmqclosed $conn onError rmqconnectionerror

proc rmq_connected {rmqConn} { # do useful things }

proc rmq_closed {rmqConn closeD} { # do other useful things }

proc rmq_error {rmqConn frameType frameData} { # do even more useful things } ```

Channel Callbacks

Channel objects have a few specific callbacks that can be set along with a more general callback mechanism for the majority of AMQP method calls.

Specific Callbacks

The specific callbacks provided for Channel objects mirror those available for Connection objects. They are:

  1. onOpen: called when the channel is open and ready to use, i.e., when the Channel.Open-Ok method is received from the RabbitMQ server and is passed the same arguments as the onConnected callback for Connection objects

  2. onClose: called when the channel has been fully closed, i.e., when the Channel.Close-Ok method is received from the RabbitMQ server and is passed the Channel object and the same dictionary passed to the onClosed callback for Connection objects

  3. onError: called when the channel receives an error, i.e., a frame is received for the given channel but contains an AMQP error code and is passed the same arguments as the onError callback for Connection objects

General Callback Mechanism

Other than the above callbacks, a Channel object can be supplied a callback for every method that can be sent in response to an AMQP method by using the on method of Channel objects.

These callbacks are passed the Channel object they were set on unless otherwise specified in the full method documentation found below.

When specifying the name of the AMQP method the callback will be invoked on, start with a lowercase letter and use camel case. All AMQP methods documented in the RabbitMQ 0-9-1 extended specification are available.

```tcl

Asumming a channel object by name rmqChan exists

$rmqChan on exchangeDeclareOk exchangedeclared $rmqChan on queueDeclareOk queuedeclared $rmqChan on queueBindOk queue_bound

$rmqChan exchangeDeclare "thebestexchange" "fanout" vwait exchangeDelcared

$rmqChan queueDeclare "thebestqueue" vwait queueDeclared

$rmqChan queueBind "thebestqueue" "thebestexchange" "thebestrouting_key"

proc exchange_delcared {rmqChan} { set ::exchangeDeclared 1 }

proc queue_declared {rmqChan} { set ::queueDeclared 1 }

proc queue_bound {rmqChan} { set ::queueBound 1 } ```

The Exception of Consuming

When consuming messages from a queue using either Basic.Consume or Basic.Get, the process of setting a callback and the data passed into the callback differs from every other case.

For consuming, the Channel object methods basicConsume and basicGet take the name of the callback invoked for each message delivered and then their arguments. The callbacks get passed in the Channel object, a dictionary of method data, a dictionary of frame data, and the data from the queue.

```tcl

Assuming a channel object by name rmqChan exists

$rmqChan basicConsume consumecallback "thebest_queue"

proc consume_callback {rmqChan methodD frameD data} { # Can inspect the consumer tag and dispatch on it switch [dict get $methodD consumerTag] { # useful things }

# Can get the delivery tag to ack the message
$rmqChan basicAck [dict get $methodD deliveryTag]

# Frame data includes things like the data body size
# and is likely less immediately useful but it is
# passed in because it might be necessary for a given
# application

} ```

Consuming From Multiple Queues

For a given channel, multiple queues can be consumed from and each queue can be given its own callback proc by passing in (or allowing the server to generate) a distinct consumerTag for each invocation of basicConsume. Otherwise, dispatching based on the method or frame metadata allows a single callback proc to customize the handling of messages from different queues. When the client application is not constrained in its use of channels, instantiating multiple Channel objects is a straight-forward way for one consumer to concurrently pull data from more than one queue.

Method Data

The dictionary of method data passed as the second argument to consumer callbacks contains the following items:

  • consumerTag

    The string consumer tag, either specified at the time basicConsume is called, or auto-generated by the server.

  • deliveryTag

    Integer numbering for the message being consumed. This is used for the basicAck or basicNack methods.

  • redelivered

    Boolean integer.

  • exchange

    Name of the exchange the message came from.

  • routingKey

    Routing key used for delivery of the message.

Frame Data

The dictionary of frame data passed as the third argument to consumer callbacks contains the following items:

  • classID

    AMQP defined integer for the class used for delivering the message.

  • bodySize

    Size in bytes for the data consumed from the queue.

  • properties

    Dictionary of AMQP Basic method properties, e.g., correlation-id, timestamp or content-type.

Special Arguments

Flags

For AMQP methods like queueDeclare or exchangeDeclare which take flags, these are passed in as a list of constants. All supported flags are mentioned in the documentation below detailing each Channel method.
Within the source, supported flag constants are found in constants.tcl.

Properties / Headers

For AMQP class methods which take properties and/or headers, e.g., basicConsume, basicPublish, or exchangeDeclare, the properties and headers are passed in as a Tcl dict. The library takes care of encoding them properly.

Library Documentation

All methods defined for Connection, Login, and Channel classes are detailed below. Only includes methods that are part of the public interface for each object. Any additional methods found in the source are meant to be called internally.

Connection Class

Class for connecting to a RabbitMQ server.

constructor

The constructor takes the following arguments (all optional):

  • -host

    Defaults to localhost

  • -port

    Defaults to 5672

  • -tls

    Either 0 or 1, but defaults to 0. Controls whether to connect to the RabbitMQ server using TLS. To set TLS options, e.g., if using a client cert, call the tlsOptions method before invoking connect.

  • -login

    Login object. Defaults to calling the Login constructor with no arguments.

  • -frameMax

    Maximum frame size in bytes. Defaults to the value offered by the RabbitMQ server in Connection.Tune.

  • -maxChannels

    Maximum number of channels available for this connection. Defaults to no imposed limit, which is essentially 65,535.

  • -locale

    Defaults to en_US.

  • -heartbeatSecs

    Interval in seconds for sending out heartbeat frames. A value of 0 means no heartbeats will be sent.

  • -blockedConnections

    Either 0 or 1, but defaults to 1. Controls whether to use this RabbitMQ extension.

  • -cancelNotifications

    Either 0 or 1, but deafults to 1. Controls whether to use this RabbitMQ extension.

  • -maxTimeout

    Integer seconds to wait before timing out the connection attempt to the server. Defaults to 3.

  • -autoReconnect

    Either 0 or 1, but defaults to 1. Controls whether the library attempts to reconnect to the RabbitMQ server when the initial call to Connection.connect fails or an established socket connection is closed by the server or by network conditions.

  • -maxBackoff

    Integer number of seconds past which exponential backoff, which is the reconnection strategy employed, will not go. Defaults to 64 seconds.

  • -maxReconnects

    Integer number of reconnects to attempt before giving up. Defaults to 5. A value of 0 means infinite reconnects. To disable retries, pass -autoReconnect as 0.

  • -debug

    Either 0 or 1, but defaults to 0. Controls whether or not debug statements are passed to -logCommand detailing the operations of the library.

  • -logCommand

    If the -debug option is true, the value of this argument will be passed debugging statements detailing the operations of the library. The specified -logCommand must take a string argument containing a single debugging statement. Defaults to puts stderr.

attemptReconnect

Takes no arguments. Using the -maxBackoff and -maxReconnects constructor arguments, attempts to reconnect to the server. If this cannot be done, and an onFailedReconnect callback has been set, it is invoked.

closeConnection

Takes an optional boolean argument controlling whether the onClose callback is invoked (defaults to true). Closes the connection and, if specified, calls any callback set with onClose. This is not meant to be called externally as it does not uses the AMQP protocol for closing the channel. Instead, connectionClose should be used in client applications.

connect

Takes no arguments. Actually initiates a socket connection with the RabbitMQ server. If the connection fails the onClose callback is invoked. Two timeouts can potentially occur in this method: one during the TCP handshake and one during the AMQP handshake. In both cases, the -maxTimeout variable is used. Returns 1 if a connection is fully established, or 0 otherwise.

connected?

Takes no arguments. Returns 0 or 1 depending on whether the socket connection to the server has been established and an AMQP handshake completed. It is only true once both those conditions have been satisfied. In the event that a connection fails, the getSocket method can be used to obtain and query the socket channel and determine whether the problem is network or protocol based.

getSocket

Takes no arguments. Returns the socket object for communicating with the server. This allows for more fine-grained inspection and tuning if so desired.

onBlocked

Takes the name of a callback proc which will be used for blocked connection notifications. Blocked connection notifications are always requested by this library, but the setting of a callback is optional. The callback takes the Connection object, a boolean for whether the connection is blocked (this callback is also used when the connection is no longer blocked), and a textual reason why.

onClose

Takes the name of a callback proc which will be called when the connection is closed. This includes a failed connection to the RabbitMQ server when first calling connect and a disconnection after establishing communication with the RabbitMQ server. The callback takes the Connection object and a dictionary with the keys specified in the documentation to the onClosed callback.

onClosed

Alias for onClose method.

onConnected

Takes the name of a callback proc which will be used when the AMQP handshake is finished. When this callback is invoked, the Connection object is ready to create channels and perform useful work.

onError

Takes the name of a callback proc used when an error is reported by the RabbitMQ server on the connection level. The callback proc takes the Connection object, a frame type and any extra data included in the frame.

onFailedReconnect

Takes the name of a callback proc used when the maximum number of connection attempts have been made without sucess. The callback proc takes the Connection object.

removeCallbacks

Takes an optional boolean channelsToo, which defaults to 0. Unsets all callbacks for the Connection object. If channelsToo is 1, also unsets callbacks on all of its channels.

reconnecting?

Takes no argument. Returns 0 or 1 depending on whether the Connection is in the process of attempting a reconnect.

resetRetries

Takes no arguments. Sets the count of connection retries back to 0. Useful in cases where -autoReconnect is true and more fine-grained control of the retry loop is desired. Internally the retry count is reset to 0 when the AMQP handshake completes.

tlsOptions

Used to setup the parameters for an SSL / TLS connection to the RabbitMQ server.
Supports all arguments supported by the Tcl tls package's ::tls::import:: command as specified in the Tcl TLS documentation.

If a TLS connection is desired, this method needs to be called before connect.

Login Class

constructor

The constructor takes the following arguments (all optional):

  • -user

    Username to login with. Defaults to guest

  • -pass

    Password to login with. Defaults to guest

  • -mechanism

    Authentication mechanism to use. Defaults to PLAIN

  • -vhost

    Virtual host to login to. Defaults to /

saslResponse

Takes no arguments. This method needs to overridden if an alternative mechanism is desired.

Channel Class

Most of the methods made available by this library come from the Channel class. It implements the majority of the AMQP methods.

constructor

Takes the following arguments:

  • connectionObj

    The Connection object to open a channel for. This is the only required argument.

  • channelNum

    The channel number to open. Optional. If not specified, the next available number starting from 1 will be used. Passing in an empty string or 0 is equivalent to not providing this argument, i.e., the class will pick the next available channel number for the Connection object provided.

  • shouldOpen

    A boolean argument that defaults to 1. If set to 1 the channel will open after it is created. If not, the channelOpen method must be called manually before anything can be done with the Channel object.

active?

Takes no arguments and returns 1 if the channel is active, i.e., it has been opened successfully, and 0 otherwise.

closeChannel

Not meant to be called externally. Instead, this method is used internally by the library to properly set the Channel object's state before and after calling the onClose callback.

closeConnection

Takes an optional boolean argument, callCloseCB, which defaults to 1. Closes the associated Connection object and if callCloseCB is true, any callback set with the Connection object's onClose method is invoked, otherwise it is ignored.

closing?

Takes no arguments and returns 1 if the Channel is in the process of closing and 0 otherwise.

getChannelNum

Takes no arguments, and returns the channel number.

getConnection

Takes no arguments, and returns the Connection object passed into the constructor.

open?

Alias for active?.

on

Takes an AMQP method name in camel case, starting with a lower case letter and the name of a callback proc for the method. To unset a callback, set its callback proc to the empty string or use removeCallback.

onClose

Takes the name of a callback proc to be called when the channel is closed. The callback takes the Channel object and a dictionary of data, which is specified in the section about onClose callbacks.

onClosed

Alias for onClose.

onError

Takes the name of a callback proc invoked when an error occurs on this particular Channel object. The error callback is passed the Channel object, a numeric error code as returned from the server, and any additional data passed back. Errors occur on a channel when the server returns an unexpected response but not when a disconnection occurs or the channel is closed forcefully by the server.

onOpen

Takes the name of a callback proc to be called when the channel successfully opens. Once it is open, AMQP methods can be called. The callback takes the Channel object.

onOpened

Alias for onOpen.

reconnecting?

Takes no arguments. Returns 1 if Connection is in the process of attempting a reconnect and 0 otherwise.

removeCallback

Takes the name of an AMQP method as defined on a Channel object.

removeCallbacks

Takes no arguments. Sets all callbacks to the empty string, effectively removing them.

setCallback

Takes the name of an AMQP method as defined on a Channel object (or for the on Channel method). The preferred method to use is on, but this is alternative method for setting a callback. To unset a callback, set its callback proc to the empty string or use removeCallback.

Channel AMQP Methods

The following methods are defined on Channel objects and implement the methods and classes detailed in the AMQP specification.

Channel Methods

channelClose

Takes the following arguments:

  • replyCode

    Numeric reply code for closing the channel as specified in the AMQP specification.

  • replyText

    Textual description of the reply code.

  • classID

    AMQP class ID number.

  • methodID

    AMQP method ID number.

To place a callback for the closing of a channel, use the onClose or onClosed method. The callback takes the Channel object and a dictionary of data with key names matching the arguments listed above.

channelOpen

Takes no arguments.

To place a callback for the opening of a channel use the onOpen method. The callback takes only the Channel object.

Exchange Methods

exchangeBind

Takes the following arguments:

  • dst

    Destination exchange name.

  • src

    Source exchange name.

  • rKey

    Routing key for the exchange binding.

  • noWait

    Boolean integer, which defaults to 0.

  • eArgs

    Exchange binding arguments (optional). Passed in as a dict. Defaults to an empty dict.

To set a callback for exchange to exchange bindings use the on method with exchangeBindOk as the first argument. Callback only takes the Channel object.

exchangeDeclare

Takes the following arguments:

  • eName

    Exchange name.

  • eType

    Exchange type: direct, fanout, header, topic

  • eFlags

    Optional flags. Flags supported (all in the ::rmq namespace):

    • EXCHANGE_PASSIVE
    • EXCHANGE_DURABLE
    • EXCHANGEAUTODELETE
    • EXCHANGE_INTERNAL
    • EXCHANGENOWAIT
  • eArgs

    Optional dict of exchange declare arguments.

To set a callback on an exchange declaration, use the on method with exchangeDeclareOk as the first argument. Callback only takes the Channel object.

exchangeDelete

Takes the following arguments:

  • eName

    Exchange name to delete.

  • inUse

    Optional boolean argument defaults to 0. If set to 1, will not delete an exchange with bindings on it.

  • noWait

    Optional boolean argument defaults to 0.

To set a callback on the exchange deletion, use the on method with exchangeDeleteOk as the first argument. Callback only takes the Channel object.

exchangeUnbind

Takes the same arguments as exchangeBind, with the same callback data.

Queue Methods

queueBind

Takes the following arguments:

  • qName

    Queue name.

  • eName

    Exchange name.

  • rKey

    Routing key (optional). Defaults to the empty string.

  • noWait

    Boolean integer (optional). Defaults to 0.

  • qArgs

    Queue binding arguments (optional). Needs to be passed in as a dict. Defaults to an empty dict.

To set a callback on a queue binding, use the on method with queueBindOk as the first argument. Callback only takes the Channel object.

queueDeclare

Takes the following arguments:

  • qName

    Queue name.

  • qFlags

    Optional list of queue declare flags. Supports the following flag constants (in the ::rmq namespace):

    • QUEUE_PASSIVE
    • QUEUE_DURABLE
    • QUEUE_EXCLUSIVE
    • QUEUEAUTODELETE
    • QUEUEDECLARENO_WAIT

To set a callback on a queue declare, use the on method with queueDeclareOk as the first argument. Callback takes the Channel object, the queue name (especially important for exclusive queues), message count, number of consumers on the queue.

queueDelete

Takes the following arguments:

  • qName

    Queue name.

  • flags

    Optional list of flags. Supported flags (in the ::rmq namespace):

    • QUEUEIFUNUSED
    • QUEUEIFEMPTY
    • QUEUEDELETENO_WAIT

To set a callback on a queue delete, use the on method with queueDeleteOk as the first argument. Callback takes the Channel object and a message count from the delete queue.

queuePurge

Takes the following arguments:

  • qName

    Queue name.

  • noWait

    Optional boolean argument. Defaults to 0.

To set a callback on a queue purge, use the on method with queuePurgeOk as the first argument. Callback takes the Channel object and a message count from the purged queue.

queueUnbind

Takes the following arguments:

  • qName

    Queue name.

  • eName

    Exchange name.

  • rKey

    Routing key.

  • qArgs

    Optional queue arguments. Passed in as a dict.

To set a callback on a queue unbinding, use the on method with queueUnbindOk as the first argument. Callback takes only the Channel object.

Basic Methods

basicAck

Takes the following arguments:

  • deliveryTag

    Delivery tag being acknowledged.

  • multiple

    Optional boolean, defaults to 0. If set to 1, all messages up to and including the deliveryTag argument's value.

Setting a callback on this method using the on method is for publisher confirms. The callback takes the Channel object, a delivery tag and a multiple boolean.

basicCancel

Takes the following arguments:

  • cTag

    Consumer tag.

  • noWait

    Optional boolean argument. Defaults to 0.

To set a callback on a basic cancel, use the on method with basicCancelOk as the first argument. Callback takes the Channel object and the consumer tag that was canceled.

basicConsume

Takes the following arguments:

  • callback

    Name of a callback to use for consuming messages. The callback takes the Channel object, a dict of method data, a dict of frame data and the data from the queue.

  • qName

    Queue name to consume from.

  • cTag

    Optional consumer tag.

  • cFlags

    Optional list of flags. Supported flags (all in the ::rmq namespace):

    • CONSUMENOLOCAL
    • CONSUMENOACK
    • CONSUME_EXCLUSIVE
    • CONSUMENOWAIT
  • cArgs

    Optional arguments to control consuming. Passed in as a dict. Supports all arguments specified for the basic class.

Callback is set directly from this method.

basicGet

Takes the following arguments:

  • callback

    Name of a callback proc using the same arguments as that for basicConsume.

  • qName

    Queue name to get a message from

  • noWait

    Optional boolean. Defaults to 0.

Like with basicConsume the callback for this method is set directly from the method call.

basicNack

Takes the following arguments:

  • deliveryTag

    Delivery tag for message being nack'ed.

  • nackFlags

    Optional list of flags. Supports the following (in the ::rmq namespace):

    • NACK_MULTIPLE
    • NACK_REQUEUE

Setting a callback on this method using the on method is for publisher confirms. The callback takes the Channel object, a delivery tag and a multiple boolean.

basicQos

Takes the following arguments:

  • prefetchCount

    Integer prefetch count, i.e., the number of unacknowledged messages that can be delivered to a consumer at one time.

  • globalQos

    Optional boolean which defaults to 0. If set to 1, the prefecth count is set globally for all consumers on the channel.

To set a callback on a basic QOS call, use the on method with basicQosOk as the first argument. Callback takes only the Channel object.

basicPublish

Takes the following arguments:

  • data

    The data to publish to the queue.

  • eName

    Exchange name.

  • rKey

    Routing key.

  • pFlags

    Optional list of flags. Supports the following flags (in the ::rmq namespace):

    • PUBLISH_MANDATORY
    • PUBLISH_IMMEDIATE

No callback can be set on this directly. For publisher confirms use the on method with basicAck as the first argument. That callback takes the Channel object, the delivery tag and a boolean for whether the ack is for multiple messages.

basicRecover

Same as basicRecoverAsync.

Confirm Methods

confirmSelect

Takes the following arguments:

  • noWait

    Optional boolean argument, defaults to 0.

To set a callback on a confirm select call, use the on method with confirmSelectOk as the first argument. Callback takes the Channel object.

basicRecoverAsync

Takes the following arguments:

  • reQueue

    Boolean argument. If 0, the message will be redelivered to the original recipient. If 1, an alternate recipient can get the redelivery.

To set a callback on a basic recover, use the on method with basicRecoverOk as the first argument. Callback takes the Channel object.

basicReject

Takes the following arguments:

  • deliveryTag

    Delivery tag of message being rejected by the client.

  • reQueue

    Optional boolean argument, defaults to 0. If set to 1, the rejected message will be requeued.

basicReturn

This method is not to be called directly, but to use a callback to handle returned messages, use the on method with basicReturn as the first argument. The callback takes the same arguments as the basicConsume callback.

TX Methods

txSelect

Takes no arguments.

To set a callback on a transaction select call, use the on method with txSelectOk as the first argument. Callback takes the Channel object.

txCommit

Takes no arguments.

To set a callback on a transaction commit call, use the on method with txCommitOk as the first argument. Callback takes the Channel object.

txRollback

Takes no arguments.

To set a callback on a transaction commit call, use the on method with txRollbackOk as the first argument. Callback takes the Channel object.