| Class | Pho::Jobs |
| In: |
lib/pho/job.rb
|
| Parent: | Object |
TODO job deletion
| RESET | = | "http://schemas.talis.com/2006/bigfoot/configuration#ResetDataJob".freeze |
| SNAPSHOT | = | "http://schemas.talis.com/2006/bigfoot/configuration#SnapshotJob".freeze |
| REINDEX | = | "http://schemas.talis.com/2006/bigfoot/configuration#ReindexJob".freeze |
| RESTORE | = | "http://schemas.talis.com/2006/bigfoot/configuration#RestoreJob".freeze |
Construct an RDF/XML document containing a job request for submitting to the Platform.
| t: | a Time object, specifying the time at which the request should be carried out |
# File lib/pho/job.rb, line 79 def Jobs.build_job_request(type, label, t=Time.now, snapshot_uri=nil) time = t.getutc.strftime("%Y-%m-%dT%H:%M:%SZ") data = "<rdf:RDF xmlns:rdf=\"http://www.w3.org/1999/02/22-rdf-syntax-ns#\" " data << " xmlns:rdfs=\"http://www.w3.org/2000/01/rdf-schema#\" " data << " xmlns:bf=\"http://schemas.talis.com/2006/bigfoot/configuration#\"> " data << " <bf:JobRequest>" data << " <rdfs:label>#{label}</rdfs:label>" data << " <bf:jobType rdf:resource=\"#{type}\"/>" data << " <bf:startTime>#{time}</bf:startTime>" if (snapshot_uri != nil) data << " <bf:snapshotUri rdf:resource=\"#{snapshot_uri}\"/>" end data << " </bf:JobRequest>" data << "</rdf:RDF>" return data end
Reads the current list of scheduled jobs from the provided store. Returns an array of job names
store:: store from which to read the scheduled job list
# File lib/pho/job.rb, line 16 def Jobs.read_from_store(store) resp = store.get_jobs() if resp.status != 200 raise "Unable to read jobs from store. Status was {resp.status}" end jobs = Array.new doc = REXML::Document.new(resp.content) REXML::XPath.each(doc.root, "//bf:job", Pho::Namespaces::MAPPING) do |el| jobs << el.attributes["rdf:resource"] end return jobs end
Generic submit job method
# File lib/pho/job.rb, line 72 def Jobs.submit_job(store, jobtype, label, t=Time.now, snapshot_uri=nil) store.submit_job( build_job_request(jobtype, label, t, snapshot_uri) ) end
Submit a reindex job to the Platform
This method submits the job, and will return an HTTP:Message indicating the response from the Platform. Client code should check this for success. As job processing may not be immediate, clients should determine the URI of the newly created job and then monitor the jobs status if they need to wait for the job to finish.
# File lib/pho/job.rb, line 47 def Jobs.submit_reindex(store, label="Reindex my store", t=Time.now) return submit_job(store, Pho::Jobs::REINDEX, label, t) end
Submit a reset job to the Platform
This method submits the job, and will return an HTTP:Message indicating the response from the Platform. Client code should check this for success. As job processing may not be immediate, clients should determine the URI of the newly created job and then monitor the jobs status if they need to wait for the job to finish.
# File lib/pho/job.rb, line 37 def Jobs.submit_reset(store, label="Reset my store", t=Time.now) return submit_job(store, Pho::Jobs::RESET, label, t) end
Submit a restore job to the Platform
This method submits the job, and will return an HTTP:Message indicating the response from the Platform. Client code should check this for success. As job processing may not be immediate, clients should determine the URI of the newly created job and then monitor the jobs status if they need to wait for the job to finish.
# File lib/pho/job.rb, line 67 def Jobs.submit_restore(store, snapshot_uri, label="Restore my snapshot", t=Time.now) return submit_job(store, Pho::Jobs::RESTORE, label, t, snapshot_uri) end
Submit a snapshot job to the Platform
This method submits the job, and will return an HTTP:Message indicating the response from the Platform. Client code should check this for success. As job processing may not be immediate, clients should determine the URI of the newly created job and then monitor the jobs status if they need to wait for the job to finish.
# File lib/pho/job.rb, line 57 def Jobs.submit_snapshot(store, label="Snapshot my store", t=Time.now) return submit_job(store, Pho::Jobs::SNAPSHOT, label, t) end
Wait for the specified job to finish
The method will repeatedly contact the Platform to determine whether the job has finished executing. The requests are made at configurable intervals (once a minute by default). If a block is supplied, then it is passed a reference to the Job (containing current progress updates) after each request. The Job object is returned once completed.
uri:: URI of the job to wait for store:: the store on which the job is running interval:: the interval at which checks will be made, in minutes. Default is 1
# File lib/pho/job.rb, line 118 def Jobs.wait_for(uri, store, interval=1, &block) updates = 0 job = Job.read_from_store(uri, store) updates = yield_job_update(job, updates, &block) while !job.completed? sleep interval*60 job = Job.read_from_store(uri, store) updates = yield_job_update(job, updates, &block) end return job end
Wait for a newly submitted job to finish
# File lib/pho/job.rb, line 100 def Jobs.wait_for_submitted(resp, store, interval=1, &block) if resp.status != 201 raise "Unable to wait, job was not created. Status was #{resp.status}" end job_url = resp.header["Location"].first return wait_for(job_url, store, interval, &block) end
# File lib/pho/job.rb, line 132 def Jobs.yield_job_update(job, updates) if block_given? if job.started? #only yield start message if we've not seen any updates if updates == 0 yield job, job.start_message, job.actual_start_time end if job.progress_updates.length > 0 unseen = job.progress_updates[updates, job.progress_updates.length] unseen.each do |update| yield job, update.message, update.time end updates = job.progress_updates.length end if job.completed? yield job, job.completion_message, job.end_time end end end return updates end