Class Delayed::Worker
In: lib/delayed/worker.rb
Parent: Object

Methods

Attributes

name_prefix  [RW]  name_prefix is ignored if name is set directly

Public Class methods

[Source]

    # File lib/delayed/worker.rb, line 63
63:     def self.after_fork
64:       # Re-open file handles
65:       @files_to_reopen.each do |file|
66:         begin
67:           file.reopen file.path, "a+"
68:           file.sync = true
69:         rescue ::Exception
70:         end
71:       end
72: 
73:       backend.after_fork
74:     end

[Source]

    # File lib/delayed/worker.rb, line 38
38:     def self.backend=(backend)
39:       if backend.is_a? Symbol
40:         require "delayed/serialization/#{backend}"
41:         require "delayed/backend/#{backend}"
42:         backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize
43:       end
44:       @@backend = backend
45:       silence_warnings { ::Delayed.const_set(:Job, backend) }
46:     end

[Source]

    # File lib/delayed/worker.rb, line 52
52:     def self.before_fork
53:       unless @files_to_reopen
54:         @files_to_reopen = []
55:         ObjectSpace.each_object(File) do |file|
56:           @files_to_reopen << file unless file.closed?
57:         end
58:       end
59: 
60:       backend.before_fork
61:     end

[Source]

    # File lib/delayed/worker.rb, line 48
48:     def self.guess_backend
49:       warn "[DEPRECATION] guess_backend is deprecated. Please remove it from your code."
50:     end

[Source]

    # File lib/delayed/worker.rb, line 76
76:     def self.lifecycle
77:       @lifecycle ||= Delayed::Lifecycle.new
78:     end

[Source]

    # File lib/delayed/worker.rb, line 80
80:     def initialize(options={})
81:       @quiet = options.has_key?(:quiet) ? options[:quiet] : true
82:       self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority)
83:       self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority)
84:       self.class.sleep_delay = options[:sleep_delay] if options.has_key?(:sleep_delay)
85:       self.class.queues = options[:queues] if options.has_key?(:queues)
86: 
87:       self.plugins.each { |klass| klass.new }
88:     end

Public Instance methods

[Source]

     # File lib/delayed/worker.rb, line 189
189:     def failed(job)
190:       self.class.lifecycle.run_callbacks(:failure, self, job) do
191:         job.hook(:failure)
192:         self.class.destroy_failed_jobs ? job.destroy : job.fail!
193:       end
194:     end

[Source]

     # File lib/delayed/worker.rb, line 202
202:     def max_attempts(job)
203:       job.max_attempts || self.class.max_attempts
204:     end

Every worker has a unique name which by default is the pid of the process. There are some advantages to overriding this with something which survives worker retarts: Workers can# safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.

[Source]

    # File lib/delayed/worker.rb, line 94
94:     def name
95:       return @name unless @name.nil?
96:       "#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}"
97:     end

Sets the name of the worker. Setting the name to nil will reset the default worker name

[Source]

     # File lib/delayed/worker.rb, line 101
101:     def name=(val)
102:       @name = val
103:     end

Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.

[Source]

     # File lib/delayed/worker.rb, line 177
177:     def reschedule(job, time = nil)
178:       if (job.attempts += 1) < max_attempts(job)
179:         time ||= job.reschedule_at
180:         job.run_at = time
181:         job.unlock
182:         job.save!
183:       else
184:         say "PERMANENTLY removing #{job.name} because of #{job.attempts} consecutive failures.", Logger::INFO
185:         failed(job)
186:       end
187:     end

[Source]

     # File lib/delayed/worker.rb, line 160
160:     def run(job)
161:       runtime =  Benchmark.realtime do
162:         Timeout.timeout(self.class.max_run_time.to_i) { job.invoke_job }
163:         job.destroy
164:       end
165:       say "#{job.name} completed after %.4f" % runtime
166:       return true  # did work
167:     rescue DeserializationError => error
168:       job.last_error = "{#{error.message}\n#{error.backtrace.join("\n")}"
169:       failed(job)
170:     rescue Exception => error
171:       self.class.lifecycle.run_callbacks(:error, self, job){ handle_failed_job(job, error) }
172:       return false  # work failed
173:     end

[Source]

     # File lib/delayed/worker.rb, line 196
196:     def say(text, level = Logger::INFO)
197:       text = "[Worker(#{name})] #{text}"
198:       puts text unless @quiet
199:       logger.add level, "#{Time.now.strftime('%FT%T%z')}: #{text}" if logger
200:     end

[Source]

     # File lib/delayed/worker.rb, line 105
105:     def start
106:       trap('TERM') { say 'Exiting...'; stop }
107:       trap('INT')  { say 'Exiting...'; stop }
108: 
109:       say "Starting job worker"
110: 
111:       self.class.lifecycle.run_callbacks(:execute, self) do
112:         loop do
113:           self.class.lifecycle.run_callbacks(:loop, self) do
114:             result = nil
115: 
116:             realtime = Benchmark.realtime do
117:               result = work_off
118:             end
119: 
120:             count = result.sum
121: 
122:             break if @exit
123: 
124:             if count.zero?
125:               sleep(self.class.sleep_delay)
126:             else
127:               say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
128:             end
129:           end
130: 
131:           break if @exit
132:         end
133:       end
134:     end

[Source]

     # File lib/delayed/worker.rb, line 136
136:     def stop
137:       @exit = true
138:     end

Do num jobs and return stats on success/failure. Exit early if interrupted.

[Source]

     # File lib/delayed/worker.rb, line 142
142:     def work_off(num = 100)
143:       success, failure = 0, 0
144: 
145:       num.times do
146:         case reserve_and_run_one_job
147:         when true
148:             success += 1
149:         when false
150:             failure += 1
151:         else
152:           break  # leave if no work could be done
153:         end
154:         break if $exit # leave if we're exiting
155:       end
156: 
157:       return [success, failure]
158:     end

Protected Instance methods

[Source]

     # File lib/delayed/worker.rb, line 208
208:     def handle_failed_job(job, error)
209:       job.last_error = "{#{error.message}\n#{error.backtrace.join("\n")}"
210:       say "#{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts", Logger::ERROR
211:       reschedule(job)
212:     end

Run the next job we can get an exclusive lock on. If no jobs are left we return nil

[Source]

     # File lib/delayed/worker.rb, line 216
216:     def reserve_and_run_one_job
217:       job = Delayed::Job.reserve(self)
218:       self.class.lifecycle.run_callbacks(:perform, self, job){ result = run(job) } if job
219:     end

[Validate]