Class Delayed::Worker
In: lib/delayed/worker.rb
Parent: Object

Methods

Constants

DEFAULT_SLEEP_DELAY = 5
DEFAULT_MAX_ATTEMPTS = 25
DEFAULT_MAX_RUN_TIME = 4.hours
DEFAULT_DEFAULT_PRIORITY = 0
DEFAULT_DELAY_JOBS = true
DEFAULT_QUEUES = []
DEFAULT_READ_AHEAD = 5

Attributes

name_prefix  [RW]  name_prefix is ignored if name is set directly

Public Class methods

[Source]

    # File lib/delayed/worker.rb, line 78
78:     def self.after_fork
79:       # Re-open file handles
80:       @files_to_reopen.each do |file|
81:         begin
82:           file.reopen file.path, "a+"
83:           file.sync = true
84:         rescue ::Exception
85:         end
86:       end
87: 
88:       backend.after_fork
89:     end

[Source]

    # File lib/delayed/worker.rb, line 53
53:     def self.backend=(backend)
54:       if backend.is_a? Symbol
55:         require "delayed/serialization/#{backend}"
56:         require "delayed/backend/#{backend}"
57:         backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize
58:       end
59:       @@backend = backend
60:       silence_warnings { ::Delayed.const_set(:Job, backend) }
61:     end

[Source]

    # File lib/delayed/worker.rb, line 67
67:     def self.before_fork
68:       unless @files_to_reopen
69:         @files_to_reopen = []
70:         ObjectSpace.each_object(File) do |file|
71:           @files_to_reopen << file unless file.closed?
72:         end
73:       end
74: 
75:       backend.before_fork
76:     end

[Source]

    # File lib/delayed/worker.rb, line 63
63:     def self.guess_backend
64:       warn "[DEPRECATION] guess_backend is deprecated. Please remove it from your code."
65:     end

[Source]

    # File lib/delayed/worker.rb, line 91
91:     def self.lifecycle
92:       @lifecycle ||= Delayed::Lifecycle.new
93:     end

[Source]

     # File lib/delayed/worker.rb, line 95
 95:     def initialize(options={})
 96:       @quiet = options.has_key?(:quiet) ? options[:quiet] : true
 97:       self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority)
 98:       self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority)
 99:       self.class.sleep_delay  = options[:sleep_delay] if options.has_key?(:sleep_delay)
100:       self.class.read_ahead   = options[:read_ahead] if options.has_key?(:read_ahead)
101:       self.class.queues       = options[:queues] if options.has_key?(:queues)
102: 
103:       self.plugins.each { |klass| klass.new }
104:     end

[Source]

    # File lib/delayed/worker.rb, line 28
28:     def self.reset
29:       self.sleep_delay      = DEFAULT_SLEEP_DELAY
30:       self.max_attempts     = DEFAULT_MAX_ATTEMPTS
31:       self.max_run_time     = DEFAULT_MAX_RUN_TIME
32:       self.default_priority = DEFAULT_DEFAULT_PRIORITY
33:       self.delay_jobs       = DEFAULT_DELAY_JOBS
34:       self.queues           = DEFAULT_QUEUES
35:       self.read_ahead       = DEFAULT_READ_AHEAD
36:     end

Public Instance methods

[Source]

     # File lib/delayed/worker.rb, line 209
209:     def failed(job)
210:       self.class.lifecycle.run_callbacks(:failure, self, job) do
211:         job.hook(:failure)
212:         self.class.destroy_failed_jobs ? job.destroy : job.fail!
213:       end
214:     end

[Source]

     # File lib/delayed/worker.rb, line 222
222:     def max_attempts(job)
223:       job.max_attempts || self.class.max_attempts
224:     end

Every worker has a unique name which by default is the pid of the process. There are some advantages to overriding this with something which survives worker retarts: Workers can# safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.

[Source]

     # File lib/delayed/worker.rb, line 110
110:     def name
111:       return @name unless @name.nil?
112:       "#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}"
113:     end

Sets the name of the worker. Setting the name to nil will reset the default worker name

[Source]

     # File lib/delayed/worker.rb, line 117
117:     def name=(val)
118:       @name = val
119:     end

Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.

[Source]

     # File lib/delayed/worker.rb, line 197
197:     def reschedule(job, time = nil)
198:       if (job.attempts += 1) < max_attempts(job)
199:         time ||= job.reschedule_at
200:         job.run_at = time
201:         job.unlock
202:         job.save!
203:       else
204:         say "PERMANENTLY removing #{job.name} because of #{job.attempts} consecutive failures.", Logger::INFO
205:         failed(job)
206:       end
207:     end

[Source]

     # File lib/delayed/worker.rb, line 180
180:     def run(job)
181:       runtime =  Benchmark.realtime do
182:         Timeout.timeout(self.class.max_run_time.to_i) { job.invoke_job }
183:         job.destroy
184:       end
185:       say "#{job.name} completed after %.4f" % runtime
186:       return true  # did work
187:     rescue DeserializationError => error
188:       job.last_error = "{#{error.message}\n#{error.backtrace.join("\n")}"
189:       failed(job)
190:     rescue Exception => error
191:       self.class.lifecycle.run_callbacks(:error, self, job){ handle_failed_job(job, error) }
192:       return false  # work failed
193:     end

[Source]

     # File lib/delayed/worker.rb, line 216
216:     def say(text, level = Logger::INFO)
217:       text = "[Worker(#{name})] #{text}"
218:       puts text unless @quiet
219:       logger.add level, "#{Time.now.strftime('%FT%T%z')}: #{text}" if logger
220:     end

[Source]

     # File lib/delayed/worker.rb, line 121
121:     def start
122:       trap('TERM') { say 'Exiting...'; stop }
123:       trap('INT')  { say 'Exiting...'; stop }
124: 
125:       say "Starting job worker"
126: 
127:       self.class.lifecycle.run_callbacks(:execute, self) do
128:         loop do
129:           self.class.lifecycle.run_callbacks(:loop, self) do
130:             result = nil
131: 
132:             realtime = Benchmark.realtime do
133:               result = work_off
134:             end
135: 
136:             count = result.sum
137: 
138:             break if stop?
139: 
140:             if count.zero?
141:               sleep(self.class.sleep_delay)
142:             else
143:               say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
144:             end
145:           end
146: 
147:           break if stop?
148:         end
149:       end
150:     end

[Source]

     # File lib/delayed/worker.rb, line 152
152:     def stop
153:       @exit = true
154:     end

[Source]

     # File lib/delayed/worker.rb, line 156
156:     def stop?
157:       !!@exit
158:     end

Do num jobs and return stats on success/failure. Exit early if interrupted.

[Source]

     # File lib/delayed/worker.rb, line 162
162:     def work_off(num = 100)
163:       success, failure = 0, 0
164: 
165:       num.times do
166:         case reserve_and_run_one_job
167:         when true
168:             success += 1
169:         when false
170:             failure += 1
171:         else
172:           break  # leave if no work could be done
173:         end
174:         break if stop? # leave if we're exiting
175:       end
176: 
177:       return [success, failure]
178:     end

Protected Instance methods

[Source]

     # File lib/delayed/worker.rb, line 228
228:     def handle_failed_job(job, error)
229:       job.last_error = "{#{error.message}\n#{error.backtrace.join("\n")}"
230:       say "#{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts", Logger::ERROR
231:       reschedule(job)
232:     end

Run the next job we can get an exclusive lock on. If no jobs are left we return nil

[Source]

     # File lib/delayed/worker.rb, line 236
236:     def reserve_and_run_one_job
237:       job = Delayed::Job.reserve(self)
238:       self.class.lifecycle.run_callbacks(:perform, self, job){ result = run(job) } if job
239:     end

[Validate]