Class: BackgroundJobQueue
- Inherits:
-
Object
- Object
- BackgroundJobQueue
- Defined in:
- backend/app/lib/background_job_queue.rb
Constant Summary
- JOB_TIMEOUT_SECONDS =
AppConfig[:job_timeout_seconds].to_i
Class Method Summary (collapse)
Instance Method Summary (collapse)
-
- (Object) find_queued_job
-
- (Object) find_stale_job
-
- (Object) get_next_job
-
- (Object) run_pending_job
-
- (Object) start_background_thread
Class Method Details
+ (Object) init
[View source]
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'backend/app/lib/background_job_queue.rb', line 139 def self.init # clear out stale jobs on start begin while(true) do stale = find_stale_job stale.finish(:canceled) break if stale.nil? end rescue end queue = BackgroundJobQueue.new queue.start_background_thread end end |
Instance Method Details
- (Object) find_queued_job
[View source]
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'backend/app/lib/background_job_queue.rb', line 40 def find_queued_job while true DB.open do |db| job = Job.any_repo. filter(:status => "queued"). order(:time_submitted).first return unless job begin job.status = "running" job.time_started = Time.now job.save return job rescue # Someone got this job. Log.info("Skipped job: #{job}") sleep 2 end end end end |
- (Object) find_stale_job
[View source]
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'backend/app/lib/background_job_queue.rb', line 17 def find_stale_job DB.open do |db| stale_job = Job.any_repo. filter(:status => "running"). where { system_mtime <= (Time.now - JOB_TIMEOUT_SECONDS) }.first if stale_job begin stale_job.time_started = Time.now stale_job.save return stale_job rescue # If we failed to save the job, another thread must have grabbed it # first. nil end end end end |
- (Object) get_next_job
[View source]
66 67 68 |
# File 'backend/app/lib/background_job_queue.rb', line 66 def get_next_job find_stale_job || find_queued_job end |
- (Object) run_pending_job
[View source]
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'backend/app/lib/background_job_queue.rb', line 71 def run_pending_job job = get_next_job return if !job finished = Atomic.new(false) job_canceled = Atomic.new(false) watchdog_thread = Thread.new do while !finished.value DB.open do Log.debug("Running job #{job.class.to_s}:#{job.id}") job = job.class.any_repo[job.id] if job.status === "canceled" # Notify the running import that we've been manually canceled Log.info("Received cancel request for job #{job.id}") job_canceled.value = true end job.save end sleep [5, (JOB_TIMEOUT_SECONDS / 2)].min end end begin runner = JobRunner.for(job).canceled(job_canceled) runner.run finished.value = true watchdog_thread.join if job_canceled.value job.finish(:canceled) else job.finish(:completed) end rescue Log.error("Job #{job.id} failed: #{$!} #{$@}") # If anything went wrong, make sure the watchdog thread still stops. finished.value = true watchdog_thread.join job.finish(:failed) end Log.debug("Completed job #{job.class.to_s}:#{job.id}") end |
- (Object) start_background_thread
[View source]
124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'backend/app/lib/background_job_queue.rb', line 124 def start_background_thread Thread.new do while true begin run_pending_job rescue Log.error("Error in job manager thread: #{$!} #{$@}") end sleep AppConfig[:job_poll_seconds].to_i end end end |