Changeset 1608

Show
Ignore:
Timestamp:
06/09/08 13:39:59 (3 months ago)
Author:
mbriggs
Message:

Added insert_drone_job. insert_queue_urls is now deprecated.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • hive/trunk/data_webapp/app/apis/hc_database_api.rb

    r1564 r1608  
    33class HcDatabaseApi < ActionWebService::API::Base 
    44  inflect_names false 
    5   api_method :insert_queue_urls, :expects => [:string], :returns => [:string] 
     5  api_method :insert_drone_job, :expects => [:string], :returns => [:string] 
    66  api_method :insert_history_urls, :expects => [:string], :returns => [:string] 
    77  api_method :insert_fingerprint, :expects => [:string], :returns => [:string] 
  • hive/trunk/data_webapp/app/controllers/hc_database_controller.rb

    r1602 r1608  
    2626  #private :insert 
    2727 
    28   # Remove a specified URL from the queue.  
    29   def dequeue_url(url) 
    30     q = QueueUrl.find(:first,:conditions => {:url => url}) 
    31     jobs = q.drone_jobs 
    32     if not q.nil? 
    33       q.destroy 
    34     end 
    35     return jobs 
    36   end 
    37  
    3828  ####################################################################### 
    3929  # Public Methods Implemented                                          # 
    4030  ####################################################################### 
    4131 
    42   # Insert a set of URLs into the queue. 
    43   def insert_queue_urls(obj_str) 
     32  def insert_drone_job(obj_str) 
    4433    if (obj_str.class.name == 'Hash') 
    45       obj_hash = obj_str 
     34      obj_hash = obj_str.symbolize_keys 
    4635    else 
    47       obj_hash = YAML.load(obj_str) 
    48     end 
    49  
    50     # Get source information 
    51     stype = obj_hash.delete("source_type") or "unknown" 
    52     sname = obj_hash.delete("source_name") 
    53      
    54     count = 0 
    55     obj_hash.each do |u,p| 
    56       url_obj = { 
    57         "url" => u, 
    58         "priority" => p, 
    59         "source_type" => stype, 
    60         "source_name" => sname 
    61       } 
    62       # There's a small chance that another process instance may have 
    63       # simultaneously inserted the same URL into our queue.  In that 
    64       # case, we simply retry the insert operation until it succeeds. 
     36      obj_hash = YAML.load(obj_str).symbolize_keys 
     37    end 
     38 
     39    # Perform any sanity checking. 
     40    raise "Job contains no queue_urls" if not obj_hash.has_key?(:queue_urls) 
     41    raise "Job contains no source" if not obj_hash.has_key?(:source) 
     42     
     43    # Create a JobSource 
     44    job_source = new_or_existing(JobSource,obj_hash.delete(:source).symbolize_keys) 
     45    job_source.save! 
     46 
     47    # Extract the set of URLs 
     48    urls = obj_hash.delete(:queue_urls) 
     49     
     50    # Create a DroneJob 
     51    obj_hash[:notify_source] = (obj_hash[:notify_source] == 'true') ? true : false 
     52    job = DroneJob.new(obj_hash.symbolize_keys) 
     53    job.job_source = job_source 
     54    job.save! 
     55 
     56    # Create the QueueUrl objects 
     57    urls.each do |url,priority| 
     58      insert_pending_url(:url => url, :priority => priority, :job => job) 
     59    end 
     60 
     61    return job.id  
     62  end 
     63 
     64  def insert_pending_url(args) 
     65    PendingUrl.transaction do 
     66      # There's a small chance that another process instance may have simultaneously inserted  
     67      # the same URL into our queue.  In that case, we simply retry the insert operation until it succeeds. 
     68      url_obj = nil 
    6569      while true 
    6670        begin 
    67           if insert(QueueUrl,url_obj) 
    68             # Counts the number of URLs successfully inserted OR modified. 
    69             count += 1 
    70             break 
     71         
     72          # Attempt to find any existing QueueUrl objects. 
     73          while true 
     74            begin 
     75              url_obj = QueueUrl.find(:first, :lock => 'FOR UPDATE', :conditions => { :url => args[:url] }) 
     76 
     77              # We've successfully got a QueueUrl object; break out of loop. 
     78              break 
     79            rescue 
     80              if $!.to_s.index('Deadlock found').nil? 
     81                # If we encounter something other than a "Deadlock found" exception, then 
     82                # propagate the error. 
     83                raise $!.to_s 
     84              end 
     85            end 
    7186          end 
     87 
     88          if not url_obj.nil? 
     89            # We've found an existing object; update priority and count. 
     90            url_obj.priority = url_obj.priority < args[:priority] ? args[:priority] : url_obj.priority 
     91            url_obj.count += 1 
     92          else 
     93            # We create a new object. 
     94            url_obj = QueueUrl.new(:url => args[:url], :priority => args[:priority]) 
     95          end 
     96          url_obj.save! 
     97 
     98          # We've successfully got a QueueUrl object; break out of loop. 
     99          break 
    72100        rescue 
    73           break if $!.to_s.index('Duplicate entry').nil? 
    74           logger.warn "insert_queue_urls(): Collision on URL: (" + u.to_s + ") - retrying insert." 
     101          if $!.to_s.index('Duplicate entry').nil? 
     102            # If we encounter something other than a "Duplicate entry" exception, then 
     103            # propagate the error. 
     104            raise $!.to_s 
     105          end 
     106          logger.warn "insert_pending_url(): Collision on URL: (" + url.to_s + ") - retrying insert." 
    75107        end 
    76108      end 
    77     end 
    78  
    79     # Return the number of URLs that were successfully queued. 
    80     return count 
     109 
     110      # Sanity check: Some other error occurred 
     111      if url_obj.nil? 
     112        raise "insert_pending_url(): Unable to obtain QueueUrl object." 
     113      end 
     114 
     115      PendingUrl.new(:queue_url => url_obj, :drone_job => args[:job]).save! 
     116    end 
    81117  end 
    82118 
     
    94130    bee_work = {"cid" => client.cid, "urls" => []} 
    95131 
    96     # Insert all visited and timed out URLs into the 
    97     # history. 
     132    # Insert all visited and timed out URLs into the history. 
    98133    count = 0 
    99     link_types = ["links_visited","links_timed_out"] 
    100     link_types.each do |l| 
     134    ["links_visited","links_timed_out","links_ignored","links_suspicious"].each do |l| 
    101135      # Build URL history item and insert. 
    102136      obj_hash[l].each do |u,t| 
     
    108142          "client_id" => cid, 
    109143        } 
    110         q = QueueUrl.find(:first,:conditions => {:url => u}) 
    111         if not q.nil? 
    112           url_obj["source_type"] = (q.source_type or "unknown") 
    113           url_obj["source_name"] = q.source_name 
    114         end 
    115          
    116         # Prevent duplication of history items due to  
    117         # possible flawed logic on (RPC) client side. 
    118         if not HistoryUrl.exists?(url_obj)  
    119           bee_work["urls"] << url_obj 
     144        CompletedUrl.transaction do 
     145          q = QueueUrl.find(:first,:conditions => {:url => u},:lock => 'FOR UPDATE') 
    120146           
    121           count += 1 if hu = insert(HistoryUrl,url_obj) 
    122  
    123           jobs = dequeue_url(u) 
    124           if not hu.nil? 
    125             jobs.each do |job| 
    126               job.completed_urls << CompletedUrl.new(:history_url => hu) 
    127               job.save 
     147          # Prevent duplication of history items due to possible flawed logic on (RPC) client side. 
     148          if not HistoryUrl.exists?(url_obj)  
     149            bee_work["urls"] << url_obj 
     150 
     151            # Insert the HistoryUrl and retrieve the corresponding inserted object 
     152            if hu = insert(HistoryUrl,url_obj) 
     153              count += 1 
     154              hu = HistoryUrl.find(hu) 
    128155            end 
     156 
     157            # Get the jobs associated with the QueueUrl 
     158            jobs = [] 
     159            if not q.nil? 
     160              jobs = q.drone_jobs 
     161            end 
     162 
     163            # Add the HistoryUrl to all corresponding DroneJob objects 
     164            if not hu.nil? 
     165              jobs.each do |job| 
     166                CompletedUrl.new(:history_url => hu,:drone_job => job).save! 
     167                # TODO: dispatch_response(job) if job.pending_urls_count == 0 
     168              end 
     169            end 
     170 
     171            # Remove the QueueUrl object 
     172            q.destroy if not q.nil? 
    129173          end 
    130174        end 
    131175      end 
    132     end 
    133  
    134     # Don't insert any ignored URLs into the history, but 
    135     # remove references from the queue. 
    136     obj_hash["links_ignored"].each do |u,t| 
    137       dequeue_url(u) 
    138176    end 
    139177 
     
    141179    BeeJob.add_job("history_urls",bee_work) 
    142180         
    143     # Return the number of URLs that were successfully  
    144     # inserted into the history. 
     181    # Return the number of URLs that were successfully inserted into the history. 
    145182    return count 
    146183  end 
  • hive/trunk/data_webapp/app/models/queue_url.rb

    r1602 r1608  
    99  validates_presence_of :url 
    1010  validates_presence_of :url_md5 
    11   validates_presence_of :priority 
    1211  validates_presence_of :last_visited_at 
    1312  validates_presence_of :created_at 
    1413  validates_presence_of :count 
    1514  validates_presence_of :host_id 
    16   validates_presence_of :source_type 
    1715  validates_numericality_of :host_id, :only_integer => true, :greater_than_or_equal_to => 0 
    1816  validates_numericality_of :count,   :only_integer => true, :greater_than_or_equal_to => 0 
    1917  validates_numericality_of :last_visited_at, :greater_than_or_equal_to => 0 
    2018  validates_numericality_of :created_at, :greater_than_or_equal_to => 0 
     19  validates_numericality_of :priority, :only_integer => true, :greater_than_or_equal_to => 0 
    2120 
    22   def source_type 
    23     self[:source_type] or "unknown" 
    24   end 
    25  
    26   def source_type= (val) 
    27     self[:source_type] = val or "unknown" 
    28   end 
    29    
    30   def self.new_from_hash(obj_hash) 
     21  def before_validation 
    3122    # Validate and Normalize the url; Remove fragment 
    3223    begin 
    33       enc_uri = URI.escape((obj_hash[:url] or obj_hash.delete("url"))) 
    34       uri = URI.parse(enc_uri) 
     24      uri = URI.parse(URI.escape(self.url)) 
    3525      #ip = Resolv.getaddress(uri.host) 
    3626    rescue 
    3727      # XXX: Is this still needed? 
    38       logger.info "Could not parse URL or resolve host" 
    39       return nil 
     28      logger.warn "Could not parse or resolve URL: " + self.url.to_s() 
     29      return false  
    4030    end 
    4131    uri.fragment = nil 
    42     u = uri.normalize().to_s() 
    43     p = (obj_hash[:priority] or obj_hash["priority"] or 0) 
    44     p = p.to_i() 
    45   
    46     # Save the normalization back to the original hash 
    47     obj_hash[:url] = u 
    48  
    49     # If the url is already queued, update the count and priority 
    50     if QueueUrl.exists?(:url => u) 
    51       url_obj = QueueUrl.find(:first, :conditions => {:url => u}) 
    52       if url_obj.host_id > 0 
    53         return nil 
    54       end 
    55       url_obj.priority = url_obj.priority < p ? p : url_obj.priority 
    56       url_obj.count += 1 
    57       url_obj 
    58     # If the url is new, add it with the timestamps for creation and last visit 
    59     else 
    60       obj_hash[:created_at] = Time.now.to_f 
    61       obj_hash[:url_md5] = Digest::MD5.hexdigest(obj_hash[:url]) 
    62       hist_url = HistoryUrl.find(:first, :order => "time_at DESC", :conditions => {:url => u}) 
    63       if not hist_url.nil? 
    64         obj_hash[:last_visited_at] = hist_url.time_at 
    65       end 
    66       QueueUrl.new(obj_hash) 
     32    # Save the normalization back to url 
     33    self.url = uri.normalize().to_s() 
     34    self.created_at = Time.now.to_f 
     35    self.url_md5 = Digest::MD5.hexdigest(self.url) 
     36    hist_url = HistoryUrl.find(:first, :order => "time_at DESC", :conditions => {:url => self.url}) 
     37    if not hist_url.nil? 
     38      self.last_visited_at = hist_url.time_at 
    6739    end 
    6840  end