Changeset 1608
- Timestamp:
- 06/09/08 13:39:59 (3 months ago)
- Files:
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
hive/trunk/data_webapp/app/apis/hc_database_api.rb
r1564 r1608 3 3 class HcDatabaseApi < ActionWebService::API::Base 4 4 inflect_names false 5 api_method :insert_ queue_urls, :expects => [:string], :returns => [:string]5 api_method :insert_drone_job, :expects => [:string], :returns => [:string] 6 6 api_method :insert_history_urls, :expects => [:string], :returns => [:string] 7 7 api_method :insert_fingerprint, :expects => [:string], :returns => [:string] hive/trunk/data_webapp/app/controllers/hc_database_controller.rb
r1602 r1608 26 26 #private :insert 27 27 28 # Remove a specified URL from the queue.29 def dequeue_url(url)30 q = QueueUrl.find(:first,:conditions => {:url => url})31 jobs = q.drone_jobs32 if not q.nil?33 q.destroy34 end35 return jobs36 end37 38 28 ####################################################################### 39 29 # Public Methods Implemented # 40 30 ####################################################################### 41 31 42 # Insert a set of URLs into the queue. 43 def insert_queue_urls(obj_str) 32 def insert_drone_job(obj_str) 44 33 if (obj_str.class.name == 'Hash') 45 obj_hash = obj_str 34 obj_hash = obj_str.symbolize_keys 46 35 else 47 obj_hash = YAML.load(obj_str) 48 end 49 50 # Get source information 51 stype = obj_hash.delete("source_type") or "unknown" 52 sname = obj_hash.delete("source_name") 53 54 count = 0 55 obj_hash.each do |u,p| 56 url_obj = { 57 "url" => u, 58 "priority" => p, 59 "source_type" => stype, 60 "source_name" => sname 61 } 62 # There's a small chance that another process instance may have 63 # simultaneously inserted the same URL into our queue. In that 64 # case, we simply retry the insert operation until it succeeds. 36 obj_hash = YAML.load(obj_str).symbolize_keys 37 end 38 39 # Perform any sanity checking. 40 raise "Job contains no queue_urls" if not obj_hash.has_key?(:queue_urls) 41 raise "Job contains no source" if not obj_hash.has_key?(:source) 42 43 # Create a JobSource 44 job_source = new_or_existing(JobSource,obj_hash.delete(:source).symbolize_keys) 45 job_source.save! 46 47 # Extract the set of URLs 48 urls = obj_hash.delete(:queue_urls) 49 50 # Create a DroneJob 51 obj_hash[:notify_source] = (obj_hash[:notify_source] == 'true') ? true : false 52 job = DroneJob.new(obj_hash.symbolize_keys) 53 job.job_source = job_source 54 job.save! 55 56 # Create the QueueUrl objects 57 urls.each do |url,priority| 58 insert_pending_url(:url => url, :priority => priority, :job => job) 59 end 60 61 return job.id 62 end 63 64 def insert_pending_url(args) 65 PendingUrl.transaction do 66 # There's a small chance that another process instance may have simultaneously inserted 67 # the same URL into our queue. In that case, we simply retry the insert operation until it succeeds. 68 url_obj = nil 65 69 while true 66 70 begin 67 if insert(QueueUrl,url_obj) 68 # Counts the number of URLs successfully inserted OR modified. 69 count += 1 70 break 71 72 # Attempt to find any existing QueueUrl objects. 73 while true 74 begin 75 url_obj = QueueUrl.find(:first, :lock => 'FOR UPDATE', :conditions => { :url => args[:url] }) 76 77 # We've successfully got a QueueUrl object; break out of loop. 78 break 79 rescue 80 if $!.to_s.index('Deadlock found').nil? 81 # If we encounter something other than a "Deadlock found" exception, then 82 # propagate the error. 83 raise $!.to_s 84 end 85 end 71 86 end 87 88 if not url_obj.nil? 89 # We've found an existing object; update priority and count. 90 url_obj.priority = url_obj.priority < args[:priority] ? args[:priority] : url_obj.priority 91 url_obj.count += 1 92 else 93 # We create a new object. 94 url_obj = QueueUrl.new(:url => args[:url], :priority => args[:priority]) 95 end 96 url_obj.save! 97 98 # We've successfully got a QueueUrl object; break out of loop. 99 break 72 100 rescue 73 break if $!.to_s.index('Duplicate entry').nil? 74 logger.warn "insert_queue_urls(): Collision on URL: (" + u.to_s + ") - retrying insert." 101 if $!.to_s.index('Duplicate entry').nil? 102 # If we encounter something other than a "Duplicate entry" exception, then 103 # propagate the error. 104 raise $!.to_s 105 end 106 logger.warn "insert_pending_url(): Collision on URL: (" + url.to_s + ") - retrying insert." 75 107 end 76 108 end 77 end 78 79 # Return the number of URLs that were successfully queued. 80 return count 109 110 # Sanity check: Some other error occurred 111 if url_obj.nil? 112 raise "insert_pending_url(): Unable to obtain QueueUrl object." 113 end 114 115 PendingUrl.new(:queue_url => url_obj, :drone_job => args[:job]).save! 116 end 81 117 end 82 118 … … 94 130 bee_work = {"cid" => client.cid, "urls" => []} 95 131 96 # Insert all visited and timed out URLs into the 97 # history. 132 # Insert all visited and timed out URLs into the history. 98 133 count = 0 99 link_types = ["links_visited","links_timed_out"] 100 link_types.each do |l| 134 ["links_visited","links_timed_out","links_ignored","links_suspicious"].each do |l| 101 135 # Build URL history item and insert. 102 136 obj_hash[l].each do |u,t| … … 108 142 "client_id" => cid, 109 143 } 110 q = QueueUrl.find(:first,:conditions => {:url => u}) 111 if not q.nil? 112 url_obj["source_type"] = (q.source_type or "unknown") 113 url_obj["source_name"] = q.source_name 114 end 115 116 # Prevent duplication of history items due to 117 # possible flawed logic on (RPC) client side. 118 if not HistoryUrl.exists?(url_obj) 119 bee_work["urls"] << url_obj 144 CompletedUrl.transaction do 145 q = QueueUrl.find(:first,:conditions => {:url => u},:lock => 'FOR UPDATE') 120 146 121 count += 1 if hu = insert(HistoryUrl,url_obj) 122 123 jobs = dequeue_url(u) 124 if not hu.nil? 125 jobs.each do |job| 126 job.completed_urls << CompletedUrl.new(:history_url => hu) 127 job.save 147 # Prevent duplication of history items due to possible flawed logic on (RPC) client side. 148 if not HistoryUrl.exists?(url_obj) 149 bee_work["urls"] << url_obj 150 151 # Insert the HistoryUrl and retrieve the corresponding inserted object 152 if hu = insert(HistoryUrl,url_obj) 153 count += 1 154 hu = HistoryUrl.find(hu) 128 155 end 156 157 # Get the jobs associated with the QueueUrl 158 jobs = [] 159 if not q.nil? 160 jobs = q.drone_jobs 161 end 162 163 # Add the HistoryUrl to all corresponding DroneJob objects 164 if not hu.nil? 165 jobs.each do |job| 166 CompletedUrl.new(:history_url => hu,:drone_job => job).save! 167 # TODO: dispatch_response(job) if job.pending_urls_count == 0 168 end 169 end 170 171 # Remove the QueueUrl object 172 q.destroy if not q.nil? 129 173 end 130 174 end 131 175 end 132 end133 134 # Don't insert any ignored URLs into the history, but135 # remove references from the queue.136 obj_hash["links_ignored"].each do |u,t|137 dequeue_url(u)138 176 end 139 177 … … 141 179 BeeJob.add_job("history_urls",bee_work) 142 180 143 # Return the number of URLs that were successfully 144 # inserted into the history. 181 # Return the number of URLs that were successfully inserted into the history. 145 182 return count 146 183 end hive/trunk/data_webapp/app/models/queue_url.rb
r1602 r1608 9 9 validates_presence_of :url 10 10 validates_presence_of :url_md5 11 validates_presence_of :priority12 11 validates_presence_of :last_visited_at 13 12 validates_presence_of :created_at 14 13 validates_presence_of :count 15 14 validates_presence_of :host_id 16 validates_presence_of :source_type17 15 validates_numericality_of :host_id, :only_integer => true, :greater_than_or_equal_to => 0 18 16 validates_numericality_of :count, :only_integer => true, :greater_than_or_equal_to => 0 19 17 validates_numericality_of :last_visited_at, :greater_than_or_equal_to => 0 20 18 validates_numericality_of :created_at, :greater_than_or_equal_to => 0 19 validates_numericality_of :priority, :only_integer => true, :greater_than_or_equal_to => 0 21 20 22 def source_type 23 self[:source_type] or "unknown" 24 end 25 26 def source_type= (val) 27 self[:source_type] = val or "unknown" 28 end 29 30 def self.new_from_hash(obj_hash) 21 def before_validation 31 22 # Validate and Normalize the url; Remove fragment 32 23 begin 33 enc_uri = URI.escape((obj_hash[:url] or obj_hash.delete("url"))) 34 uri = URI.parse(enc_uri) 24 uri = URI.parse(URI.escape(self.url)) 35 25 #ip = Resolv.getaddress(uri.host) 36 26 rescue 37 27 # XXX: Is this still needed? 38 logger. info "Could not parse URL or resolve host"39 return nil28 logger.warn "Could not parse or resolve URL: " + self.url.to_s() 29 return false 40 30 end 41 31 uri.fragment = nil 42 u = uri.normalize().to_s() 43 p = (obj_hash[:priority] or obj_hash["priority"] or 0) 44 p = p.to_i() 45 46 # Save the normalization back to the original hash 47 obj_hash[:url] = u 48 49 # If the url is already queued, update the count and priority 50 if QueueUrl.exists?(:url => u) 51 url_obj = QueueUrl.find(:first, :conditions => {:url => u}) 52 if url_obj.host_id > 0 53 return nil 54 end 55 url_obj.priority = url_obj.priority < p ? p : url_obj.priority 56 url_obj.count += 1 57 url_obj 58 # If the url is new, add it with the timestamps for creation and last visit 59 else 60 obj_hash[:created_at] = Time.now.to_f 61 obj_hash[:url_md5] = Digest::MD5.hexdigest(obj_hash[:url]) 62 hist_url = HistoryUrl.find(:first, :order => "time_at DESC", :conditions => {:url => u}) 63 if not hist_url.nil? 64 obj_hash[:last_visited_at] = hist_url.time_at 65 end 66 QueueUrl.new(obj_hash) 32 # Save the normalization back to url 33 self.url = uri.normalize().to_s() 34 self.created_at = Time.now.to_f 35 self.url_md5 = Digest::MD5.hexdigest(self.url) 36 hist_url = HistoryUrl.find(:first, :order => "time_at DESC", :conditions => {:url => self.url}) 37 if not hist_url.nil? 38 self.last_visited_at = hist_url.time_at 67 39 end 68 40 end
