source: sage/dsage/scripts/dsage_worker.py @ 7660:62b7be191970

Revision 7660:62b7be191970, 31.9 KB checked in by Yi Qiang <yqiang@…>, 6 years ago (diff)

Refactored stats code.

  • Property exe set to *
Line 
1#!/usr/bin/env python
2############################################################################
3#                                                                     
4#   DSAGE: Distributed SAGE                     
5#                                                                             
6#       Copyright (C) 2006, 2007 Yi Qiang <yqiang@gmail.com>               
7#                                                                           
8#  Distributed under the terms of the GNU General Public License (GPL)       
9#
10#    This code is distributed in the hope that it will be useful,
11#    but WITHOUT ANY WARRANTY; without even the implied warranty of
12#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13#    General Public License for more details.
14#
15#  The full text of the GPL is available at:
16#
17#                  http://www.gnu.org/licenses/
18#
19############################################################################
20
21import sys
22import os
23import cPickle
24import zlib
25import pexpect
26import datetime
27import uuid
28from math import ceil
29from getpass import getuser
30
31from twisted.spread import pb
32from twisted.internet import reactor, defer, error, task
33from twisted.python import log
34from twisted.spread import banana
35banana.SIZE_LIMIT = 100*1024*1024 # 100 MegaBytes
36
37from gnutls.constants import *
38from gnutls.crypto import *
39from gnutls.errors import *
40from gnutls.interfaces.twisted import X509Credentials
41
42from sage.interfaces.sage0 import Sage
43from sage.misc.preparser import preparse_file
44
45from sage.dsage.database.job import Job, expand_job
46from sage.dsage.misc.hostinfo import ClassicHostInfo
47from sage.dsage.errors.exceptions import NoJobException
48from sage.dsage.twisted.pb import PBClientFactory
49from sage.dsage.misc.constants import DELIMITER
50from sage.dsage.misc.constants import DSAGE_DIR
51from sage.dsage.misc.constants import TMP_WORKER_FILES
52from sage.dsage.misc.misc import random_str
53
54START_MARKER = '___BEGIN___'
55END_MARKER = '___END___'
56LOG_PREFIX = "[Worker %s] "
57
58def unpickle(pickled_job):
59    return cPickle.loads(zlib.decompress(pickled_job))
60
61class Worker(object):
62    """
63    This class represents a worker object that does the actual calculation.
64   
65    Parameters:
66    remoteobj -- reference to the remote PB server
67   
68    """
69   
70    def __init__(self, remoteobj, id, log_level=0, poll=1.0):
71        self.remoteobj = remoteobj
72        self.id = id
73        self.free = True
74        self.job = None
75        self.log_level = log_level
76        self.poll = poll
77        self.checker_task = task.LoopingCall(self.check_work)
78        self.checker_timeout = 0.5
79        self.got_output = False
80        self.job_start_time = None
81        self.orig_poll = poll
82        self.start()
83           
84        # import some basic modules into our Sage() instance
85        self.sage.eval('import time')
86        self.sage.eval('import os')
87       
88    def _catch_failure(self, failure):
89        log.msg("Error: ", failure.getErrorMessage())
90        log.msg("Traceback: ", failure.printTraceback())
91       
92    def get_job(self):
93        try:
94            if self.log_level > 3:
95                log.msg(LOG_PREFIX % self.id +  'Getting job...') 
96            d = self.remoteobj.callRemote('get_job')
97        except Exception, msg:
98            log.msg(msg)
99            log.msg(LOG_PREFIX % self.id +  'Disconnected...')
100            reactor.callLater(5.0, self.get_job)
101            return
102        d.addCallback(self.gotJob)
103        d.addErrback(self.noJob)
104       
105        return d
106   
107    def gotJob(self, jdict):
108        """
109        gotJob is a callback for the remoteobj's get_job method.
110       
111        Parameters:
112        job -- Job object returned by remote's 'get_job' method
113       
114        """
115       
116        if self.log_level > 1:
117            if jdict is None:
118                log.msg(LOG_PREFIX % self.id + 'No new job.')
119        if self.log_level > 3:
120            if jdict is not None:
121                log.msg(LOG_PREFIX % self.id + 'Got Job: %s' % jdict)
122        self.job = expand_job(jdict)
123        if not isinstance(self.job, Job):
124            raise NoJobException
125        try:
126            self.poll = self.orig_poll
127            self.doJob(self.job)
128        except Exception, msg:
129            log.msg(msg)
130            self.report_failure(msg)
131            self.restart()
132   
133    def job_done(self, output, result, completed, cpu_time):
134        """
135        job_done is a callback for doJob.  Called when a job completes.
136       
137        Parameters:
138        output -- the output of the command
139        result -- the result of processing the job, a pickled object
140        completed -- whether or not the job is completely finished (bool)
141       
142        """
143       
144        try:
145            d = self.remoteobj.callRemote('job_done',
146                                          self.job.job_id,
147                                          output,
148                                          result,
149                                          completed,
150                                          cpu_time)
151        except Exception, msg:
152            log.msg(msg)
153            log.msg('[Worker: %s, job_done] Disconnected, reconnecting in %s'\
154                    % (self.id, 5.0))
155            reactor.callLater(5.0, self.job_done, 
156                              output, result, completed)
157            d = defer.Deferred()
158            d.errback(error.ConnectionLost())
159           
160            return d
161       
162        if completed:
163            log.msg('[Worker %s] Finished job %s' % (self.id,
164                                                     self.job.job_id))
165            self.restart()
166       
167        return d
168   
169    def noJob(self, failure):
170        """
171        noJob is an errback that catches the NoJobException.
172       
173        Parameters:
174        failure -- a twisted.python.failure object (twisted.python.failure)
175       
176        """
177       
178        if failure.check(NoJobException):
179            if self.poll >= 15:
180                self.poll = 15
181            if self.log_level > 1:
182                msg = 'Sleeping for %s seconds' % self.poll
183                log.msg(LOG_PREFIX % self.id + msg)
184            self.poll = ceil(self.poll * 1.5)
185            reactor.callLater(self.poll, self.get_job)
186        else:
187            log.msg("Error: ", failure.getErrorMessage())
188            log.msg("Traceback: ", failure.printTraceback())
189   
190    def setup_tmp_dir(self, job):
191        """
192        Creates the temporary directory for the worker.
193       
194        """
195       
196        cur_dir = os.getcwd() # keep a reference to the current directory
197        tmp_job_dir = os.path.join(TMP_WORKER_FILES, job.job_id)
198        if not os.path.isdir(TMP_WORKER_FILES):
199            os.mkdir(TMP_WORKER_FILES)
200        if not os.path.isdir(tmp_job_dir):
201            os.mkdir(tmp_job_dir)
202        os.chdir(tmp_job_dir)
203        self.sage.eval("os.chdir('%s')" % tmp_job_dir)
204       
205        return tmp_job_dir
206       
207    def extract_and_load_job_data(self, job):
208        """
209        Extracts all the data that is in a job object.
210       
211        """
212       
213        if isinstance(job.data, list):
214            if self.log_level > 2:
215                msg = 'Extracting job data...'
216                log.msg(LOG_PREFIX % self.id + msg)
217            try:
218                for var, data, kind in job.data:
219                    try:
220                        data = zlib.decompress(data)
221                    except Exception, msg:
222                        log.msg(msg)
223                        continue
224                    if kind == 'file':
225                        data = preparse_file(data, magic=True, do_time=False,
226                                             ignore_prompts=False)
227                        f = open(var, 'wb')
228                        f.write(data)
229                        f.close()
230                        if self.log_level > 2:
231                            msg = 'Extracted %s' % f
232                            log.msg(LOG_PREFIX % self.id + msg)
233                        self.sage.eval("execfile('%s')" % var)
234                    if kind == 'object':
235                        fname = var + '.sobj'
236                        if self.log_level > 2:
237                            log.msg('Object to be loaded: %s' % fname)
238                        f = open(fname, 'wb')
239                        f.write(data)
240                        f.close()
241                        self.sage.eval("%s = load('%s')" % (var, fname))
242                        if self.log_level > 2:
243                            msg = 'Loaded %s' % fname
244                            log.msg(LOG_PREFIX % self.id + msg)
245            except Exception, msg:
246                log.msg(LOG_PREFIX % self.id + msg)
247
248    def write_job_file(self, job):
249        """
250        Writes out the job file to be executed to disk.
251       
252        """
253       
254        parsed_file = preparse_file(job.code, magic=True, 
255                                    do_time=False, ignore_prompts=False)
256
257        job_filename = str(job.name) + '.py'
258        job_file = open(job_filename, 'w')
259        BEGIN = "print '%s'\n\n" % (START_MARKER)
260        END = "print '%s'\n\n" % (END_MARKER)
261        GO_TO_TMP_DIR = """os.chdir('%s')\n""" % self.tmp_job_dir
262        SAVE_TIME = """save((time.time()-dsage_start_time), 'cpu_time.sobj', compress=False)\n"""
263        SAVE_RESULT = """try:
264    save(DSAGE_RESULT, 'result.sobj', compress=True)
265except:
266    save('No DSAGE_RESULT', 'result.sobj', compress=True)
267"""
268        job_file.write("alarm(%s)\n\n" % (job.timeout))
269        job_file.write("import time\n\n")
270        job_file.write(BEGIN)
271        job_file.write('dsage_start_time = time.time()\n')
272        job_file.write(parsed_file)
273        job_file.write("\n\n")
274        job_file.write(END)
275        job_file.write("\n")
276        job_file.write(GO_TO_TMP_DIR)
277        job_file.write(SAVE_RESULT)
278        job_file.write(SAVE_TIME)
279        job_file.close()
280        if self.log_level > 2:
281            log.msg('[Worker: %s] Wrote job file. ' % (self.id))
282           
283        return job_filename
284       
285    def doJob(self, job):
286        """
287        doJob is the method that drives the execution of a job.
288       
289        Parameters:
290        job -- a Job object (dsage.database.Job)
291       
292        """
293       
294        log.msg(LOG_PREFIX % self.id + 'Starting job %s ' % job.job_id)
295           
296        self.free = False
297        self.got_output = False
298        d = defer.Deferred()
299       
300        try:
301            self.checker_task.start(self.checker_timeout, now=False)
302        except AssertionError:
303            self.checker_task.stop()
304            self.checker_task.start(self.checker_timeout, now=False)
305        if self.log_level > 2:
306            log.msg(LOG_PREFIX % self.id + 'Starting checker task...')
307       
308        self.tmp_job_dir = self.setup_tmp_dir(job)
309        self.extract_and_load_job_data(job)
310       
311        job_filename = self.write_job_file(job)
312
313        f = os.path.join(self.tmp_job_dir, job_filename)
314        self.sage._send("execfile('%s')" % (f))
315        self.job_start_time = datetime.datetime.now()
316        if self.log_level > 2:
317            msg = 'File to execute: %s' % f
318            log.msg(LOG_PREFIX % self.id + msg)
319       
320        d.callback(True)
321           
322    def reset_checker(self):
323        """
324        Resets the output/result checker for the worker.
325       
326        """
327       
328        if self.checker_task.running:
329            self.checker_task.stop()
330        self.checker_timeout = 1.0
331        self.checker_task = task.LoopingCall(self.check_work)
332       
333    def check_work(self):
334        """
335        check_work periodically polls workers for new output. The period is
336        determined by an exponential back off algorithm.
337       
338        This figures out whether or not there is anything new output that we
339        should submit to the server.
340       
341        """
342       
343        if self.sage == None:
344            return
345        if self.job == None or self.free == True:
346            if self.checker_task.running:
347                self.checker_task.stop()
348            return
349        if self.log_level > 1:
350            msg = 'Checking job %s' % self.job.job_id
351            log.msg(LOG_PREFIX % self.id + msg)
352        os.chdir(self.tmp_job_dir)
353        try:
354            # foo, output, new = self.sage._so_far()
355            # This sucks and is a very bad way to tell when a calculation is
356            # finished           
357            done, new = self.sage._get()
358            # If result.sobj exists, our calculation is done
359            result = open('result.sobj', 'rb').read()                                               
360            done = True
361        except RuntimeError, msg: # Error in calling worker.sage._so_far()
362            done = False
363            if self.log_level > 1:
364                log.msg(LOG_PREFIX % self.id + 'RuntimeError: %s' % msg)
365                log.msg("Don't worry, the RuntimeError above " + 
366                        "is a non-fatal SAGE failure")
367            self.increase_checker_task_timeout()
368            return
369        except IOError, msg: # File does not exist yet
370            done = False
371        if done:
372                        try:
373                                cpu_time = cPickle.loads(open('cpu_time.sobj', 'rb').read())
374                        except IOError:
375                                cpu_time = -1 # This means that we could not get a cpu_time.
376                        self.free = True
377                        self.reset_checker()
378        else:
379            result = cPickle.dumps('Job not done yet.', 2)
380            cpu_time = None
381        if self.check_failure(new):
382            self.report_failure(new)
383            self.restart()
384            return
385        sanitized_output = self.clean_output(new)   
386        if self.log_level > 3:
387            print 'Output before sanitizing: \n' , sanitized_output
388        if self.log_level > 3:
389            print 'Output after sanitizing: \n', sanitized_output
390        if sanitized_output == '' and not done:
391            self.increase_checker_task_timeout()
392        else:
393            d = self.job_done(sanitized_output, result, done, cpu_time)
394            d.addErrback(self._catch_failure)
395   
396    def report_failure(self, failure):
397        """
398        Reports failure of a job.
399       
400        """
401       
402        msg = 'Job %s failed!' % (self.job.job_id)
403        import shutil
404        failed_dir = self.tmp_job_dir + '_failed'
405        if os.path.exists(failed_dir):
406            shutil.rmtree(failed_dir)
407        shutil.move(self.tmp_job_dir, failed_dir)
408        log.msg(LOG_PREFIX % self.id + msg)
409        log.msg('Traceback: \n%s' % failure)
410        d = self.remoteobj.callRemote('job_failed', self.job.job_id, failure)
411        d.addErrback(self._catch_failure)
412       
413        return d
414       
415    def increase_checker_task_timeout(self):
416        """
417        Quickly decreases the number of times a worker checks for output
418       
419        """
420       
421        if self.checker_task.running:
422            self.checker_task.stop()
423       
424        self.checker_timeout = self.checker_timeout * 1.5
425        if self.checker_timeout > 300.0:
426            self.checker_timeout = 300.0
427        self.checker_task = task.LoopingCall(self.check_work)
428        self.checker_task.start(self.checker_timeout, now=False)
429        if self.log_level > 0:
430            msg = 'Checking output again in %s' % self.checker_timeout
431            log.msg(LOG_PREFIX % self.id + msg)
432       
433    def clean_output(self, sage_output):
434        """
435        clean_output attempts to clean up the output string from sage.
436
437        """
438       
439        begin = sage_output.find(START_MARKER)
440        if begin != -1:
441            self.got_output = True
442            begin += len(START_MARKER)
443        else:
444            begin = 0
445        end = sage_output.find(END_MARKER)
446        if end != -1:
447            end -= 1
448        else:
449            if not self.got_output:
450                end = 0
451            else:
452                end = len(sage_output)
453        output = sage_output[begin:end]
454        output = output.strip()
455        output = output.replace('\r', '')
456       
457        if ('execfile' in output or 'load' in output) and self.got_output:
458            output = ''           
459           
460        return output
461       
462    def check_failure(self, sage_output):
463        """
464        Checks for signs of exceptions or errors in the output.
465
466        """
467
468        if sage_output == None:
469            return False
470        else:
471            sage_output = ''.join(sage_output)
472
473        if 'Traceback' in sage_output:
474            return True
475        elif 'Error' in sage_output:
476            return True
477        else:
478            return False
479   
480    def kill_sage(self):
481        """
482        Try to hard kill the SAGE instance.
483       
484        """
485       
486        try:
487            self.sage.quit()
488            del self.sage
489        except Exception, msg:
490            pid = self.sage.pid()
491            cmd = 'kill -9 %s' % pid
492            os.system(cmd)
493            log.msg(msg)
494           
495    def stop(self, hard_reset=False):
496        """
497        Stops the current worker and resets it's internal state.
498           
499        """
500       
501        # Set status to free and delete any current jobs we have
502        self.free = True
503        self.job = None
504       
505        if hard_reset:
506            log.msg(LOG_PREFIX % self.id + 'Performing hard reset.')
507            self.kill_sage()
508        else: # try for a soft reset
509            INTERRUPT_TRIES = 20
510            timeout = 0.3
511            e = self.sage._expect
512            try:
513                for i in range(INTERRUPT_TRIES):   
514                    self.sage._expect.sendline('q')
515                    self.sage._expect.sendline(chr(3))  # send ctrl-c
516                    try: 
517                        e.expect(self.sage._prompt, timeout=timeout)           
518                        success = True
519                        break
520                    except (pexpect.TIMEOUT, pexpect.EOF), msg:
521                        success = False
522                        if self.log_level > 3:
523                            msg = 'Interrupting SAGE (try %s)' % i
524                            log.msg(LOG_PREFIX % self.id + msg)
525            except Exception, msg:
526                success = False
527                log.msg(msg)
528                log.msg(LOG_PREFIX % self.id + "Performing hard reset.")
529       
530            if not success:
531                self.kill_sage()
532            else:
533                self.sage.reset()
534   
535    def start(self):
536        """
537        Starts a new worker if it does not exist already.
538       
539        """
540       
541        if not hasattr(self, 'sage'):
542            if self.log_level > 3:
543                logfile = DSAGE_DIR + '/%s-pexpect.log' % self.id
544                self.sage = Sage(maxread=1, logfile=logfile, python=True)
545            else:
546                self.sage = Sage(maxread=1, python=True)
547            try:
548                self.sage._start(block_during_init=True)
549            except RuntimeError, msg: # Could not start SAGE
550                print msg
551                print 'Failed to start a worker, probably Expect issues.'
552                reactor.stop()
553                sys.exit(-1)
554        E = self.sage.expect()
555        E.sendline('\n')
556        E.expect('>>>')
557        cmd = 'from sage.all import *;'
558        cmd += 'from sage.all_notebook import *;'
559        cmd += 'import sage.server.support as _support_; '
560        E.sendline(cmd)
561        self.get_job()
562   
563    def restart(self):
564        """
565        Restarts the current worker.
566       
567        """
568       
569        try:
570            delta = datetime.datetime.now() - self.job_start_time
571            if delta.seconds >= (3*60): # more than 3 minutes, do a hard reset
572                self.stop(hard_reset=True)
573            else:
574                self.stop(hard_reset=False)
575        except TypeError:
576            self.stop(hard_reset=True)
577        self.job_start_time = None
578        self.start()
579        self.reset_checker()
580        log.msg('[Worker: %s] Restarting...' % (self.id))
581       
582class Monitor(object):
583    """
584    This class represents a monitor that controls workers.
585   
586    It monitors the workers and checks on their status
587   
588    Parameters:
589    hostname -- the hostname of the server we want to connect to (str)
590    port -- the port of the server we want to connect to (int)
591
592    """
593   
594    def __init__(self, server='localhost', port=8081, 
595                 username=getuser(),
596                 ssl=True, 
597                 workers=2, 
598                 anonymous=False, 
599                 priority=20, 
600                 poll=1.0,
601                 log_level=0, 
602                 log_file=os.path.join(DSAGE_DIR, 'worker.log'),
603                 pubkey_file=None,
604                 privkey_file=None):
605        self.server = server
606        self.port = port
607        self.username = username
608        self.ssl = ssl
609        self.workers = workers
610        self.anonymous = anonymous
611        self.priority = priority
612        self.poll = poll
613        self.log_level = log_level
614        self.log_file = log_file
615        self.pubkey_file = pubkey_file
616        self.privkey_file = privkey_file
617       
618        self.remoteobj = None
619        self.connected = False
620        self.reconnecting = False
621        self.worker_pool = None
622        self.sleep_time = 1.0
623       
624        self.host_info = ClassicHostInfo().host_info
625       
626        uuid_ = str(uuid.uuid1())
627        self.host_info['uuid'] = uuid_
628        self.host_info['workers'] = self.workers
629       
630        self._startLogging(self.log_file)
631       
632        try:
633            os.nice(self.priority)
634        except OSError, msg:
635            log.msg('Error setting priority: %s' % (self.priority))
636            pass       
637        if not self.anonymous:
638            from twisted.cred import credentials
639            from twisted.conch.ssh import keys
640            self.DATA =  random_str(500)
641            # public key authentication information
642            self.pubkey_str =keys.getPublicKeyString(self.pubkey_file)
643            # try getting the private key object without a passphrase first
644            try:
645                self.priv_key = keys.getPrivateKeyObject(self.privkey_file)
646            except keys.BadKeyError:
647                pphrase = self._getpassphrase()
648                self.priv_key = keys.getPrivateKeyObject(self.privkey_file,
649                                                         pphrase)
650            self.pub_key = keys.getPublicKeyObject(self.pubkey_str)
651            self.algorithm = 'rsa'
652            self.blob = keys.makePublicKeyBlob(self.pub_key)
653            self.data = self.DATA
654            self.signature = keys.signData(self.priv_key, self.data)
655            self.creds = credentials.SSHPrivateKey(self.username,
656                                                   self.algorithm,
657                                                   self.blob, 
658                                                   self.data,
659                                                   self.signature)
660   
661    def _startLogging(self, log_file):
662        if log_file == 'stdout':
663            log.startLogging(sys.stdout)
664            log.msg('WARNING: Only loggint to stdout!')
665        else:
666            worker_log = open(log_file, 'a')
667            log.startLogging(sys.stdout)
668            log.startLogging(worker_log)
669            log.msg("Logging to file: ", log_file)
670           
671    def _getpassphrase(self):
672        import getpass
673        passphrase = getpass.getpass('Passphrase (Hit enter for None): ')
674       
675        return passphrase
676       
677    def _connected(self, remoteobj):
678        self.remoteobj = remoteobj
679        self.remoteobj.notifyOnDisconnect(self._disconnected)
680        self.connected = True
681        self.reconnecting = False
682       
683        if self.worker_pool == None: # Only pool workers the first time
684            self.pool_workers(self.remoteobj)
685        else:
686            for worker in self.worker_pool:
687                worker.remoteobj = self.remoteobj # Update workers
688   
689    def _disconnected(self, remoteobj):
690        log.msg('Lost connection to the server.')
691        self.connected = False
692        self._retryConnect()
693   
694    def _got_killed_jobs(self, killed_jobs):
695        if killed_jobs == None:
696            return
697        # reconstruct the Job objects from the jdicts
698        killed_jobs = [expand_job(jdict) for jdict in killed_jobs]
699        for worker in self.worker_pool:
700            if worker.job is None:
701                continue
702            if worker.free:
703                continue
704            for job in killed_jobs:
705                if job is None or worker.job is None:
706                    continue
707                if worker.job.job_id == job.job_id:
708                    msg = 'Processing killed job, restarting...'
709                    log.msg(LOG_PREFIX % worker.id + msg)
710                    worker.restart()
711   
712    def _retryConnect(self):
713        log.msg('[Monitor] Disconnected, reconnecting in %s' % (5.0))
714        if not self.connected:
715            reactor.callLater(5.0, self.connect)
716   
717    def _catchConnectionFailure(self, failure):               
718        log.msg("Error: ", failure.getErrorMessage())
719        log.msg("Traceback: ", failure.printTraceback())
720        self._disconnected(None)
721   
722    def _catch_failure(self, failure):
723        log.msg("Error: ", failure.getErrorMessage())
724        log.msg("Traceback: ", failure.printTraceback())
725       
726    def connect(self):
727        """
728        This method connects the monitor to a remote PB server.
729       
730        """
731       
732        if self.connected: # Don't connect multiple times
733            return
734   
735        self.factory = PBClientFactory()
736        try:
737            if self.ssl:
738                # For OpenSSL, SAGE uses GNUTLS now
739                # from twisted.internet import ssl
740                # contextFactory = ssl.ClientContextFactory()
741                # reactor.connectSSL(self.server, self.port,
742                #                    self.factory, contextFactory)
743                cred = X509Credentials()
744                reactor.connectTLS(self.server, self.port, self.factory,
745                                   cred)
746            else:
747                reactor.connectTCP(self.server, self.port, self.factory)
748        except Exception, msg:
749            self._retryConnect()
750       
751        log.msg(DELIMITER)
752        log.msg('DSAGE Worker')
753        log.msg('Started with PID: %s' % (os.getpid()))
754        log.msg('Connecting to %s:%s' % (self.server, self.port))
755        if self.ssl:
756            log.msg('Using SSL: True')
757        else:
758            log.msg('Using SSL: False')
759        log.msg(DELIMITER)
760       
761        if not self.anonymous:
762            log.msg('Connecting as authenticated worker...\n')
763            d = self.factory.login(self.creds, 
764                                   (pb.Referenceable(), self.host_info))
765        else:
766            from twisted.cred.credentials import Anonymous
767            log.msg('Connecting as anonymous worker...\n')
768            d = self.factory.login(Anonymous(), 
769                                   (pb.Referenceable(), self.host_info))
770        d.addCallback(self._connected)
771        d.addErrback(self._catchConnectionFailure)
772           
773        return d
774   
775    def pool_workers(self, remoteobj):
776        """
777        pool_workers creates as many workers as specified in worker.conf.
778       
779        """
780
781        log.msg('[Monitor] Starting %s workers...' % (self.workers))
782        self.worker_pool = [Worker(remoteobj, x, self.log_level, self.poll)
783                            for x in range(self.workers)]
784   
785    def check_killed_jobs(self):
786        """
787        check_killed_jobs retrieves a list of killed job ids.
788       
789        """
790       
791        if not self.connected:
792            return
793           
794        killed_jobs = self.remoteobj.callRemote('get_killed_jobs_list')
795        killed_jobs.addCallback(self._got_killed_jobs)
796        killed_jobs.addErrback(self._catch_failure)
797
798    def start_looping_calls(self):
799        """
800        start_looping_calls prepares and starts our periodic checking methods.
801       
802        """
803        #
804        # self.check_output_timeout = 1
805        # self.tsk1 = task.LoopingCall(self.check_output)
806        # self.tsk1.start(self.check_output_timeout, now=False)
807       
808        interval = 5.0
809        self.tsk2 = task.LoopingCall(self.check_killed_jobs)
810        self.tsk2.start(interval, now=False)
811   
812    def stop_looping_calls(self):
813        """
814        stops the looping calls.
815       
816        """
817       
818        # self.tsk1.stop()
819        self.tsk2.stop()
820
821def usage():
822    """
823    Prints usage help.
824
825    """
826   
827    from optparse import OptionParser
828   
829    usage = ['usage: %prog [options]\n',
830              'Bug reports to <yqiang@gmail.com>']
831    parser = OptionParser(usage=''.join(usage))
832    parser.add_option('-s', '--server',
833                      dest='server',
834                      default='localhost',
835                      help='hostname. Default is localhost')
836    parser.add_option('-p', '--port', 
837                      dest='port', 
838                      type='int',
839                      default=8081,
840                      help='port to connect to. default=8081')
841    parser.add_option('--poll',
842                      dest='poll',
843                      type='float',
844                      default=5.0,
845                      help='poll rate before checking for new job. default=5')
846    parser.add_option('-a', '--anonymous',
847                      dest='anonymous',
848                      default=False,
849                      action='store_true',
850                      help='Connect as anonymous worker. default=False')
851    parser.add_option('-f', '--logfile',
852                      dest='logfile',
853                      default=os.path.join(DSAGE_DIR, 'worker.log'),
854                      help='log file')
855    parser.add_option('-l', '--loglevel',
856                      dest='loglevel',
857                      type='int',
858                      default=0,
859                      help='log level. default=0')
860    parser.add_option('--ssl',
861                      dest='ssl',
862                      action='store_true',
863                      default=False,
864                      help='enable or disable ssl')
865    parser.add_option('--privkey',
866                      dest='privkey_file',
867                      default=os.path.join(DSAGE_DIR, 'dsage_key'),
868                      help='private key file. default = ' + 
869                           '~/.sage/dsage/dsage_key')
870    parser.add_option('--pubkey',
871                      dest='pubkey_file',
872                      default=os.path.join(DSAGE_DIR, 'dsage_key.pub'),
873                      help='public key file. default = ' +
874                           '~/.sage/dsage/dsage_key.pub')
875    parser.add_option('-w', '--workers',
876                      dest='workers',
877                      type='int',
878                      default=2,
879                      help='number of workers. default=2')
880    parser.add_option('--priority',
881                      dest='priority',
882                      type='int',
883                      default=20,
884                      help='priority of workers. default=20')
885    parser.add_option('-u', '--username',
886                      dest='username',
887                      default=getuser(),
888                      help='username')
889    parser.add_option('--noblock',
890                      dest='noblock',
891                      action='store_true',
892                      default=False,
893                      help='tells that the server was ' + 
894                           'started in blocking mode')
895    (options, args) = parser.parse_args()
896
897    return options
898       
899def main():
900    options = usage()
901    SSL = options.ssl
902    monitor = Monitor(server=options.server, 
903                      port=options.port, 
904                      username=options.username,
905                      ssl=SSL, 
906                      workers=options.workers,
907                      anonymous=options.anonymous, 
908                      priority=options.priority,
909                      poll=options.poll, 
910                      log_file=options.logfile,
911                      log_level=options.loglevel,
912                      pubkey_file=options.pubkey_file,
913                      privkey_file=options.privkey_file)
914    monitor.connect()
915    monitor.start_looping_calls()
916    try:
917        if options.noblock:
918            reactor.run(installSignalHandlers=0)
919        else:
920            reactor.run(installSignalHandlers=1)
921    except:
922        log.msg('Error starting the twisted reactor, exiting...')
923        sys.exit()
924
925if __name__ == '__main__':
926    usage()
927    main()
Note: See TracBrowser for help on using the repository browser.