Ticket #3600: ppool_unit_tests.patch

File ppool_unit_tests.patch, 13.7 KB (added by yi, 13 years ago)

New version which fixes the reactor left in an unclean state errors.

  • sage/dsage/database/job.py

    # HG changeset patch
    # User Yi Qiang <yqiang@gmail.com>
    # Date 1215729995 25200
    # Node ID 514b71ad00b416aea50d23f249c64da9e0a1e450
    # Parent  2f22db0464042948c3e0dd1beacb0e71fe0cf297
    [mq]: ppool_unit_tests.patch
    
    diff --git a/sage/dsage/database/job.py b/sage/dsage/database/job.py
    a b  
    9292    def __repr__(self):
    9393        return self.__str__()
    9494
    95     def attach(self, var, obj, file_name=None):
     95    def attach(self, var, obj=None, file_name=None):
    9696        """
    9797        Attaches an object to a job.
    9898
     
    107107            try:
    108108                s = open(file_name, 'rb').read()
    109109                s = zlib.compress(s)
     110                kind = 'file'
    110111            except:
    111112                print 'Unable to load %s. ' % file_name
     113                kind = None
    112114                return
    113115        else:
    114116            try:
    115117                s = cPickle.dumps(obj, 2)
    116118                s = zlib.compress(s)
     119                kind = 'object'
    117120            except cPickle.PicklingError:
    118121                print 'Unable to attach your object.'
    119         self.data.append((var, s, 'object'))
     122                kind = None
     123        self.data.append((var, s, kind))
    120124
    121125    def attach_file(self, file_name):
    122126        """
  • sage/dsage/misc/hostinfo.py

    diff --git a/sage/dsage/misc/hostinfo.py b/sage/dsage/misc/hostinfo.py
    a b  
    147147
    148148if __name__ == '__main__':
    149149    h = HostInfo()
    150     print h
    151  No newline at end of file
     150    print h
  • sage/dsage/worker/process.py

    diff --git a/sage/dsage/worker/process.py b/sage/dsage/worker/process.py
    a b  
     1import pdb
    12from twisted.spread import pb
    23from twisted.internet import defer
    34from twisted.internet import protocol
     
    2324
    2425    sage = SAGE_BIN
    2526    args = ['sage', '-q']
     27    RESPAWN = True
    2628    LOG_PREFIX = "ProcessPool: "
    2729
    2830    def __init__(self, monitor, num_workers=None, proto=None):
     
    4850
    4951    def start(self):
    5052        """
    51         Call me to start the processes.
     53        Call me to start the process pool.
    5254        """
    5355
    5456        self.started = True
    5557        self._start_processes()
    5658
     59    def stop(self):
     60        """
     61        Call me to stop the process pool.
     62        """
     63
     64        for p in self.processes:
     65            p.transport.loseConnection()
     66
     67        self.started = False
     68       
    5769    def _err_handler(self, failure):
    5870        """
    5971        Generic error handler for Deferreds.
     
    7587        log.msg(self.LOG_PREFIX + "_process_ready")
    7688        self.processes.add(proto)
    7789        self.ready.add(proto)
    78         jdict = yield self.monitor.get_job()
     90        try:
     91            jdict = yield self.monitor.get_job()
     92        except AttributeError:
     93            jdict = None
     94            log.msg(self.LOG_PREFIX + "Not connected to server yet.")
    7995        if jdict:
    8096            self.do_job(jdict)
    8197        else:
    8298            log.msg(self.LOG_PREFIX + 'jdict is None')
    8399
     100        defer.returnValue(proto)
     101
     102    def _job_failed(self, result):
     103        """
     104        I am called when a job has failed.
     105        """
     106       
     107        if self.monitor:
     108            return self.monitor._job_failed(result)
     109        else:
     110            log.msg(self.LOG_PREFIX + "I don't have a valid monitor")
     111       
    84112    def _job_finished(self, result):
    85113        """
    86114        I am called when a job is finished as handled by outReceived.
    87115
    88116        INPUT:
    89117            result  -- a tuple (proto, job)
     118
     119        OUTPUT:
     120            job     -- completed job
    90121        """
    91122
    92123        proto, job = result
    93124        log.msg(self.LOG_PREFIX + "Job %s finished" % job.job_id)
    94125        self.processes.discard(proto)
    95126        self.busy.discard(proto)
    96         d = self.monitor.job_done(job.job_id, proto.data, job.result,
    97                               job.cpu_time)
     127        try:
     128            d = self.monitor.job_done(job.job_id, proto.data,
     129                                      job.result, job.cpu_time)
     130        except:
     131            log.msg("Error submitting job result") # TODO: wrap this in a
     132                                                   # decorator
    98133        self._recycle_process(proto)
    99134
    100     def _process_finished(self, proto):
     135        return job
     136
     137    def _process_ended(self, proto):
    101138        """
    102139        I am called when a process has exited. This will be called after
    103140        processEnded gets called.
     
    106143            proto -- a ProcessProtocol
    107144        """
    108145
     146        log.msg(self.LOG_PREFIX + "Process ended, restarting...")
    109147        self.processes.discard(proto)
    110148        self.busy.discard(proto)
     149        self.ready.discard(proto)
    111150        p = SageProcessProtocol()
    112         ready, finished, failed = self._start_process(p)
     151
     152        if self.RESPAWN:
     153            ready, finished, failed = self._start_process(p)
     154
     155        return proto
    113156
    114157    def _recycle_process(self, proto):
    115158        """
     
    123166        proto._soft_reset()
    124167        proto.ready.addCallback(self._process_ready)
    125168        proto.finished.addCallback(self._job_finished)
    126         proto.failed.addCallback(self.monitor._job_failed)
     169        proto.failed.addCallback(self._job_failed)
    127170        self.busy.discard(proto)
    128171
    129172        return proto.ready, proto.finished
     
    136179            proto - a ProcessProtocol
    137180        """
    138181
     182        log.msg(self.LOG_PREFIX + "Starting protocol %s" % proto)
    139183        reactor.spawnProcess(proto, self.sage, self.args,
    140184                             env=os.environ, usePTY=True)
    141185        self.processes.add(proto)
    142186        proto.ready.addCallback(self._process_ready)
    143187        proto.finished.addCallback(self._job_finished)
    144         proto.failed.addCallback(self.monitor._job_failed)
    145         proto.process_ended.addCallback(self._process_finished)
     188        proto.failed.addCallback(self._job_failed)
     189        proto.process_ended.addCallback(self._process_ended)
    146190
    147191        return proto.ready, proto.finished, proto.failed
    148192
     
    168212        log.msg('Popped child: %s' % child.name)
    169213        self.busy.add(child)
    170214        d = child.run(jdict)
     215
     216        return child, child.finished
    171217
    172218
    173219class SageProcessProtocol(protocol.ProcessProtocol):
     
    241287                        if self.log_level > 2:
    242288                            msg = 'Extracted %s' % f
    243289                            log.msg(self.LOG_PREFIX % self.name + msg)
    244                         self.transport.writeSequence("execfile('%s')\r" % var)
    245                         self.transport.writeSequence("exit()\r")
     290                        if var.endswith(('.py', '.sage', '.pyx')):
     291                            self.transport.writeSequence("execfile('%s')\r" % var)
     292                            self.transport.writeSequence("exit()\r")
    246293                    if kind == 'object':
    247294                        fname = os.path.join(self.tmp_job_dir, var + '.sobj')
    248295                        if self.log_level > 2:
     
    344391            jdict -- a dict which gets expanded to a Job object
    345392        """
    346393
    347         #if jdict is None:
    348         #    log.msg(self.LOG_PREFIX % self.name + "jdict is None!")
    349         #    return False
    350 
    351394        self.job = expand_job(jdict)
    352395
    353396        log.msg(self.LOG_PREFIX % self.name +
     
    360403        f = os.path.join(self.tmp_job_dir, job_filename)
    361404        log.msg(self.LOG_PREFIX % self.name + "Executing %s" % f)
    362405        self.transport.writeSequence("execfile('%s')\r" % f)
    363 
    364         return True
    365406
    366407    def connectionMade(self):
    367408        log.msg(self.LOG_PREFIX % self.name + "Connection made...")
     
    411452        pgid = os.getpgid(self.transport.pid)
    412453        os.killpg(pgid, 9)
    413454
     455        return self, pgid
     456
  • sage/dsage/worker/tests/test_process.py

    diff --git a/sage/dsage/worker/tests/test_process.py b/sage/dsage/worker/tests/test_process.py
    a b  
    1 import unittest
    21import tempfile
     2import cPickle
     3import zlib
     4import os
     5
     6from twisted.internet import defer
     7from twisted.internet import reactor
     8from twisted.python import log
     9from twisted.trial import unittest
    310
    411from sage.dsage.worker.process import SageProcessProtocol
    512from sage.dsage.worker.process import SageProcessPool
     13from sage.dsage.worker.monitor import DSageMonitor
     14from sage.dsage.database.job import Job
     15from sage.dsage.misc.misc import random_str
    616
    717class SageProcessPoolTest(unittest.TestCase):
    818    """
    919    I test the implementation of the SageProcessPool
    1020    """
    1121   
     22    NUM_WORKERS = 2
     23
    1224    def setUp(self):
    13         self.pool = SageProcessPool(None)
     25        log.msg('TEST: Creating pool with %s workers' % self.NUM_WORKERS)
     26        self.pool = SageProcessPool(None, num_workers=self.NUM_WORKERS)
     27        self.pool.RESPAWN = False
     28
     29    def tearDown(self):
     30        self.pool.stop()
     31
     32    def test_start(self):
     33        self.pool.start()
     34        self.assertEquals(self.pool.started, True)
     35        self.assert_(isinstance(self.pool.ready, set))
     36        self.assert_(isinstance(self.pool.processes, set))
     37        self.assert_(isinstance(self.pool.busy, set))
     38       
     39    def test_start_processs(self):
     40        self.pool.start()
     41        self.assertEquals(self.pool.num_workers, self.NUM_WORKERS)
     42        self.assertEquals(len(self.pool.processes), self.NUM_WORKERS)
     43
     44    def test_stop(self):
     45        self.pool.start()
     46        self.pool.stop()
     47
     48        self.assertEquals(self.pool.started, False)
     49
     50    @defer.inlineCallbacks
     51    def test_process_ready(self):
     52        proto = SageProcessProtocol()
     53        ready, finished, failed = self.pool._start_process(proto)
     54        r = yield ready
     55       
     56        self.assert_(r in self.pool.ready)
     57
     58    @defer.inlineCallbacks
     59    def test_process_ended(self):
     60        proto = SageProcessProtocol()
     61        ready, finished, failed = self.pool._start_process(proto)
     62        r = yield ready
     63        proto, pgid = proto.kill_job()
     64        f = yield proto.process_ended
     65        self.assert_(proto not in self.pool.ready)
     66        self.assert_(proto not in self.pool.processes)
     67
     68    @defer.inlineCallbacks
     69    def test_do_job(self):
     70        proto = SageProcessProtocol()
     71        ready, finished, failed = self.pool._start_process(proto)
     72        yield ready
     73        job = Job()
     74        job.job_id = "testing123"
     75        job.code = "DSAGE_RESULT = int(2+2)"
     76        jdict = job._reduce()
     77        p, j = self.pool.do_job(jdict)
     78        j = yield j # actually wait for the job to complete
     79        self.assertEquals(j.output, '')
     80
     81        import cPickle, zlib
     82        self.assertEquals(cPickle.loads(zlib.decompress(j.result)), 4)
     83        self.assertEquals(p, proto)
     84        self.assertEquals(job.job_id, j.job_id)
     85        proto.transport.loseConnection()
     86
     87    def test_recycle_process(self):
     88        """
     89        I test recycling of Sage processes.
     90        """
     91       
     92        self.pool.start()
     93        job = Job()
     94        job.job_id = "testing123"
     95        job.code = "DSAGE_RESULT = int(2+2)"
     96        jdict = job._reduce()
     97
     98        proto, j = yield self.pool.do_job(jdict)
     99        self.assertEquals(proto.reset, True)
     100        self.assertEquals(proto.data, "")
     101        self.assertEquals(proto.job, None)
     102        self.assertEquals(proto.ready.called, 0)
     103        self.assertEquals(proto.finished.called, 0)
     104        self.assertEquals(proto.failed.called, 0)
    14105
    15106
    16107class SageProcessProtocolTest(unittest.TestCase):
     
    19110    """
    20111
    21112    def setUp(self):
     113        sage = SageProcessPool.sage
     114        args = SageProcessPool.args
    22115        self.proto = SageProcessProtocol()
     116        reactor.spawnProcess(self.proto, sage, args, env=os.environ, usePTY=True)
     117
     118    def tearDown(self):
     119        self.proto.transport.loseConnection()
     120
     121    def test_name(self):
     122        self.proto.name = 'Worker1'
     123        self.assertEquals(self.proto.name, 'Worker1')
     124
     125    def test_ready(self):
     126        from twisted.internet import defer
     127        self.assert_(isinstance(self.proto.ready, defer.Deferred))
     128
     129    def test_finished(self):
     130        from twisted.internet import defer
     131        self.assert_(isinstance(self.proto.finished, defer.Deferred))
     132
     133    def test_failed(self):
     134        from twisted.internet import defer
     135        self.assert_(isinstance(self.proto.failed, defer.Deferred))
     136
     137    def test_process_ended(self):
     138        from twisted.internet import defer
     139        self.assert_(isinstance(self.proto.process_ended, defer.Deferred))
     140
     141    def test_started(self):
     142        self.assertEquals(self.proto.started, False)
     143
     144    def test_reset(self):
     145        self.assertEquals(self.proto.reset, False)
     146
     147    def test_job(self):
     148        self.assertEquals(self.proto.job, None)
     149   
     150    def test_check_failure(self):
     151        output = "Traceback: foo"
     152        self.assertEquals(self.proto.check_failure(output), True)
     153       
     154        output = "Error: foo"
     155        self.assertEquals(self.proto.check_failure(output), True)
     156
     157        output = None
     158        self.assertEquals(self.proto.check_failure(output), False)
     159   
     160    def test_kill_job(self):
     161        pass
     162
     163    def test_extract_and_load_job_data(self):
     164        job = Job()
     165        job.attach('foo', 1)
     166        job.attach('bar', 2)
     167        job.attach('README.txt', file_name=os.path.join(os.environ['SAGE_ROOT'],
     168                   'README.txt'))
     169        job.job_id = random_str(10)
     170
     171        self.proto.tmp_job_dir = os.environ['SAGE_TESTDIR']
     172        self.proto.extract_and_load_job_data(job)
     173        self.assertEquals(os.path.isfile(os.path.join(self.proto.tmp_job_dir,
     174                          'foo.sobj')), True)
     175        self.assertEquals(os.path.isfile(os.path.join(self.proto.tmp_job_dir,
     176                          'bar.sobj')), True)
     177        self.assertEquals(os.path.isfile(os.path.join(self.proto.tmp_job_dir,
     178                          'README.txt')), True)
     179