Ticket #3600: ppool_unit_tests.patch
| File ppool_unit_tests.patch, 13.7 KB (added by yi, 20 months ago) |
|---|
-
sage/dsage/database/job.py
# HG changeset patch # User Yi Qiang <yqiang@gmail.com> # Date 1215729995 25200 # Node ID 514b71ad00b416aea50d23f249c64da9e0a1e450 # Parent 2f22db0464042948c3e0dd1beacb0e71fe0cf297 [mq]: ppool_unit_tests.patch diff --git a/sage/dsage/database/job.py b/sage/dsage/database/job.py
a b 92 92 def __repr__(self): 93 93 return self.__str__() 94 94 95 def attach(self, var, obj , file_name=None):95 def attach(self, var, obj=None, file_name=None): 96 96 """ 97 97 Attaches an object to a job. 98 98 … … 107 107 try: 108 108 s = open(file_name, 'rb').read() 109 109 s = zlib.compress(s) 110 kind = 'file' 110 111 except: 111 112 print 'Unable to load %s. ' % file_name 113 kind = None 112 114 return 113 115 else: 114 116 try: 115 117 s = cPickle.dumps(obj, 2) 116 118 s = zlib.compress(s) 119 kind = 'object' 117 120 except cPickle.PicklingError: 118 121 print 'Unable to attach your object.' 119 self.data.append((var, s, 'object')) 122 kind = None 123 self.data.append((var, s, kind)) 120 124 121 125 def attach_file(self, file_name): 122 126 """ -
sage/dsage/misc/hostinfo.py
diff --git a/sage/dsage/misc/hostinfo.py b/sage/dsage/misc/hostinfo.py
a b 147 147 148 148 if __name__ == '__main__': 149 149 h = HostInfo() 150 print h 151 No newline at end of file 150 print h -
sage/dsage/worker/process.py
diff --git a/sage/dsage/worker/process.py b/sage/dsage/worker/process.py
a b 1 import pdb 1 2 from twisted.spread import pb 2 3 from twisted.internet import defer 3 4 from twisted.internet import protocol … … 23 24 24 25 sage = SAGE_BIN 25 26 args = ['sage', '-q'] 27 RESPAWN = True 26 28 LOG_PREFIX = "ProcessPool: " 27 29 28 30 def __init__(self, monitor, num_workers=None, proto=None): … … 48 50 49 51 def start(self): 50 52 """ 51 Call me to start the process es.53 Call me to start the process pool. 52 54 """ 53 55 54 56 self.started = True 55 57 self._start_processes() 56 58 59 def stop(self): 60 """ 61 Call me to stop the process pool. 62 """ 63 64 for p in self.processes: 65 p.transport.loseConnection() 66 67 self.started = False 68 57 69 def _err_handler(self, failure): 58 70 """ 59 71 Generic error handler for Deferreds. … … 75 87 log.msg(self.LOG_PREFIX + "_process_ready") 76 88 self.processes.add(proto) 77 89 self.ready.add(proto) 78 jdict = yield self.monitor.get_job() 90 try: 91 jdict = yield self.monitor.get_job() 92 except AttributeError: 93 jdict = None 94 log.msg(self.LOG_PREFIX + "Not connected to server yet.") 79 95 if jdict: 80 96 self.do_job(jdict) 81 97 else: 82 98 log.msg(self.LOG_PREFIX + 'jdict is None') 83 99 100 defer.returnValue(proto) 101 102 def _job_failed(self, result): 103 """ 104 I am called when a job has failed. 105 """ 106 107 if self.monitor: 108 return self.monitor._job_failed(result) 109 else: 110 log.msg(self.LOG_PREFIX + "I don't have a valid monitor") 111 84 112 def _job_finished(self, result): 85 113 """ 86 114 I am called when a job is finished as handled by outReceived. 87 115 88 116 INPUT: 89 117 result -- a tuple (proto, job) 118 119 OUTPUT: 120 job -- completed job 90 121 """ 91 122 92 123 proto, job = result 93 124 log.msg(self.LOG_PREFIX + "Job %s finished" % job.job_id) 94 125 self.processes.discard(proto) 95 126 self.busy.discard(proto) 96 d = self.monitor.job_done(job.job_id, proto.data, job.result, 97 job.cpu_time) 127 try: 128 d = self.monitor.job_done(job.job_id, proto.data, 129 job.result, job.cpu_time) 130 except: 131 log.msg("Error submitting job result") # TODO: wrap this in a 132 # decorator 98 133 self._recycle_process(proto) 99 134 100 def _process_finished(self, proto): 135 return job 136 137 def _process_ended(self, proto): 101 138 """ 102 139 I am called when a process has exited. This will be called after 103 140 processEnded gets called. … … 106 143 proto -- a ProcessProtocol 107 144 """ 108 145 146 log.msg(self.LOG_PREFIX + "Process ended, restarting...") 109 147 self.processes.discard(proto) 110 148 self.busy.discard(proto) 149 self.ready.discard(proto) 111 150 p = SageProcessProtocol() 112 ready, finished, failed = self._start_process(p) 151 152 if self.RESPAWN: 153 ready, finished, failed = self._start_process(p) 154 155 return proto 113 156 114 157 def _recycle_process(self, proto): 115 158 """ … … 123 166 proto._soft_reset() 124 167 proto.ready.addCallback(self._process_ready) 125 168 proto.finished.addCallback(self._job_finished) 126 proto.failed.addCallback(self. monitor._job_failed)169 proto.failed.addCallback(self._job_failed) 127 170 self.busy.discard(proto) 128 171 129 172 return proto.ready, proto.finished … … 136 179 proto - a ProcessProtocol 137 180 """ 138 181 182 log.msg(self.LOG_PREFIX + "Starting protocol %s" % proto) 139 183 reactor.spawnProcess(proto, self.sage, self.args, 140 184 env=os.environ, usePTY=True) 141 185 self.processes.add(proto) 142 186 proto.ready.addCallback(self._process_ready) 143 187 proto.finished.addCallback(self._job_finished) 144 proto.failed.addCallback(self. monitor._job_failed)145 proto.process_ended.addCallback(self._process_ finished)188 proto.failed.addCallback(self._job_failed) 189 proto.process_ended.addCallback(self._process_ended) 146 190 147 191 return proto.ready, proto.finished, proto.failed 148 192 … … 168 212 log.msg('Popped child: %s' % child.name) 169 213 self.busy.add(child) 170 214 d = child.run(jdict) 215 216 return child, child.finished 171 217 172 218 173 219 class SageProcessProtocol(protocol.ProcessProtocol): … … 241 287 if self.log_level > 2: 242 288 msg = 'Extracted %s' % f 243 289 log.msg(self.LOG_PREFIX % self.name + msg) 244 self.transport.writeSequence("execfile('%s')\r" % var) 245 self.transport.writeSequence("exit()\r") 290 if var.endswith(('.py', '.sage', '.pyx')): 291 self.transport.writeSequence("execfile('%s')\r" % var) 292 self.transport.writeSequence("exit()\r") 246 293 if kind == 'object': 247 294 fname = os.path.join(self.tmp_job_dir, var + '.sobj') 248 295 if self.log_level > 2: … … 344 391 jdict -- a dict which gets expanded to a Job object 345 392 """ 346 393 347 #if jdict is None:348 # log.msg(self.LOG_PREFIX % self.name + "jdict is None!")349 # return False350 351 394 self.job = expand_job(jdict) 352 395 353 396 log.msg(self.LOG_PREFIX % self.name + … … 360 403 f = os.path.join(self.tmp_job_dir, job_filename) 361 404 log.msg(self.LOG_PREFIX % self.name + "Executing %s" % f) 362 405 self.transport.writeSequence("execfile('%s')\r" % f) 363 364 return True365 406 366 407 def connectionMade(self): 367 408 log.msg(self.LOG_PREFIX % self.name + "Connection made...") … … 411 452 pgid = os.getpgid(self.transport.pid) 412 453 os.killpg(pgid, 9) 413 454 455 return self, pgid 456 -
sage/dsage/worker/tests/test_process.py
diff --git a/sage/dsage/worker/tests/test_process.py b/sage/dsage/worker/tests/test_process.py
a b 1 import unittest2 1 import tempfile 2 import cPickle 3 import zlib 4 import os 5 6 from twisted.internet import defer 7 from twisted.internet import reactor 8 from twisted.python import log 9 from twisted.trial import unittest 3 10 4 11 from sage.dsage.worker.process import SageProcessProtocol 5 12 from sage.dsage.worker.process import SageProcessPool 13 from sage.dsage.worker.monitor import DSageMonitor 14 from sage.dsage.database.job import Job 15 from sage.dsage.misc.misc import random_str 6 16 7 17 class SageProcessPoolTest(unittest.TestCase): 8 18 """ 9 19 I test the implementation of the SageProcessPool 10 20 """ 11 21 22 NUM_WORKERS = 2 23 12 24 def setUp(self): 13 self.pool = SageProcessPool(None) 25 log.msg('TEST: Creating pool with %s workers' % self.NUM_WORKERS) 26 self.pool = SageProcessPool(None, num_workers=self.NUM_WORKERS) 27 self.pool.RESPAWN = False 28 29 def tearDown(self): 30 self.pool.stop() 31 32 def test_start(self): 33 self.pool.start() 34 self.assertEquals(self.pool.started, True) 35 self.assert_(isinstance(self.pool.ready, set)) 36 self.assert_(isinstance(self.pool.processes, set)) 37 self.assert_(isinstance(self.pool.busy, set)) 38 39 def test_start_processs(self): 40 self.pool.start() 41 self.assertEquals(self.pool.num_workers, self.NUM_WORKERS) 42 self.assertEquals(len(self.pool.processes), self.NUM_WORKERS) 43 44 def test_stop(self): 45 self.pool.start() 46 self.pool.stop() 47 48 self.assertEquals(self.pool.started, False) 49 50 @defer.inlineCallbacks 51 def test_process_ready(self): 52 proto = SageProcessProtocol() 53 ready, finished, failed = self.pool._start_process(proto) 54 r = yield ready 55 56 self.assert_(r in self.pool.ready) 57 58 @defer.inlineCallbacks 59 def test_process_ended(self): 60 proto = SageProcessProtocol() 61 ready, finished, failed = self.pool._start_process(proto) 62 r = yield ready 63 proto, pgid = proto.kill_job() 64 f = yield proto.process_ended 65 self.assert_(proto not in self.pool.ready) 66 self.assert_(proto not in self.pool.processes) 67 68 @defer.inlineCallbacks 69 def test_do_job(self): 70 proto = SageProcessProtocol() 71 ready, finished, failed = self.pool._start_process(proto) 72 yield ready 73 job = Job() 74 job.job_id = "testing123" 75 job.code = "DSAGE_RESULT = int(2+2)" 76 jdict = job._reduce() 77 p, j = self.pool.do_job(jdict) 78 j = yield j # actually wait for the job to complete 79 self.assertEquals(j.output, '') 80 81 import cPickle, zlib 82 self.assertEquals(cPickle.loads(zlib.decompress(j.result)), 4) 83 self.assertEquals(p, proto) 84 self.assertEquals(job.job_id, j.job_id) 85 proto.transport.loseConnection() 86 87 def test_recycle_process(self): 88 """ 89 I test recycling of Sage processes. 90 """ 91 92 self.pool.start() 93 job = Job() 94 job.job_id = "testing123" 95 job.code = "DSAGE_RESULT = int(2+2)" 96 jdict = job._reduce() 97 98 proto, j = yield self.pool.do_job(jdict) 99 self.assertEquals(proto.reset, True) 100 self.assertEquals(proto.data, "") 101 self.assertEquals(proto.job, None) 102 self.assertEquals(proto.ready.called, 0) 103 self.assertEquals(proto.finished.called, 0) 104 self.assertEquals(proto.failed.called, 0) 14 105 15 106 16 107 class SageProcessProtocolTest(unittest.TestCase): … … 19 110 """ 20 111 21 112 def setUp(self): 113 sage = SageProcessPool.sage 114 args = SageProcessPool.args 22 115 self.proto = SageProcessProtocol() 116 reactor.spawnProcess(self.proto, sage, args, env=os.environ, usePTY=True) 117 118 def tearDown(self): 119 self.proto.transport.loseConnection() 120 121 def test_name(self): 122 self.proto.name = 'Worker1' 123 self.assertEquals(self.proto.name, 'Worker1') 124 125 def test_ready(self): 126 from twisted.internet import defer 127 self.assert_(isinstance(self.proto.ready, defer.Deferred)) 128 129 def test_finished(self): 130 from twisted.internet import defer 131 self.assert_(isinstance(self.proto.finished, defer.Deferred)) 132 133 def test_failed(self): 134 from twisted.internet import defer 135 self.assert_(isinstance(self.proto.failed, defer.Deferred)) 136 137 def test_process_ended(self): 138 from twisted.internet import defer 139 self.assert_(isinstance(self.proto.process_ended, defer.Deferred)) 140 141 def test_started(self): 142 self.assertEquals(self.proto.started, False) 143 144 def test_reset(self): 145 self.assertEquals(self.proto.reset, False) 146 147 def test_job(self): 148 self.assertEquals(self.proto.job, None) 149 150 def test_check_failure(self): 151 output = "Traceback: foo" 152 self.assertEquals(self.proto.check_failure(output), True) 153 154 output = "Error: foo" 155 self.assertEquals(self.proto.check_failure(output), True) 156 157 output = None 158 self.assertEquals(self.proto.check_failure(output), False) 159 160 def test_kill_job(self): 161 pass 162 163 def test_extract_and_load_job_data(self): 164 job = Job() 165 job.attach('foo', 1) 166 job.attach('bar', 2) 167 job.attach('README.txt', file_name=os.path.join(os.environ['SAGE_ROOT'], 168 'README.txt')) 169 job.job_id = random_str(10) 170 171 self.proto.tmp_job_dir = os.environ['SAGE_TESTDIR'] 172 self.proto.extract_and_load_job_data(job) 173 self.assertEquals(os.path.isfile(os.path.join(self.proto.tmp_job_dir, 174 'foo.sobj')), True) 175 self.assertEquals(os.path.isfile(os.path.join(self.proto.tmp_job_dir, 176 'bar.sobj')), True) 177 self.assertEquals(os.path.isfile(os.path.join(self.proto.tmp_job_dir, 178 'README.txt')), True) 179
