diff options
author | Alex Legler <alex@a3li.li> | 2015-02-23 15:21:27 +0100 |
---|---|---|
committer | Alex Legler <alex@a3li.li> | 2015-02-23 15:21:27 +0100 |
commit | 2a13f18aa0a7ac3fe7d19eeea45842de818a615c (patch) | |
tree | c67b09ac642018550f2ca203851c87dc3ccadc33 | |
parent | Implement --delete (diff) | |
download | backend-2a13f18aa0a7ac3fe7d19eeea45842de818a615c.tar.gz backend-2a13f18aa0a7ac3fe7d19eeea45842de818a615c.tar.bz2 backend-2a13f18aa0a7ac3fe7d19eeea45842de818a615c.zip |
use more threads!
-rw-r--r-- | Gemfile | 4 | ||||
-rw-r--r-- | Gemfile.lock | 4 | ||||
-rwxr-xr-x | ag | 12 | ||||
-rw-r--r-- | lib/storage.rb | 10 | ||||
-rw-r--r-- | lib/threading.rb | 4 |
5 files changed, 24 insertions, 10 deletions
@@ -4,4 +4,6 @@ gem 'mail' gem 'maildir' gem 'elasticsearch' gem 'sanitize' -gem 'charlock_holmes'
\ No newline at end of file +gem 'charlock_holmes' +gem 'parallel' +gem 'ruby-progressbar'
\ No newline at end of file diff --git a/Gemfile.lock b/Gemfile.lock index ca40918..d55552e 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -24,6 +24,8 @@ GEM mini_portile (~> 0.6.0) nokogumbo (1.2.0) nokogiri + parallel (1.4.0) + ruby-progressbar (1.7.1) sanitize (3.1.1) crass (~> 1.0.1) nokogiri (>= 1.4.4) @@ -37,4 +39,6 @@ DEPENDENCIES elasticsearch mail maildir + parallel + ruby-progressbar sanitize @@ -7,6 +7,8 @@ require 'mail' require 'maildir' require 'elasticsearch' require 'optparse' +require 'parallel' +require 'ruby-progressbar' require_relative 'lib/utils' require_relative 'lib/threading' require_relative 'lib/rendering' @@ -111,7 +113,9 @@ $es.transport.reload_connections! def do_full Ag::Storage.create_index($options.name) - $maildir.list(:cur).each do |maildir_message| + messages = $maildir.list(:cur) + + Parallel.each(messages, progress: "Importing #{$options.name}") do |maildir_message| mail = maildir_message.data begin @@ -126,14 +130,16 @@ def do_full end def do_incremental - $maildir.list(:new).each do |maildir_message| + messages = $maildir.list(:cur) + + Parallel.each(messages, progress: "Importing #{$options.name}") do |maildir_message| mail = maildir_message.data begin Ag::Storage.store($options.name, mail, maildir_message.filename) maildir_message.process unless $options.readonly rescue => e - $stderr.puts "Cannot save message #{mail.message_id} (file #{maildir_message.filename}): #{e.message}" + $stderr.puts "Cannot save message #{mail.message_id} (file #{maildir_message.filename}): #{e.message}" if $options.debug next end end diff --git a/lib/storage.rb b/lib/storage.rb index f255633..d32ba2b 100644 --- a/lib/storage.rb +++ b/lib/storage.rb @@ -7,7 +7,7 @@ module Ag::Storage begin $es.indices.delete index: 'ml-' + list rescue Elasticsearch::Transport::Transport::Errors::NotFound => e - $stderr.puts "Index did not exist yet. Creating." + $stderr.puts "Index did not exist yet. Creating." if $options.debug end $es.indices.create( @@ -84,10 +84,10 @@ module Ag::Storage content = Ag::Utils.fix_encoding(raw_content || '', true).strip if content == '' - $stderr.puts "#{message.message_id}: Content empty?" + $stderr.puts "#{message.message_id}: Content empty?" if $options.debug end rescue => e - $stderr.puts "Cannot render message #{message.message_id} (file: #{filename}): #{e}" + $stderr.puts "Cannot render message #{message.message_id} (file: #{filename}): #{e}" if $options.debug end content @@ -174,7 +174,7 @@ module Ag::Storage ) end - def fix_threading(list) + def fix_threading(list, pass) result = $es.search( index: 'ml-' + list, size: 100000, @@ -201,7 +201,7 @@ module Ag::Storage } ) - result['hits']['hits'].each do |hit| + Parallel.each(result['hits']['hits'], progress: "Calculating Threading (Pass #{pass})") do |hit| msg = resolve_message_id(list, hit['_source']['raw_parent']) unless msg == nil diff --git a/lib/threading.rb b/lib/threading.rb index 8988f23..212bb98 100644 --- a/lib/threading.rb +++ b/lib/threading.rb @@ -57,11 +57,13 @@ module Ag def calc(list) number_of_root_threads = -1 + pass = 1 loop do - new_num = Ag::Storage.fix_threading(list) + new_num = Ag::Storage.fix_threading(list, pass) break if new_num == number_of_root_threads number_of_root_threads = new_num + pass += 1 end end end |