Ticket #3600: ppool_unit_tests.patch

File ppool_unit_tests.patch, 13.7 KB (added by yi, 20 months ago)

New version which fixes the reactor left in an unclean state errors.

  • sage/dsage/database/job.py

    # HG changeset patch
    # User Yi Qiang <yqiang@gmail.com>
    # Date 1215729995 25200
    # Node ID 514b71ad00b416aea50d23f249c64da9e0a1e450
    # Parent  2f22db0464042948c3e0dd1beacb0e71fe0cf297
    [mq]: ppool_unit_tests.patch
    
    diff --git a/sage/dsage/database/job.py b/sage/dsage/database/job.py
    a b  
    9292    def __repr__(self): 
    9393        return self.__str__() 
    9494 
    95     def attach(self, var, obj, file_name=None): 
     95    def attach(self, var, obj=None, file_name=None): 
    9696        """ 
    9797        Attaches an object to a job. 
    9898 
     
    107107            try: 
    108108                s = open(file_name, 'rb').read() 
    109109                s = zlib.compress(s) 
     110                kind = 'file' 
    110111            except: 
    111112                print 'Unable to load %s. ' % file_name 
     113                kind = None 
    112114                return 
    113115        else: 
    114116            try: 
    115117                s = cPickle.dumps(obj, 2) 
    116118                s = zlib.compress(s) 
     119                kind = 'object' 
    117120            except cPickle.PicklingError: 
    118121                print 'Unable to attach your object.' 
    119         self.data.append((var, s, 'object')) 
     122                kind = None 
     123        self.data.append((var, s, kind)) 
    120124 
    121125    def attach_file(self, file_name): 
    122126        """ 
  • sage/dsage/misc/hostinfo.py

    diff --git a/sage/dsage/misc/hostinfo.py b/sage/dsage/misc/hostinfo.py
    a b  
    147147 
    148148if __name__ == '__main__': 
    149149    h = HostInfo() 
    150     print h 
    151  No newline at end of file 
     150    print h 
  • sage/dsage/worker/process.py

    diff --git a/sage/dsage/worker/process.py b/sage/dsage/worker/process.py
    a b  
     1import pdb 
    12from twisted.spread import pb 
    23from twisted.internet import defer 
    34from twisted.internet import protocol 
     
    2324 
    2425    sage = SAGE_BIN 
    2526    args = ['sage', '-q'] 
     27    RESPAWN = True 
    2628    LOG_PREFIX = "ProcessPool: " 
    2729 
    2830    def __init__(self, monitor, num_workers=None, proto=None): 
     
    4850 
    4951    def start(self): 
    5052        """ 
    51         Call me to start the processes. 
     53        Call me to start the process pool. 
    5254        """ 
    5355 
    5456        self.started = True 
    5557        self._start_processes() 
    5658 
     59    def stop(self): 
     60        """ 
     61        Call me to stop the process pool. 
     62        """ 
     63 
     64        for p in self.processes: 
     65            p.transport.loseConnection() 
     66 
     67        self.started = False 
     68         
    5769    def _err_handler(self, failure): 
    5870        """ 
    5971        Generic error handler for Deferreds. 
     
    7587        log.msg(self.LOG_PREFIX + "_process_ready") 
    7688        self.processes.add(proto) 
    7789        self.ready.add(proto) 
    78         jdict = yield self.monitor.get_job() 
     90        try: 
     91            jdict = yield self.monitor.get_job() 
     92        except AttributeError: 
     93            jdict = None 
     94            log.msg(self.LOG_PREFIX + "Not connected to server yet.") 
    7995        if jdict: 
    8096            self.do_job(jdict) 
    8197        else: 
    8298            log.msg(self.LOG_PREFIX + 'jdict is None') 
    8399 
     100        defer.returnValue(proto) 
     101 
     102    def _job_failed(self, result): 
     103        """ 
     104        I am called when a job has failed. 
     105        """ 
     106         
     107        if self.monitor: 
     108            return self.monitor._job_failed(result) 
     109        else: 
     110            log.msg(self.LOG_PREFIX + "I don't have a valid monitor") 
     111         
    84112    def _job_finished(self, result): 
    85113        """ 
    86114        I am called when a job is finished as handled by outReceived. 
    87115 
    88116        INPUT: 
    89117            result  -- a tuple (proto, job) 
     118 
     119        OUTPUT: 
     120            job     -- completed job 
    90121        """ 
    91122 
    92123        proto, job = result 
    93124        log.msg(self.LOG_PREFIX + "Job %s finished" % job.job_id) 
    94125        self.processes.discard(proto) 
    95126        self.busy.discard(proto) 
    96         d = self.monitor.job_done(job.job_id, proto.data, job.result, 
    97                               job.cpu_time) 
     127        try: 
     128            d = self.monitor.job_done(job.job_id, proto.data,  
     129                                      job.result, job.cpu_time) 
     130        except: 
     131            log.msg("Error submitting job result") # TODO: wrap this in a 
     132                                                   # decorator 
    98133        self._recycle_process(proto) 
    99134 
    100     def _process_finished(self, proto): 
     135        return job 
     136 
     137    def _process_ended(self, proto): 
    101138        """ 
    102139        I am called when a process has exited. This will be called after 
    103140        processEnded gets called. 
     
    106143            proto -- a ProcessProtocol 
    107144        """ 
    108145 
     146        log.msg(self.LOG_PREFIX + "Process ended, restarting...") 
    109147        self.processes.discard(proto) 
    110148        self.busy.discard(proto) 
     149        self.ready.discard(proto) 
    111150        p = SageProcessProtocol() 
    112         ready, finished, failed = self._start_process(p) 
     151 
     152        if self.RESPAWN: 
     153            ready, finished, failed = self._start_process(p) 
     154 
     155        return proto 
    113156 
    114157    def _recycle_process(self, proto): 
    115158        """ 
     
    123166        proto._soft_reset() 
    124167        proto.ready.addCallback(self._process_ready) 
    125168        proto.finished.addCallback(self._job_finished) 
    126         proto.failed.addCallback(self.monitor._job_failed) 
     169        proto.failed.addCallback(self._job_failed) 
    127170        self.busy.discard(proto) 
    128171 
    129172        return proto.ready, proto.finished 
     
    136179            proto - a ProcessProtocol 
    137180        """ 
    138181 
     182        log.msg(self.LOG_PREFIX + "Starting protocol %s" % proto) 
    139183        reactor.spawnProcess(proto, self.sage, self.args, 
    140184                             env=os.environ, usePTY=True) 
    141185        self.processes.add(proto) 
    142186        proto.ready.addCallback(self._process_ready) 
    143187        proto.finished.addCallback(self._job_finished) 
    144         proto.failed.addCallback(self.monitor._job_failed) 
    145         proto.process_ended.addCallback(self._process_finished) 
     188        proto.failed.addCallback(self._job_failed) 
     189        proto.process_ended.addCallback(self._process_ended) 
    146190 
    147191        return proto.ready, proto.finished, proto.failed 
    148192 
     
    168212        log.msg('Popped child: %s' % child.name) 
    169213        self.busy.add(child) 
    170214        d = child.run(jdict) 
     215 
     216        return child, child.finished 
    171217 
    172218 
    173219class SageProcessProtocol(protocol.ProcessProtocol): 
     
    241287                        if self.log_level > 2: 
    242288                            msg = 'Extracted %s' % f 
    243289                            log.msg(self.LOG_PREFIX % self.name + msg) 
    244                         self.transport.writeSequence("execfile('%s')\r" % var) 
    245                         self.transport.writeSequence("exit()\r") 
     290                        if var.endswith(('.py', '.sage', '.pyx')): 
     291                            self.transport.writeSequence("execfile('%s')\r" % var) 
     292                            self.transport.writeSequence("exit()\r") 
    246293                    if kind == 'object': 
    247294                        fname = os.path.join(self.tmp_job_dir, var + '.sobj') 
    248295                        if self.log_level > 2: 
     
    344391            jdict -- a dict which gets expanded to a Job object 
    345392        """ 
    346393 
    347         #if jdict is None: 
    348         #    log.msg(self.LOG_PREFIX % self.name + "jdict is None!") 
    349         #    return False 
    350  
    351394        self.job = expand_job(jdict) 
    352395 
    353396        log.msg(self.LOG_PREFIX % self.name + 
     
    360403        f = os.path.join(self.tmp_job_dir, job_filename) 
    361404        log.msg(self.LOG_PREFIX % self.name + "Executing %s" % f) 
    362405        self.transport.writeSequence("execfile('%s')\r" % f) 
    363  
    364         return True 
    365406 
    366407    def connectionMade(self): 
    367408        log.msg(self.LOG_PREFIX % self.name + "Connection made...") 
     
    411452        pgid = os.getpgid(self.transport.pid) 
    412453        os.killpg(pgid, 9) 
    413454 
     455        return self, pgid 
     456 
  • sage/dsage/worker/tests/test_process.py

    diff --git a/sage/dsage/worker/tests/test_process.py b/sage/dsage/worker/tests/test_process.py
    a b  
    1 import unittest 
    21import tempfile 
     2import cPickle 
     3import zlib 
     4import os 
     5 
     6from twisted.internet import defer 
     7from twisted.internet import reactor 
     8from twisted.python import log 
     9from twisted.trial import unittest 
    310 
    411from sage.dsage.worker.process import SageProcessProtocol 
    512from sage.dsage.worker.process import SageProcessPool 
     13from sage.dsage.worker.monitor import DSageMonitor 
     14from sage.dsage.database.job import Job 
     15from sage.dsage.misc.misc import random_str 
    616 
    717class SageProcessPoolTest(unittest.TestCase): 
    818    """ 
    919    I test the implementation of the SageProcessPool 
    1020    """ 
    1121     
     22    NUM_WORKERS = 2 
     23 
    1224    def setUp(self): 
    13         self.pool = SageProcessPool(None) 
     25        log.msg('TEST: Creating pool with %s workers' % self.NUM_WORKERS) 
     26        self.pool = SageProcessPool(None, num_workers=self.NUM_WORKERS) 
     27        self.pool.RESPAWN = False 
     28 
     29    def tearDown(self): 
     30        self.pool.stop() 
     31 
     32    def test_start(self): 
     33        self.pool.start() 
     34        self.assertEquals(self.pool.started, True) 
     35        self.assert_(isinstance(self.pool.ready, set)) 
     36        self.assert_(isinstance(self.pool.processes, set)) 
     37        self.assert_(isinstance(self.pool.busy, set)) 
     38         
     39    def test_start_processs(self): 
     40        self.pool.start() 
     41        self.assertEquals(self.pool.num_workers, self.NUM_WORKERS) 
     42        self.assertEquals(len(self.pool.processes), self.NUM_WORKERS) 
     43 
     44    def test_stop(self): 
     45        self.pool.start() 
     46        self.pool.stop() 
     47 
     48        self.assertEquals(self.pool.started, False) 
     49 
     50    @defer.inlineCallbacks 
     51    def test_process_ready(self): 
     52        proto = SageProcessProtocol() 
     53        ready, finished, failed = self.pool._start_process(proto) 
     54        r = yield ready 
     55         
     56        self.assert_(r in self.pool.ready) 
     57 
     58    @defer.inlineCallbacks 
     59    def test_process_ended(self): 
     60        proto = SageProcessProtocol() 
     61        ready, finished, failed = self.pool._start_process(proto) 
     62        r = yield ready 
     63        proto, pgid = proto.kill_job() 
     64        f = yield proto.process_ended 
     65        self.assert_(proto not in self.pool.ready) 
     66        self.assert_(proto not in self.pool.processes) 
     67 
     68    @defer.inlineCallbacks 
     69    def test_do_job(self): 
     70        proto = SageProcessProtocol() 
     71        ready, finished, failed = self.pool._start_process(proto) 
     72        yield ready 
     73        job = Job() 
     74        job.job_id = "testing123" 
     75        job.code = "DSAGE_RESULT = int(2+2)" 
     76        jdict = job._reduce() 
     77        p, j = self.pool.do_job(jdict) 
     78        j = yield j # actually wait for the job to complete 
     79        self.assertEquals(j.output, '') 
     80 
     81        import cPickle, zlib 
     82        self.assertEquals(cPickle.loads(zlib.decompress(j.result)), 4) 
     83        self.assertEquals(p, proto) 
     84        self.assertEquals(job.job_id, j.job_id) 
     85        proto.transport.loseConnection() 
     86 
     87    def test_recycle_process(self): 
     88        """ 
     89        I test recycling of Sage processes. 
     90        """ 
     91         
     92        self.pool.start() 
     93        job = Job() 
     94        job.job_id = "testing123" 
     95        job.code = "DSAGE_RESULT = int(2+2)" 
     96        jdict = job._reduce() 
     97 
     98        proto, j = yield self.pool.do_job(jdict) 
     99        self.assertEquals(proto.reset, True) 
     100        self.assertEquals(proto.data, "") 
     101        self.assertEquals(proto.job, None) 
     102        self.assertEquals(proto.ready.called, 0) 
     103        self.assertEquals(proto.finished.called, 0) 
     104        self.assertEquals(proto.failed.called, 0) 
    14105 
    15106 
    16107class SageProcessProtocolTest(unittest.TestCase): 
     
    19110    """ 
    20111 
    21112    def setUp(self): 
     113        sage = SageProcessPool.sage 
     114        args = SageProcessPool.args 
    22115        self.proto = SageProcessProtocol() 
     116        reactor.spawnProcess(self.proto, sage, args, env=os.environ, usePTY=True) 
     117 
     118    def tearDown(self): 
     119        self.proto.transport.loseConnection() 
     120 
     121    def test_name(self): 
     122        self.proto.name = 'Worker1' 
     123        self.assertEquals(self.proto.name, 'Worker1') 
     124 
     125    def test_ready(self): 
     126        from twisted.internet import defer 
     127        self.assert_(isinstance(self.proto.ready, defer.Deferred)) 
     128 
     129    def test_finished(self): 
     130        from twisted.internet import defer 
     131        self.assert_(isinstance(self.proto.finished, defer.Deferred)) 
     132 
     133    def test_failed(self): 
     134        from twisted.internet import defer 
     135        self.assert_(isinstance(self.proto.failed, defer.Deferred)) 
     136 
     137    def test_process_ended(self): 
     138        from twisted.internet import defer 
     139        self.assert_(isinstance(self.proto.process_ended, defer.Deferred)) 
     140 
     141    def test_started(self): 
     142        self.assertEquals(self.proto.started, False) 
     143 
     144    def test_reset(self): 
     145        self.assertEquals(self.proto.reset, False) 
     146 
     147    def test_job(self): 
     148        self.assertEquals(self.proto.job, None) 
     149     
     150    def test_check_failure(self): 
     151        output = "Traceback: foo" 
     152        self.assertEquals(self.proto.check_failure(output), True) 
     153         
     154        output = "Error: foo" 
     155        self.assertEquals(self.proto.check_failure(output), True) 
     156 
     157        output = None 
     158        self.assertEquals(self.proto.check_failure(output), False) 
     159     
     160    def test_kill_job(self): 
     161        pass 
     162 
     163    def test_extract_and_load_job_data(self): 
     164        job = Job() 
     165        job.attach('foo', 1) 
     166        job.attach('bar', 2) 
     167        job.attach('README.txt', file_name=os.path.join(os.environ['SAGE_ROOT'], 
     168                   'README.txt')) 
     169        job.job_id = random_str(10) 
     170 
     171        self.proto.tmp_job_dir = os.environ['SAGE_TESTDIR'] 
     172        self.proto.extract_and_load_job_data(job) 
     173        self.assertEquals(os.path.isfile(os.path.join(self.proto.tmp_job_dir, 
     174                          'foo.sobj')), True) 
     175        self.assertEquals(os.path.isfile(os.path.join(self.proto.tmp_job_dir, 
     176                          'bar.sobj')), True) 
     177        self.assertEquals(os.path.isfile(os.path.join(self.proto.tmp_job_dir, 
     178                          'README.txt')), True) 
     179