並列処理でActiveRecordを使う

※ドキュメントを読みながらこんなもんかな?とやってみたやつなので問題あるかもしれません。何かあればコメント頂けると嬉しいです。

例えば、DBからデータを取り出して逐次メールを送信する場合。
よく知られているようにメールの送信はコネクションの確立やSMTPサーバの処理などの待ち時間が長く、逐次処理をしていると無駄が大きすぎる。

処理を並列化して、あるメールの送信待ち時間を他のメールの構築等に充てて無駄をなくすことを試みる。

環境

処理の並列化

Thread.newを使う。単純に実装すればこんなかんじ。

# 未送信のレコードをそれぞれ別々のスレッドで処理するサンプル
threads = []
mails = find(:all, :conditions => ["sent = ?", false])
mails.each do |mail|
  threads << Thread.new do
    Thread.pass
    Mailer.deliver_mail(mail)
    mail.update_attributes(:sent => true)
  end
end
threads.each { |t| t.join }  # すべてのスレッドの処理が終わるのを待つ

ActiveRecordの並列化

処理は並列化できたが、これだとDBへのコネクションは1つしかなく、例えばスレッド毎にトランザクションを開始するようなことはできない。ActiveRecordのコードを追ってみたところ、どうやら

ActiveRecord::Base.allow_concurrency = true

とすれば良いようだ。
こうすることで、スレッド毎にコネクションを保持するようになり、スレッド毎に別々のトランザクションを開始することができる。

これらを踏まえたサンプル

コネクションが増えすぎないように、あらかじめ決めておいた数のスレッドで並列処理する。メールの重複送信を防止するため、PostgreSQLの行レベルロックを活用した。

ActiveRecord::Base.allow_concurrency = true  # マルチスレッド対応
class Mail < ActiveRecord::Base
  validates_presence_of :recipients, :from, :subject, :body

  # 送信時に最大何個のスレッドを作るか
  cattr_accessor :max_threads
  @@max_threads = 20

  class << self

    # すべての未送信メッセージを送信
    # 送信に成功したメールモデルのインスタンスの配列を返す
    def send_all!
      sent_mails = []
      threads = []
      mails = find(:all, :conditions => ["sent = ?", false])  # 未送信のメールを取り出す
      # max_threads個のスレッドで並列処理
      max_threads.times do
        threads << Thread.new do
          Thread.pass
          while mail = mails.shift
            sent_mails << mail if mail.send!
          end
          clear_active_connections!  # 処理が終わったのでコネクションを切断
        end
      end
      threads.each { |t| t.join }  # すべてのスレッドの処理が終わるのを待つ
      sent_mails
    end

  end
 
  # メール送信
  # 送信前には行レベルロックを行い二重送信を防ぐ
  def send!
    self.class.transaction do
      if new_record? || find(:first, :select => 'status', :conditions => ["id = ?", id]).status
        false
      else
        sended_mail = Mailer.deliver_mail(self)
        update_attributes(:sent => true)
        true
      end
    end
  end
  
end
# ひとつだけ送信
mail = Mail.new(:from => 'hoge@mizincogrammer.com',
                :recipients => 'fuga@mizincogrammer.com',
                :subject => 'Hello',
                :body => 'Rails World!!')
mail.save
mail.send!
# CRON等でまとめて送信
Mail.send_all!