Class | Delayed::Worker |
In: |
lib/delayed/worker.rb
|
Parent: | Object |
DEFAULT_SLEEP_DELAY | = | 5 |
DEFAULT_MAX_ATTEMPTS | = | 25 |
DEFAULT_MAX_RUN_TIME | = | 4.hours |
DEFAULT_DEFAULT_PRIORITY | = | 0 |
DEFAULT_DELAY_JOBS | = | true |
DEFAULT_QUEUES | = | [] |
DEFAULT_READ_AHEAD | = | 5 |
name_prefix | [RW] | name_prefix is ignored if name is set directly |
# File lib/delayed/worker.rb, line 78 78: def self.after_fork 79: # Re-open file handles 80: @files_to_reopen.each do |file| 81: begin 82: file.reopen file.path, "a+" 83: file.sync = true 84: rescue ::Exception 85: end 86: end 87: 88: backend.after_fork 89: end
# File lib/delayed/worker.rb, line 53 53: def self.backend=(backend) 54: if backend.is_a? Symbol 55: require "delayed/serialization/#{backend}" 56: require "delayed/backend/#{backend}" 57: backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize 58: end 59: @@backend = backend 60: silence_warnings { ::Delayed.const_set(:Job, backend) } 61: end
# File lib/delayed/worker.rb, line 67 67: def self.before_fork 68: unless @files_to_reopen 69: @files_to_reopen = [] 70: ObjectSpace.each_object(File) do |file| 71: @files_to_reopen << file unless file.closed? 72: end 73: end 74: 75: backend.before_fork 76: end
# File lib/delayed/worker.rb, line 63 63: def self.guess_backend 64: warn "[DEPRECATION] guess_backend is deprecated. Please remove it from your code." 65: end
# File lib/delayed/worker.rb, line 91 91: def self.lifecycle 92: @lifecycle ||= Delayed::Lifecycle.new 93: end
# File lib/delayed/worker.rb, line 95 95: def initialize(options={}) 96: @quiet = options.has_key?(:quiet) ? options[:quiet] : true 97: self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority) 98: self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority) 99: self.class.sleep_delay = options[:sleep_delay] if options.has_key?(:sleep_delay) 100: self.class.read_ahead = options[:read_ahead] if options.has_key?(:read_ahead) 101: self.class.queues = options[:queues] if options.has_key?(:queues) 102: 103: self.plugins.each { |klass| klass.new } 104: end
# File lib/delayed/worker.rb, line 28 28: def self.reset 29: self.sleep_delay = DEFAULT_SLEEP_DELAY 30: self.max_attempts = DEFAULT_MAX_ATTEMPTS 31: self.max_run_time = DEFAULT_MAX_RUN_TIME 32: self.default_priority = DEFAULT_DEFAULT_PRIORITY 33: self.delay_jobs = DEFAULT_DELAY_JOBS 34: self.queues = DEFAULT_QUEUES 35: self.read_ahead = DEFAULT_READ_AHEAD 36: end
# File lib/delayed/worker.rb, line 209 209: def failed(job) 210: self.class.lifecycle.run_callbacks(:failure, self, job) do 211: job.hook(:failure) 212: self.class.destroy_failed_jobs ? job.destroy : job.fail! 213: end 214: end
# File lib/delayed/worker.rb, line 222 222: def max_attempts(job) 223: job.max_attempts || self.class.max_attempts 224: end
Every worker has a unique name which by default is the pid of the process. There are some advantages to overriding this with something which survives worker retarts: Workers can# safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
# File lib/delayed/worker.rb, line 110 110: def name 111: return @name unless @name.nil? 112: "#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}" 113: end
Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.
# File lib/delayed/worker.rb, line 197 197: def reschedule(job, time = nil) 198: if (job.attempts += 1) < max_attempts(job) 199: time ||= job.reschedule_at 200: job.run_at = time 201: job.unlock 202: job.save! 203: else 204: say "PERMANENTLY removing #{job.name} because of #{job.attempts} consecutive failures.", Logger::INFO 205: failed(job) 206: end 207: end
# File lib/delayed/worker.rb, line 180 180: def run(job) 181: runtime = Benchmark.realtime do 182: Timeout.timeout(self.class.max_run_time.to_i) { job.invoke_job } 183: job.destroy 184: end 185: say "#{job.name} completed after %.4f" % runtime 186: return true # did work 187: rescue DeserializationError => error 188: job.last_error = "{#{error.message}\n#{error.backtrace.join("\n")}" 189: failed(job) 190: rescue Exception => error 191: self.class.lifecycle.run_callbacks(:error, self, job){ handle_failed_job(job, error) } 192: return false # work failed 193: end
# File lib/delayed/worker.rb, line 216 216: def say(text, level = Logger::INFO) 217: text = "[Worker(#{name})] #{text}" 218: puts text unless @quiet 219: logger.add level, "#{Time.now.strftime('%FT%T%z')}: #{text}" if logger 220: end
# File lib/delayed/worker.rb, line 121 121: def start 122: trap('TERM') { say 'Exiting...'; stop } 123: trap('INT') { say 'Exiting...'; stop } 124: 125: say "Starting job worker" 126: 127: self.class.lifecycle.run_callbacks(:execute, self) do 128: loop do 129: self.class.lifecycle.run_callbacks(:loop, self) do 130: result = nil 131: 132: realtime = Benchmark.realtime do 133: result = work_off 134: end 135: 136: count = result.sum 137: 138: break if stop? 139: 140: if count.zero? 141: sleep(self.class.sleep_delay) 142: else 143: say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last] 144: end 145: end 146: 147: break if stop? 148: end 149: end 150: end
Do num jobs and return stats on success/failure. Exit early if interrupted.
# File lib/delayed/worker.rb, line 162 162: def work_off(num = 100) 163: success, failure = 0, 0 164: 165: num.times do 166: case reserve_and_run_one_job 167: when true 168: success += 1 169: when false 170: failure += 1 171: else 172: break # leave if no work could be done 173: end 174: break if stop? # leave if we're exiting 175: end 176: 177: return [success, failure] 178: end
# File lib/delayed/worker.rb, line 228 228: def handle_failed_job(job, error) 229: job.last_error = "{#{error.message}\n#{error.backtrace.join("\n")}" 230: say "#{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts", Logger::ERROR 231: reschedule(job) 232: end