Class: BackgroundJobQueue

Inherits:
Object
  • Object
show all
Defined in:
backend/app/lib/background_job_queue.rb

Constant Summary

JOB_TIMEOUT_SECONDS =
AppConfig[:job_timeout_seconds].to_i

Class Method Summary (collapse)

Instance Method Summary (collapse)

Class Method Details

+ (Object) init

[View source]

139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'backend/app/lib/background_job_queue.rb', line 139

def self.init
    # clear out stale jobs on start
    begin
      while(true) do
        stale = find_stale_job
        stale.finish(:canceled)
        break if stale.nil?
      end
    rescue
    end
    

    queue = BackgroundJobQueue.new
    queue.start_background_thread
  end

end

Instance Method Details

- (Object) find_queued_job

[View source]

40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'backend/app/lib/background_job_queue.rb', line 40

def find_queued_job
  while true
    DB.open do |db|

      job = Job.any_repo.
        filter(:status => "queued").
        order(:time_submitted).first

      return unless job

      begin
        job.status = "running"
        job.time_started = Time.now
        job.save

        return job
      rescue
        # Someone got this job.
        Log.info("Skipped job: #{job}")
        sleep 2
      end
    end
  end
end

- (Object) find_stale_job

[View source]

17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'backend/app/lib/background_job_queue.rb', line 17

def find_stale_job
  DB.open do |db|
    stale_job = Job.any_repo.
      filter(:status => "running").
      where {
      system_mtime <= (Time.now - JOB_TIMEOUT_SECONDS)
    }.first 

    if stale_job
      begin
        stale_job.time_started = Time.now
        stale_job.save
        return stale_job
      rescue
        # If we failed to save the job, another thread must have grabbed it
        # first.
        nil
      end
    end
  end
end

- (Object) get_next_job

[View source]

66
67
68
# File 'backend/app/lib/background_job_queue.rb', line 66

def get_next_job
  find_stale_job || find_queued_job
end

- (Object) run_pending_job

[View source]

71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'backend/app/lib/background_job_queue.rb', line 71

def run_pending_job
  job = get_next_job

  return if !job

  finished = Atomic.new(false)
  job_canceled = Atomic.new(false)

  watchdog_thread = Thread.new do
    while !finished.value
      DB.open do
        Log.debug("Running job #{job.class.to_s}:#{job.id}")
        job = job.class.any_repo[job.id]

        if job.status === "canceled"
          # Notify the running import that we've been manually canceled
          Log.info("Received cancel request for job #{job.id}")
          job_canceled.value = true
        end

        job.save
      end

      sleep [5, (JOB_TIMEOUT_SECONDS / 2)].min
    end
  end

  begin
    runner = JobRunner.for(job).canceled(job_canceled)
    runner.run

    finished.value = true
    watchdog_thread.join

    if job_canceled.value
      job.finish(:canceled)
    else
      job.finish(:completed)
    end

  rescue
    Log.error("Job #{job.id} failed: #{$!} #{$@}")
    # If anything went wrong, make sure the watchdog thread still stops.
    finished.value = true
    watchdog_thread.join

    job.finish(:failed)
  end

  Log.debug("Completed job #{job.class.to_s}:#{job.id}")
end

- (Object) start_background_thread

[View source]

124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'backend/app/lib/background_job_queue.rb', line 124

def start_background_thread
  Thread.new do
    while true
      begin
        run_pending_job
      rescue
        Log.error("Error in job manager thread: #{$!} #{$@}")
      end

      sleep AppConfig[:job_poll_seconds].to_i
    end
  end
end