Show
Ignore:
Timestamp:
04/25/08 11:17:11 (4 months ago)
Author:
kindlund
Message:

Housekeeping.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • honeyclient/trunk/lib/HoneyClient/Manager.pm

    r1558 r1559  
    548548 
    549549            } 
    550  
    551             # TODO: Delete this, eventually. 
    552             print "\nBucket #" . $counter . ":\n" . Dumper($buckets->[$counter]) . "\n\n"; 
    553550            $args{'work_queue'}->enqueue(nfreeze($buckets->[$counter])); 
    554551        } 
     
    581578} 
    582579 
    583 # Signal handler to help give user immediate feedback during 
    584 # shutdown process. 
    585 sub _shutdown { 
    586     my $LOG = get_logger(); 
    587     $LOG->warn("Received termination signal.  Shutting down (please wait)."); 
    588     exit; 
    589 }; 
    590 $SIG{HUP}  = \&_shutdown; 
    591 $SIG{INT}  = \&_shutdown; 
    592 $SIG{QUIT} = \&_shutdown; 
    593 $SIG{ABRT} = \&_shutdown; 
    594 $SIG{TERM} = \&_shutdown; 
    595  
    596 ####################################################################### 
    597 # Public Methods Implemented                                          # 
    598 ####################################################################### 
    599  
    600 =pod 
    601  
    602 =head1 EXPORTS 
    603  
    604 =head2 run(driver_name => $driver_name, master_vm_config => $master_vm_config, work => $work) 
    605  
    606 =over 4 
    607  
    608 Runs the Manager code, using the specified arguments. 
    609  
    610 I<Inputs>:  
    611  B<$driver_name> is an optional argument, indicating the driver name to 
    612 use when driving all cloned VMs. 
    613  B<$master_vm_config> is an optional argument, indicating the absolute 
    614 path to the master VM configuration file that each clone VM should use. 
    615  B<$work> is an optional argument, indicating the work each cloned 
    616 VM should process. 
    617  
    618 =back 
    619  
    620 =begin testing 
    621  
    622 SKIP: { 
    623     skip "HoneyClient::Manager->run() can't be easily tested, yet.", 1; 
    624 
    625  
    626 =end testing 
    627  
    628 =cut 
    629  
    630 sub run { 
    631  
    632     # Extract arguments. 
    633     # Hash-based arguments are used, since HoneyClient::Util::SOAP is unable to handle 
    634     # hash references directly.  Thus, flat hashtables are used throughout the code 
    635     # for consistency. 
    636     my ($class, %args) = @_; 
    637  
    638     # Sanity check: If there's no database support and no work was 
    639     # specified, then stop. 
    640     my $localLinksExist = scalar(%{$args{'work'}}); 
    641     if (!getVar(name      => "enable", 
    642                 namespace => "HoneyClient::Manager::Database") && !$localLinksExist) { 
    643         $LOG->error("No URLs specified and database support is disabled.  Shutting down Manager."); 
    644         exit; 
    645     } 
    646  
    647     # Temporary variable to hold each cloned VM. 
    648     my $vm        = undef; 
    649  
    650     # Get a stub connection to the firewall. 
    651     my $stubFW = getClientHandle(namespace     => "HoneyClient::Manager::FW", 
    652                                  fault_handler => \&_handleFaultAndCleanup); 
    653  
    654     $LOG->info("Installing default firewall rules."); 
    655     $stubFW->installDefaultRules(); 
    656  
    657     # If these parameters weren't defined, delete them 
    658     # from the specified arg hash. 
    659     if (!defined($args{'master_vm_config'})) { 
    660         delete $args{'master_vm_config'};  
    661     } 
    662     if (!defined($args{'driver_name'})) { 
    663         delete $args{'driver_name'};  
    664     } 
    665  
    666     # Create a new work queue. 
    667     $WORK_QUEUE = new Thread::Queue; 
    668      
    669     # Create a new wait queue. 
    670     $WAIT_QUEUE = new Thread::Queue; 
    671  
    672     # Create the thread pool. 
    673     my @THREAD_POOL; 
    674  
    675     # Create the cloned VMs. 
    676     for (my $counter = 0; $counter < getVar(name => "num_simultaneous_clones"); $counter++) { 
    677         my $thread = threads->create(\&_worker, \%args); 
    678         if (!defined($thread)) { 
    679             $LOG->error("Unable to create worker thread! Shutting down."); 
    680             Carp::croak "Unable to create worker thread! Shutting down."; 
    681         } 
    682         # Push thread onto thread pool. 
    683         push(@THREAD_POOL, $thread); 
    684     } 
    685  
    686     # Register the host system with the database, if need be. 
    687     if (getVar(name      => "enable", 
    688                namespace => "HoneyClient::Manager::Database")) { 
    689  
    690         my $host = { 
    691             org => getVar(name => "organization"), 
    692             hostname => Sys::Hostname::Long::hostname_long, 
    693             ip => Sys::HostIP->ip, 
    694         }; 
    695         HoneyClient::Manager::Database::insert_host($host); 
    696     } 
    697  
    698     # If supported, get a URL list from the database. 
    699     my $remoteLinksExist = 0; 
    700     my $queue_url_list = {}; 
    701     my $tid = undef; 
    702     my $first_access_attempt = 1; 
    703     while (getVar(name      => "enable", 
    704                   namespace => "HoneyClient::Manager::Database")) { 
    705         $LOG->info("Waiting for new URLs from database."); 
    706         $queue_url_list = _get_urls(first_attempt => $first_access_attempt); 
    707         $first_access_attempt = 0; 
    708  
    709         $remoteLinksExist = scalar(%{$queue_url_list}); 
    710         while (!$localLinksExist && !$remoteLinksExist) { 
    711  
    712             # Sleep for a bit, before trying again to contact the database. 
    713             sleep(getVar(name => "database_retry_delay")); 
    714             # XXX: Trap/ignore all errors and simply retry. 
    715             eval { 
    716                 $queue_url_list = _get_urls(first_attempt => 0); 
    717                 $remoteLinksExist = scalar(%{$queue_url_list}); 
    718             }; 
    719         } 
    720         # If we do have URLs from the database, then merge them into the agent state. 
    721         # Note: Priorities specified in the database take precedent over any URLs specified locally. 
    722         if ($remoteLinksExist) { 
    723             $args{'work'} = { %{$args{'work'}}, %{$queue_url_list} }; 
    724         } 
    725          
    726         # Drive the VMs, using the work found. 
    727         $LOG->info("Received new work and updating queue."); 
    728         _divide_work(work_queue              => $WORK_QUEUE, 
    729                      wait_queue              => $WAIT_QUEUE, 
    730                      work                    => $args{'work'}, 
    731                      num_simultaneous_clones => getVar(name => "num_simultaneous_clones")); 
    732  
    733         # Wait until the VMs need more work. 
    734         while (!$WAIT_QUEUE->pending) { 
    735             # Poll the wait queue every 2 seconds. 
    736             # This time delay should be short. 
    737             threads->yield(); 
    738             sleep(2); 
    739  
    740             # Make sure all worker threads are still alive. 
    741             for (my $counter = 0; $counter < getVar(name => "num_simultaneous_clones"); $counter++) { 
    742                 my $thread = $THREAD_POOL[$counter]; 
    743                 if (!$thread->is_running()) { 
    744                     $LOG->error("Thread ID (" . $thread->tid() . "): Unexpectedly terminated."); 
    745                     Carp::croak "Thread ID (" . $thread->tid() . "): Unexpectedly terminated."; 
    746                 } 
    747             } 
    748         } 
    749         $LOG->info("Got a signal that a thread needs more work."); 
    750  
    751         # Once finished, empty the work queue. 
    752         $args{'work'} = {}; 
    753         # If we had any local links, they definately will have been processed by now. 
    754         $localLinksExist = 0; 
    755  
    756         # Loop forever, since we have a database connection. 
    757     } 
    758  
    759     # If we don't have a database connection, then just handle the work 
    760     # that was provided from the command line and then shut down. 
    761     if (scalar(%{$args{'work'}})) { 
    762  
    763         # Drive the VMs, using the work found. 
    764         $LOG->info("Received new work and updating queue."); 
    765         _divide_work(work_queue              => $WORK_QUEUE, 
    766                      wait_queue              => $WAIT_QUEUE, 
    767                      work                    => $args{'work'}, 
    768                      num_simultaneous_clones => getVar(name => "num_simultaneous_clones")); 
    769  
    770         # Wait until all VMs are finished. 
    771         # We wait for each worker to signal that they are waiting for more work, before shutting down 
    772         # the application. 
    773         for (my $i = 0; $i < getVar(name => "num_simultaneous_clones"); $i++) { 
    774             my $tid = undef; 
    775             # This is a little hackish, since calling Thread::Queue->dequeue 
    776             # doesn't properly handle signals. 
    777             while (!defined($tid = $WAIT_QUEUE->dequeue_nb)) { 
    778                 # Poll the wait queue every 2 seconds. 
    779                 # This time delay should be short. 
    780                 threads->yield(); 
    781                 sleep(2); 
    782  
    783                 # Make sure all worker threads are still alive. 
    784                 for (my $counter = 0; $counter < getVar(name => "num_simultaneous_clones"); $counter++) { 
    785                     my $thread = $THREAD_POOL[$counter]; 
    786                     if (!$thread->is_running()) { 
    787                         $LOG->error("Thread ID (" . $thread->tid() . "): Unexpectedly terminated."); 
    788                         Carp::croak "Thread ID (" . $thread->tid() . "): Unexpectedly terminated."; 
    789                     } 
    790                 } 
    791             } 
    792         } 
    793         # Once finished, empty the work queue. 
    794         $args{'work'} = {}; 
    795     } 
    796     $LOG->info("All URLs exhausted. Shutting down Manager."); 
    797 
    798  
    799 # TODO: Comment this. 
     580# Helper function designed to create a worker thread which manages a single 
     581# VM, using a WORK_QUEUE and a WAIT_QUEUE. 
     582
    800583# Note: We can gracefully stop each worker, by sending an empty hashtable of work, 
    801584# or by signalling the thread. 
     
    893676} 
    894677 
     678 
     679# Signal handler to help give user immediate feedback during 
     680# shutdown process. 
     681sub _shutdown { 
     682    my $LOG = get_logger(); 
     683    $LOG->warn("Received termination signal.  Shutting down (please wait)."); 
     684    exit; 
     685}; 
     686$SIG{HUP}  = \&_shutdown; 
     687$SIG{INT}  = \&_shutdown; 
     688$SIG{QUIT} = \&_shutdown; 
     689$SIG{ABRT} = \&_shutdown; 
     690$SIG{TERM} = \&_shutdown; 
     691 
     692####################################################################### 
     693# Public Methods Implemented                                          # 
     694####################################################################### 
     695 
     696=pod 
     697 
     698=head1 EXPORTS 
     699 
     700=head2 run(driver_name => $driver_name, master_vm_config => $master_vm_config, work => $work) 
     701 
     702=over 4 
     703 
     704Runs the Manager code, using the specified arguments. 
     705 
     706I<Inputs>:  
     707 B<$driver_name> is an optional argument, indicating the driver name to 
     708use when driving all cloned VMs. 
     709 B<$master_vm_config> is an optional argument, indicating the absolute 
     710path to the master VM configuration file that each clone VM should use. 
     711 B<$work> is an optional argument, indicating the work each cloned 
     712VM should process. 
     713 
     714=back 
     715 
     716=begin testing 
     717 
     718SKIP: { 
     719    skip "HoneyClient::Manager->run() can't be easily tested, yet.", 1; 
     720} 
     721 
     722=end testing 
     723 
     724=cut 
     725 
     726sub run { 
     727 
     728    # Extract arguments. 
     729    # Hash-based arguments are used, since HoneyClient::Util::SOAP is unable to handle 
     730    # hash references directly.  Thus, flat hashtables are used throughout the code 
     731    # for consistency. 
     732    my ($class, %args) = @_; 
     733 
     734    # Sanity check: If there's no database support and no work was 
     735    # specified, then stop. 
     736    my $localLinksExist = scalar(%{$args{'work'}}); 
     737    if (!getVar(name      => "enable", 
     738                namespace => "HoneyClient::Manager::Database") && !$localLinksExist) { 
     739        $LOG->error("No URLs specified and database support is disabled.  Shutting down Manager."); 
     740        exit; 
     741    } 
     742 
     743    # Temporary variable to hold each cloned VM. 
     744    my $vm        = undef; 
     745 
     746    # Get a stub connection to the firewall. 
     747    my $stubFW = getClientHandle(namespace     => "HoneyClient::Manager::FW", 
     748                                 fault_handler => \&_handleFaultAndCleanup); 
     749 
     750    $LOG->info("Installing default firewall rules."); 
     751    $stubFW->installDefaultRules(); 
     752 
     753    # If these parameters weren't defined, delete them 
     754    # from the specified arg hash. 
     755    if (!defined($args{'master_vm_config'})) { 
     756        delete $args{'master_vm_config'};  
     757    } 
     758    if (!defined($args{'driver_name'})) { 
     759        delete $args{'driver_name'};  
     760    } 
     761 
     762    # Create a new work queue. 
     763    $WORK_QUEUE = new Thread::Queue; 
     764     
     765    # Create a new wait queue. 
     766    $WAIT_QUEUE = new Thread::Queue; 
     767 
     768    # Create the thread pool. 
     769    my @THREAD_POOL; 
     770 
     771    # Create the cloned VMs. 
     772    for (my $counter = 0; $counter < getVar(name => "num_simultaneous_clones"); $counter++) { 
     773        my $thread = threads->create(\&_worker, \%args); 
     774        if (!defined($thread)) { 
     775            $LOG->error("Unable to create worker thread! Shutting down."); 
     776            Carp::croak "Unable to create worker thread! Shutting down."; 
     777        } 
     778        # Push thread onto thread pool. 
     779        push(@THREAD_POOL, $thread); 
     780    } 
     781 
     782    # Register the host system with the database, if need be. 
     783    if (getVar(name      => "enable", 
     784               namespace => "HoneyClient::Manager::Database")) { 
     785 
     786        my $host = { 
     787            org => getVar(name => "organization"), 
     788            hostname => Sys::Hostname::Long::hostname_long, 
     789            ip => Sys::HostIP->ip, 
     790        }; 
     791        HoneyClient::Manager::Database::insert_host($host); 
     792    } 
     793 
     794    # If supported, get a URL list from the database. 
     795    my $remoteLinksExist = 0; 
     796    my $queue_url_list = {}; 
     797    my $tid = undef; 
     798    my $first_access_attempt = 1; 
     799    while (getVar(name      => "enable", 
     800                  namespace => "HoneyClient::Manager::Database")) { 
     801        $LOG->info("Waiting for new URLs from database."); 
     802        $queue_url_list = _get_urls(first_attempt => $first_access_attempt); 
     803        $first_access_attempt = 0; 
     804 
     805        $remoteLinksExist = scalar(%{$queue_url_list}); 
     806        while (!$localLinksExist && !$remoteLinksExist) { 
     807 
     808            # Sleep for a bit, before trying again to contact the database. 
     809            sleep(getVar(name => "database_retry_delay")); 
     810            # XXX: Trap/ignore all errors and simply retry. 
     811            eval { 
     812                $queue_url_list = _get_urls(first_attempt => 0); 
     813                $remoteLinksExist = scalar(%{$queue_url_list}); 
     814            }; 
     815        } 
     816        # If we do have URLs from the database, then merge them into the agent state. 
     817        # Note: Priorities specified in the database take precedent over any URLs specified locally. 
     818        if ($remoteLinksExist) { 
     819            $args{'work'} = { %{$args{'work'}}, %{$queue_url_list} }; 
     820        } 
     821         
     822        # Drive the VMs, using the work found. 
     823        $LOG->info("Received new work and updating queue."); 
     824        _divide_work(work_queue              => $WORK_QUEUE, 
     825                     wait_queue              => $WAIT_QUEUE, 
     826                     work                    => $args{'work'}, 
     827                     num_simultaneous_clones => getVar(name => "num_simultaneous_clones")); 
     828 
     829        # Wait until the VMs need more work. 
     830        while (!$WAIT_QUEUE->pending) { 
     831            # Poll the wait queue every 2 seconds. 
     832            # This time delay should be short. 
     833            threads->yield(); 
     834            sleep(2); 
     835 
     836            # Make sure all worker threads are still alive. 
     837            for (my $counter = 0; $counter < getVar(name => "num_simultaneous_clones"); $counter++) { 
     838                my $thread = $THREAD_POOL[$counter]; 
     839                if (!$thread->is_running()) { 
     840                    $LOG->error("Thread ID (" . $thread->tid() . "): Unexpectedly terminated."); 
     841                    Carp::croak "Thread ID (" . $thread->tid() . "): Unexpectedly terminated."; 
     842                } 
     843            } 
     844        } 
     845        $LOG->info("Got a signal that a thread needs more work."); 
     846 
     847        # Once finished, empty the work queue. 
     848        $args{'work'} = {}; 
     849        # If we had any local links, they definately will have been processed by now. 
     850        $localLinksExist = 0; 
     851 
     852        # Loop forever, since we have a database connection. 
     853    } 
     854 
     855    # If we don't have a database connection, then just handle the work 
     856    # that was provided from the command line and then shut down. 
     857    if (scalar(%{$args{'work'}})) { 
     858 
     859        # Drive the VMs, using the work found. 
     860        $LOG->info("Received new work and updating queue."); 
     861        _divide_work(work_queue              => $WORK_QUEUE, 
     862                     wait_queue              => $WAIT_QUEUE, 
     863                     work                    => $args{'work'}, 
     864                     num_simultaneous_clones => getVar(name => "num_simultaneous_clones")); 
     865 
     866        # Wait until all VMs are finished. 
     867        # We wait for each worker to signal that they are waiting for more work, before shutting down 
     868        # the application. 
     869        for (my $i = 0; $i < getVar(name => "num_simultaneous_clones"); $i++) { 
     870            my $tid = undef; 
     871            # This is a little hackish, since calling Thread::Queue->dequeue 
     872            # doesn't properly handle signals. 
     873            while (!defined($tid = $WAIT_QUEUE->dequeue_nb)) { 
     874                # Poll the wait queue every 2 seconds. 
     875                # This time delay should be short. 
     876                threads->yield(); 
     877                sleep(2); 
     878 
     879                # Make sure all worker threads are still alive. 
     880                for (my $counter = 0; $counter < getVar(name => "num_simultaneous_clones"); $counter++) { 
     881                    my $thread = $THREAD_POOL[$counter]; 
     882                    if (!$thread->is_running()) { 
     883                        $LOG->error("Thread ID (" . $thread->tid() . "): Unexpectedly terminated."); 
     884                        Carp::croak "Thread ID (" . $thread->tid() . "): Unexpectedly terminated."; 
     885                    } 
     886                } 
     887            } 
     888        } 
     889        # Once finished, empty the work queue. 
     890        $args{'work'} = {}; 
     891    } 
     892    $LOG->info("All URLs exhausted. Shutting down Manager."); 
     893} 
     894 
    895895####################################################################### 
    896896