Class: BackgroundJobQueue

Inherits:
Object
  • Object
show all
Defined in:
backend/app/lib/background_job_queue.rb

Constant Summary

JOB_TIMEOUT_SECONDS =
AppConfig[:job_timeout_seconds].to_i

Class Method Summary (collapse)

Instance Method Summary (collapse)

Class Method Details

+ (Object) init



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'backend/app/lib/background_job_queue.rb', line 139

def self.init
    # clear out stale jobs on start
    begin
      while(true) do
        stale = find_stale_job
        stale.finish(:canceled)
        break if stale.nil?
      end
    rescue
    end
    

    queue = BackgroundJobQueue.new
    queue.start_background_thread
  end

end

Instance Method Details

- (Object) find_queued_job



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'backend/app/lib/background_job_queue.rb', line 40

def find_queued_job
  while true
    DB.open do |db|

      job = Job.any_repo.
        filter(:status => "queued").
        order(:time_submitted).first

      return unless job

      begin
        job.status = "running"
        job.time_started = Time.now
        job.save

        return job
      rescue
        # Someone got this job.
        Log.info("Skipped job: #{job}")
        sleep 2
      end
    end
  end
end

- (Object) find_stale_job



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'backend/app/lib/background_job_queue.rb', line 17

def find_stale_job
  DB.open do |db|
    stale_job = Job.any_repo.
      filter(:status => "running").
      where {
      system_mtime <= (Time.now - JOB_TIMEOUT_SECONDS)
    }.first 

    if stale_job
      begin
        stale_job.time_started = Time.now
        stale_job.save
        return stale_job
      rescue
        # If we failed to save the job, another thread must have grabbed it
        # first.
        nil
      end
    end
  end
end

- (Object) get_next_job



66
67
68
# File 'backend/app/lib/background_job_queue.rb', line 66

def get_next_job
  find_stale_job || find_queued_job
end

- (Object) run_pending_job



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'backend/app/lib/background_job_queue.rb', line 71

def run_pending_job
  job = get_next_job

  return if !job

  finished = Atomic.new(false)
  job_canceled = Atomic.new(false)

  watchdog_thread = Thread.new do
    while !finished.value
      DB.open do
        Log.debug("Running job #{job.class.to_s}:#{job.id}")
        job = job.class.any_repo[job.id]

        if job.status === "canceled"
          # Notify the running import that we've been manually canceled
          Log.info("Received cancel request for job #{job.id}")
          job_canceled.value = true
        end

        job.save
      end

      sleep [5, (JOB_TIMEOUT_SECONDS / 2)].min
    end
  end

  begin
    runner = JobRunner.for(job).canceled(job_canceled)
    runner.run

    finished.value = true
    watchdog_thread.join

    if job_canceled.value
      job.finish(:canceled)
    else
      job.finish(:completed)
    end

  rescue
    Log.error("Job #{job.id} failed: #{$!} #{$@}")
    # If anything went wrong, make sure the watchdog thread still stops.
    finished.value = true
    watchdog_thread.join

    job.finish(:failed)
  end

  Log.debug("Completed job #{job.class.to_s}:#{job.id}")
end

- (Object) start_background_thread



124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'backend/app/lib/background_job_queue.rb', line 124

def start_background_thread
  Thread.new do
    while true
      begin
        run_pending_job
      rescue
        Log.error("Error in job manager thread: #{$!} #{$@}")
      end

      sleep AppConfig[:job_poll_seconds].to_i
    end
  end
end