source: sage/dsage/scripts/dsage_worker.py @ 6726:6b9d0040e60a

Revision 6726:6b9d0040e60a, 31.7 KB checked in by Yi Qiang <yqiang@…>, 6 years ago (diff)

Misc. fixes.

  • Property exe set to *
Line 
1#!/usr/bin/env python
2############################################################################
3#                                                                     
4#   DSAGE: Distributed SAGE                     
5#                                                                             
6#       Copyright (C) 2006, 2007 Yi Qiang <yqiang@gmail.com>               
7#                                                                           
8#  Distributed under the terms of the GNU General Public License (GPL)       
9#
10#    This code is distributed in the hope that it will be useful,
11#    but WITHOUT ANY WARRANTY; without even the implied warranty of
12#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13#    General Public License for more details.
14#
15#  The full text of the GPL is available at:
16#
17#                  http://www.gnu.org/licenses/
18#
19############################################################################
20
21import sys
22import os
23import cPickle
24import zlib
25import pexpect
26import datetime
27import uuid
28from math import ceil
29from getpass import getuser
30
31from twisted.spread import pb
32from twisted.internet import reactor, defer, error, task
33from twisted.python import log
34from twisted.spread import banana
35banana.SIZE_LIMIT = 100*1024*1024 # 100 MegaBytes
36
37from gnutls.constants import *
38from gnutls.crypto import *
39from gnutls.errors import *
40from gnutls.interfaces.twisted import X509Credentials
41
42from sage.interfaces.sage0 import Sage
43from sage.misc.preparser import preparse_file
44
45from sage.dsage.database.job import Job, expand_job
46from sage.dsage.misc.hostinfo import ClassicHostInfo
47from sage.dsage.errors.exceptions import NoJobException
48from sage.dsage.twisted.pb import PBClientFactory
49from sage.dsage.misc.constants import DELIMITER
50from sage.dsage.misc.constants import DSAGE_DIR
51from sage.dsage.misc.misc import random_str
52
53START_MARKER = '___BEGIN___'
54END_MARKER = '___END___'
55LOG_PREFIX = "[Worker %s] "
56
57def unpickle(pickled_job):
58    return cPickle.loads(zlib.decompress(pickled_job))
59
60class Worker(object):
61    """
62    This class represents a worker object that does the actual calculation.
63   
64    Parameters:
65    remoteobj -- reference to the remote PB server
66   
67    """
68   
69    def __init__(self, remoteobj, id, log_level=0, poll=1.0):
70        self.remoteobj = remoteobj
71        self.id = id
72        self.free = True
73        self.job = None
74        self.log_level = log_level
75        self.poll = poll
76        self.checker_task = task.LoopingCall(self.check_work)
77        self.checker_timeout = 0.5
78        self.got_output = False
79        self.job_start_time = None
80        self.orig_poll = poll
81        self.start()
82           
83        # import some basic modules into our Sage() instance
84        self.sage.eval('import time')
85        self.sage.eval('import os')
86       
87    def _catch_failure(self, failure):
88        log.msg("Error: ", failure.getErrorMessage())
89        log.msg("Traceback: ", failure.printTraceback())
90       
91    def get_job(self):
92        try:
93            if self.log_level > 3:
94                log.msg(LOG_PREFIX % self.id +  'Getting job...') 
95            d = self.remoteobj.callRemote('get_job')
96        except Exception, msg:
97            log.msg(msg)
98            log.msg(LOG_PREFIX % self.id +  'Disconnected...')
99            reactor.callLater(5.0, self.get_job)
100            return
101        d.addCallback(self.gotJob)
102        d.addErrback(self.noJob)
103       
104        return d
105   
106    def gotJob(self, jdict):
107        """
108        gotJob is a callback for the remoteobj's get_job method.
109       
110        Parameters:
111        job -- Job object returned by remote's 'get_job' method
112       
113        """
114       
115        if self.log_level > 1:
116            if jdict is None:
117                log.msg(LOG_PREFIX % self.id + 'No new job.')
118        if self.log_level > 3:
119            if jdict is not None:
120                log.msg(LOG_PREFIX % self.id + 'Got Job: %s' % jdict)
121        self.job = expand_job(jdict)
122        if not isinstance(self.job, Job):
123            raise NoJobException
124        try:
125            self.poll = self.orig_poll
126            self.doJob(self.job)
127        except Exception, msg:
128            log.msg(msg)
129            self.report_failure(msg)
130            self.restart()
131   
132    def job_done(self, output, result, completed, cpu_time):
133        """
134        job_done is a callback for doJob.  Called when a job completes.
135       
136        Parameters:
137        output -- the output of the command
138        result -- the result of processing the job, a pickled object
139        completed -- whether or not the job is completely finished (bool)
140       
141        """
142       
143        try:
144            d = self.remoteobj.callRemote('job_done',
145                                          self.job.job_id,
146                                          output,
147                                          result,
148                                          completed,
149                                          cpu_time)
150        except Exception, msg:
151            log.msg(msg)
152            log.msg('[Worker: %s, job_done] Disconnected, reconnecting in %s'\
153                    % (self.id, 5.0))
154            reactor.callLater(5.0, self.job_done, 
155                              output, result, completed)
156            d = defer.Deferred()
157            d.errback(error.ConnectionLost())
158           
159            return d
160       
161        if completed:
162            self.restart()
163       
164        return d
165   
166    def noJob(self, failure):
167        """
168        noJob is an errback that catches the NoJobException.
169       
170        Parameters:
171        failure -- a twisted.python.failure object (twisted.python.failure)
172       
173        """
174       
175        if failure.check(NoJobException):
176            if self.poll >= 15:
177                self.poll = 15
178            if self.log_level > 1:
179                msg = 'Sleeping for %s seconds' % self.poll
180                log.msg(LOG_PREFIX % self.id + msg)
181            self.poll = ceil(self.poll * 1.5)
182            reactor.callLater(self.poll, self.get_job)
183        else:
184            log.msg("Error: ", failure.getErrorMessage())
185            log.msg("Traceback: ", failure.printTraceback())
186   
187    def setup_tmp_dir(self, job):
188        """
189        Creates the temporary directory for the worker.
190       
191        """
192       
193        cur_dir = os.getcwd() # keep a reference to the current directory
194        tmp_dir = os.path.join(DSAGE_DIR, 'tmp_worker_files')
195        tmp_job_dir = os.path.join(tmp_dir, job.job_id)
196        if not os.path.isdir(tmp_dir):
197            os.mkdir(tmp_dir)
198        if not os.path.isdir(tmp_job_dir):
199            os.mkdir(tmp_job_dir)
200        os.chdir(tmp_job_dir)
201        self.sage.eval("os.chdir('%s')" % tmp_job_dir)
202       
203        return tmp_job_dir
204       
205    def extract_job_data(self, job):
206        """
207        Extracts all the data that is in a job object.
208       
209        """
210       
211        if isinstance(job.data, list):
212            if self.log_level > 2:
213                msg = 'Extracting job data...'
214                log.msg(LOG_PREFIX % self.id + msg)
215            try:
216                for var, data, kind in job.data:
217                    try:
218                        data = zlib.decompress(data)
219                    except Exception, msg:
220                        log.msg(msg)
221                        continue
222                    if kind == 'file':
223                        f = open(var, 'wb')
224                        f.write(data)
225                        f.close()
226                        if self.log_level > 2:
227                            msg = 'Extracted %s' % f
228                            log.msg(LOG_PREFIX % self.id + msg)
229                    if kind == 'object':
230                        fname = var + '.sobj'
231                        if self.log_level > 2:
232                            log.msg('Object to be loaded: %s' % fname)
233                        f = open(fname, 'wb')
234                        f.write(data)
235                        f.close()
236                        self.sage.eval("%s = load('%s')" % (var, fname))
237                        if self.log_level > 2:
238                            msg = 'Loaded %s' % fname
239                            log.msg(LOG_PREFIX % self.id + msg)
240            except Exception, msg:
241                log.msg(LOG_PREFIX % self.id + msg)
242
243    def write_job_file(self, job):
244        """
245        Writes out the job file to be executed to disk.
246       
247        """
248       
249        parsed_file = preparse_file(job.code, magic=True, 
250                                    do_time=False, ignore_prompts=False)
251
252        job_filename = str(job.name) + '.py'
253        job_file = open(job_filename, 'w')
254        timeout = job.timeout
255        BEGIN = "print '%s'\n\n" % (START_MARKER)
256        END = "print '%s'\n\n" % (END_MARKER)
257        GO_TO_TMP_DIR = """os.chdir('%s')\n""" % self.tmp_job_dir
258        SAVE_TIME = """save((time.time()-dsage_start_time), 'cpu_time.sobj', compress=False)\n"""
259        SAVE_RESULT = """try:
260    save(DSAGE_RESULT, 'result.sobj', compress=True)
261except:
262    save('No DSAGE_RESULT', 'result.sobj', compress=True)
263"""
264        job_file.write("alarm(%s)\n\n" % (timeout))
265        job_file.write("import time\n\n")
266        job_file.write(BEGIN)
267        job_file.write('dsage_start_time = time.time()\n')
268        job_file.write(parsed_file)
269        job_file.write("\n\n")
270        job_file.write(END)
271        job_file.write("\n")
272        job_file.write(GO_TO_TMP_DIR)
273        job_file.write(SAVE_RESULT)
274        job_file.write(SAVE_TIME)
275        job_file.close()
276        if self.log_level > 2:
277            log.msg('[Worker: %s] Wrote job file. ' % (self.id))
278           
279        return job_filename
280       
281    def doJob(self, job):
282        """
283        doJob is the method that drives the execution of a job.
284       
285        Parameters:
286        job -- a Job object (dsage.database.Job)
287       
288        """
289       
290        log.msg(LOG_PREFIX % self.id + 'Starting job %s ' % job.job_id)
291           
292        self.free = False
293        self.got_output = False
294        d = defer.Deferred()
295       
296        try:
297            self.checker_task.start(self.checker_timeout, now=False)
298        except AssertionError:
299            self.checker_task.stop()
300            self.checker_task.start(self.checker_timeout, now=False)
301        if self.log_level > 2:
302            log.msg(LOG_PREFIX % self.id + 'Starting checker task...')
303       
304        self.tmp_job_dir = self.setup_tmp_dir(job)
305        self.extract_job_data(job)
306       
307        job_filename = self.write_job_file(job)
308
309        f = os.path.join(self.tmp_job_dir, job_filename)
310        self.sage._send("execfile('%s')" % (f))
311        self.job_start_time = datetime.datetime.now()
312        if self.log_level > 2:
313            msg = 'File to execute: %s' % f
314            log.msg(LOG_PREFIX % self.id + msg)
315       
316        d.callback(True)
317           
318    def reset_checker(self):
319        """
320        Resets the output/result checker for the worker.
321       
322        """
323       
324        if self.checker_task.running:
325            self.checker_task.stop()
326        self.checker_timeout = 1.0
327        self.checker_task = task.LoopingCall(self.check_work)
328       
329    def check_work(self):
330        """
331        check_work periodically polls workers for new output. The period is
332        determined by an exponential back off algorithm.
333       
334        This figures out whether or not there is anything new output that we
335        should submit to the server.
336       
337        """
338       
339        if self.sage == None:
340            return
341        if self.job == None or self.free == True:
342            if self.checker_task.running:
343                self.checker_task.stop()
344            return
345        if self.log_level > 1:
346            msg = 'Checking job %s' % self.job.job_id
347            log.msg(LOG_PREFIX % self.id + msg)
348        try:
349            os.chdir(self.tmp_job_dir)
350            # foo, output, new = self.sage._so_far()
351            # This sucks and is a very bad way to tell when a calculation is
352            # finished           
353            done, new = self.sage._get()
354            # If result.sobj exists, our calculation is done
355            result = open('result.sobj', 'rb').read()                                               
356            done = True
357        except RuntimeError, msg: # Error in calling worker.sage._so_far()
358            done = False
359            if self.log_level > 1:
360                log.msg(LOG_PREFIX % self.id + 'RuntimeError: %s' % msg)
361                log.msg("Don't worry, the RuntimeError above " + 
362                        "is a non-fatal SAGE failure")
363            self.increase_checker_task_timeout()
364            return
365        except IOError, msg: # File does not exist yet
366            done = False
367        if done:
368            cpu_time = cPickle.loads(open('cpu_time.sobj', 'rb').read())
369            self.free = True
370            self.reset_checker()
371        else:
372            result = cPickle.dumps('Job not done yet.', 2)
373            cpu_time = None
374        if self.check_failure(new):
375            self.report_failure(new)
376            self.restart()
377            return
378        sanitized_output = self.clean_output(new)   
379        if self.log_level > 3:
380            print 'Output before sanitizing: \n' , sanitized_output
381        if self.log_level > 3:
382            print 'Output after sanitizing: \n', sanitized_output
383        if sanitized_output == '' and not done:
384            self.increase_checker_task_timeout()
385        else:
386            d = self.job_done(sanitized_output, result, done, cpu_time)
387            d.addErrback(self._catch_failure)
388   
389    def report_failure(self, failure):
390        """
391        Reports failure of a job.
392       
393        """
394       
395        msg = 'Job %s failed!' % (self.job.job_id)
396        import shutil
397        failed_dir = self.tmp_job_dir + '_failed'
398        if os.path.exists(failed_dir):
399            shutil.rmtree(failed_dir)
400        shutil.move(self.tmp_job_dir, failed_dir)
401        log.msg(LOG_PREFIX % self.id + msg)
402        log.msg('Traceback: \n%s' % failure)
403        d = self.remoteobj.callRemote('job_failed', self.job.job_id, failure)
404        d.addErrback(self._catch_failure)
405       
406        return d
407       
408    def increase_checker_task_timeout(self):
409        """
410        Quickly decreases the number of times a worker checks for output
411       
412        """
413       
414        if self.checker_task.running:
415            self.checker_task.stop()
416       
417        self.checker_timeout = self.checker_timeout * 1.5
418        if self.checker_timeout > 300.0:
419            self.checker_timeout = 300.0
420        self.checker_task = task.LoopingCall(self.check_work)
421        self.checker_task.start(self.checker_timeout, now=False)
422        if self.log_level > 0:
423            msg = 'Checking output again in %s' % self.checker_timeout
424            log.msg(LOG_PREFIX % self.id + msg)
425       
426    def clean_output(self, sage_output):
427        """
428        clean_output attempts to clean up the output string from sage.
429
430        """
431       
432        begin = sage_output.find(START_MARKER)
433        if begin != -1:
434            self.got_output = True
435            begin += len(START_MARKER)
436        else:
437            begin = 0
438        end = sage_output.find(END_MARKER)
439        if end != -1:
440            end -= 1
441        else:
442            if not self.got_output:
443                end = 0
444            else:
445                end = len(sage_output)
446        output = sage_output[begin:end]
447        output = output.strip()
448        output = output.replace('\r', '')
449       
450        if ('execfile' in output or 'load' in output) and self.got_output:
451            output = ''           
452           
453        return output
454       
455    def check_failure(self, sage_output):
456        """
457        Checks for signs of exceptions or errors in the output.
458
459        """
460
461        if sage_output == None:
462            return False
463        else:
464            sage_output = ''.join(sage_output)
465
466        if 'Traceback' in sage_output:
467            return True
468        elif 'Error' in sage_output:
469            return True
470        else:
471            return False
472   
473    def kill_sage(self):
474        """
475        Try to hard kill the SAGE instance.
476       
477        """
478       
479        try:
480            pid = self.sage.pid()
481            cmd = 'kill -9 -%s'%pid
482            os.system(cmd)
483            del self.sage
484        except Exception, msg:
485            log.msg(msg)
486           
487    def stop(self, hard_reset=False):
488        """
489        Stops the current worker and resets it's internal state.
490           
491        """
492       
493        if hard_reset:
494            log.msg(LOG_PREFIX % self.id + 'Performing hard reset.')
495            self.kill_sage()
496            self.start()
497        else: # try for a soft reset
498            INTERRUPT_TRIES = 20
499            timeout = 0.3
500            e = self.sage._expect
501            try:
502                for i in range(INTERRUPT_TRIES):   
503                    self.sage._expect.sendline('q')
504                    self.sage._expect.sendline(chr(3))  # send ctrl-c
505                    try: 
506                        e.expect(self.sage._prompt, timeout=timeout)           
507                        success = True
508                        break
509                    except (pexpect.TIMEOUT, pexpect.EOF), msg:
510                        success = False
511                        if self.log_level > 3:
512                            msg = 'Interrupting SAGE (try %s)' % i
513                            log.msg(LOG_PREFIX % self.id + msg)
514            except Exception, msg:
515                success = False
516                log.msg(msg)
517                log.msg(LOG_PREFIX % self.id + "Performing hard reset.")
518       
519            if not success:
520                self.kill_sage()
521                self.start()
522            else:
523                self.sage.reset()
524        self.free = True
525        self.job = None
526   
527    def start(self):
528        """
529        Starts a new worker if it does not exist already.
530       
531        """
532       
533        if not hasattr(self, 'sage'):
534            if self.log_level > 3:
535                logfile = DSAGE_DIR + '/%s-pexpect.log' % self.id
536                self.sage = Sage(maxread=1, logfile=logfile, python=True)
537            else:
538                self.sage = Sage(maxread=1, python=True)
539            try:
540                self.sage._start(block_during_init=True)
541            except RuntimeError, msg: # Could not start SAGE
542                print msg
543                print 'Failed to start a worker, probably Expect issues.'
544                reactor.stop()
545                sys.exit(-1)
546        E = self.sage.expect()
547        E.sendline('\n')
548        E.expect('>>>')
549        cmd = 'from sage.all import *;'
550        cmd += 'from sage.all_notebook import *;'
551        cmd += 'import sage.server.support as _support_; '
552        E.sendline(cmd)
553        self.get_job()
554   
555    def restart(self):
556        """
557        Restarts the current worker.
558       
559        """
560       
561        try:
562            delta = datetime.datetime.now() - self.job_start_time
563            if delta.seconds >= (3*60): # more than 3 minutes, do a hard reset
564                self.stop(hard_reset=True)
565            else:
566                self.stop(hard_reset=False)
567        except TypeError:
568            self.stop(hard_reset=True)
569        self.job_start_time = None
570        self.start()
571        self.reset_checker()
572        log.msg('[Worker: %s] Restarting...' % (self.id))
573       
574class Monitor(object):
575    """
576    This class represents a monitor that controls workers.
577   
578    It monitors the workers and checks on their status
579   
580    Parameters:
581    hostname -- the hostname of the server we want to connect to (str)
582    port -- the port of the server we want to connect to (int)
583
584    """
585   
586    def __init__(self, server='localhost', port=8081, 
587                 username=getuser(),
588                 ssl=True, 
589                 workers=2, 
590                 anonymous=False, 
591                 priority=20, 
592                 poll=1.0,
593                 log_level=0, 
594                 log_file=os.path.join(DSAGE_DIR, 'worker.log'),
595                 pubkey_file=None,
596                 privkey_file=None):
597        self.server = server
598        self.port = port
599        self.username = username
600        self.ssl = ssl
601        self.workers = workers
602        self.anonymous = anonymous
603        self.priority = priority
604        self.poll = poll
605        self.log_level = log_level
606        self.log_file = log_file
607        self.pubkey_file = pubkey_file
608        self.privkey_file = privkey_file
609       
610        self.remoteobj = None
611        self.connected = False
612        self.reconnecting = False
613        self.worker_pool = None
614        self.sleep_time = 1.0
615       
616        self.host_info = ClassicHostInfo().host_info
617        try:
618            uuid_file = os.path.join(DSAGE_DIR, 'worker.uuid')
619            uuid_ = open(uuid_file).readlines()[0]
620        except Exception, e:
621            uuid_ = str(uuid.uuid1())
622            f = open(uuid_file, 'w')
623            f.write(uuid_)
624            f.close()
625        self.host_info['uuid'] = uuid_
626        self.host_info['workers'] = self.workers
627       
628        self._startLogging(self.log_file)
629       
630        try:
631            os.nice(self.priority)
632        except OSError, msg:
633            log.msg('Error setting priority: %s' % (self.priority))
634            pass       
635        if not self.anonymous:
636            from twisted.cred import credentials
637            from twisted.conch.ssh import keys
638            self.DATA =  random_str(500)
639            # public key authentication information
640            self.pubkey_str =keys.getPublicKeyString(self.pubkey_file)
641            # try getting the private key object without a passphrase first
642            try:
643                self.priv_key = keys.getPrivateKeyObject(self.privkey_file)
644            except keys.BadKeyError:
645                pphrase = self._getpassphrase()
646                self.priv_key = keys.getPrivateKeyObject(self.privkey_file,
647                                                         pphrase)
648            self.pub_key = keys.getPublicKeyObject(self.pubkey_str)
649            self.algorithm = 'rsa'
650            self.blob = keys.makePublicKeyBlob(self.pub_key)
651            self.data = self.DATA
652            self.signature = keys.signData(self.priv_key, self.data)
653            self.creds = credentials.SSHPrivateKey(self.username,
654                                                   self.algorithm,
655                                                   self.blob, 
656                                                   self.data,
657                                                   self.signature)
658   
659    def _startLogging(self, log_file):
660        if log_file == 'stdout':
661            log.startLogging(sys.stdout)
662            log.msg('WARNING: Only loggint to stdout!')
663        else:
664            worker_log = open(log_file, 'a')
665            log.startLogging(sys.stdout)
666            log.startLogging(worker_log)
667            log.msg("Logging to file: ", log_file)
668           
669    def _getpassphrase(self):
670        import getpass
671        passphrase = getpass.getpass('Passphrase (Hit enter for None): ')
672       
673        return passphrase
674       
675    def _connected(self, remoteobj):
676        self.remoteobj = remoteobj
677        self.remoteobj.notifyOnDisconnect(self._disconnected)
678        self.connected = True
679        self.reconnecting = False
680       
681        if self.worker_pool == None: # Only pool workers the first time
682            self.pool_workers(self.remoteobj)
683        else:
684            for worker in self.worker_pool:
685                worker.remoteobj = self.remoteobj # Update workers
686   
687    def _disconnected(self, remoteobj):
688        log.msg('Lost connection to the server.')
689        self.connected = False
690        self._retryConnect()
691   
692    def _got_killed_jobs(self, killed_jobs):
693        if killed_jobs == None:
694            return
695        # reconstruct the Job objects from the jdicts
696        killed_jobs = [expand_job(jdict) for jdict in killed_jobs]
697        for worker in self.worker_pool:
698            if worker.job is None:
699                continue
700            if worker.free:
701                continue
702            for job in killed_jobs:
703                if job is None or worker.job is None:
704                    continue
705                if worker.job.job_id == job.job_id:
706                    msg = 'Processing killed job, restarting...'
707                    log.msg(LOG_PREFIX % worker.id + msg)
708                    worker.restart()
709   
710    def _retryConnect(self):
711        log.msg('[Monitor] Disconnected, reconnecting in %s' % (5.0))
712        if not self.connected:
713            reactor.callLater(5.0, self.connect)
714   
715    def _catchConnectionFailure(self, failure):               
716        log.msg("Error: ", failure.getErrorMessage())
717        log.msg("Traceback: ", failure.printTraceback())
718        self._disconnected(None)
719   
720    def _catch_failure(self, failure):
721        log.msg("Error: ", failure.getErrorMessage())
722        log.msg("Traceback: ", failure.printTraceback())
723       
724    def connect(self):
725        """
726        This method connects the monitor to a remote PB server.
727       
728        """
729       
730        if self.connected: # Don't connect multiple times
731            return
732   
733        factory = pb.PBClientFactory()       
734        self.factory = PBClientFactory()
735        try:
736            if self.ssl:
737                # For OpenSSL, SAGE uses GNUTLS now
738                # from twisted.internet import ssl
739                # contextFactory = ssl.ClientContextFactory()
740                # reactor.connectSSL(self.server, self.port,
741                #                    self.factory, contextFactory)
742                cred = X509Credentials()
743                reactor.connectTLS(self.server, self.port, self.factory,
744                                   cred)
745            else:
746                reactor.connectTCP(self.server, self.port, self.factory)
747        except Exception, msg:
748            self._retryConnect()
749       
750        log.msg(DELIMITER)
751        log.msg('DSAGE Worker')
752        log.msg('Started with PID: %s' % (os.getpid()))
753        log.msg('Connecting to %s:%s' % (self.server, self.port))
754        if self.ssl:
755            log.msg('Using SSL: True')
756        else:
757            log.msg('Using SSL: False')
758        log.msg(DELIMITER)
759       
760        if not self.anonymous:
761            log.msg('Connecting as authenticated worker...\n')
762            d = self.factory.login(self.creds, 
763                                   (pb.Referenceable(), self.host_info))
764        else:
765            from twisted.cred.credentials import Anonymous
766            log.msg('Connecting as anonymous worker...\n')
767            d = self.factory.login(Anonymous(), 
768                                   (pb.Referenceable(), self.host_info))
769        d.addCallback(self._connected)
770        d.addErrback(self._catchConnectionFailure)
771           
772        return d
773   
774    def pool_workers(self, remoteobj):
775        """
776        pool_workers creates as many workers as specified in worker.conf.
777       
778        """
779
780        log.msg('[Monitor] Starting %s workers...' % (self.workers))
781        self.worker_pool = [Worker(remoteobj, x, self.log_level, self.poll)
782                            for x in range(self.workers)]
783   
784    def check_killed_jobs(self):
785        """
786        check_killed_jobs retrieves a list of killed job ids.
787       
788        """
789       
790        if not self.connected:
791            return
792           
793        killed_jobs = self.remoteobj.callRemote('get_killed_jobs_list')
794        killed_jobs.addCallback(self._got_killed_jobs)
795        killed_jobs.addErrback(self._catch_failure)
796
797    def start_looping_calls(self):
798        """
799        start_looping_calls prepares and starts our periodic checking methods.
800       
801        """
802        #
803        # self.check_output_timeout = 1
804        # self.tsk1 = task.LoopingCall(self.check_output)
805        # self.tsk1.start(self.check_output_timeout, now=False)
806       
807        interval = 5.0
808        self.tsk2 = task.LoopingCall(self.check_killed_jobs)
809        self.tsk2.start(interval, now=False)
810   
811    def stop_looping_calls(self):
812        """
813        stops the looping calls.
814       
815        """
816       
817        # self.tsk1.stop()
818        self.tsk2.stop()
819
820def usage():
821    """
822    Prints usage help.
823
824    """
825   
826    from optparse import OptionParser
827   
828    usage = ['usage: %prog [options]\n',
829              'Bug reports to <yqiang@gmail.com>']
830    parser = OptionParser(usage=''.join(usage))
831    parser.add_option('-s', '--server',
832                      dest='server',
833                      default='localhost',
834                      help='hostname. Default is localhost')
835    parser.add_option('-p', '--port', 
836                      dest='port', 
837                      type='int',
838                      default=8081,
839                      help='port to connect to. default=8081')
840    parser.add_option('--poll',
841                      dest='poll',
842                      type='float',
843                      default=5.0,
844                      help='poll rate before checking for new job. default=5')
845    parser.add_option('-a', '--anonymous',
846                      dest='anonymous',
847                      default=False,
848                      action='store_true',
849                      help='Connect as anonymous worker. default=False')
850    parser.add_option('-f', '--logfile',
851                      dest='logfile',
852                      default=os.path.join(DSAGE_DIR, 'worker.log'),
853                      help='log file')
854    parser.add_option('-l', '--loglevel',
855                      dest='loglevel',
856                      type='int',
857                      default=0,
858                      help='log level. default=0')
859    parser.add_option('--ssl',
860                      dest='ssl',
861                      action='store_true',
862                      default=False,
863                      help='enable or disable ssl')
864    parser.add_option('--privkey',
865                      dest='privkey_file',
866                      default=os.path.join(DSAGE_DIR, 'dsage_key'),
867                      help='private key file. default = ' + 
868                           '~/.sage/dsage/dsage_key')
869    parser.add_option('--pubkey',
870                      dest='pubkey_file',
871                      default=os.path.join(DSAGE_DIR, 'dsage_key.pub'),
872                      help='public key file. default = ' +
873                           '~/.sage/dsage/dsage_key.pub')
874    parser.add_option('-w', '--workers',
875                      dest='workers',
876                      type='int',
877                      default=2,
878                      help='number of workers. default=2')
879    parser.add_option('--priority',
880                      dest='priority',
881                      type='int',
882                      default=20,
883                      help='priority of workers. default=20')
884    parser.add_option('-u', '--username',
885                      dest='username',
886                      default=getuser(),
887                      help='username')
888    parser.add_option('--noblock',
889                      dest='noblock',
890                      action='store_true',
891                      default=False,
892                      help='tells that the server was ' + 
893                           'started in blocking mode')
894    (options, args) = parser.parse_args()
895
896    return options
897       
898def main():
899    options = usage()
900    SSL = options.ssl
901    monitor = Monitor(server=options.server, 
902                      port=options.port, 
903                      username=options.username,
904                      ssl=SSL, 
905                      workers=options.workers,
906                      anonymous=options.anonymous, 
907                      priority=options.priority,
908                      poll=options.poll, 
909                      log_file=options.logfile,
910                      log_level=options.loglevel,
911                      pubkey_file=options.pubkey_file,
912                      privkey_file=options.privkey_file)
913    monitor.connect()
914    monitor.start_looping_calls()
915    try:
916        if options.noblock:
917            reactor.run(installSignalHandlers=0)
918        else:
919            reactor.run(installSignalHandlers=1)
920    except:
921        log.msg('Error starting the twisted reactor, exiting...')
922        sys.exit()
923
924if __name__ == '__main__':
925    usage()
926    main()
Note: See TracBrowser for help on using the repository browser.