Producing AMQP messages from Ruby On Rails applications

As your architecture becomes more complex, you often need to perform time-consuming tasks when certain business events occur. Examples include sending emails, processing images, and interacting with external APIs. If these events may be triggered by actions that occur within your web application, you don’t want to perform these tasks synchronously within the HTTP request/response cycle: your end-users would hate you and your throughput would suffer. Instead, you want to perform these tasks asynchronously outside of the HTTP request/response cycle.

Event-driven architectures help to solve these (and other) types of problems by allowing an arbitrary number of tasks to be performed as a side effect of an event being published. They can be built on top of message-oriented middleware, which allows the publishing and consumption of events to be truly distributed, and each component to be scaled independently.

After evaluating many message brokers upon which to build our event-driven architecture, we ultimately chose RabbitMQ, as it offered the best blend of features, performance, and ease of use. In addition, RabbitMQ implements the AMQP protocol, which is emerging as the defacto standard protocol for message-oriented middleware, and allows us to (in theory) move to a different broker in the future.

Our Ruby On Rails applications needed to publish messages to the broker, and there’s two main ways you can do that in Ruby: synchronously via the Bunny gem, or asynchronously via the AMQP gem. While Bunny has an easy interface to use, its interaction with the broker is synchronous. In addition, its currently in flux and pending an overhaul, which makes depending on it somewhat risky at the moment. The AMQP gem on the other hand interacts with the broker asynchronously, and is actively maintained. However, it is built on top of EventMachine to achieve its asynchronous functionality, so using it, and more importantly testing with it, is inherently more difficult.

We ultimately decided to publish messages asynchronously using the AMQP gem. While there’s a lot of documentation out there about the AMQP gem, I couldn’t find a simple “how to publish messages with the AMQP gem within your Rails model classes” type of tutorial anywhere, so here’s how we ended up doing it.

Messages

With a polyglot architecture, we needed a message format that was easily produced/consumed by various programming languages, so JSON was the obvious choice. All messages have a metadata property which includes basic information such as the application that produced the event, the host the application was running on, the time the event was produced, and the routing key.

{"metadata":
  {"host":"web3.mycompany.com",
   "app":"web",
   "key":"some.event",
   "created":"2012-11-28T37:11:56Z"
  }
  ...message specific properties...
}

A base Message::Base class take care of serializing this information via ActiveModel::Serializers::JSON.

module Message
  class Base
    include ActiveModel::Serializers::JSON

    attr_reader :metadata

    def initialize(routing_key=nil)
      raise StandardError, "routing_key cannot be nil" if routing_key.nil?
      @metadata = {:host => Socket.gethostbyname(Socket.gethostname).first,
                   :app => "web", # configure this...
                   :key => routing_key,
                   :created => DateTime.now.new_offset(0).to_time.utc.iso8601}
    end

    def routing_key
      @metadata[:key]
    end

    def as_json(options={})
      hash = {:metadata => {:host => @metadata[:host],
                            :app => @metadata[:app],
                            :key => @metadata[:key],
                            :created => @metadata[:created]}}
      hash
    end
  end
end

Message::Base subclasses are created for each message type, with properties specific to that message type. An example for a hypothetical new user account created message:

module Message
  module User
    # Message indicating that a new user account has been created
    class AccountCreated < Base

      attr_reader :account_id
      attr_reader :email_address

      ROUTING_KEY = "user.account.created"

      def initialize(account_id, email_address)
        raise ArgumentError, "account_id cannot be nil" if account_id.nil?
        raise ArgumentError, "email_address cannot be nil" if email_address.nil?
        super ROUTING_KEY
        self.account_id = account_id
        self.email_address = email_address
      end

      def as_json(options={})
        hash = super
        hash[:account_id] = account_id
        hash[:email_address] = email_address
        hash
      end
    end
  end
end

Connecting To The Broker

Broker configuration details for each Rails environment are stored in a config/amqp.yml file in the Rails app.

defaults: &defaults
  logging: false
  connection_timeout: 3
  host: localhost
  port: 5672
  vhost: "/mycompany"
  exchange: "mycompany.topic"
  user: username
  pass: password

test:
  <<: *defaults

development:
  <<: *defaults

production:
  <<: *defaults
  host: queue.mycompany.com

The Configurable module can be mixed in to read in the configuration from config/amqp.yml for a given Rails environment.

module Amqp
  module Configurable

    private

    def load_config
      if defined?(::Rails)
        config_file = ::Rails.root.join('config/amqp.yml')
        if config_file.file?
          YAML.load(ERB.new(config_file.read).result)
        else
          nil
        end
      end
    end
  end
end

Connection classes wrap an AMQP::Channel object to communicate with the broker and utilize the Configurable module above to determine the name of the exchange to produce messages to (via an AMQP::Exchange object). Our use is hard coded to rely on a Topic Exchange, but you could easily make this configurable/dynamic as well.

The Amqp::AsynchronousConnection class uses the AMQP gem under the hood to communicate with the broker asynchronously. It is passed an active AMQP::Session object, which is established externally upon server startup (more on this below).

require 'amqp'

module Amqp
  class AsynchronousConnection
    extend Configurable

    @@setup = false

    def self.setup(amqp_session, config = nil, env = ::Rails.env)
      if !self.setup?
        raise StandardError, "Must pass an AMQP::Session to setup" if !amqp_session.is_a?(AMQP::Session)
        @@amqp_session = amqp_session
        @@config = load_config(config)
        @@env_config = @@config[env]
        raise StandardError, "Env #{env} not found in config" if @@env_config.nil?
        raise StandardError, "'exchange' key not found in config" if !@@env_config.has_key?("exchange")
        @@channel  = AMQP::Channel.new(@@amqp_session)
        @@exchange = @@channel.topic(@@env_config["exchange"], :durable => true)
        @@setup = true
      end
    end

    # Whether the underlying connection has been set up
    def self.setup?
      @@setup
    end

    def self.teardown
      if self.setup?
        remove_class_variable :@@exchange
        remove_class_variable :@@channel
        remove_class_variable :@@env_config
        remove_class_variable :@@config
        remove_class_variable :@@amqp_session
        @@setup = false
      end
    end

    # Produces a message to the underlying exchange
    def self.produce(message)
      if self.setup?
        begin
          @@exchange.publish(message.to_json, :routing_key => message.routing_key, :mandatory => false, :immediate => false, :persistent => true, :content_type => "application/json")
        rescue => err
          ::Rails.logger.error "Unexpected error producing AMQP messsage: (#{message.to_json})"
          ::Rails.logger.error "#{err.message}"
          ::Rails.logger.error err.backtrace.join("\n")
        end
      else
        ::Rails.logger.error "ERROR: Could not produce message, not connected to AMQP broker!"
      end
    end
  end
end

The Amqp::Rails module is a helper which starts the EventMachine event loop and establishes the AMQP::Session. It also contains error handling code which will detect when the connection to the broker is lost, and attempts to re-connect every 5 seconds.

module Amqp
  # Helper methods for connecting to AMQP brokers in Rails apps.
  module Rails
    extend Configurable

    @connection_failure_handler = Proc.new do |settings|
      ::Rails.logger.error "Could not connect to AMQP broker! (settings: #{settings.inspect})"
      ::Rails.logger.error "Waiting 5 seconds to attempt to connect..."
      EventMachine.add_timer(5) do
        ::Rails.logger.error "Trying to connect to AMQP broker..."
        connect(settings)
      end
    end

    # Starts the EventMachine reactor (if necessary) and connects to the AMQP broker.
    # This class will handle connection failures and automatically attempt to re-connect
    # when the connection is lost after first being established.
    #
    # Assumes the file config/amqp.yml exists under Rails.root.
    #
    # Note that this class does NOT handle authentication failures with the broker.
    # Authentication failures will cause an uncaught exception and the app will not start up.
    def self.async_start(on_open_callback=nil)
      AMQP::Utilities::EventLoopHelper.run do
        yaml = YAML.load_file(File.join(::Rails.root, "config", "amqp.yml"))
        settings = yaml.fetch(::Rails.env, Hash.new).symbolize_keys
        settings.merge!(:on_tcp_connection_failure => @connection_failure_handler)
        settings.merge!(:on_open_callback => on_open_callback) if !on_open_callback.nil? && on_open_callback.is_a?(Proc)
        EventMachine.run do
          connect(settings)
        end
      end
    end


    private

    def self.connect(settings)
      EventMachine.next_tick do
        connection = AMQP.connect(settings) do |connection|
          connection.on_tcp_connection_loss do |connection, settings|
            ::Rails.logger.error "Connection to AMQP broker lost!"
            Amqp::AsynchronousConnection.teardown
            ::Rails.logger.error "Waiting 5 seconds to attempt to connect..."
            EventMachine.add_timer(5) do
              ::Rails.logger.error "Trying to connect to AMQP broker..."
              connect(settings)
            end
          end

          ::Rails.logger.info "Connected to AMQP broker. (settings: #{settings.inspect})"
          Amqp::AsynchronousConnection.setup(connection)
          settings[:on_open_callback].call if settings.has_key?(:on_open_callback)
        end
      end
    end
  end
end

Finally, we add an initializer in config/initializers/amqp.rb which calls the async_start method in the helper.

Amqp::Rails.async_start

MessageProducer

The MessageProducer module allows us to mixin the produce method to our Model classes to produce messages to the broker using a Connection class. The produce method is an alias for produce_asynchronously, but we can easily change it to be an alias for produce_synchronously should we decide to switch to use the Bunny gem (more on this below).

module Amqp
  module MessageProducer
    # Produces a message to the underlying exchange using an asynchronous connection
    def produce_asynchronously(message)
      AsynchronousConnection.produce(message)
    end
    alias :produce :produce_asynchronously
  end
end

Producing messages from Model classes

Finally, we include the Amqp::MessageProducer module in our model classes to produce messages to the broker when certain business events happen.

class Account < ActiveRecord::Base
  include Amqp::MessageProducer

  after_create :publish_account_created_message

  # Publishes a message to the exchange when a new user account is created
  def publish_account_created_message
    produce(Message::User::AccountCreated.new(self.account_id, self.email_address))
  end
end

Easily Switch Between Synchronous and Asynchronous

We structured the code such that we can easily switch to using Bunny simply by changing one alias. The Amqp::SynchronousConnection class wraps a Bunny object to communicate with the server.

require 'bunny'

module Amqp
  class SynchronousConnection
    extend Configurable

    @@setup = false
    @@connected = false

    def self.setup(config = nil, env = Rails.env)
      if !self.setup?
        @@config = load_config(config)
        @@env_config = @@config[env]
        raise StandardError, "Env #{env} not found in config" if @@env_config.nil?
        # symbolize the keys, which Bunny expects
        @@env_config.keys.each {|key| @@env_config[(key.to_sym rescue key) || key] = @@env_config.delete(key) }
        raise StandardError, "'exchange' key not found in config" if !@@env_config.has_key?(:exchange)
        @@bunny = Bunny.new(@@env_config)
        @@setup = true
      end
      handle_passenger_forking
    end

    # Whether the underlying connection has been set up
    def self.setup?
      @@setup
    end

    # Establish a connection to the underlying exchange
    def self.connect
      raise StandardError, "AMQP not setup. Call setup before calling connect" if !self.setup?
      @@bunny.start
      @@exchange = @@bunny.exchange(@@env_config[:exchange], :type => :topic, :durable => true)
      @@connected = true
    end

    # Disconnect from the underlying exchange
    def self.disconnect
      begin
        @@bunny.stop
      rescue
        # if this is being called because the underlying connection went bad
        # calling stop will raise an error. that's ok....
      ensure
        @@connected = false
      end
    end

    # Re-connects to the underlying exchange
    def self.reconnect
      self.disconnect
      @@setup = false
      @@bunny = Bunny.new(@@env_config)
      @@setup = true
      self.connect
    end

    # Whether the underlying connection has been established
    def self.connected?
      @@connected
    end

    # Produces a message to the underlying exchange
    def self.produce(message)
      if !self.setup? || !self.connected?
        ::Rails.logger.error "AMQP not setup or connected. Call setup and connect before calling produce"
      else
        begin
          @@exchange.publish(message.to_json, :key => message.routing_key, :mandatory => false, :immediate => false, :persistent => true, :content_type => "application/json")
        rescue Bunny::ServerDownError
          # the connection went south, try to reconnect and try one more time
          begin
            self.reconnect
            @@exchange.publish(message.to_json, :key => message.routing_key, :mandatory => false, :immediate => false, :persistent => true, :content_type => "application/json")
          rescue => err
            ::Rails.logger.error "Unexpected error producing AMQP messsage: (#{message.to_json})"
            ::Rails.logger.error "#{err.message}"
            ::Rails.logger.error err.backtrace.join("\n")
          end
        rescue => err
          ::Rails.logger.error "Unexpected error producing AMQP messsage: (#{message.to_json})"
          ::Rails.logger.error "#{err.message}"
          ::Rails.logger.error err.backtrace.join("\n")
        end
      end
    end
  end
end

And we add support for this to the Amqp::MessageProducer module by adding a produce_synchronously method.

module Amqp
  module MessageProducer
    # Produces a message to the underlying exchange using a synchronous connection
    def produce_synchronously(message)
      SynchronousConnection.produce(message)
    end

    # Produces a message to the underlying exchange using an asynchronous connection
    def produce_asynchronously(message)
      AsynchronousConnection.produce(message)
    end
    alias :produce :produce_asynchronously
  end
end

Pros and Cons

On the plus side, this approach completely abstracts away which gem we’re using to produce messages to the broker. If we decide to use another gem for some reason, we’d only have to change the internal details of the connection classes. Also, clients can decide to produce messages synchronously or asynchronously if they need to for some reason.

On the down side, the use of class variables feels sort of dirty, so there’s some room for improvement there.

Overall, this approach has worked out great for us, but as always, suggestions welcome!

blog comments powered by Disqus