Class Pho::Jobs
In: lib/pho/job.rb
Parent: Object

TODO job deletion

Methods

Constants

RESET = "http://schemas.talis.com/2006/bigfoot/configuration#ResetDataJob".freeze
SNAPSHOT = "http://schemas.talis.com/2006/bigfoot/configuration#SnapshotJob".freeze
REINDEX = "http://schemas.talis.com/2006/bigfoot/configuration#ReindexJob".freeze
RESTORE = "http://schemas.talis.com/2006/bigfoot/configuration#RestoreJob".freeze

Public Class methods

Construct an RDF/XML document containing a job request for submitting to the Platform.

t:a Time object, specifying the time at which the request should be carried out

[Source]

# File lib/pho/job.rb, line 79
    def Jobs.build_job_request(type, label, t=Time.now, snapshot_uri=nil)
      
      time = t.getutc.strftime("%Y-%m-%dT%H:%M:%SZ")
      data = "<rdf:RDF xmlns:rdf=\"http://www.w3.org/1999/02/22-rdf-syntax-ns#\" "
      data << " xmlns:rdfs=\"http://www.w3.org/2000/01/rdf-schema#\" " 
      data << " xmlns:bf=\"http://schemas.talis.com/2006/bigfoot/configuration#\"> " 
      data << " <bf:JobRequest>"
      data << "   <rdfs:label>#{label}</rdfs:label>"    
      data << "   <bf:jobType rdf:resource=\"#{type}\"/>"
      data << "   <bf:startTime>#{time}</bf:startTime>"
      
      if (snapshot_uri != nil)
        data << "   <bf:snapshotUri rdf:resource=\"#{snapshot_uri}\"/>"        
      end
      
      data << " </bf:JobRequest>"
      data << "</rdf:RDF>"
      return data      
    end

Reads the current list of scheduled jobs from the provided store. Returns an array of job names

 store:: store from which to read the scheduled job list

[Source]

# File lib/pho/job.rb, line 16
    def Jobs.read_from_store(store)
      resp = store.get_jobs()
      if resp.status != 200
        raise "Unable to read jobs from store. Status was {resp.status}"
      end
      jobs = Array.new
      
      doc = REXML::Document.new(resp.content)
      REXML::XPath.each(doc.root, "//bf:job", Pho::Namespaces::MAPPING) do |el|
        jobs << el.attributes["rdf:resource"]
      end
      return jobs
      
    end

Generic submit job method

[Source]

# File lib/pho/job.rb, line 72
    def Jobs.submit_job(store, jobtype, label, t=Time.now, snapshot_uri=nil)
      store.submit_job( build_job_request(jobtype, label, t, snapshot_uri) )
    end

Submit a reindex job to the Platform

This method submits the job, and will return an HTTP:Message indicating the response from the Platform. Client code should check this for success. As job processing may not be immediate, clients should determine the URI of the newly created job and then monitor the jobs status if they need to wait for the job to finish.

[Source]

# File lib/pho/job.rb, line 47
    def Jobs.submit_reindex(store, label="Reindex my store", t=Time.now)
      return submit_job(store, Pho::Jobs::REINDEX, label, t)      
    end

Submit a reset job to the Platform

This method submits the job, and will return an HTTP:Message indicating the response from the Platform. Client code should check this for success. As job processing may not be immediate, clients should determine the URI of the newly created job and then monitor the jobs status if they need to wait for the job to finish.

[Source]

# File lib/pho/job.rb, line 37
    def Jobs.submit_reset(store, label="Reset my store", t=Time.now)
      return submit_job(store, Pho::Jobs::RESET, label, t)
    end

Submit a restore job to the Platform

This method submits the job, and will return an HTTP:Message indicating the response from the Platform. Client code should check this for success. As job processing may not be immediate, clients should determine the URI of the newly created job and then monitor the jobs status if they need to wait for the job to finish.

[Source]

# File lib/pho/job.rb, line 67
    def Jobs.submit_restore(store, snapshot_uri, label="Restore my snapshot", t=Time.now)
      return submit_job(store, Pho::Jobs::RESTORE, label, t, snapshot_uri)
    end

Submit a snapshot job to the Platform

This method submits the job, and will return an HTTP:Message indicating the response from the Platform. Client code should check this for success. As job processing may not be immediate, clients should determine the URI of the newly created job and then monitor the jobs status if they need to wait for the job to finish.

[Source]

# File lib/pho/job.rb, line 57
    def Jobs.submit_snapshot(store, label="Snapshot my store", t=Time.now)
      return submit_job(store, Pho::Jobs::SNAPSHOT, label, t)
    end

Wait for the specified job to finish

The method will repeatedly contact the Platform to determine whether the job has finished executing. The requests are made at configurable intervals (once a minute by default). If a block is supplied, then it is passed a reference to the Job (containing current progress updates) after each request. The Job object is returned once completed.

 uri:: URI of the job to wait for
 store:: the store on which the job is running
 interval:: the interval at which checks will be made, in minutes. Default is 1

[Source]

# File lib/pho/job.rb, line 118
    def Jobs.wait_for(uri, store, interval=1, &block)
        updates = 0
        job = Job.read_from_store(uri, store)
        updates = yield_job_update(job, updates, &block)
        while !job.completed?
          sleep interval*60
          job = Job.read_from_store(uri, store)
          updates = yield_job_update(job, updates, &block)
        end
        return job
    end

Wait for a newly submitted job to finish

[Source]

# File lib/pho/job.rb, line 100
    def Jobs.wait_for_submitted(resp, store, interval=1, &block)
      if resp.status != 201
        raise "Unable to wait, job was not created. Status was #{resp.status}"
      end
      job_url = resp.header["Location"].first
      return wait_for(job_url, store, interval, &block)
    end

Protected Class methods

[Source]

# File lib/pho/job.rb, line 132
      def Jobs.yield_job_update(job, updates)
        if block_given?
            if job.started?
              
              #only yield start message if we've not seen any updates
              if updates == 0
                yield job, job.start_message, job.actual_start_time  
              end
                            
              if job.progress_updates.length > 0
                unseen = job.progress_updates[updates, job.progress_updates.length] 
                unseen.each do |update|
                  yield job, update.message, update.time
                end
                updates = job.progress_updates.length                
              end
              
              if job.completed?
                  yield job, job.completion_message, job.end_time
              end
              
            end
        end        
        return updates  
      end

[Validate]