Producing AMQP messages from Ruby On Rails applications

As your architecture becomes more complex, you often need to perform time-consuming tasks when certain business events occur. Examples include sending emails, processing images, and interacting with external APIs. If these events may be triggered by actions that occur within your web application, you don’t want to perform these tasks synchronously within the HTTP request/response cycle: your end-users would hate you and your throughput would suffer. Instead, you want to perform these tasks asynchronously outside of the HTTP request/response cycle.

Event-driven architectures help to solve these (and other) types of problems by allowing an arbitrary number of tasks to be performed as a side effect of an event being published. They can be built on top of message-oriented middleware, which allows the publishing and consumption of events to be truly distributed, and each component to be scaled independently.

After evaluating many message brokers upon which to build our event-driven architecture, we ultimately chose RabbitMQ, as it offered the best blend of features, performance, and ease of use. In addition, RabbitMQ implements the AMQP protocol, which is emerging as the defacto standard protocol for message-oriented middleware, and allows us to (in theory) move to a different broker in the future.

Our Ruby On Rails applications needed to publish messages to the broker, and there’s two main ways you can do that in Ruby: synchronously via the Bunny gem, or asynchronously via the AMQP gem. While Bunny has an easy interface to use, its interaction with the broker is synchronous. In addition, its currently in flux and pending an overhaul, which makes depending on it somewhat risky at the moment. The AMQP gem on the other hand interacts with the broker asynchronously, and is actively maintained. However, it is built on top of EventMachine to achieve its asynchronous functionality, so using it, and more importantly testing with it, is inherently more difficult.

We ultimately decided to publish messages asynchronously using the AMQP gem. While there’s a lot of documentation out there about the AMQP gem, I couldn’t find a simple “how to publish messages with the AMQP gem within your Rails model classes” type of tutorial anywhere, so here’s how we ended up doing it.

Messages

With a polyglot architecture, we needed a message format that was easily produced/consumed by various programming languages, so JSON was the obvious choice. All messages have a metadata property which includes basic information such as the application that produced the event, the host the application was running on, the time the event was produced, and the routing key.

{"metadata":
  {"host":"web3.mycompany.com",
   "app":"web",
   "key":"some.event",
   "created":"2012-11-28T37:11:56Z"
  }
  ...message specific properties...
}

A base Message::Base class take care of serializing this information via ActiveModel::Serializers::JSON.

module Message
  class Base
    include ActiveModel::Serializers::JSON

    attr_reader :metadata

    def initialize(routing_key=nil)
      raise StandardError, "routing_key cannot be nil" if routing_key.nil?
      @metadata = {:host => Socket.gethostbyname(Socket.gethostname).first,
                   :app => "web", # configure this...
                   :key => routing_key,
                   :created => DateTime.now.new_offset(0).to_time.utc.iso8601}
    end

    def routing_key
      @metadata[:key]
    end

    def as_json(options={})
      hash = {:metadata => {:host => @metadata[:host],
                            :app => @metadata[:app],
                            :key => @metadata[:key],
                            :created => @metadata[:created]}}
      hash
    end
  end
end

Message::Base subclasses are created for each message type, with properties specific to that message type. An example for a hypothetical new user account created message:

module Message
  module User
    # Message indicating that a new user account has been created
    class AccountCreated < Base

      attr_reader :account_id
      attr_reader :email_address

      ROUTING_KEY = "user.account.created"

      def initialize(account_id, email_address)
        raise ArgumentError, "account_id cannot be nil" if account_id.nil?
        raise ArgumentError, "email_address cannot be nil" if email_address.nil?
        super ROUTING_KEY
        self.account_id = account_id
        self.email_address = email_address
      end

      def as_json(options={})
        hash = super
        hash[:account_id] = account_id
        hash[:email_address] = email_address
        hash
      end
    end
  end
end

Connecting To The Broker

Broker configuration details for each Rails environment are stored in a config/amqp.yml file in the Rails app.

defaults: &defaults
  logging: false
  connection_timeout: 3
  host: localhost
  port: 5672
  vhost: "/mycompany"
  exchange: "mycompany.topic"
  user: username
  pass: password

test:
  <<: *defaults

development:
  <<: *defaults

production:
  <<: *defaults
  host: queue.mycompany.com

The Configurable module can be mixed in to read in the configuration from config/amqp.yml for a given Rails environment.

module Amqp
  module Configurable

    private

    def load_config
      if defined?(::Rails)
        config_file = ::Rails.root.join('config/amqp.yml')
        if config_file.file?
          YAML.load(ERB.new(config_file.read).result)
        else
          nil
        end
      end
    end
  end
end

Connection classes wrap an AMQP::Channel object to communicate with the broker and utilize the Configurable module above to determine the name of the exchange to produce messages to (via an AMQP::Exchange object). Our use is hard coded to rely on a Topic Exchange, but you could easily make this configurable/dynamic as well.

The Amqp::AsynchronousConnection class uses the AMQP gem under the hood to communicate with the broker asynchronously. It is passed an active AMQP::Session object, which is established externally upon server startup (more on this below).

require 'amqp'

module Amqp
  class AsynchronousConnection
    extend Configurable

    @@setup = false

    def self.setup(amqp_session, config = nil, env = ::Rails.env)
      if !self.setup?
        raise StandardError, "Must pass an AMQP::Session to setup" if !amqp_session.is_a?(AMQP::Session)
        @@amqp_session = amqp_session
        @@config = load_config(config)
        @@env_config = @@config[env]
        raise StandardError, "Env #{env} not found in config" if @@env_config.nil?
        raise StandardError, "'exchange' key not found in config" if !@@env_config.has_key?("exchange")
        @@channel  = AMQP::Channel.new(@@amqp_session)
        @@exchange = @@channel.topic(@@env_config["exchange"], :durable => true)
        @@setup = true
      end
    end

    # Whether the underlying connection has been set up
    def self.setup?
      @@setup
    end

    def self.teardown
      if self.setup?
        remove_class_variable :@@exchange
        remove_class_variable :@@channel
        remove_class_variable :@@env_config
        remove_class_variable :@@config
        remove_class_variable :@@amqp_session
        @@setup = false
      end
    end

    # Produces a message to the underlying exchange
    def self.produce(message)
      if self.setup?
        begin
          @@exchange.publish(message.to_json, :routing_key => message.routing_key, :mandatory => false, :immediate => false, :persistent => true, :content_type => "application/json")
        rescue => err
          ::Rails.logger.error "Unexpected error producing AMQP messsage: (#{message.to_json})"
          ::Rails.logger.error "#{err.message}"
          ::Rails.logger.error err.backtrace.join("\n")
        end
      else
        ::Rails.logger.error "ERROR: Could not produce message, not connected to AMQP broker!"
      end
    end
  end
end

The Amqp::Rails module is a helper which starts the EventMachine event loop and establishes the AMQP::Session. It also contains error handling code which will detect when the connection to the broker is lost, and attempts to re-connect every 5 seconds.

module Amqp
  # Helper methods for connecting to AMQP brokers in Rails apps.
  module Rails
    extend Configurable

    @connection_failure_handler = Proc.new do |settings|
      ::Rails.logger.error "Could not connect to AMQP broker! (settings: #{settings.inspect})"
      ::Rails.logger.error "Waiting 5 seconds to attempt to connect..."
      EventMachine.add_timer(5) do
        ::Rails.logger.error "Trying to connect to AMQP broker..."
        connect(settings)
      end
    end

    # Starts the EventMachine reactor (if necessary) and connects to the AMQP broker.
    # This class will handle connection failures and automatically attempt to re-connect
    # when the connection is lost after first being established.
    #
    # Assumes the file config/amqp.yml exists under Rails.root.
    #
    # Note that this class does NOT handle authentication failures with the broker.
    # Authentication failures will cause an uncaught exception and the app will not start up.
    def self.async_start(on_open_callback=nil)
      AMQP::Utilities::EventLoopHelper.run do
        yaml = YAML.load_file(File.join(::Rails.root, "config", "amqp.yml"))
        settings = yaml.fetch(::Rails.env, Hash.new).symbolize_keys
        settings.merge!(:on_tcp_connection_failure => @connection_failure_handler)
        settings.merge!(:on_open_callback => on_open_callback) if !on_open_callback.nil? && on_open_callback.is_a?(Proc)
        EventMachine.run do
          connect(settings)
        end
      end
    end


    private

    def self.connect(settings)
      EventMachine.next_tick do
        connection = AMQP.connect(settings) do |connection|
          connection.on_tcp_connection_loss do |connection, settings|
            ::Rails.logger.error "Connection to AMQP broker lost!"
            Amqp::AsynchronousConnection.teardown
            ::Rails.logger.error "Waiting 5 seconds to attempt to connect..."
            EventMachine.add_timer(5) do
              ::Rails.logger.error "Trying to connect to AMQP broker..."
              connect(settings)
            end
          end

          ::Rails.logger.info "Connected to AMQP broker. (settings: #{settings.inspect})"
          Amqp::AsynchronousConnection.setup(connection)
          settings[:on_open_callback].call if settings.has_key?(:on_open_callback)
        end
      end
    end
  end
end

Finally, we add an initializer in config/initializers/amqp.rb which calls the async_start method in the helper.

Amqp::Rails.async_start

MessageProducer

The MessageProducer module allows us to mixin the produce method to our Model classes to produce messages to the broker using a Connection class. The produce method is an alias for produce_asynchronously, but we can easily change it to be an alias for produce_synchronously should we decide to switch to use the Bunny gem (more on this below).

module Amqp
  module MessageProducer
    # Produces a message to the underlying exchange using an asynchronous connection
    def produce_asynchronously(message)
      AsynchronousConnection.produce(message)
    end
    alias :produce :produce_asynchronously
  end
end

Producing messages from Model classes

Finally, we include the Amqp::MessageProducer module in our model classes to produce messages to the broker when certain business events happen.

class Account < ActiveRecord::Base
  include Amqp::MessageProducer

  after_create :publish_account_created_message

  # Publishes a message to the exchange when a new user account is created
  def publish_account_created_message
    produce(Message::User::AccountCreated.new(self.account_id, self.email_address))
  end
end

Easily Switch Between Synchronous and Asynchronous

We structured the code such that we can easily switch to using Bunny simply by changing one alias. The Amqp::SynchronousConnection class wraps a Bunny object to communicate with the server.

require 'bunny'

module Amqp
  class SynchronousConnection
    extend Configurable

    @@setup = false
    @@connected = false

    def self.setup(config = nil, env = Rails.env)
      if !self.setup?
        @@config = load_config(config)
        @@env_config = @@config[env]
        raise StandardError, "Env #{env} not found in config" if @@env_config.nil?
        # symbolize the keys, which Bunny expects
        @@env_config.keys.each {|key| @@env_config[(key.to_sym rescue key) || key] = @@env_config.delete(key) }
        raise StandardError, "'exchange' key not found in config" if !@@env_config.has_key?(:exchange)
        @@bunny = Bunny.new(@@env_config)
        @@setup = true
      end
      handle_passenger_forking
    end

    # Whether the underlying connection has been set up
    def self.setup?
      @@setup
    end

    # Establish a connection to the underlying exchange
    def self.connect
      raise StandardError, "AMQP not setup. Call setup before calling connect" if !self.setup?
      @@bunny.start
      @@exchange = @@bunny.exchange(@@env_config[:exchange], :type => :topic, :durable => true)
      @@connected = true
    end

    # Disconnect from the underlying exchange
    def self.disconnect
      begin
        @@bunny.stop
      rescue
        # if this is being called because the underlying connection went bad
        # calling stop will raise an error. that's ok....
      ensure
        @@connected = false
      end
    end

    # Re-connects to the underlying exchange
    def self.reconnect
      self.disconnect
      @@setup = false
      @@bunny = Bunny.new(@@env_config)
      @@setup = true
      self.connect
    end

    # Whether the underlying connection has been established
    def self.connected?
      @@connected
    end

    # Produces a message to the underlying exchange
    def self.produce(message)
      if !self.setup? || !self.connected?
        ::Rails.logger.error "AMQP not setup or connected. Call setup and connect before calling produce"
      else
        begin
          @@exchange.publish(message.to_json, :key => message.routing_key, :mandatory => false, :immediate => false, :persistent => true, :content_type => "application/json")
        rescue Bunny::ServerDownError
          # the connection went south, try to reconnect and try one more time
          begin
            self.reconnect
            @@exchange.publish(message.to_json, :key => message.routing_key, :mandatory => false, :immediate => false, :persistent => true, :content_type => "application/json")
          rescue => err
            ::Rails.logger.error "Unexpected error producing AMQP messsage: (#{message.to_json})"
            ::Rails.logger.error "#{err.message}"
            ::Rails.logger.error err.backtrace.join("\n")
          end
        rescue => err
          ::Rails.logger.error "Unexpected error producing AMQP messsage: (#{message.to_json})"
          ::Rails.logger.error "#{err.message}"
          ::Rails.logger.error err.backtrace.join("\n")
        end
      end
    end
  end
end

And we add support for this to the Amqp::MessageProducer module by adding a produce_synchronously method.

module Amqp
  module MessageProducer
    # Produces a message to the underlying exchange using a synchronous connection
    def produce_synchronously(message)
      SynchronousConnection.produce(message)
    end

    # Produces a message to the underlying exchange using an asynchronous connection
    def produce_asynchronously(message)
      AsynchronousConnection.produce(message)
    end
    alias :produce :produce_asynchronously
  end
end

Pros and Cons

On the plus side, this approach completely abstracts away which gem we’re using to produce messages to the broker. If we decide to use another gem for some reason, we’d only have to change the internal details of the connection classes. Also, clients can decide to produce messages synchronously or asynchronously if they need to for some reason.

On the down side, the use of class variables feels sort of dirty, so there’s some room for improvement there.

Overall, this approach has worked out great for us, but as always, suggestions welcome!

Guest post on the JRuby blog

Check out my guest post “Bridging The Gap With JRuby” on the JRuby blog:

http://blog.jruby.org/2012/05/bridging-the-gap-with-jruby/

Versioning Rails 3 based RESTful APIs

While there is great debate about the “correct” way to version RESTful APIs, some patterns have emerged:

  • Use HTTP headers to request a version
  • Use path prefixes to request a version
  • Use request parameters to request a version

These debates are largely centered around how clients request a specific version of your RESTful API. But that’s only part of the story. How do you structure your application internally to support multiple concurrent versions? At a minimum, a version of your RESTful API consists of:

  • A collection of URIs with specific supported request parameters
  • Serialized representations of resources (typically JSON and/or XML)
  • Tests
  • Documentation

My web framework of choice is Rails, and for a version of a RESTful API built with Rails, the above list translates to:

  • A collection of routes
  • A collection of namespaced controllers
  • A collection of namespaced presenters/decorators (whatever you want to call them) for your models
  • A collection of namespaced tests
  • Documentation for the version

Currently the only way to version an API in Ruby/Rails is to roll your own, or use something like Grape. While Grape is an awesome gem, it’s focused on the lower level Rack framework, and isn’t really trying to solve the problem of how to structure and manage your API to support versioning.

Introducing Versionist. Versionist is a Rails plugin for versioning Rails 3 based RESTful APIs. Versionist currently supports 3 versioning strategies (HTTP header, path prefix, request parameter), and provides generators for creating new API versions, as well as new components within an existing API version. Versionist is still beta, but some features that I will be looking to add in the future include:

  • Support for existing presenter/decorator libraries like Draper and ROAR.
  • Support for creating a new API version by copying an existing API version as a starting point.
  • Support for deprecating an API version.

As always, comments/suggestions/contributions welcome.

Adventures In Archiving With MongoDB

Most databases have them: a small handful of big tables that grow at a substantially faster rate than any of their peers (think tweets, clicks, check-ins, etc). Over time, the sheer size of these tables cause queries against them to slow down, and increase the size and time taken for backups. In some cases, not all of this data needs to be “live” so that it can be randomly accessed forever, and can be archived once some criteria has been met. I like to think of this as Eventually Irrelevant (if I may coin a term).

Many database products support partitioning features, which allow you to arrange data physically based on some criteria (typically date, range, list, or hash-based partitioning). These partitioning features allow you to drop partitions from the live database when appropriate. While MongoDB supports horizontal partitioning via sharding, it does not currently support the notion of dropping shards. Even if it did, the criteria for what documents to drop from a collection would most likely not be based on your shard key, so it’s unlikely you’d want to drop a shard anyways. You’d want to drop data from all shards. Capped Collections and the forthcoming TTL-based Capped Collections feature are not quite what we’re looking for either. Capped Collections have very strict rules (including not being able to shard them), and for both normal and TTL-based Capped Collections, as old documents are aged out they are simply dropped on the floor. What we need is more control over the dropping process, such that we can guarantee that our data has been copied to an archive database/data warehouse before being dropped from the main database.

I set out to find the most efficient way of manually archiving documents from a large collection given the tools currently available in MongoDB. For the purposes of this exercise, let’s assume we have a large collection of ad clicks which are associated with ads. We want to archive clicks which are associated with ads which expired more than 3 months ago.

MapReduce

MongoDB wants you to do bulk operations using MapReduce. Since all of the examples and documentation revolve around aggregation, I suspected I wouldn’t be able to use MapReduce as the backbone of my archiving process. After some experimenting, I found that my intuition was correct. Upon attempting to insert/remove documents within the map or reduce functions, I would quickly run into exception 10293 internal error: locks are not upgradeable errors:

Sun Sep 25 21:22:24 [conn5] update warehouse.clicks  query: { _id: ObjectId('4d6bf191db0b4b2b1fad5b65') } exception 10293 internal error: locks are not upgradeable: { "opid" : 4248173, "active" : false, "waitingForLock" : false, "op" : "update", "ns" : "?", "query" : { "_id" : { "$oid" : "4d6bf191db0b4b2b1fad5b65" } }, "client" : "0.0.0.0:0", "desc" : "conn" } 0ms
Sun Sep 25 21:22:24 [conn5] Assertion: 10293:internal error: locks are not upgradeable: { "opid" : 4248174, "active" : false, "waitingForLock" : false, "op" : "update", "ns" : "?", "query" : { "_id" : { "$oid" : "4d6bf198db0b4b2b20ad5b65" } }, "client" : "0.0.0.0:0", "desc" : "conn" }
0x10008de9b 0x1002064f7 0x1002c1ec6 0x1002c4f3d 0x1002c5f7a 0x1000a8181 0x100147df4 0x1004dc68e 0x1004efc93 0x1004dc71b 0x1004dcb71 0x10049a078 0x100158efa 0x100377760 0x100389d0c 0x10034c204 0x10034d877 0x100180cc4 0x100184649 0x1002b9e89
 0   mongod                              0x000000010008de9b _ZN5mongo11msgassertedEiPKc + 315
 1   mongod                              0x00000001002064f7 _ZN5mongo10MongoMutex19_writeLockedAlreadyEv + 263
 2   mongod                              0x00000001002c1ec6 _ZN5mongo14receivedUpdateERNS_7MessageERNS_5CurOpE + 886
 3   mongod                              0x00000001002c4f3d _ZN5mongo16assembleResponseERNS_7MessageERNS_10DbResponseERKNS_8SockAddrE + 5661
 4   mongod                              0x00000001002c5f7a _ZN5mongo14DBDirectClient3sayERNS_7MessageE + 106
 5   mongod                              0x00000001000a8181 _ZN5mongo12DBClientBase6updateERKSsNS_5QueryENS_7BSONObjEbb + 273
 6   mongod                              0x0000000100147df4 _ZN5mongo12mongo_updateEP9JSContextP8JSObjectjPlS4_ + 660
 7   mongod                              0x00000001004dc68e js_Invoke + 3864
 8   mongod                              0x00000001004efc93 js_Interpret + 71932
 9   mongod                              0x00000001004dc71b js_Invoke + 4005
 10  mongod                              0x00000001004dcb71 js_InternalInvoke + 404
 11  mongod                              0x000000010049a078 JS_CallFunction + 86
 12  mongod                              0x0000000100158efa _ZN5mongo7SMScope6invokeEyRKNS_7BSONObjEib + 666
 13  mongod                              0x0000000100377760 _ZN5mongo2mr8JSMapper3mapERKNS_7BSONObjE + 96
 14  mongod                              0x0000000100389d0c _ZN5mongo2mr16MapReduceCommand3runERKSsRNS_7BSONObjERSsRNS_14BSONObjBuilderEb + 1740
 15  mongod                              0x000000010034c204 _ZN5mongo11execCommandEPNS_7CommandERNS_6ClientEiPKcRNS_7BSONObjERNS_14BSONObjBuilderEb + 628
 16  mongod                              0x000000010034d877 _ZN5mongo12_runCommandsEPKcRNS_7BSONObjERNS_10BufBuilderERNS_14BSONObjBuilderEbi + 2151
 17  mongod                              0x0000000100180cc4 _ZN5mongo11runCommandsEPKcRNS_7BSONObjERNS_5CurOpERNS_10BufBuilderERNS_14BSONObjBuilderEbi + 52
 18  mongod                              0x0000000100184649 _ZN5mongo8runQueryERNS_7MessageERNS_12QueryMessageERNS_5CurOpES1_ + 10585
 19  mongod                              0x00000001002b9e89 _ZN5mongo13receivedQueryERNS_6ClientERNS_10DbResponseERNS_7MessageE + 569

Upon killing the client which kicked off the MapReduce process, theses errors would continue to spin out of control, and I’d have to bounce the server. While MapReduce’s divide and conquer approach is ideal for what we’re trying to accomplish, unless the ability to obtain a write lock within a MapReduce function is supported, it is not a viable option for now.

db.eval()

While a server side script is certainly a viable option for writing an archiving process, it has a fundamental flaw in that it won’t work for sharded collections, which could be a non-starter for some folks.

An example archiving process using db.eval() would look something like this:

archiveClick = function archiveClick(doc) {
  if (expiredAdIds.indexOf(doc.ad_id) != -1) {
    archivedb = db.getMongo().getDB("archive");
    archivedb.clicks.save(doc);
    // before we remove the original document, make sure the archive worked
    if (archivedb.getLastError() == null) {
      db.clicks.remove({_id: doc._id});
    } else {
      throw "could not archive document with _id " + doc._id;
    }
  }
}

archiveClicks = function archiveClicks() {
  threeMonthsAgo = new Date();
  threeMonthsAgo.setMonth(threeMonthsAgo.getMonth()-3);
  expiredAdIds = [];
  getExpiredAds = function(ad) {expiredAdIds.push(ad._id.str);}
  db.ads.find({end_date: {$lte: threeMonthsAgo}}).forEach(getExpiredAds);
  db.system.js.save({_id: "expiredAdIds", value: expiredAdIds});
  db.clicks.find().forEach(archiveClick);
  db.system.js.remove({_id: "expiredAdIds"});
}

db.system.js.save({_id: "archiveClick", value: archiveClick});
db.system.js.save({_id: "archiveClicks", value: archiveClicks});

db.runCommand({$eval: "archiveClicks()", nolock: true});

Note the nolock: true sent to db.eval(). This ensures the mongod isn’t locked for other operations while this runs.

The Holy Grail: Define Custom archive() Functions On Collections

In order for our archiving process to work for sharded collections, we will need to add new functions to collection objects in the shell to perform the archiving. Frankly, this feels a lot more natural than db.eval().

I wanted to support two modes of archiving: immediate archiving, and a mark & sweep approach which allows you to queue documents to be archived at one point in time, and perform the actual archiving at a later time (off-peak hours).

I’ve made these functions available in my mongodb-archive project on GitHub. Download the archive.js file, and use it like so:

load("archive.js");

threeMonthsAgo = new Date();
threeMonthsAgo.setMonth(threeMonthsAgo.getMonth()-3);
expiredAdIds = [];
getExpiredAds = function(ad) {expiredAdIds.push(ad._id.str);}
db.ads.find({end_date: {$lte: threeMonthsAgo}}).forEach(getExpiredAds);

archiveConnection = new Mongo("localhost:27018");
archiveDB = archiveConnection.getDB("archive");
archiveCollection = archiveDB.getCollection("clicks");

for (var i = 0; i < expiredAdIds.length; i++) {
  print("archiving clicks for ad_id: " + expiredAdIds[i]);
  db.clicks.archive({"ad_id": expiredAdIds[i]}, archiveCollection);
  print("");
}

The mark/sweep variant would look like:

db.clicks.queueForArchive({"ad_id": expiredAdIds[i]});
// ....some time later.....
db.clicks.archiveQueued(archiveCollection);

The biggest factors on the performance of the archiving process are really no different than the things that impact the performance of your MongoDB database overall: working set size, indexes, disk speed, and lock contention. The more indexes there are on the source collection, the longer the remove() will take. The more indexes there are on the archive collection, the longer the save() will take. The more of the data that you wish to archive is resident in memory, the faster the process will be.

On my MacBook Pro (2.3GHz i7, 8GB RAM) with a 5400 RPM SATA drive and a clicks collection containing over 11 million documents, I was able to archive ~500 documents per second with the immediate archiving approach. Example output from mongostat when this was running:

Archive DB:

insert  query update delete getmore command flushes mapped  vsize    res locked % idx miss %     qr|qw   ar|aw  netIn netOut  conn       time 
     0      0      1      0       0       3       0   208m  2.85g    15m      3.8          0       0|0     0|0     1k     1k     2   17:13:02 
     0      0     45      0       0      46       0   208m  2.85g    15m      3.4          0       0|0     0|0    61k     6k     2   17:13:03 
insert  query update delete getmore command flushes mapped  vsize    res locked % idx miss %     qr|qw   ar|aw  netIn netOut  conn       time 
     0      0      8      0       0       9       0   208m  2.85g    15m      0.1          0       0|0     0|0    10k     2k     2   17:13:04 
     0      0     47      0       0      48       0   208m  2.85g    15m      3.4          0       0|0     0|0    66k     7k     2   17:13:05 
     0      0      0      0       0       1       1   208m  2.85g    15m        0          0       0|0     0|0    62b     1k     2   17:13:06 
     0      0     18      0       0      19       0   208m  2.85g    15m      2.6          0       0|0     0|0    24k     3k     2   17:13:07 
     0      0     79      0       0      80       0   208m  2.85g    16m      1.1          0       0|0     0|0   111k    11k     2   17:13:08 
     0      0    131      0       0     132       0   208m  2.85g    16m      8.8          0       0|0     0|0   181k    17k     2   17:13:09 
     0      0    216      0       0     217       0   208m  2.85g    16m      3.8          0       0|0     0|0   291k    27k     2   17:13:10 
     0      0    425      0       0     426       0   208m  2.85g    17m     10.5          0       0|0     0|0   600k    53k     2   17:13:11 
     0      0    490      0       0     490       0   208m  2.85g    19m      7.3          0       0|0     0|1   658k    61k     2   17:13:12 
     0      0    630      0       0     632       0   208m  2.85g    20m     11.3          0       0|0     0|0   844k    78k     2   17:13:13 
insert  query update delete getmore command flushes mapped  vsize    res locked % idx miss %     qr|qw   ar|aw  netIn netOut  conn       time 
     0      0    818      0       0     818       0   208m  2.85g    22m     13.6          0       0|0     0|0     1m   101k     2   17:13:14 
     0      0    686      0       0     688       0   208m  2.85g    21m      9.6          0       0|0     0|0   932k    85k     2   17:13:15 
     0      0      0      0       0       1       0   208m  2.85g    22m        0          0       0|0     0|0    62b     1k     2   17:13:16 
     0      0    178      0       0     179       0   208m  2.85g    22m      1.2          0       0|0     0|0   252k    23k     2   17:13:17 
     0      0    974      0       0     975       0   208m  2.85g    23m     12.3          0       0|0     0|0     1m   121k     2   17:13:18 
     0      0    920      0       0     921       0   208m  2.85g    26m     10.1          0       0|0     0|0     1m   114k     2   17:13:19 
     0      0    810      0       0     811       0   208m  2.85g    26m      8.1          0       0|0     0|0     1m   100k     2   17:13:20 
     0      0    612      0       0     613       0   208m  2.85g    27m      8.1          0       0|0     0|0   798k    76k     2   17:13:21 
     0      0      0      0       0       1       0   208m  2.85g    27m        0          0       0|0     0|0    62b     1k     2   17:13:22 
     0      0      0      0       0       1       0   208m  2.85g    27m        0          0       0|0     0|0    62b     1k     2   17:13:23 
insert  query update delete getmore command flushes mapped  vsize    res locked % idx miss %     qr|qw   ar|aw  netIn netOut  conn       time 
     0      0     30      0       0      31       0   208m  2.85g    27m      1.1          0       0|0     0|0    46k     5k     2   17:13:24 
     0      0    852      0       0     853       0   208m  2.85g    27m     14.8          0       0|0     0|0     1m   106k     2   17:13:25 
     0      0    768      0       0     769       0   208m  2.85g    29m      9.5          0       0|0     0|0     1m    95k     2   17:13:26 
     0      0    867      0       0     868       0   208m  2.85g    32m     13.4          0       0|0     0|0     1m   107k     2   17:13:27 
     0      0    705      0       0     706       0   208m  2.85g    31m     10.8          0       0|0     0|0   936k    88k     2   17:13:28 
     0      0    331      0       0     332       0   208m  2.85g    32m      3.9          0       0|0     0|0   446k    42k     2   17:13:29 
     0      0      0      0       0       1       0   208m  2.85g    32m        0          0       0|0     0|0    62b     1k     2   17:13:30 
     0      0    551      0       0     552       0   208m  2.85g    32m      6.1          0       0|0     0|0   739k    69k     2   17:13:31 
     0      0    728      0       0     729       0   208m  2.85g    35m     12.2          0       0|0     0|0   949k    90k     2   17:13:32 
     0      0    991      0       0     992       0   208m  2.85g    35m     11.7          0       0|0     0|0     1m   123k     2   17:13:33 
insert  query update delete getmore command flushes mapped  vsize    res locked % idx miss %     qr|qw   ar|aw  netIn netOut  conn       time 
     0      0    935      0       0     935       0   208m  2.85g    38m     10.9          0       0|0     0|1     1m   116k     2   17:13:34 
     0      0    431      0       0     433       0   208m  2.85g    39m      5.9          0       0|0     0|0   563k    54k     2   17:13:35 
     0      0      0      0       0       1       0   208m  2.85g    39m        0          0       0|0     0|0    62b     1k     2   17:13:36 
     0      0    518      0       0     519       0   208m  2.85g    40m        5          0       0|0     0|0   693k    65k     2   17:13:37 
     0      0    904      0       0     905       0   208m  2.85g    40m      8.9          0       0|0     0|0     1m   112k     2   17:13:38 
     0      0    958      0       0     959       0   208m  2.85g    41m     14.2          0       0|0     0|0     1m   119k     2   17:13:39 
     0      0    899      0       0     900       0   208m  2.85g    44m     11.1          0       0|0     0|0     1m   111k     2   17:13:40 
     0      0    401      0       0     402       0   208m  2.85g    42m      5.9          0       0|0     0|0   540k    50k     2   17:13:41 
     0      0    856      0       0     857       0   208m  2.85g    44m      9.4          0       0|0     0|0     1m   106k     2   17:13:42 
     0      0    232      0       0     233       0   208m  2.85g    45m      2.9          0       0|0     0|0   311k    29k     1   17:13:43 

Source DB:

insert  query update delete getmore command flushes mapped  vsize    res locked % idx miss %     qr|qw   ar|aw  netIn netOut  conn       time 
     0      2      0     12       1      13       0    56g   115g  2.82g     52.2          0       0|0     0|1     1k   822k     2   17:13:02 
     0      0      0     42       0      43       0    56g   115g  2.65g     88.1          0       0|0     0|1     5k     4k     2   17:13:03 
     0      0      0     14       0      15       1    56g   115g  1.88g      101          0       0|0     0|1     1k     2k     2   17:13:04 
     0      0      0     33       1      35       0    56g   115g  1.89g     53.8       16.6       0|0     1|0     4k     4k     2   17:13:05 
     0      0      0      0       0       1       0    56g   115g   1.9g        0          0       0|0     1|0    62b     1k     2   17:13:06 
     0      0      0     34       0      35       0    56g   115g   1.9g     47.3          0       0|0     0|0     4k     4m     2   17:13:07 
     0      0      0     91       0      92       0    56g   115g  1.89g     86.8          0       0|0     0|0    12k     8k     2   17:13:08 
insert  query update delete getmore command flushes mapped  vsize    res locked % idx miss %     qr|qw   ar|aw  netIn netOut  conn       time 
     0      0      0    149       0     150       0    56g   115g   1.9g     82.5          0       0|0     0|0    20k    13k     2   17:13:09 
     0      0      0    287       0     287       0    56g   115g   1.9g     74.3          0       0|0     0|1    38k    25k     2   17:13:10 
     0      0      0    387       0     389       0    56g   115g  1.91g     64.6          0       0|0     0|0    52k    33k     2   17:13:11 
     0      0      0    580       0     581       0    56g   115g  1.93g       59          0       0|0     0|0    78k    49k     2   17:13:12 
     0      0      0    685       0     686       0    56g   115g  1.93g     47.9          0       0|0     0|0    93k    58k     2   17:13:13 
     0      0      0    729       0     730       0    56g   115g  1.95g     42.6          0       0|0     0|0    99k    61k     2   17:13:14 
     0      0      0    551       1     552       0    56g   115g  1.97g     34.5          0       0|0     1|0    74k    47k     2   17:13:15 
     0      0      0      0       0       1       0    56g   115g  1.98g        0          0       0|0     1|0    62b     1k     2   17:13:16 
     0      0      0    368       0     368       0    56g   115g  1.96g       20          0       0|0     0|1    50k     4m     2   17:13:17 
     0      0      0   1032       0    1034       0    56g   115g     2g     35.9          0       0|0     0|0   140k    87k     2   17:13:18 
insert  query update delete getmore command flushes mapped  vsize    res locked % idx miss %     qr|qw   ar|aw  netIn netOut  conn       time 
     0      0      0    834       0     835       0    56g   115g  2.01g     42.3          0       0|0     0|0   113k    70k     2   17:13:19 
     0      0      0    922       0     922       0    56g   115g  2.02g     38.9          0       0|0     0|1   125k    77k     2   17:13:20 
     0      0      0    338       1     340       0    56g   115g  2.03g       13          0       0|0     1|0    46k    29k     2   17:13:21 
     0      0      0      0       0       1       0    56g   115g  2.04g        0          0       0|0     1|0    62b     1k     2   17:13:22 
     0      0      0      0       0       1       0    56g   115g  2.04g        0          0       0|0     1|0    62b     1k     2   17:13:23 
     0      0      0    185       0     186       0    56g   115g  2.02g     24.7          0       0|0     0|0    25k     4m     2   17:13:24 
     0      0      0    920       0     920       0    56g   115g  2.02g     29.9          0       0|0     0|1   125k    77k     2   17:13:25 
     0      0      0    741       0     742       0    56g   115g  2.04g       49          0       0|0     0|1   100k    62k     2   17:13:26 
     0      0      0    886       0     887       0    56g   115g  2.05g       33          0       0|0     0|1   120k    74k     2   17:13:27 
     0      0      0    683       0     684       0    56g   115g  2.08g     57.3          0       0|0     0|1    92k    58k     2   17:13:28 
insert  query update delete getmore command flushes mapped  vsize    res locked % idx miss %     qr|qw   ar|aw  netIn netOut  conn       time 
     0      0      0    138       1     140       0    56g   115g  2.06g     14.5          0       0|0     1|0    18k    12k     2   17:13:29 
     0      0      0     12       0      12       0    56g   115g  2.07g      0.2          0       0|0     0|1     1k     4m     2   17:13:30 
     0      0      0    761       0     763       0    56g   115g  2.08g     46.5          0       0|0     0|0   103k    64k     2   17:13:31 
     0      0      0    762       0     762       0    56g   115g  2.09g       47          0       0|0     0|1   103k    64k     2   17:13:32 
     0      0      0    957       0     959       0    56g   115g   2.1g     35.3          0       0|0     0|0   130k    80k     2   17:13:33 
     0      0      0    905       0     905       0    56g   115g  2.13g     38.5          0       0|1     0|1   123k    76k     2   17:13:34 
     0      0      0    239       1     241       0    56g   115g  2.12g     14.9          0       0|0     1|0    32k    21k     2   17:13:35 
     0      0      0      0       0       1       0    56g   115g  2.13g        0          0       0|0     1|0    62b     1k     2   17:13:36 
     0      0      0    780       0     781       0    56g   115g  2.13g     42.8          0       0|0     0|0   106k     4m     2   17:13:37 
     0      0      0    805       0     806       0    56g   115g  2.14g     39.8          0       0|0     0|0   109k    68k     2   17:13:38 
insert  query update delete getmore command flushes mapped  vsize    res locked % idx miss %     qr|qw   ar|aw  netIn netOut  conn       time 
     0      0      0    988       0     989       0    56g   115g  2.16g     34.8          0       0|0     0|0   134k    83k     2   17:13:39 
     0      0      0    953       0     953       0    56g   115g  2.17g     39.9          0       0|0     0|1   129k    80k     2   17:13:40 
     0      0      0    375       1     376       0    56g   115g  2.18g     13.9          0       0|1     0|1    51k     1m     2   17:13:41 
     0      0      0    867       0     869       0    56g   115g  2.19g     41.7          0       0|0     0|0   118k    73k     1   17:13:42 

Total:

MongoDB shell version: 2.0.1
connecting to: localhost:27017/prodcopy
archiving clicks for ad_id: 4d6c12497b90420373000056
archiving documents...
archived 19045 documents in 40701ms.

For the queued approach with this same set up, I was able to queue ~1200 documents per second, and the archiving process performed at about the same ~500 documents per second. Example output from mongostat while this was running:

Archive DB:

insert  query update delete getmore command flushes mapped  vsize    res locked % idx miss %     qr|qw   ar|aw  netIn netOut  conn       time 
     0      0      0      0       0       1       0   208m  2.85g    18m        0          0       0|0     0|0    62b     1k     2   17:16:44 
     0      0      0      0       0       1       0   208m  2.85g    18m        0          0       0|0     0|0    62b     1k     2   17:16:45 
     0      0      0      0       0       1       0   208m  2.85g    18m        0          0       0|0     0|0    62b     1k     2   17:16:46 
     0      0      0      0       0       1       0   208m  2.85g    18m        0          0       0|0     0|0    62b     1k     2   17:16:47 
     0      0      0      0       0       1       0   208m  2.85g    18m        0          0       0|0     0|0    62b     1k     2   17:16:48 
     0      0      0      0       0       1       0   208m  2.85g    18m        0          0       0|0     0|0    62b     1k     2   17:16:49 
     0      0    248      0       0     249       0   208m  2.85g    17m        6          0       0|0     0|0   283k    31k     2   17:16:50 
     0      0    780      0       0     781       0   208m  2.85g    18m      8.2          0       0|0     0|0   883k    97k     2   17:16:51 
     0      0   1256      0       0    1257       0   208m  2.85g    22m     13.7          0       0|0     0|0     1m   155k     2   17:16:52 
     0      0   1084      0       0    1085       0   208m  2.85g    21m       10          0       0|0     0|0     1m   134k     2   17:16:53 
insert  query update delete getmore command flushes mapped  vsize    res locked % idx miss %     qr|qw   ar|aw  netIn netOut  conn       time 
     0      0   1108      0       0    1109       0   208m  2.85g    25m      9.8          0       0|0     0|0     1m   137k     2   17:16:54 
     0      0   1108      0       0    1109       0   208m  2.85g    24m      9.8          0       0|0     0|0     1m   137k     2   17:16:55 
     0      0   1110      0       0    1111       0   208m  2.85g    27m       11          0       0|0     0|0     1m   137k     2   17:16:56 
     0      0   1166      0       0    1167       0   208m  2.85g    31m     11.3          0       0|0     0|0     1m   144k     2   17:16:57 
     0      0   1246      0       0    1247       0   208m  2.85g    31m     10.6          0       0|0     0|0     1m   154k     2   17:16:58 
     0      0   1094      0       0    1095       0   208m  2.85g    35m      9.6          0       0|0     0|0     1m   135k     2   17:16:59 
     0      0    957      0       0     958       0   208m  2.85g    33m      9.6          0       0|0     0|0     1m   119k     2   17:17:00 
     0      0    721      0       0     722       0   464m  3.35g    38m      8.9          0       0|0     0|0     1m    90k     2   17:17:01 
     0      0    243      0       0     243       0   464m  3.35g    34m      9.7          0       0|0     0|0   445k    31k     2   17:17:02 
     0      0    921      0       0     923       0   464m  3.35g    37m     30.1          0       0|0     0|0     1m   114k     2   17:17:03 
insert  query update delete getmore command flushes mapped  vsize    res locked % idx miss %     qr|qw   ar|aw  netIn netOut  conn       time 
     0      0    668      0       0     669       0   464m  3.35g    38m      9.6          0       0|0     0|0   962k    83k     2   17:17:04 
     0      0    995      0       0     995       0   464m  3.35g    41m     21.2          0       0|0     0|1     1m   123k     2   17:17:05 
     0      0    467      0       0     468       0   464m  3.35g    27m       12          0       0|1     0|1   679k    58k     2   17:17:06 
     0      0      1      0       0       2       1   464m  3.35g    17m     85.5          0       0|1     0|1     1k     1k     2   17:17:07 
     0      0    337      0       0     338       0   464m  3.35g    19m     36.2          0       0|0     0|1   499k    42k     2   17:17:08 
     0      0    917      0       0     919       0   464m  3.35g    21m     20.4          0       0|0     0|0     1m   114k     2   17:17:09 
     0      0   1022      0       0    1022       0   464m  3.35g    25m     22.7          0       0|0     0|1     1m   126k     2   17:17:10 
     0      0    970      0       0     971       0   464m  3.35g    24m     27.4          0       0|0     0|1     1m   120k     2   17:17:11 
     0      0    166      0       0     168       0   464m  3.35g    25m      1.2          0       0|0     0|0   251k    21k     2   17:17:12 
     0      0    862      0       0     863       0   464m  3.35g    28m     23.1          0       0|0     0|0     1m   107k     2   17:17:13 
insert  query update delete getmore command flushes mapped  vsize    res locked % idx miss %     qr|qw   ar|aw  netIn netOut  conn       time 
     0      0    888      0       0     889       0   464m  3.35g    28m     28.8          0       0|0     0|0     1m   110k     2   17:17:14 
     0      0    540      0       0     541       0   464m  3.35g    30m     12.2          0       0|0     0|0   802k    67k     2   17:17:15 
     0      0    879      0       0     880       0   464m  3.35g    31m     22.3          0       0|0     0|0     1m   109k     2   17:17:16 
     0      0    473      0       0     474       0   464m  3.35g    36m     17.2          0       0|0     0|0     1m    59k     1   17:17:17 

Source DB:

insert  query update delete getmore command flushes mapped  vsize    res locked % idx miss %     qr|qw   ar|aw  netIn netOut  conn       time 
     0      0      0      0       0       1       0    56g   115g  1.86g        0          0       0|0     0|0    62b     1k     1   17:16:29 
     0      0      0      0       0       1       0    56g   115g  1.86g        0          0       0|0     0|0    62b     1k     1   17:16:30 
     1      1      1      0       1       3       0    56g   115g  1.87g     51.1          0       0|0     0|1   475b   697k     2   17:16:31 
     0      0      0      0       0       1       0    56g   115g   349m      129          0       0|0     0|1    62b     1k     2   17:16:32 
     0      0      0      0       0       1       0    56g   115g   367m      101          0       0|0     0|1    62b     1k     2   17:16:33 
     0      0      0      0       0       1       0    56g   115g   384m     73.4          0       0|0     0|1    62b     1k     2   17:16:34 
     0      0      0      0       0       1       0    56g   115g   428m      112          0       0|0     0|1    62b     1k     2   17:16:35 
     0      0      0      0       0       1       0    56g   115g   436m     94.8          0       0|0     0|1    62b     1k     2   17:16:36 
     0      0      0      0       0       1       0    56g   115g   450m      108          0       0|0     0|1    62b     1k     2   17:16:37 
     0      0      0      0       0       1       0    56g   115g   461m      110          0       0|0     0|1    62b     1k     2   17:16:38 
insert  query update delete getmore command flushes mapped  vsize    res locked % idx miss %     qr|qw   ar|aw  netIn netOut  conn       time 
     0      0      0      0       0       1       0    56g   115g   480m     95.5          0       0|0     0|1    62b     1k     2   17:16:39 
     0      0      0      0       0       1       0    56g   115g   494m     73.5          0       0|0     0|1    62b     1k     2   17:16:40 
     0      0      0      0       0       1       0    56g   115g   531m      120          0       0|0     0|1    62b     1k     2   17:16:41 
     0      0      0      0       0       1       0    56g   115g   541m      108          0       0|0     0|1    62b     1k     2   17:16:42 
     0      0      0      0       0       1       0    56g   115g   564m     74.6          0       0|0     0|1    62b     1k     2   17:16:43 
     0      0      0      0       0       1       0    56g   115g   579m      127          0       0|0     0|1    62b     1k     2   17:16:44 
     0      0      0      0       0       1       0    56g   115g   605m      100          0       0|0     0|1    62b     1k     2   17:16:45 
     0      0      0      0       0       1       0    56g   115g   631m     98.5          0       0|0     0|1    62b     1k     2   17:16:46 
     0      0      0      0       0       1       0    56g   115g   657m     89.1          0       0|0     0|1    62b     1k     2   17:16:47 
     0      0      0      0       0       1       0    56g   115g   662m     87.8          0       0|0     0|1    62b     1k     2   17:16:48 
insert  query update delete getmore command flushes mapped  vsize    res locked % idx miss %     qr|qw   ar|aw  netIn netOut  conn       time 
     0      0      0      0       0       1       0    56g   115g   680m     93.5          0       0|0     0|1    62b     1k     2   17:16:49 
     0      1      0    530       1     531       0    56g   115g   767m     81.3          0       0|0     0|1    72k     4m     2   17:16:50 
     0      0      0    884       0     886       0    56g   115g   721m       48          0       0|0     0|0   120k    74k     2   17:16:51 
     0      0      0   1079       0    1080       0    56g   115g   698m     33.6          0       0|0     0|0   146k    90k     2   17:16:52 
     0      0      0   1294       0    1294       0    56g   115g   720m     23.8          0       0|1     0|1   175k   108k     2   17:16:53 
     0      0      0   1118       1    1120       0    56g   115g   740m     31.9          0       0|0     0|0   152k     4m     2   17:16:54 
     0      0      0   1111       0    1112       0    56g   115g   718m     33.3          0       0|0     0|0   151k    93k     2   17:16:55 
     0      0      0   1097       0    1098       0    56g   115g   706m     33.1          0       0|0     0|0   149k    92k     2   17:16:56 
     0      0      0   1097       0    1098       0    56g   115g   701m     33.2          0       0|0     0|0   149k    92k     2   17:16:57 
     0      0      0   1091       1    1092       0    56g   115g   696m     35.1          0       0|0     0|0   148k     4m     2   17:16:58 
insert  query update delete getmore command flushes mapped  vsize    res locked % idx miss %     qr|qw   ar|aw  netIn netOut  conn       time 
     0      0      0   1149       0    1150       0    56g   115g   718m     21.3          0       0|0     0|0   156k    96k     2   17:16:59 
     0      0      0   1091       0    1091       0    56g   115g   707m     31.2          0       0|0     0|1   148k    91k     2   17:17:00 
     0      0      0    468       0     470       0    56g   115g   714m      9.3          0       0|0     0|0    63k    40k     2   17:17:01 
     0      0      0    332       0     333       0    56g   115g   694m     27.3          0       0|0     0|0    45k    28k     2   17:17:02 
     0      0      0   1013       1    1014       0    56g   115g   713m     17.8          0       0|0     0|0   137k     4m     2   17:17:03 
     0      0      0    551       0     552       0    56g   115g   691m     39.9          0       0|0     0|0    74k    47k     2   17:17:04 
     0      0      0   1140       0    1141       0    56g   115g   709m     19.7          0       0|0     0|0   155k    95k     2   17:17:05 
     0      0      0    126       0     127       0    56g   115g   709m      2.1          0       0|0     0|0    17k    11k     2   17:17:06 
     0      0      0     92       0      92       0    56g   115g   710m      1.6          0       0|0     0|1    12k     8k     2   17:17:07 
     0      0      0    605       1     607       0    56g   115g   700m     29.1          0       0|0     0|0    82k     4m     2   17:17:08 
insert  query update delete getmore command flushes mapped  vsize    res locked % idx miss %     qr|qw   ar|aw  netIn netOut  conn       time 
     0      0      0    814       0     814       0    56g   115g   699m     17.3          0       0|0     0|1   110k    68k     2   17:17:09 
     0      0      0   1036       0    1037       0    56g   115g   708m     18.4          0       0|0     0|0   140k    87k     2   17:17:10 
     0      0      0    820       0     821       0    56g   115g   715m     14.4          0       0|1     0|1   111k    69k     2   17:17:11 
     0      0      0    298       0     300       0    56g   115g   683m     24.2          0       0|0     0|0    40k    26k     2   17:17:12 
     0      0      0    907       1     908       0    56g   115g   699m     20.4          0       0|0     0|0   123k     4m     2   17:17:13 
     0      0      0    946       0     947       0    56g   115g   707m     16.4          0       0|0     0|0   128k    79k     2   17:17:14 
     0      0      0    374       0     375       0    56g   115g   677m     36.7          0       0|0     0|0    50k    32k     2   17:17:15 
     0      0      0    905       1     905       0    56g   115g   689m     16.2          0       0|0     0|0   123k   821k     2   17:17:16 
     0      0      0    259       0     261       0    56g   115g   690m      5.1          0       0|0     0|0    35k    22k     1   17:17:17 
     0      0      0      0       0       1       0    56g   115g   690m        0          0       0|0     0|0    62b     1k     1   17:17:18 

Total:

MongoDB shell version: 2.0.1
connecting to: localhost:27017/prodcopy
archiving clicks for ad_id: 4d6c12497b90420373000062
ensuring sparse index on field "arch"
marking documents for archive...
queued 22227 documents for archive in 19369ms.

archiving queued documents to archive.clicks...
archived 22227 queued documents to archive.clicks in 26901ms.

Upgrading the hard drive to an SSD, I was able to almost triple the performance of the archiving step, and archive ~1300 documents per second.

Obviously this is all a giant hack, and it would be nice if MongoDB supported some kind of partitioning feature so we didn’t have to do any of this manually. But until then, this is the best I could get given the tools currently available.