Producing AMQP messages from Ruby On Rails applications
As your architecture becomes more complex, you often need to perform time-consuming tasks when certain business events occur. Examples include sending emails, processing images, and interacting with external APIs. If these events may be triggered by actions that occur within your web application, you don’t want to perform these tasks synchronously within the HTTP request/response cycle: your end-users would hate you and your throughput would suffer. Instead, you want to perform these tasks asynchronously outside of the HTTP request/response cycle.
Event-driven architectures help to solve these (and other) types of problems by allowing an arbitrary number of tasks to be performed as a side effect of an event being published. They can be built on top of message-oriented middleware, which allows the publishing and consumption of events to be truly distributed, and each component to be scaled independently.
After evaluating many message brokers upon which to build our event-driven architecture, we ultimately chose RabbitMQ, as it offered the best blend of features, performance, and ease of use. In addition, RabbitMQ implements the AMQP protocol, which is emerging as the defacto standard protocol for message-oriented middleware, and allows us to (in theory) move to a different broker in the future.
Our Ruby On Rails applications needed to publish messages to the broker, and there’s two main ways you can do that in Ruby: synchronously via the Bunny gem, or asynchronously via the AMQP gem. While Bunny has an easy interface to use, its interaction with the broker is synchronous. In addition, its currently in flux and pending an overhaul, which makes depending on it somewhat risky at the moment. The AMQP gem on the other hand interacts with the broker asynchronously, and is actively maintained. However, it is built on top of EventMachine to achieve its asynchronous functionality, so using it, and more importantly testing with it, is inherently more difficult.
We ultimately decided to publish messages asynchronously using the AMQP gem. While there’s a lot of documentation out there about the AMQP gem, I couldn’t find a simple “how to publish messages with the AMQP gem within your Rails model classes” type of tutorial anywhere, so here’s how we ended up doing it.
Messages
With a polyglot architecture, we needed a message format that was easily produced/consumed by various programming languages, so JSON was the obvious choice. All messages have a metadata property which includes basic information such as the application that produced the event, the host the application was running on, the time the event was produced, and the routing key.
{"metadata":
{"host":"web3.mycompany.com",
"app":"web",
"key":"some.event",
"created":"2012-11-28T37:11:56Z"
}
...message specific properties...
}
A base Message::Base class take care of serializing this information via ActiveModel::Serializers::JSON.
module Message
class Base
include ActiveModel::Serializers::JSON
attr_reader :metadata
def initialize(routing_key=nil)
raise StandardError, "routing_key cannot be nil" if routing_key.nil?
@metadata = {:host => Socket.gethostbyname(Socket.gethostname).first,
:app => "web", # configure this...
:key => routing_key,
:created => DateTime.now.new_offset(0).to_time.utc.iso8601}
end
def routing_key
@metadata[:key]
end
def as_json(options={})
hash = {:metadata => {:host => @metadata[:host],
:app => @metadata[:app],
:key => @metadata[:key],
:created => @metadata[:created]}}
hash
end
end
end
Message::Base subclasses are created for each message type, with properties specific to that message type. An example for a hypothetical new user account created message:
module Message
module User
# Message indicating that a new user account has been created
class AccountCreated < Base
attr_reader :account_id
attr_reader :email_address
ROUTING_KEY = "user.account.created"
def initialize(account_id, email_address)
raise ArgumentError, "account_id cannot be nil" if account_id.nil?
raise ArgumentError, "email_address cannot be nil" if email_address.nil?
super ROUTING_KEY
self.account_id = account_id
self.email_address = email_address
end
def as_json(options={})
hash = super
hash[:account_id] = account_id
hash[:email_address] = email_address
hash
end
end
end
end
Connecting To The Broker
Broker configuration details for each Rails environment are stored in a config/amqp.yml file in the Rails app.
defaults: &defaults logging: false connection_timeout: 3 host: localhost port: 5672 vhost: "/mycompany" exchange: "mycompany.topic" user: username pass: password test: <<: *defaults development: <<: *defaults production: <<: *defaults host: queue.mycompany.com
The Configurable module can be mixed in to read in the configuration from config/amqp.yml for a given Rails environment.
module Amqp
module Configurable
private
def load_config
if defined?(::Rails)
config_file = ::Rails.root.join('config/amqp.yml')
if config_file.file?
YAML.load(ERB.new(config_file.read).result)
else
nil
end
end
end
end
end
Connection classes wrap an AMQP::Channel object to communicate with the broker and utilize the Configurable module above to determine the name of the exchange to produce messages to (via an AMQP::Exchange object). Our use is hard coded to rely on a Topic Exchange, but you could easily make this configurable/dynamic as well.
The Amqp::AsynchronousConnection class uses the AMQP gem under the hood to communicate with the broker asynchronously. It is passed an active AMQP::Session object, which is established externally upon server startup (more on this below).
require 'amqp'
module Amqp
class AsynchronousConnection
extend Configurable
@@setup = false
def self.setup(amqp_session, config = nil, env = ::Rails.env)
if !self.setup?
raise StandardError, "Must pass an AMQP::Session to setup" if !amqp_session.is_a?(AMQP::Session)
@@amqp_session = amqp_session
@@config = load_config(config)
@@env_config = @@config[env]
raise StandardError, "Env #{env} not found in config" if @@env_config.nil?
raise StandardError, "'exchange' key not found in config" if !@@env_config.has_key?("exchange")
@@channel = AMQP::Channel.new(@@amqp_session)
@@exchange = @@channel.topic(@@env_config["exchange"], :durable => true)
@@setup = true
end
end
# Whether the underlying connection has been set up
def self.setup?
@@setup
end
def self.teardown
if self.setup?
remove_class_variable :@@exchange
remove_class_variable :@@channel
remove_class_variable :@@env_config
remove_class_variable :@@config
remove_class_variable :@@amqp_session
@@setup = false
end
end
# Produces a message to the underlying exchange
def self.produce(message)
if self.setup?
begin
@@exchange.publish(message.to_json, :routing_key => message.routing_key, :mandatory => false, :immediate => false, :persistent => true, :content_type => "application/json")
rescue => err
::Rails.logger.error "Unexpected error producing AMQP messsage: (#{message.to_json})"
::Rails.logger.error "#{err.message}"
::Rails.logger.error err.backtrace.join("\n")
end
else
::Rails.logger.error "ERROR: Could not produce message, not connected to AMQP broker!"
end
end
end
end
The Amqp::Rails module is a helper which starts the EventMachine event loop and establishes the AMQP::Session. It also contains error handling code which will detect when the connection to the broker is lost, and attempts to re-connect every 5 seconds.
module Amqp
# Helper methods for connecting to AMQP brokers in Rails apps.
module Rails
extend Configurable
@connection_failure_handler = Proc.new do |settings|
::Rails.logger.error "Could not connect to AMQP broker! (settings: #{settings.inspect})"
::Rails.logger.error "Waiting 5 seconds to attempt to connect..."
EventMachine.add_timer(5) do
::Rails.logger.error "Trying to connect to AMQP broker..."
connect(settings)
end
end
# Starts the EventMachine reactor (if necessary) and connects to the AMQP broker.
# This class will handle connection failures and automatically attempt to re-connect
# when the connection is lost after first being established.
#
# Assumes the file config/amqp.yml exists under Rails.root.
#
# Note that this class does NOT handle authentication failures with the broker.
# Authentication failures will cause an uncaught exception and the app will not start up.
def self.async_start(on_open_callback=nil)
AMQP::Utilities::EventLoopHelper.run do
yaml = YAML.load_file(File.join(::Rails.root, "config", "amqp.yml"))
settings = yaml.fetch(::Rails.env, Hash.new).symbolize_keys
settings.merge!(:on_tcp_connection_failure => @connection_failure_handler)
settings.merge!(:on_open_callback => on_open_callback) if !on_open_callback.nil? && on_open_callback.is_a?(Proc)
EventMachine.run do
connect(settings)
end
end
end
private
def self.connect(settings)
EventMachine.next_tick do
connection = AMQP.connect(settings) do |connection|
connection.on_tcp_connection_loss do |connection, settings|
::Rails.logger.error "Connection to AMQP broker lost!"
Amqp::AsynchronousConnection.teardown
::Rails.logger.error "Waiting 5 seconds to attempt to connect..."
EventMachine.add_timer(5) do
::Rails.logger.error "Trying to connect to AMQP broker..."
connect(settings)
end
end
::Rails.logger.info "Connected to AMQP broker. (settings: #{settings.inspect})"
Amqp::AsynchronousConnection.setup(connection)
settings[:on_open_callback].call if settings.has_key?(:on_open_callback)
end
end
end
end
end
Finally, we add an initializer in config/initializers/amqp.rb which calls the async_start method in the helper.
Amqp::Rails.async_start
MessageProducer
The MessageProducer module allows us to mixin the produce method to our Model classes to produce messages to the broker using a Connection class. The produce method is an alias for produce_asynchronously, but we can easily change it to be an alias for produce_synchronously should we decide to switch to use the Bunny gem (more on this below).
module Amqp
module MessageProducer
# Produces a message to the underlying exchange using an asynchronous connection
def produce_asynchronously(message)
AsynchronousConnection.produce(message)
end
alias :produce :produce_asynchronously
end
end
Producing messages from Model classes
Finally, we include the Amqp::MessageProducer module in our model classes to produce messages to the broker when certain business events happen.
class Account < ActiveRecord::Base
include Amqp::MessageProducer
after_create :publish_account_created_message
# Publishes a message to the exchange when a new user account is created
def publish_account_created_message
produce(Message::User::AccountCreated.new(self.account_id, self.email_address))
end
end
Easily Switch Between Synchronous and Asynchronous
We structured the code such that we can easily switch to using Bunny simply by changing one alias. The Amqp::SynchronousConnection class wraps a Bunny object to communicate with the server.
require 'bunny'
module Amqp
class SynchronousConnection
extend Configurable
@@setup = false
@@connected = false
def self.setup(config = nil, env = Rails.env)
if !self.setup?
@@config = load_config(config)
@@env_config = @@config[env]
raise StandardError, "Env #{env} not found in config" if @@env_config.nil?
# symbolize the keys, which Bunny expects
@@env_config.keys.each {|key| @@env_config[(key.to_sym rescue key) || key] = @@env_config.delete(key) }
raise StandardError, "'exchange' key not found in config" if !@@env_config.has_key?(:exchange)
@@bunny = Bunny.new(@@env_config)
@@setup = true
end
handle_passenger_forking
end
# Whether the underlying connection has been set up
def self.setup?
@@setup
end
# Establish a connection to the underlying exchange
def self.connect
raise StandardError, "AMQP not setup. Call setup before calling connect" if !self.setup?
@@bunny.start
@@exchange = @@bunny.exchange(@@env_config[:exchange], :type => :topic, :durable => true)
@@connected = true
end
# Disconnect from the underlying exchange
def self.disconnect
begin
@@bunny.stop
rescue
# if this is being called because the underlying connection went bad
# calling stop will raise an error. that's ok....
ensure
@@connected = false
end
end
# Re-connects to the underlying exchange
def self.reconnect
self.disconnect
@@setup = false
@@bunny = Bunny.new(@@env_config)
@@setup = true
self.connect
end
# Whether the underlying connection has been established
def self.connected?
@@connected
end
# Produces a message to the underlying exchange
def self.produce(message)
if !self.setup? || !self.connected?
::Rails.logger.error "AMQP not setup or connected. Call setup and connect before calling produce"
else
begin
@@exchange.publish(message.to_json, :key => message.routing_key, :mandatory => false, :immediate => false, :persistent => true, :content_type => "application/json")
rescue Bunny::ServerDownError
# the connection went south, try to reconnect and try one more time
begin
self.reconnect
@@exchange.publish(message.to_json, :key => message.routing_key, :mandatory => false, :immediate => false, :persistent => true, :content_type => "application/json")
rescue => err
::Rails.logger.error "Unexpected error producing AMQP messsage: (#{message.to_json})"
::Rails.logger.error "#{err.message}"
::Rails.logger.error err.backtrace.join("\n")
end
rescue => err
::Rails.logger.error "Unexpected error producing AMQP messsage: (#{message.to_json})"
::Rails.logger.error "#{err.message}"
::Rails.logger.error err.backtrace.join("\n")
end
end
end
end
end
And we add support for this to the Amqp::MessageProducer module by adding a produce_synchronously method.
module Amqp
module MessageProducer
# Produces a message to the underlying exchange using a synchronous connection
def produce_synchronously(message)
SynchronousConnection.produce(message)
end
# Produces a message to the underlying exchange using an asynchronous connection
def produce_asynchronously(message)
AsynchronousConnection.produce(message)
end
alias :produce :produce_asynchronously
end
end
Pros and Cons
On the plus side, this approach completely abstracts away which gem we’re using to produce messages to the broker. If we decide to use another gem for some reason, we’d only have to change the internal details of the connection classes. Also, clients can decide to produce messages synchronously or asynchronously if they need to for some reason.
On the down side, the use of class variables feels sort of dirty, so there’s some room for improvement there.
Overall, this approach has worked out great for us, but as always, suggestions welcome!
Guest post on the JRuby blog
Check out my guest post “Bridging The Gap With JRuby” on the JRuby blog:
Versioning Rails 3 based RESTful APIs
While there is great debate about the “correct” way to version RESTful APIs, some patterns have emerged:
- Use HTTP headers to request a version
- Use path prefixes to request a version
- Use request parameters to request a version
These debates are largely centered around how clients request a specific version of your RESTful API. But that’s only part of the story. How do you structure your application internally to support multiple concurrent versions? At a minimum, a version of your RESTful API consists of:
- A collection of URIs with specific supported request parameters
- Serialized representations of resources (typically JSON and/or XML)
- Tests
- Documentation
My web framework of choice is Rails, and for a version of a RESTful API built with Rails, the above list translates to:
- A collection of routes
- A collection of namespaced controllers
- A collection of namespaced presenters/decorators (whatever you want to call them) for your models
- A collection of namespaced tests
- Documentation for the version
Currently the only way to version an API in Ruby/Rails is to roll your own, or use something like Grape. While Grape is an awesome gem, it’s focused on the lower level Rack framework, and isn’t really trying to solve the problem of how to structure and manage your API to support versioning.
Introducing Versionist. Versionist is a Rails plugin for versioning Rails 3 based RESTful APIs. Versionist currently supports 3 versioning strategies (HTTP header, path prefix, request parameter), and provides generators for creating new API versions, as well as new components within an existing API version. Versionist is still beta, but some features that I will be looking to add in the future include:
- Support for existing presenter/decorator libraries like Draper and ROAR.
- Support for creating a new API version by copying an existing API version as a starting point.
- Support for deprecating an API version.
As always, comments/suggestions/contributions welcome.
Adventures In Archiving With MongoDB
Most databases have them: a small handful of big tables that grow at a substantially faster rate than any of their peers (think tweets, clicks, check-ins, etc). Over time, the sheer size of these tables cause queries against them to slow down, and increase the size and time taken for backups. In some cases, not all of this data needs to be “live” so that it can be randomly accessed forever, and can be archived once some criteria has been met. I like to think of this as Eventually Irrelevant (if I may coin a term).
Many database products support partitioning features, which allow you to arrange data physically based on some criteria (typically date, range, list, or hash-based partitioning). These partitioning features allow you to drop partitions from the live database when appropriate. While MongoDB supports horizontal partitioning via sharding, it does not currently support the notion of dropping shards. Even if it did, the criteria for what documents to drop from a collection would most likely not be based on your shard key, so it’s unlikely you’d want to drop a shard anyways. You’d want to drop data from all shards. Capped Collections and the forthcoming TTL-based Capped Collections feature are not quite what we’re looking for either. Capped Collections have very strict rules (including not being able to shard them), and for both normal and TTL-based Capped Collections, as old documents are aged out they are simply dropped on the floor. What we need is more control over the dropping process, such that we can guarantee that our data has been copied to an archive database/data warehouse before being dropped from the main database.
I set out to find the most efficient way of manually archiving documents from a large collection given the tools currently available in MongoDB. For the purposes of this exercise, let’s assume we have a large collection of ad clicks which are associated with ads. We want to archive clicks which are associated with ads which expired more than 3 months ago.
MapReduce
MongoDB wants you to do bulk operations using MapReduce. Since all of the examples and documentation revolve around aggregation, I suspected I wouldn’t be able to use MapReduce as the backbone of my archiving process. After some experimenting, I found that my intuition was correct. Upon attempting to insert/remove documents within the map or reduce functions, I would quickly run into exception 10293 internal error: locks are not upgradeable errors:
Sun Sep 25 21:22:24 [conn5] update warehouse.clicks query: { _id: ObjectId('4d6bf191db0b4b2b1fad5b65') } exception 10293 internal error: locks are not upgradeable: { "opid" : 4248173, "active" : false, "waitingForLock" : false, "op" : "update", "ns" : "?", "query" : { "_id" : { "$oid" : "4d6bf191db0b4b2b1fad5b65" } }, "client" : "0.0.0.0:0", "desc" : "conn" } 0ms
Sun Sep 25 21:22:24 [conn5] Assertion: 10293:internal error: locks are not upgradeable: { "opid" : 4248174, "active" : false, "waitingForLock" : false, "op" : "update", "ns" : "?", "query" : { "_id" : { "$oid" : "4d6bf198db0b4b2b20ad5b65" } }, "client" : "0.0.0.0:0", "desc" : "conn" }
0x10008de9b 0x1002064f7 0x1002c1ec6 0x1002c4f3d 0x1002c5f7a 0x1000a8181 0x100147df4 0x1004dc68e 0x1004efc93 0x1004dc71b 0x1004dcb71 0x10049a078 0x100158efa 0x100377760 0x100389d0c 0x10034c204 0x10034d877 0x100180cc4 0x100184649 0x1002b9e89
0 mongod 0x000000010008de9b _ZN5mongo11msgassertedEiPKc + 315
1 mongod 0x00000001002064f7 _ZN5mongo10MongoMutex19_writeLockedAlreadyEv + 263
2 mongod 0x00000001002c1ec6 _ZN5mongo14receivedUpdateERNS_7MessageERNS_5CurOpE + 886
3 mongod 0x00000001002c4f3d _ZN5mongo16assembleResponseERNS_7MessageERNS_10DbResponseERKNS_8SockAddrE + 5661
4 mongod 0x00000001002c5f7a _ZN5mongo14DBDirectClient3sayERNS_7MessageE + 106
5 mongod 0x00000001000a8181 _ZN5mongo12DBClientBase6updateERKSsNS_5QueryENS_7BSONObjEbb + 273
6 mongod 0x0000000100147df4 _ZN5mongo12mongo_updateEP9JSContextP8JSObjectjPlS4_ + 660
7 mongod 0x00000001004dc68e js_Invoke + 3864
8 mongod 0x00000001004efc93 js_Interpret + 71932
9 mongod 0x00000001004dc71b js_Invoke + 4005
10 mongod 0x00000001004dcb71 js_InternalInvoke + 404
11 mongod 0x000000010049a078 JS_CallFunction + 86
12 mongod 0x0000000100158efa _ZN5mongo7SMScope6invokeEyRKNS_7BSONObjEib + 666
13 mongod 0x0000000100377760 _ZN5mongo2mr8JSMapper3mapERKNS_7BSONObjE + 96
14 mongod 0x0000000100389d0c _ZN5mongo2mr16MapReduceCommand3runERKSsRNS_7BSONObjERSsRNS_14BSONObjBuilderEb + 1740
15 mongod 0x000000010034c204 _ZN5mongo11execCommandEPNS_7CommandERNS_6ClientEiPKcRNS_7BSONObjERNS_14BSONObjBuilderEb + 628
16 mongod 0x000000010034d877 _ZN5mongo12_runCommandsEPKcRNS_7BSONObjERNS_10BufBuilderERNS_14BSONObjBuilderEbi + 2151
17 mongod 0x0000000100180cc4 _ZN5mongo11runCommandsEPKcRNS_7BSONObjERNS_5CurOpERNS_10BufBuilderERNS_14BSONObjBuilderEbi + 52
18 mongod 0x0000000100184649 _ZN5mongo8runQueryERNS_7MessageERNS_12QueryMessageERNS_5CurOpES1_ + 10585
19 mongod 0x00000001002b9e89 _ZN5mongo13receivedQueryERNS_6ClientERNS_10DbResponseERNS_7MessageE + 569
Upon killing the client which kicked off the MapReduce process, theses errors would continue to spin out of control, and I’d have to bounce the server. While MapReduce’s divide and conquer approach is ideal for what we’re trying to accomplish, unless the ability to obtain a write lock within a MapReduce function is supported, it is not a viable option for now.
db.eval()
While a server side script is certainly a viable option for writing an archiving process, it has a fundamental flaw in that it won’t work for sharded collections, which could be a non-starter for some folks.
An example archiving process using db.eval() would look something like this:
archiveClick = function archiveClick(doc) {
if (expiredAdIds.indexOf(doc.ad_id) != -1) {
archivedb = db.getMongo().getDB("archive");
archivedb.clicks.save(doc);
// before we remove the original document, make sure the archive worked
if (archivedb.getLastError() == null) {
db.clicks.remove({_id: doc._id});
} else {
throw "could not archive document with _id " + doc._id;
}
}
}
archiveClicks = function archiveClicks() {
threeMonthsAgo = new Date();
threeMonthsAgo.setMonth(threeMonthsAgo.getMonth()-3);
expiredAdIds = [];
getExpiredAds = function(ad) {expiredAdIds.push(ad._id.str);}
db.ads.find({end_date: {$lte: threeMonthsAgo}}).forEach(getExpiredAds);
db.system.js.save({_id: "expiredAdIds", value: expiredAdIds});
db.clicks.find().forEach(archiveClick);
db.system.js.remove({_id: "expiredAdIds"});
}
db.system.js.save({_id: "archiveClick", value: archiveClick});
db.system.js.save({_id: "archiveClicks", value: archiveClicks});
db.runCommand({$eval: "archiveClicks()", nolock: true});
Note the nolock: true sent to db.eval(). This ensures the mongod isn’t locked for other operations while this runs.
The Holy Grail: Define Custom archive() Functions On Collections
In order for our archiving process to work for sharded collections, we will need to add new functions to collection objects in the shell to perform the archiving. Frankly, this feels a lot more natural than db.eval().
I wanted to support two modes of archiving: immediate archiving, and a mark & sweep approach which allows you to queue documents to be archived at one point in time, and perform the actual archiving at a later time (off-peak hours).
I’ve made these functions available in my mongodb-archive project on GitHub. Download the archive.js file, and use it like so:
load("archive.js");
threeMonthsAgo = new Date();
threeMonthsAgo.setMonth(threeMonthsAgo.getMonth()-3);
expiredAdIds = [];
getExpiredAds = function(ad) {expiredAdIds.push(ad._id.str);}
db.ads.find({end_date: {$lte: threeMonthsAgo}}).forEach(getExpiredAds);
archiveConnection = new Mongo("localhost:27018");
archiveDB = archiveConnection.getDB("archive");
archiveCollection = archiveDB.getCollection("clicks");
for (var i = 0; i < expiredAdIds.length; i++) {
print("archiving clicks for ad_id: " + expiredAdIds[i]);
db.clicks.archive({"ad_id": expiredAdIds[i]}, archiveCollection);
print("");
}
The mark/sweep variant would look like:
db.clicks.queueForArchive({"ad_id": expiredAdIds[i]});
// ....some time later.....
db.clicks.archiveQueued(archiveCollection);
The biggest factors on the performance of the archiving process are really no different than the things that impact the performance of your MongoDB database overall: working set size, indexes, disk speed, and lock contention. The more indexes there are on the source collection, the longer the remove() will take. The more indexes there are on the archive collection, the longer the save() will take. The more of the data that you wish to archive is resident in memory, the faster the process will be.
On my MacBook Pro (2.3GHz i7, 8GB RAM) with a 5400 RPM SATA drive and a clicks collection containing over 11 million documents, I was able to archive ~500 documents per second with the immediate archiving approach. Example output from mongostat when this was running:
Archive DB:
insert query update delete getmore command flushes mapped vsize res locked % idx miss % qr|qw ar|aw netIn netOut conn time
0 0 1 0 0 3 0 208m 2.85g 15m 3.8 0 0|0 0|0 1k 1k 2 17:13:02
0 0 45 0 0 46 0 208m 2.85g 15m 3.4 0 0|0 0|0 61k 6k 2 17:13:03
insert query update delete getmore command flushes mapped vsize res locked % idx miss % qr|qw ar|aw netIn netOut conn time
0 0 8 0 0 9 0 208m 2.85g 15m 0.1 0 0|0 0|0 10k 2k 2 17:13:04
0 0 47 0 0 48 0 208m 2.85g 15m 3.4 0 0|0 0|0 66k 7k 2 17:13:05
0 0 0 0 0 1 1 208m 2.85g 15m 0 0 0|0 0|0 62b 1k 2 17:13:06
0 0 18 0 0 19 0 208m 2.85g 15m 2.6 0 0|0 0|0 24k 3k 2 17:13:07
0 0 79 0 0 80 0 208m 2.85g 16m 1.1 0 0|0 0|0 111k 11k 2 17:13:08
0 0 131 0 0 132 0 208m 2.85g 16m 8.8 0 0|0 0|0 181k 17k 2 17:13:09
0 0 216 0 0 217 0 208m 2.85g 16m 3.8 0 0|0 0|0 291k 27k 2 17:13:10
0 0 425 0 0 426 0 208m 2.85g 17m 10.5 0 0|0 0|0 600k 53k 2 17:13:11
0 0 490 0 0 490 0 208m 2.85g 19m 7.3 0 0|0 0|1 658k 61k 2 17:13:12
0 0 630 0 0 632 0 208m 2.85g 20m 11.3 0 0|0 0|0 844k 78k 2 17:13:13
insert query update delete getmore command flushes mapped vsize res locked % idx miss % qr|qw ar|aw netIn netOut conn time
0 0 818 0 0 818 0 208m 2.85g 22m 13.6 0 0|0 0|0 1m 101k 2 17:13:14
0 0 686 0 0 688 0 208m 2.85g 21m 9.6 0 0|0 0|0 932k 85k 2 17:13:15
0 0 0 0 0 1 0 208m 2.85g 22m 0 0 0|0 0|0 62b 1k 2 17:13:16
0 0 178 0 0 179 0 208m 2.85g 22m 1.2 0 0|0 0|0 252k 23k 2 17:13:17
0 0 974 0 0 975 0 208m 2.85g 23m 12.3 0 0|0 0|0 1m 121k 2 17:13:18
0 0 920 0 0 921 0 208m 2.85g 26m 10.1 0 0|0 0|0 1m 114k 2 17:13:19
0 0 810 0 0 811 0 208m 2.85g 26m 8.1 0 0|0 0|0 1m 100k 2 17:13:20
0 0 612 0 0 613 0 208m 2.85g 27m 8.1 0 0|0 0|0 798k 76k 2 17:13:21
0 0 0 0 0 1 0 208m 2.85g 27m 0 0 0|0 0|0 62b 1k 2 17:13:22
0 0 0 0 0 1 0 208m 2.85g 27m 0 0 0|0 0|0 62b 1k 2 17:13:23
insert query update delete getmore command flushes mapped vsize res locked % idx miss % qr|qw ar|aw netIn netOut conn time
0 0 30 0 0 31 0 208m 2.85g 27m 1.1 0 0|0 0|0 46k 5k 2 17:13:24
0 0 852 0 0 853 0 208m 2.85g 27m 14.8 0 0|0 0|0 1m 106k 2 17:13:25
0 0 768 0 0 769 0 208m 2.85g 29m 9.5 0 0|0 0|0 1m 95k 2 17:13:26
0 0 867 0 0 868 0 208m 2.85g 32m 13.4 0 0|0 0|0 1m 107k 2 17:13:27
0 0 705 0 0 706 0 208m 2.85g 31m 10.8 0 0|0 0|0 936k 88k 2 17:13:28
0 0 331 0 0 332 0 208m 2.85g 32m 3.9 0 0|0 0|0 446k 42k 2 17:13:29
0 0 0 0 0 1 0 208m 2.85g 32m 0 0 0|0 0|0 62b 1k 2 17:13:30
0 0 551 0 0 552 0 208m 2.85g 32m 6.1 0 0|0 0|0 739k 69k 2 17:13:31
0 0 728 0 0 729 0 208m 2.85g 35m 12.2 0 0|0 0|0 949k 90k 2 17:13:32
0 0 991 0 0 992 0 208m 2.85g 35m 11.7 0 0|0 0|0 1m 123k 2 17:13:33
insert query update delete getmore command flushes mapped vsize res locked % idx miss % qr|qw ar|aw netIn netOut conn time
0 0 935 0 0 935 0 208m 2.85g 38m 10.9 0 0|0 0|1 1m 116k 2 17:13:34
0 0 431 0 0 433 0 208m 2.85g 39m 5.9 0 0|0 0|0 563k 54k 2 17:13:35
0 0 0 0 0 1 0 208m 2.85g 39m 0 0 0|0 0|0 62b 1k 2 17:13:36
0 0 518 0 0 519 0 208m 2.85g 40m 5 0 0|0 0|0 693k 65k 2 17:13:37
0 0 904 0 0 905 0 208m 2.85g 40m 8.9 0 0|0 0|0 1m 112k 2 17:13:38
0 0 958 0 0 959 0 208m 2.85g 41m 14.2 0 0|0 0|0 1m 119k 2 17:13:39
0 0 899 0 0 900 0 208m 2.85g 44m 11.1 0 0|0 0|0 1m 111k 2 17:13:40
0 0 401 0 0 402 0 208m 2.85g 42m 5.9 0 0|0 0|0 540k 50k 2 17:13:41
0 0 856 0 0 857 0 208m 2.85g 44m 9.4 0 0|0 0|0 1m 106k 2 17:13:42
0 0 232 0 0 233 0 208m 2.85g 45m 2.9 0 0|0 0|0 311k 29k 1 17:13:43
Source DB:
insert query update delete getmore command flushes mapped vsize res locked % idx miss % qr|qw ar|aw netIn netOut conn time
0 2 0 12 1 13 0 56g 115g 2.82g 52.2 0 0|0 0|1 1k 822k 2 17:13:02
0 0 0 42 0 43 0 56g 115g 2.65g 88.1 0 0|0 0|1 5k 4k 2 17:13:03
0 0 0 14 0 15 1 56g 115g 1.88g 101 0 0|0 0|1 1k 2k 2 17:13:04
0 0 0 33 1 35 0 56g 115g 1.89g 53.8 16.6 0|0 1|0 4k 4k 2 17:13:05
0 0 0 0 0 1 0 56g 115g 1.9g 0 0 0|0 1|0 62b 1k 2 17:13:06
0 0 0 34 0 35 0 56g 115g 1.9g 47.3 0 0|0 0|0 4k 4m 2 17:13:07
0 0 0 91 0 92 0 56g 115g 1.89g 86.8 0 0|0 0|0 12k 8k 2 17:13:08
insert query update delete getmore command flushes mapped vsize res locked % idx miss % qr|qw ar|aw netIn netOut conn time
0 0 0 149 0 150 0 56g 115g 1.9g 82.5 0 0|0 0|0 20k 13k 2 17:13:09
0 0 0 287 0 287 0 56g 115g 1.9g 74.3 0 0|0 0|1 38k 25k 2 17:13:10
0 0 0 387 0 389 0 56g 115g 1.91g 64.6 0 0|0 0|0 52k 33k 2 17:13:11
0 0 0 580 0 581 0 56g 115g 1.93g 59 0 0|0 0|0 78k 49k 2 17:13:12
0 0 0 685 0 686 0 56g 115g 1.93g 47.9 0 0|0 0|0 93k 58k 2 17:13:13
0 0 0 729 0 730 0 56g 115g 1.95g 42.6 0 0|0 0|0 99k 61k 2 17:13:14
0 0 0 551 1 552 0 56g 115g 1.97g 34.5 0 0|0 1|0 74k 47k 2 17:13:15
0 0 0 0 0 1 0 56g 115g 1.98g 0 0 0|0 1|0 62b 1k 2 17:13:16
0 0 0 368 0 368 0 56g 115g 1.96g 20 0 0|0 0|1 50k 4m 2 17:13:17
0 0 0 1032 0 1034 0 56g 115g 2g 35.9 0 0|0 0|0 140k 87k 2 17:13:18
insert query update delete getmore command flushes mapped vsize res locked % idx miss % qr|qw ar|aw netIn netOut conn time
0 0 0 834 0 835 0 56g 115g 2.01g 42.3 0 0|0 0|0 113k 70k 2 17:13:19
0 0 0 922 0 922 0 56g 115g 2.02g 38.9 0 0|0 0|1 125k 77k 2 17:13:20
0 0 0 338 1 340 0 56g 115g 2.03g 13 0 0|0 1|0 46k 29k 2 17:13:21
0 0 0 0 0 1 0 56g 115g 2.04g 0 0 0|0 1|0 62b 1k 2 17:13:22
0 0 0 0 0 1 0 56g 115g 2.04g 0 0 0|0 1|0 62b 1k 2 17:13:23
0 0 0 185 0 186 0 56g 115g 2.02g 24.7 0 0|0 0|0 25k 4m 2 17:13:24
0 0 0 920 0 920 0 56g 115g 2.02g 29.9 0 0|0 0|1 125k 77k 2 17:13:25
0 0 0 741 0 742 0 56g 115g 2.04g 49 0 0|0 0|1 100k 62k 2 17:13:26
0 0 0 886 0 887 0 56g 115g 2.05g 33 0 0|0 0|1 120k 74k 2 17:13:27
0 0 0 683 0 684 0 56g 115g 2.08g 57.3 0 0|0 0|1 92k 58k 2 17:13:28
insert query update delete getmore command flushes mapped vsize res locked % idx miss % qr|qw ar|aw netIn netOut conn time
0 0 0 138 1 140 0 56g 115g 2.06g 14.5 0 0|0 1|0 18k 12k 2 17:13:29
0 0 0 12 0 12 0 56g 115g 2.07g 0.2 0 0|0 0|1 1k 4m 2 17:13:30
0 0 0 761 0 763 0 56g 115g 2.08g 46.5 0 0|0 0|0 103k 64k 2 17:13:31
0 0 0 762 0 762 0 56g 115g 2.09g 47 0 0|0 0|1 103k 64k 2 17:13:32
0 0 0 957 0 959 0 56g 115g 2.1g 35.3 0 0|0 0|0 130k 80k 2 17:13:33
0 0 0 905 0 905 0 56g 115g 2.13g 38.5 0 0|1 0|1 123k 76k 2 17:13:34
0 0 0 239 1 241 0 56g 115g 2.12g 14.9 0 0|0 1|0 32k 21k 2 17:13:35
0 0 0 0 0 1 0 56g 115g 2.13g 0 0 0|0 1|0 62b 1k 2 17:13:36
0 0 0 780 0 781 0 56g 115g 2.13g 42.8 0 0|0 0|0 106k 4m 2 17:13:37
0 0 0 805 0 806 0 56g 115g 2.14g 39.8 0 0|0 0|0 109k 68k 2 17:13:38
insert query update delete getmore command flushes mapped vsize res locked % idx miss % qr|qw ar|aw netIn netOut conn time
0 0 0 988 0 989 0 56g 115g 2.16g 34.8 0 0|0 0|0 134k 83k 2 17:13:39
0 0 0 953 0 953 0 56g 115g 2.17g 39.9 0 0|0 0|1 129k 80k 2 17:13:40
0 0 0 375 1 376 0 56g 115g 2.18g 13.9 0 0|1 0|1 51k 1m 2 17:13:41
0 0 0 867 0 869 0 56g 115g 2.19g 41.7 0 0|0 0|0 118k 73k 1 17:13:42
Total:
MongoDB shell version: 2.0.1 connecting to: localhost:27017/prodcopy archiving clicks for ad_id: 4d6c12497b90420373000056 archiving documents... archived 19045 documents in 40701ms.
For the queued approach with this same set up, I was able to queue ~1200 documents per second, and the archiving process performed at about the same ~500 documents per second. Example output from mongostat while this was running:
Archive DB:
insert query update delete getmore command flushes mapped vsize res locked % idx miss % qr|qw ar|aw netIn netOut conn time
0 0 0 0 0 1 0 208m 2.85g 18m 0 0 0|0 0|0 62b 1k 2 17:16:44
0 0 0 0 0 1 0 208m 2.85g 18m 0 0 0|0 0|0 62b 1k 2 17:16:45
0 0 0 0 0 1 0 208m 2.85g 18m 0 0 0|0 0|0 62b 1k 2 17:16:46
0 0 0 0 0 1 0 208m 2.85g 18m 0 0 0|0 0|0 62b 1k 2 17:16:47
0 0 0 0 0 1 0 208m 2.85g 18m 0 0 0|0 0|0 62b 1k 2 17:16:48
0 0 0 0 0 1 0 208m 2.85g 18m 0 0 0|0 0|0 62b 1k 2 17:16:49
0 0 248 0 0 249 0 208m 2.85g 17m 6 0 0|0 0|0 283k 31k 2 17:16:50
0 0 780 0 0 781 0 208m 2.85g 18m 8.2 0 0|0 0|0 883k 97k 2 17:16:51
0 0 1256 0 0 1257 0 208m 2.85g 22m 13.7 0 0|0 0|0 1m 155k 2 17:16:52
0 0 1084 0 0 1085 0 208m 2.85g 21m 10 0 0|0 0|0 1m 134k 2 17:16:53
insert query update delete getmore command flushes mapped vsize res locked % idx miss % qr|qw ar|aw netIn netOut conn time
0 0 1108 0 0 1109 0 208m 2.85g 25m 9.8 0 0|0 0|0 1m 137k 2 17:16:54
0 0 1108 0 0 1109 0 208m 2.85g 24m 9.8 0 0|0 0|0 1m 137k 2 17:16:55
0 0 1110 0 0 1111 0 208m 2.85g 27m 11 0 0|0 0|0 1m 137k 2 17:16:56
0 0 1166 0 0 1167 0 208m 2.85g 31m 11.3 0 0|0 0|0 1m 144k 2 17:16:57
0 0 1246 0 0 1247 0 208m 2.85g 31m 10.6 0 0|0 0|0 1m 154k 2 17:16:58
0 0 1094 0 0 1095 0 208m 2.85g 35m 9.6 0 0|0 0|0 1m 135k 2 17:16:59
0 0 957 0 0 958 0 208m 2.85g 33m 9.6 0 0|0 0|0 1m 119k 2 17:17:00
0 0 721 0 0 722 0 464m 3.35g 38m 8.9 0 0|0 0|0 1m 90k 2 17:17:01
0 0 243 0 0 243 0 464m 3.35g 34m 9.7 0 0|0 0|0 445k 31k 2 17:17:02
0 0 921 0 0 923 0 464m 3.35g 37m 30.1 0 0|0 0|0 1m 114k 2 17:17:03
insert query update delete getmore command flushes mapped vsize res locked % idx miss % qr|qw ar|aw netIn netOut conn time
0 0 668 0 0 669 0 464m 3.35g 38m 9.6 0 0|0 0|0 962k 83k 2 17:17:04
0 0 995 0 0 995 0 464m 3.35g 41m 21.2 0 0|0 0|1 1m 123k 2 17:17:05
0 0 467 0 0 468 0 464m 3.35g 27m 12 0 0|1 0|1 679k 58k 2 17:17:06
0 0 1 0 0 2 1 464m 3.35g 17m 85.5 0 0|1 0|1 1k 1k 2 17:17:07
0 0 337 0 0 338 0 464m 3.35g 19m 36.2 0 0|0 0|1 499k 42k 2 17:17:08
0 0 917 0 0 919 0 464m 3.35g 21m 20.4 0 0|0 0|0 1m 114k 2 17:17:09
0 0 1022 0 0 1022 0 464m 3.35g 25m 22.7 0 0|0 0|1 1m 126k 2 17:17:10
0 0 970 0 0 971 0 464m 3.35g 24m 27.4 0 0|0 0|1 1m 120k 2 17:17:11
0 0 166 0 0 168 0 464m 3.35g 25m 1.2 0 0|0 0|0 251k 21k 2 17:17:12
0 0 862 0 0 863 0 464m 3.35g 28m 23.1 0 0|0 0|0 1m 107k 2 17:17:13
insert query update delete getmore command flushes mapped vsize res locked % idx miss % qr|qw ar|aw netIn netOut conn time
0 0 888 0 0 889 0 464m 3.35g 28m 28.8 0 0|0 0|0 1m 110k 2 17:17:14
0 0 540 0 0 541 0 464m 3.35g 30m 12.2 0 0|0 0|0 802k 67k 2 17:17:15
0 0 879 0 0 880 0 464m 3.35g 31m 22.3 0 0|0 0|0 1m 109k 2 17:17:16
0 0 473 0 0 474 0 464m 3.35g 36m 17.2 0 0|0 0|0 1m 59k 1 17:17:17
Source DB:
insert query update delete getmore command flushes mapped vsize res locked % idx miss % qr|qw ar|aw netIn netOut conn time
0 0 0 0 0 1 0 56g 115g 1.86g 0 0 0|0 0|0 62b 1k 1 17:16:29
0 0 0 0 0 1 0 56g 115g 1.86g 0 0 0|0 0|0 62b 1k 1 17:16:30
1 1 1 0 1 3 0 56g 115g 1.87g 51.1 0 0|0 0|1 475b 697k 2 17:16:31
0 0 0 0 0 1 0 56g 115g 349m 129 0 0|0 0|1 62b 1k 2 17:16:32
0 0 0 0 0 1 0 56g 115g 367m 101 0 0|0 0|1 62b 1k 2 17:16:33
0 0 0 0 0 1 0 56g 115g 384m 73.4 0 0|0 0|1 62b 1k 2 17:16:34
0 0 0 0 0 1 0 56g 115g 428m 112 0 0|0 0|1 62b 1k 2 17:16:35
0 0 0 0 0 1 0 56g 115g 436m 94.8 0 0|0 0|1 62b 1k 2 17:16:36
0 0 0 0 0 1 0 56g 115g 450m 108 0 0|0 0|1 62b 1k 2 17:16:37
0 0 0 0 0 1 0 56g 115g 461m 110 0 0|0 0|1 62b 1k 2 17:16:38
insert query update delete getmore command flushes mapped vsize res locked % idx miss % qr|qw ar|aw netIn netOut conn time
0 0 0 0 0 1 0 56g 115g 480m 95.5 0 0|0 0|1 62b 1k 2 17:16:39
0 0 0 0 0 1 0 56g 115g 494m 73.5 0 0|0 0|1 62b 1k 2 17:16:40
0 0 0 0 0 1 0 56g 115g 531m 120 0 0|0 0|1 62b 1k 2 17:16:41
0 0 0 0 0 1 0 56g 115g 541m 108 0 0|0 0|1 62b 1k 2 17:16:42
0 0 0 0 0 1 0 56g 115g 564m 74.6 0 0|0 0|1 62b 1k 2 17:16:43
0 0 0 0 0 1 0 56g 115g 579m 127 0 0|0 0|1 62b 1k 2 17:16:44
0 0 0 0 0 1 0 56g 115g 605m 100 0 0|0 0|1 62b 1k 2 17:16:45
0 0 0 0 0 1 0 56g 115g 631m 98.5 0 0|0 0|1 62b 1k 2 17:16:46
0 0 0 0 0 1 0 56g 115g 657m 89.1 0 0|0 0|1 62b 1k 2 17:16:47
0 0 0 0 0 1 0 56g 115g 662m 87.8 0 0|0 0|1 62b 1k 2 17:16:48
insert query update delete getmore command flushes mapped vsize res locked % idx miss % qr|qw ar|aw netIn netOut conn time
0 0 0 0 0 1 0 56g 115g 680m 93.5 0 0|0 0|1 62b 1k 2 17:16:49
0 1 0 530 1 531 0 56g 115g 767m 81.3 0 0|0 0|1 72k 4m 2 17:16:50
0 0 0 884 0 886 0 56g 115g 721m 48 0 0|0 0|0 120k 74k 2 17:16:51
0 0 0 1079 0 1080 0 56g 115g 698m 33.6 0 0|0 0|0 146k 90k 2 17:16:52
0 0 0 1294 0 1294 0 56g 115g 720m 23.8 0 0|1 0|1 175k 108k 2 17:16:53
0 0 0 1118 1 1120 0 56g 115g 740m 31.9 0 0|0 0|0 152k 4m 2 17:16:54
0 0 0 1111 0 1112 0 56g 115g 718m 33.3 0 0|0 0|0 151k 93k 2 17:16:55
0 0 0 1097 0 1098 0 56g 115g 706m 33.1 0 0|0 0|0 149k 92k 2 17:16:56
0 0 0 1097 0 1098 0 56g 115g 701m 33.2 0 0|0 0|0 149k 92k 2 17:16:57
0 0 0 1091 1 1092 0 56g 115g 696m 35.1 0 0|0 0|0 148k 4m 2 17:16:58
insert query update delete getmore command flushes mapped vsize res locked % idx miss % qr|qw ar|aw netIn netOut conn time
0 0 0 1149 0 1150 0 56g 115g 718m 21.3 0 0|0 0|0 156k 96k 2 17:16:59
0 0 0 1091 0 1091 0 56g 115g 707m 31.2 0 0|0 0|1 148k 91k 2 17:17:00
0 0 0 468 0 470 0 56g 115g 714m 9.3 0 0|0 0|0 63k 40k 2 17:17:01
0 0 0 332 0 333 0 56g 115g 694m 27.3 0 0|0 0|0 45k 28k 2 17:17:02
0 0 0 1013 1 1014 0 56g 115g 713m 17.8 0 0|0 0|0 137k 4m 2 17:17:03
0 0 0 551 0 552 0 56g 115g 691m 39.9 0 0|0 0|0 74k 47k 2 17:17:04
0 0 0 1140 0 1141 0 56g 115g 709m 19.7 0 0|0 0|0 155k 95k 2 17:17:05
0 0 0 126 0 127 0 56g 115g 709m 2.1 0 0|0 0|0 17k 11k 2 17:17:06
0 0 0 92 0 92 0 56g 115g 710m 1.6 0 0|0 0|1 12k 8k 2 17:17:07
0 0 0 605 1 607 0 56g 115g 700m 29.1 0 0|0 0|0 82k 4m 2 17:17:08
insert query update delete getmore command flushes mapped vsize res locked % idx miss % qr|qw ar|aw netIn netOut conn time
0 0 0 814 0 814 0 56g 115g 699m 17.3 0 0|0 0|1 110k 68k 2 17:17:09
0 0 0 1036 0 1037 0 56g 115g 708m 18.4 0 0|0 0|0 140k 87k 2 17:17:10
0 0 0 820 0 821 0 56g 115g 715m 14.4 0 0|1 0|1 111k 69k 2 17:17:11
0 0 0 298 0 300 0 56g 115g 683m 24.2 0 0|0 0|0 40k 26k 2 17:17:12
0 0 0 907 1 908 0 56g 115g 699m 20.4 0 0|0 0|0 123k 4m 2 17:17:13
0 0 0 946 0 947 0 56g 115g 707m 16.4 0 0|0 0|0 128k 79k 2 17:17:14
0 0 0 374 0 375 0 56g 115g 677m 36.7 0 0|0 0|0 50k 32k 2 17:17:15
0 0 0 905 1 905 0 56g 115g 689m 16.2 0 0|0 0|0 123k 821k 2 17:17:16
0 0 0 259 0 261 0 56g 115g 690m 5.1 0 0|0 0|0 35k 22k 1 17:17:17
0 0 0 0 0 1 0 56g 115g 690m 0 0 0|0 0|0 62b 1k 1 17:17:18
Total:
MongoDB shell version: 2.0.1 connecting to: localhost:27017/prodcopy archiving clicks for ad_id: 4d6c12497b90420373000062 ensuring sparse index on field "arch" marking documents for archive... queued 22227 documents for archive in 19369ms. archiving queued documents to archive.clicks... archived 22227 queued documents to archive.clicks in 26901ms.
Upgrading the hard drive to an SSD, I was able to almost triple the performance of the archiving step, and archive ~1300 documents per second.
Obviously this is all a giant hack, and it would be nice if MongoDB supported some kind of partitioning feature so we didn’t have to do any of this manually. But until then, this is the best I could get given the tools currently available.