並列処理でActiveRecordを使う
※ドキュメントを読みながらこんなもんかな?とやってみたやつなので問題あるかもしれません。何かあればコメント頂けると嬉しいです。
例えば、DBからデータを取り出して逐次メールを送信する場合。
よく知られているようにメールの送信はコネクションの確立やSMTPサーバの処理などの待ち時間が長く、逐次処理をしていると無駄が大きすぎる。
処理を並列化して、あるメールの送信待ち時間を他のメールの構築等に充てて無駄をなくすことを試みる。
環境
- Ruby 1.8.6
- Rails 2.1.0
- PostgreSQL 8.3
処理の並列化
Thread.newを使う。単純に実装すればこんなかんじ。
# 未送信のレコードをそれぞれ別々のスレッドで処理するサンプル threads = [] mails = find(:all, :conditions => ["sent = ?", false]) mails.each do |mail| threads << Thread.new do Thread.pass Mailer.deliver_mail(mail) mail.update_attributes(:sent => true) end end threads.each { |t| t.join } # すべてのスレッドの処理が終わるのを待つ
ActiveRecordの並列化
処理は並列化できたが、これだとDBへのコネクションは1つしかなく、例えばスレッド毎にトランザクションを開始するようなことはできない。ActiveRecordのコードを追ってみたところ、どうやら
ActiveRecord::Base.allow_concurrency = true
とすれば良いようだ。
こうすることで、スレッド毎にコネクションを保持するようになり、スレッド毎に別々のトランザクションを開始することができる。
これらを踏まえたサンプル
コネクションが増えすぎないように、あらかじめ決めておいた数のスレッドで並列処理する。メールの重複送信を防止するため、PostgreSQLの行レベルロックを活用した。
ActiveRecord::Base.allow_concurrency = true # マルチスレッド対応 class Mail < ActiveRecord::Base validates_presence_of :recipients, :from, :subject, :body # 送信時に最大何個のスレッドを作るか cattr_accessor :max_threads @@max_threads = 20 class << self # すべての未送信メッセージを送信 # 送信に成功したメールモデルのインスタンスの配列を返す def send_all! sent_mails = [] threads = [] mails = find(:all, :conditions => ["sent = ?", false]) # 未送信のメールを取り出す # max_threads個のスレッドで並列処理 max_threads.times do threads << Thread.new do Thread.pass while mail = mails.shift sent_mails << mail if mail.send! end clear_active_connections! # 処理が終わったのでコネクションを切断 end end threads.each { |t| t.join } # すべてのスレッドの処理が終わるのを待つ sent_mails end end # メール送信 # 送信前には行レベルロックを行い二重送信を防ぐ def send! self.class.transaction do if new_record? || find(:first, :select => 'status', :conditions => ["id = ?", id]).status false else sended_mail = Mailer.deliver_mail(self) update_attributes(:sent => true) true end end end end
# ひとつだけ送信 mail = Mail.new(:from => 'hoge@mizincogrammer.com', :recipients => 'fuga@mizincogrammer.com', :subject => 'Hello', :body => 'Rails World!!') mail.save mail.send!
# CRON等でまとめて送信 Mail.send_all!