| 1 | | #!/usr/bin/env python |
| 2 | | ############################################################################ |
| 3 | | # |
| 4 | | # DSAGE: Distributed SAGE |
| 5 | | # |
| 6 | | # Copyright (C) 2006, 2007 Yi Qiang <yqiang@gmail.com> |
| 7 | | # |
| 8 | | # Distributed under the terms of the GNU General Public License (GPL) |
| 9 | | # |
| 10 | | # This code is distributed in the hope that it will be useful, |
| 11 | | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 12 | | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| 13 | | # General Public License for more details. |
| 14 | | # |
| 15 | | # The full text of the GPL is available at: |
| 16 | | # |
| 17 | | # http://www.gnu.org/licenses/ |
| 18 | | # |
| 19 | | ############################################################################ |
| 20 | | __docformat__ = "restructuredtext en" |
| 21 | | |
| 22 | | import sys |
| 23 | | import os |
| 24 | | import cPickle |
| 25 | | import zlib |
| 26 | | import pexpect |
| 27 | | import datetime |
| 28 | | from math import ceil |
| 29 | | from getpass import getuser |
| 30 | | |
| 31 | | from twisted.spread import pb |
| 32 | | from twisted.internet import reactor, defer, error, task |
| 33 | | from twisted.python import log |
| 34 | | from twisted.spread import banana |
| 35 | | banana.SIZE_LIMIT = 100*1024*1024 # 100 MegaBytes |
| 36 | | |
| 37 | | from gnutls.constants import * |
| 38 | | from gnutls.crypto import * |
| 39 | | from gnutls.errors import * |
| 40 | | from gnutls.interfaces.twisted import X509Credentials |
| 41 | | |
| 42 | | from sage.interfaces.sage0 import Sage |
| 43 | | from sage.misc.preparser import preparse_file |
| 44 | | |
| 45 | | from sage.dsage.database.job import Job, expand_job |
| 46 | | from sage.dsage.misc.hostinfo import HostInfo |
| 47 | | from sage.dsage.errors.exceptions import NoJobException |
| 48 | | from sage.dsage.twisted.pb import ClientFactory |
| 49 | | from sage.dsage.misc.constants import DELIMITER |
| 50 | | from sage.dsage.misc.constants import DSAGE_DIR |
| 51 | | from sage.dsage.misc.constants import TMP_WORKER_FILES |
| 52 | | from sage.dsage.misc.misc import random_str, get_uuid |
| 53 | | |
| 54 | | START_MARKER = '\x01r\x01e' |
| 55 | | END_MARKER = '\x01r\x01b' |
| 56 | | LOG_PREFIX = "[Worker %s] " |
| 57 | | |
| 58 | | class Worker(object): |
| 59 | | """ |
| 60 | | Workers perform the computation of dsage jobs. |
| 61 | | |
| 62 | | """ |
| 63 | | |
| 64 | | def __init__(self, remoteobj, id, log_level=0, poll=1.0): |
| 65 | | """ |
| 66 | | :type remoteobj: remoteobj |
| 67 | | :param remoteobj: Reference to the remote dsage server |
| 68 | | |
| 69 | | :type id: integer |
| 70 | | :param id: numerical identifier of worker |
| 71 | | |
| 72 | | :type log_level: integer |
| 73 | | :param log_level: log level, higher means more verbose |
| 74 | | |
| 75 | | :type poll: integer |
| 76 | | :param poll: rate (in seconds) a worker talks to the server |
| 77 | | |
| 78 | | """ |
| 79 | | |
| 80 | | self.remoteobj = remoteobj |
| 81 | | self.id = id |
| 82 | | self.free = True |
| 83 | | self.job = None |
| 84 | | self.log_level = log_level |
| 85 | | self.poll_rate = poll |
| 86 | | self.checker_task = task.LoopingCall(self.check_work) |
| 87 | | self.checker_timeout = 0.5 |
| 88 | | self.got_output = False |
| 89 | | self.job_start_time = None |
| 90 | | self.orig_poll = poll |
| 91 | | self.start() |
| 92 | | |
| 93 | | def _catch_failure(self, failure): |
| 94 | | log.msg("Error: ", failure.getErrorMessage()) |
| 95 | | log.msg("Traceback: ", failure.printTraceback()) |
| 96 | | |
| 97 | | def _increase_poll_rate(self): |
| 98 | | if self.poll_rate >= 15: # Cap the polling interval to 15 seconds |
| 99 | | self.poll_rate = 15 |
| 100 | | if self.log_level > 3: |
| 101 | | log.msg('[Worker %s] Capping poll rate to %s' |
| 102 | | % (self.id, self.poll_rate)) |
| 103 | | else: |
| 104 | | self.poll_rate = ceil(self.poll_rate * 1.5) |
| 105 | | if self.log_level > 3: |
| 106 | | log.msg('[Worker %s] Increased polling rate to %s' |
| 107 | | % (self.id, self.poll_rate)) |
| 108 | | |
| 109 | | def get_job(self): |
| 110 | | try: |
| 111 | | if self.log_level > 3: |
| 112 | | log.msg(LOG_PREFIX % self.id + 'Getting job...') |
| 113 | | d = self.remoteobj.callRemote('get_job') |
| 114 | | except Exception, msg: |
| 115 | | log.msg(msg) |
| 116 | | log.msg(LOG_PREFIX % self.id + 'Disconnected...') |
| 117 | | self._increase_poll_rate() |
| 118 | | reactor.callLater(self.poll_rate, self.get_job) |
| 119 | | return |
| 120 | | d.addCallback(self.gotJob) |
| 121 | | d.addErrback(self.noJob) |
| 122 | | |
| 123 | | return d |
| 124 | | |
| 125 | | def gotJob(self, jdict): |
| 126 | | """ |
| 127 | | callback for the remoteobj's get_job method. |
| 128 | | |
| 129 | | :type jdict: dict |
| 130 | | :param jdict: job dictionary |
| 131 | | |
| 132 | | """ |
| 133 | | |
| 134 | | if self.log_level > 1: |
| 135 | | if jdict is None: |
| 136 | | log.msg(LOG_PREFIX % self.id + 'No new job.') |
| 137 | | if self.log_level > 3: |
| 138 | | if jdict is not None: |
| 139 | | log.msg(LOG_PREFIX % self.id + 'Got Job: %s' % jdict) |
| 140 | | self.job = expand_job(jdict) |
| 141 | | if not isinstance(self.job, Job): |
| 142 | | raise NoJobException |
| 143 | | try: |
| 144 | | self.poll_rate = self.orig_poll |
| 145 | | self.doJob(self.job) |
| 146 | | except Exception, msg: |
| 147 | | log.msg(msg) |
| 148 | | self.report_failure(msg) |
| 149 | | self.restart() |
| 150 | | |
| 151 | | def job_done(self, output, result, completed, cpu_time): |
| 152 | | """ |
| 153 | | Reports to the server that a job has finished. It also reports partial |
| 154 | | completeness by presenting the server with new output. |
| 155 | | |
| 156 | | Parameters: |
| 157 | | :type output: string |
| 158 | | :param output: output of command (to sys.stdout) |
| 159 | | |
| 160 | | :type result: python pickle |
| 161 | | :param result: result of the job |
| 162 | | |
| 163 | | :type completed: bool |
| 164 | | :param completed: whether or not the job is finished |
| 165 | | |
| 166 | | :type cpu_time: string |
| 167 | | :param cpu_time: how long the job took |
| 168 | | |
| 169 | | """ |
| 170 | | |
| 171 | | job_id = self.job.job_id |
| 172 | | wait = 5.0 |
| 173 | | try: |
| 174 | | d = self.remoteobj.callRemote('job_done', job_id, output, result, |
| 175 | | completed, cpu_time) |
| 176 | | except Exception, msg: |
| 177 | | log.msg('Error trying to submit job status...') |
| 178 | | log.msg('Retrying to submit again in %s seconds...' % wait) |
| 179 | | log.err(msg) |
| 180 | | reactor.callLater(wait, self.job_done, output, result, |
| 181 | | completed, cpu_time) |
| 182 | | d = defer.Deferred() |
| 183 | | d.errback(error.ConnectionLost()) |
| 184 | | return d |
| 185 | | |
| 186 | | if completed: |
| 187 | | log.msg('[Worker %s] Finished job %s' % (self.id, job_id)) |
| 188 | | self.restart() |
| 189 | | |
| 190 | | return d |
| 191 | | |
| 192 | | |
| 193 | | def noJob(self, failure): |
| 194 | | """ |
| 195 | | Errback that catches the NoJobException. |
| 196 | | |
| 197 | | :type failure: twisted.python.failure |
| 198 | | :param failure: a twisted failure object |
| 199 | | |
| 200 | | """ |
| 201 | | |
| 202 | | if failure.check(NoJobException): |
| 203 | | if self.log_level > 1: |
| 204 | | msg = 'Sleeping for %s seconds' % self.poll_rate |
| 205 | | log.msg(LOG_PREFIX % self.id + msg) |
| 206 | | self._increase_poll_rate() |
| 207 | | reactor.callLater(self.poll_rate, self.get_job) |
| 208 | | else: |
| 209 | | log.msg("Error: ", failure.getErrorMessage()) |
| 210 | | log.msg("Traceback: ", failure.printTraceback()) |
| 211 | | |
| 212 | | def setup_tmp_dir(self, job): |
| 213 | | """ |
| 214 | | Creates the temporary directory for the worker. |
| 215 | | |
| 216 | | :type job: sage.dsage.database.job.Job |
| 217 | | :param job: a Job object |
| 218 | | |
| 219 | | """ |
| 220 | | |
| 221 | | cur_dir = os.getcwd() # keep a reference to the current directory |
| 222 | | tmp_job_dir = os.path.join(TMP_WORKER_FILES, job.job_id) |
| 223 | | if not os.path.isdir(TMP_WORKER_FILES): |
| 224 | | os.mkdir(TMP_WORKER_FILES) |
| 225 | | if not os.path.isdir(tmp_job_dir): |
| 226 | | os.mkdir(tmp_job_dir) |
| 227 | | os.chdir(tmp_job_dir) |
| 228 | | self.sage.eval("os.chdir('%s')" % tmp_job_dir) |
| 229 | | |
| 230 | | return tmp_job_dir |
| 231 | | |
| 232 | | def extract_and_load_job_data(self, job): |
| 233 | | """ |
| 234 | | Extracts all the data that is in a job object. |
| 235 | | |
| 236 | | :type job: sage.dsage.database.job.Job |
| 237 | | :param job: a Job object |
| 238 | | |
| 239 | | """ |
| 240 | | |
| 241 | | if isinstance(job.data, list): |
| 242 | | if self.log_level > 2: |
| 243 | | msg = 'Extracting job data...' |
| 244 | | log.msg(LOG_PREFIX % self.id + msg) |
| 245 | | try: |
| 246 | | for var, data, kind in job.data: |
| 247 | | try: |
| 248 | | data = zlib.decompress(data) |
| 249 | | except Exception, msg: |
| 250 | | log.msg(msg) |
| 251 | | continue |
| 252 | | if kind == 'file': |
| 253 | | data = preparse_file(data, magic=True, do_time=False, |
| 254 | | ignore_prompts=False) |
| 255 | | f = open(var, 'wb') |
| 256 | | f.write(data) |
| 257 | | f.close() |
| 258 | | if self.log_level > 2: |
| 259 | | msg = 'Extracted %s' % f |
| 260 | | log.msg(LOG_PREFIX % self.id + msg) |
| 261 | | self.sage.eval("execfile('%s')" % var) |
| 262 | | if kind == 'object': |
| 263 | | fname = var + '.sobj' |
| 264 | | if self.log_level > 2: |
| 265 | | log.msg('Object to be loaded: %s' % fname) |
| 266 | | f = open(fname, 'wb') |
| 267 | | f.write(data) |
| 268 | | f.close() |
| 269 | | self.sage.eval("%s = load('%s')" % (var, fname)) |
| 270 | | if self.log_level > 2: |
| 271 | | msg = 'Loaded %s' % fname |
| 272 | | log.msg(LOG_PREFIX % self.id + msg) |
| 273 | | except Exception, msg: |
| 274 | | log.msg(LOG_PREFIX % self.id + msg) |
| 275 | | |
| 276 | | def write_job_file(self, job): |
| 277 | | """ |
| 278 | | Writes out the job file to be executed to disk. |
| 279 | | |
| 280 | | :type job: sage.dsage.database.job.Job |
| 281 | | :param job: A Job object |
| 282 | | |
| 283 | | """ |
| 284 | | |
| 285 | | parsed_file = preparse_file(job.code, magic=True, |
| 286 | | do_time=False, ignore_prompts=False) |
| 287 | | |
| 288 | | job_filename = str(job.name) + '.py' |
| 289 | | job_file = open(job_filename, 'w') |
| 290 | | BEGIN = "print '%s'\n\n" % (START_MARKER) |
| 291 | | END = "print '%s'\n\n" % (END_MARKER) |
| 292 | | GO_TO_TMP_DIR = """os.chdir('%s')\n""" % self.tmp_job_dir |
| 293 | | SAVE_TIME = """save((time.time()-dsage_start_time), 'cpu_time.sobj', compress=False)\n""" |
| 294 | | SAVE_RESULT = """try: |
| 295 | | save(DSAGE_RESULT, 'result.sobj', compress=True) |
| 296 | | except: |
| 297 | | save('No DSAGE_RESULT', 'result.sobj', compress=True) |
| 298 | | """ |
| 299 | | job_file.write("alarm(%s)\n\n" % (job.timeout)) |
| 300 | | job_file.write("import time\n\n") |
| 301 | | job_file.write(BEGIN) |
| 302 | | job_file.write('dsage_start_time = time.time()\n') |
| 303 | | job_file.write(parsed_file) |
| 304 | | job_file.write("\n\n") |
| 305 | | job_file.write(END) |
| 306 | | job_file.write("\n") |
| 307 | | job_file.write(GO_TO_TMP_DIR) |
| 308 | | job_file.write(SAVE_RESULT) |
| 309 | | job_file.write(SAVE_TIME) |
| 310 | | job_file.close() |
| 311 | | if self.log_level > 2: |
| 312 | | log.msg('[Worker: %s] Wrote job file. ' % (self.id)) |
| 313 | | |
| 314 | | return job_filename |
| 315 | | |
| 316 | | def doJob(self, job): |
| 317 | | """ |
| 318 | | Executes a job |
| 319 | | |
| 320 | | :type job: sage.dsage.database.job.Job |
| 321 | | :param job: A Job object |
| 322 | | |
| 323 | | """ |
| 324 | | |
| 325 | | log.msg(LOG_PREFIX % self.id + 'Starting job %s ' % job.job_id) |
| 326 | | |
| 327 | | self.free = False |
| 328 | | self.got_output = False |
| 329 | | d = defer.Deferred() |
| 330 | | |
| 331 | | try: |
| 332 | | self.checker_task.start(self.checker_timeout, now=False) |
| 333 | | except AssertionError: |
| 334 | | self.checker_task.stop() |
| 335 | | self.checker_task.start(self.checker_timeout, now=False) |
| 336 | | if self.log_level > 2: |
| 337 | | log.msg(LOG_PREFIX % self.id + 'Starting checker task...') |
| 338 | | |
| 339 | | self.tmp_job_dir = self.setup_tmp_dir(job) |
| 340 | | self.extract_and_load_job_data(job) |
| 341 | | |
| 342 | | job_filename = self.write_job_file(job) |
| 343 | | |
| 344 | | f = os.path.join(self.tmp_job_dir, job_filename) |
| 345 | | self.sage._send("execfile('%s')" % (f)) |
| 346 | | self.job_start_time = datetime.datetime.now() |
| 347 | | if self.log_level > 2: |
| 348 | | msg = 'File to execute: %s' % f |
| 349 | | log.msg(LOG_PREFIX % self.id + msg) |
| 350 | | |
| 351 | | d.callback(True) |
| 352 | | |
| 353 | | def reset_checker(self): |
| 354 | | """ |
| 355 | | Resets the output/result checker for the worker. |
| 356 | | |
| 357 | | """ |
| 358 | | |
| 359 | | if self.checker_task.running: |
| 360 | | self.checker_task.stop() |
| 361 | | self.checker_timeout = 1.0 |
| 362 | | self.checker_task = task.LoopingCall(self.check_work) |
| 363 | | |
| 364 | | def check_work(self): |
| 365 | | """ |
| 366 | | check_work periodically polls workers for new output. The period is |
| 367 | | determined by an exponential back off algorithm. |
| 368 | | |
| 369 | | This figures out whether or not there is anything new output that we |
| 370 | | should submit to the server. |
| 371 | | |
| 372 | | """ |
| 373 | | |
| 374 | | if self.sage == None: |
| 375 | | return |
| 376 | | if self.job == None or self.free == True: |
| 377 | | if self.checker_task.running: |
| 378 | | self.checker_task.stop() |
| 379 | | return |
| 380 | | if self.log_level > 1: |
| 381 | | msg = 'Checking job %s' % self.job.job_id |
| 382 | | log.msg(LOG_PREFIX % self.id + msg) |
| 383 | | os.chdir(self.tmp_job_dir) |
| 384 | | try: |
| 385 | | # foo, output, new = self.sage._so_far() |
| 386 | | # This sucks and is a very bad way to tell when a calculation is |
| 387 | | # finished |
| 388 | | done, new = self.sage._get() |
| 389 | | # If result.sobj exists, our calculation is done |
| 390 | | result = open('result.sobj', 'rb').read() |
| 391 | | done = True |
| 392 | | except RuntimeError, msg: # Error in calling worker.sage._so_far() |
| 393 | | done = False |
| 394 | | if self.log_level > 1: |
| 395 | | log.msg(LOG_PREFIX % self.id + 'RuntimeError: %s' % msg) |
| 396 | | log.msg("Don't worry, the RuntimeError above " + |
| 397 | | "is a non-fatal SAGE failure") |
| 398 | | self.increase_checker_task_timeout() |
| 399 | | return |
| 400 | | except IOError, msg: # File does not exist yet |
| 401 | | done = False |
| 402 | | |
| 403 | | if done: |
| 404 | | try: |
| 405 | | cpu_time = cPickle.loads(open('cpu_time.sobj', 'rb').read()) |
| 406 | | except IOError: |
| 407 | | cpu_time = -1 |
| 408 | | self.free = True |
| 409 | | self.reset_checker() |
| 410 | | else: |
| 411 | | result = cPickle.dumps('Job not done yet.', 2) |
| 412 | | cpu_time = None |
| 413 | | |
| 414 | | if self.check_failure(new): |
| 415 | | self.report_failure(new) |
| 416 | | self.restart() |
| 417 | | return |
| 418 | | |
| 419 | | sanitized_output = self.clean_output(new) |
| 420 | | if self.log_level > 3: |
| 421 | | print 'Output before sanitizing: \n' , sanitized_output |
| 422 | | if self.log_level > 3: |
| 423 | | print 'Output after sanitizing: \n', sanitized_output |
| 424 | | if sanitized_output == '' and not done: |
| 425 | | self.increase_checker_task_timeout() |
| 426 | | else: |
| 427 | | d = self.job_done(sanitized_output, result, done, cpu_time) |
| 428 | | d.addErrback(self._catch_failure) |
| 429 | | |
| 430 | | def report_failure(self, failure): |
| 431 | | """ |
| 432 | | Reports failure of a job. |
| 433 | | |
| 434 | | :type failure: twisted.python.failure |
| 435 | | :param failure: A twisted failure object |
| 436 | | |
| 437 | | """ |
| 438 | | |
| 439 | | msg = 'Job %s failed!' % (self.job.job_id) |
| 440 | | import shutil |
| 441 | | failed_dir = self.tmp_job_dir + '_failed' |
| 442 | | if os.path.exists(failed_dir): |
| 443 | | shutil.rmtree(failed_dir) |
| 444 | | shutil.move(self.tmp_job_dir, failed_dir) |
| 445 | | log.msg(LOG_PREFIX % self.id + msg) |
| 446 | | log.msg('Traceback: \n%s' % failure) |
| 447 | | d = self.remoteobj.callRemote('job_failed', self.job.job_id, failure) |
| 448 | | d.addErrback(self._catch_failure) |
| 449 | | |
| 450 | | return d |
| 451 | | |
| 452 | | def increase_checker_task_timeout(self): |
| 453 | | """ |
| 454 | | Quickly decreases the number of times a worker checks for output |
| 455 | | |
| 456 | | """ |
| 457 | | |
| 458 | | if self.checker_task.running: |
| 459 | | self.checker_task.stop() |
| 460 | | |
| 461 | | self.checker_timeout = self.checker_timeout * 1.5 |
| 462 | | if self.checker_timeout > 300.0: |
| 463 | | self.checker_timeout = 300.0 |
| 464 | | self.checker_task = task.LoopingCall(self.check_work) |
| 465 | | self.checker_task.start(self.checker_timeout, now=False) |
| 466 | | if self.log_level > 0: |
| 467 | | msg = 'Checking output again in %s' % self.checker_timeout |
| 468 | | log.msg(LOG_PREFIX % self.id + msg) |
| 469 | | |
| 470 | | def clean_output(self, sage_output): |
| 471 | | """ |
| 472 | | clean_output attempts to clean up the output string from sage. |
| 473 | | |
| 474 | | :type sage_output: string |
| 475 | | :param sage_output: sys.stdout output from the child sage instance |
| 476 | | |
| 477 | | """ |
| 478 | | |
| 479 | | begin = sage_output.find(START_MARKER) |
| 480 | | if begin != -1: |
| 481 | | self.got_output = True |
| 482 | | begin += len(START_MARKER) |
| 483 | | else: |
| 484 | | begin = 0 |
| 485 | | end = sage_output.find(END_MARKER) |
| 486 | | if end != -1: |
| 487 | | end -= 1 |
| 488 | | else: |
| 489 | | if not self.got_output: |
| 490 | | end = 0 |
| 491 | | else: |
| 492 | | end = len(sage_output) |
| 493 | | output = sage_output[begin:end] |
| 494 | | output = output.strip() |
| 495 | | output = output.replace('\r', '') |
| 496 | | |
| 497 | | if ('execfile' in output or 'load' in output) and self.got_output: |
| 498 | | output = '' |
| 499 | | |
| 500 | | return output |
| 501 | | |
| 502 | | def check_failure(self, sage_output): |
| 503 | | """ |
| 504 | | Checks for signs of exceptions or errors in the output. |
| 505 | | |
| 506 | | :type sage_output: string |
| 507 | | :param sage_output: output from the sage instance |
| 508 | | |
| 509 | | """ |
| 510 | | |
| 511 | | if sage_output == None: |
| 512 | | return False |
| 513 | | else: |
| 514 | | sage_output = ''.join(sage_output) |
| 515 | | |
| 516 | | if 'Traceback' in sage_output: |
| 517 | | return True |
| 518 | | elif 'Error' in sage_output: |
| 519 | | return True |
| 520 | | else: |
| 521 | | return False |
| 522 | | |
| 523 | | def kill_sage(self): |
| 524 | | """ |
| 525 | | Try to hard kill the SAGE instance. |
| 526 | | |
| 527 | | """ |
| 528 | | |
| 529 | | try: |
| 530 | | self.sage.quit() |
| 531 | | del self.sage |
| 532 | | except Exception, msg: |
| 533 | | pid = self.sage.pid() |
| 534 | | cmd = 'kill -9 %s' % pid |
| 535 | | os.system(cmd) |
| 536 | | log.msg(msg) |
| 537 | | |
| 538 | | def stop(self, hard_reset=False): |
| 539 | | """ |
| 540 | | Stops the current worker and resets it's internal state. |
| 541 | | |
| 542 | | :type hard_reset: boolean |
| 543 | | :param hard_reset: Specifies whether to kill -9 the sage instances |
| 544 | | |
| 545 | | """ |
| 546 | | |
| 547 | | # Set status to free and delete any current jobs we have |
| 548 | | self.free = True |
| 549 | | self.job = None |
| 550 | | |
| 551 | | if hard_reset: |
| 552 | | log.msg(LOG_PREFIX % self.id + 'Performing hard reset.') |
| 553 | | self.kill_sage() |
| 554 | | else: # try for a soft reset |
| 555 | | INTERRUPT_TRIES = 20 |
| 556 | | timeout = 0.3 |
| 557 | | e = self.sage._expect |
| 558 | | try: |
| 559 | | for i in range(INTERRUPT_TRIES): |
| 560 | | self.sage._expect.sendline('q') |
| 561 | | self.sage._expect.sendline(chr(3)) # send ctrl-c |
| 562 | | try: |
| 563 | | e.expect(self.sage._prompt, timeout=timeout) |
| 564 | | success = True |
| 565 | | break |
| 566 | | except (pexpect.TIMEOUT, pexpect.EOF), msg: |
| 567 | | success = False |
| 568 | | if self.log_level > 3: |
| 569 | | msg = 'Interrupting SAGE (try %s)' % i |
| 570 | | log.msg(LOG_PREFIX % self.id + msg) |
| 571 | | except Exception, msg: |
| 572 | | success = False |
| 573 | | log.msg(msg) |
| 574 | | log.msg(LOG_PREFIX % self.id + "Performing hard reset.") |
| 575 | | |
| 576 | | if not success: |
| 577 | | self.kill_sage() |
| 578 | | else: |
| 579 | | self.sage.reset() |
| 580 | | |
| 581 | | def start(self): |
| 582 | | """ |
| 583 | | Starts a new worker if it does not exist already. |
| 584 | | |
| 585 | | """ |
| 586 | | |
| 587 | | log.msg('[Worker %s] Started...' % (self.id)) |
| 588 | | if not hasattr(self, 'sage'): |
| 589 | | if self.log_level > 3: |
| 590 | | logfile = DSAGE_DIR + '/%s-pexpect.log' % self.id |
| 591 | | self.sage = Sage(maxread=1, logfile=logfile, python=True) |
| 592 | | else: |
| 593 | | self.sage = Sage(maxread=1, python=True) |
| 594 | | try: |
| 595 | | self.sage._start(block_during_init=True) |
| 596 | | except RuntimeError, msg: # Could not start SAGE |
| 597 | | print msg |
| 598 | | print 'Failed to start a worker, probably Expect issues.' |
| 599 | | reactor.stop() |
| 600 | | sys.exit(-1) |
| 601 | | E = self.sage.expect() |
| 602 | | E.sendline('\n') |
| 603 | | E.expect('>>>') |
| 604 | | cmd = 'from sage.all import *;' |
| 605 | | cmd += 'from sage.all_notebook import *;' |
| 606 | | cmd += 'import sage.server.support as _support_; ' |
| 607 | | cmd += 'import time;' |
| 608 | | cmd += 'import os;' |
| 609 | | E.sendline(cmd) |
| 610 | | |
| 611 | | if os.uname()[0].lower() == 'linux': |
| 612 | | try: |
| 613 | | self.base_mem = int(self.sage.get_memory_usage()) |
| 614 | | except: |
| 615 | | pass |
| 616 | | |
| 617 | | self.get_job() |
| 618 | | |
| 619 | | def restart(self): |
| 620 | | """ |
| 621 | | Restarts the current worker. |
| 622 | | |
| 623 | | """ |
| 624 | | |
| 625 | | log.msg('[Worker: %s] Restarting...' % (self.id)) |
| 626 | | |
| 627 | | if hasattr(self, 'base_mem'): |
| 628 | | try: |
| 629 | | cur_mem = int(self.sage.get_memory_usage()) |
| 630 | | except: |
| 631 | | cur_mem = 0 |
| 632 | | try: |
| 633 | | if hasattr(self, 'base_mem'): |
| 634 | | if cur_mem >= (2 * self.base_mem): |
| 635 | | self.stop(hard_reset=True) |
| 636 | | else: |
| 637 | | from sage.dsage.misc.misc import timedelta_to_seconds |
| 638 | | delta = datetime.datetime.now() - self.job_start_time |
| 639 | | secs = timedelta_to_seconds(delta) |
| 640 | | if secs >= (3*60): # more than 3 minutes, do a hard reset |
| 641 | | self.stop(hard_reset=True) |
| 642 | | else: |
| 643 | | self.stop(hard_reset=False) |
| 644 | | except TypeError: |
| 645 | | self.stop(hard_reset=True) |
| 646 | | self.job_start_time = None |
| 647 | | self.start() |
| 648 | | self.reset_checker() |
| 649 | | |
| 650 | | |
| 651 | | class Monitor(pb.Referenceable): |
| 652 | | """ |
| 653 | | Monitors control workers. |
| 654 | | They are able to shutdown workers and spawn them, as well as check on |
| 655 | | their status. |
| 656 | | |
| 657 | | """ |
| 658 | | |
| 659 | | def __init__(self, server='localhost', port=8081, username=getuser(), |
| 660 | | ssl=True, workers=2, authenticate=False, priority=20, |
| 661 | | poll=1.0, log_level=0, |
| 662 | | log_file=os.path.join(DSAGE_DIR, 'worker.log'), |
| 663 | | pubkey_file=None, privkey_file=None): |
| 664 | | """ |
| 665 | | :type server: string |
| 666 | | :param server: hostname of remote server |
| 667 | | |
| 668 | | :type port: integer |
| 669 | | :param port: port of remote server |
| 670 | | |
| 671 | | :type username: string |
| 672 | | :param username: username to use for authentication |
| 673 | | |
| 674 | | :type ssl: boolean |
| 675 | | :param ssl: specify whether or not to use SSL for the connection |
| 676 | | |
| 677 | | :type workers: integer |
| 678 | | :param workers: specifies how many workers to launch |
| 679 | | |
| 680 | | :type authenticate: boolean |
| 681 | | :param authenticate: specifies whether or not to authenticate |
| 682 | | |
| 683 | | :type priority: integer |
| 684 | | :param priority: specifies the UNIX priority of the workers |
| 685 | | |
| 686 | | :type poll: float |
| 687 | | :param poll: specifies how fast workers talk to the server in seconds |
| 688 | | |
| 689 | | :type log_level: integer |
| 690 | | :param log_level: specifies verbosity of logging, higher equals more |
| 691 | | |
| 692 | | :type log_file: string |
| 693 | | :param log_file: specifies the location of the log_file |
| 694 | | |
| 695 | | """ |
| 696 | | |
| 697 | | self.server = server |
| 698 | | self.port = port |
| 699 | | self.username = username |
| 700 | | self.ssl = ssl |
| 701 | | self.workers = workers |
| 702 | | self.authenticate = authenticate |
| 703 | | self.priority = priority |
| 704 | | self.poll_rate = poll |
| 705 | | self.log_level = log_level |
| 706 | | self.log_file = log_file |
| 707 | | self.pubkey_file = pubkey_file |
| 708 | | self.privkey_file = privkey_file |
| 709 | | |
| 710 | | self.remoteobj = None |
| 711 | | self.connected = False |
| 712 | | self.reconnecting = False |
| 713 | | self.worker_pool = None |
| 714 | | self.sleep_time = 1.0 |
| 715 | | |
| 716 | | self.host_info = HostInfo().host_info |
| 717 | | |
| 718 | | self.host_info['uuid'] = get_uuid() |
| 719 | | self.host_info['workers'] = self.workers |
| 720 | | self.host_info['username'] = self.username |
| 721 | | |
| 722 | | self._startLogging(self.log_file) |
| 723 | | |
| 724 | | try: |
| 725 | | os.nice(self.priority) |
| 726 | | except OSError, msg: |
| 727 | | log.msg('Error setting priority: %s' % (self.priority)) |
| 728 | | pass |
| 729 | | if self.authenticate: |
| 730 | | from twisted.cred import credentials |
| 731 | | from twisted.conch.ssh import keys |
| 732 | | self.DATA = random_str(500) |
| 733 | | # public key authentication information |
| 734 | | self.pubkey = keys.Key.fromFile(self.pubkey_file) |
| 735 | | # try getting the private key object without a passphrase first |
| 736 | | try: |
| 737 | | self.privkey = keys.Key.fromFile(self.privkey_file) |
| 738 | | except keys.BadKeyError: |
| 739 | | pphrase = self._getpassphrase() |
| 740 | | self.privkey = keys.Key.fromFile(self.privkey_file, |
| 741 | | passphrase=pphrase) |
| 742 | | self.algorithm = 'rsa' |
| 743 | | self.blob = self.pubkey.blob() |
| 744 | | self.data = self.DATA |
| 745 | | self.signature = self.privkey.sign(self.data) |
| 746 | | self.creds = credentials.SSHPrivateKey(self.username, |
| 747 | | self.algorithm, |
| 748 | | self.blob, |
| 749 | | self.data, |
| 750 | | self.signature) |
| 751 | | |
| 752 | | def _startLogging(self, log_file): |
| 753 | | """ |
| 754 | | :type log_file: string |
| 755 | | :param log_file: file name to log to |
| 756 | | |
| 757 | | """ |
| 758 | | |
| 759 | | if log_file == 'stdout': |
| 760 | | log.startLogging(sys.stdout) |
| 761 | | log.msg('WARNING: Only loggint to stdout!') |
| 762 | | else: |
| 763 | | worker_log = open(log_file, 'a') |
| 764 | | log.startLogging(sys.stdout) |
| 765 | | log.startLogging(worker_log) |
| 766 | | log.msg("Logging to file: ", log_file) |
| 767 | | |
| 768 | | def _getpassphrase(self): |
| 769 | | import getpass |
| 770 | | passphrase = getpass.getpass('Passphrase (Hit enter for None): ') |
| 771 | | |
| 772 | | return passphrase |
| 773 | | |
| 774 | | def _connected(self, remoteobj): |
| 775 | | """ |
| 776 | | Callback for connect. |
| 777 | | |
| 778 | | :type remoteobj: remote object |
| 779 | | :param remoteobj: remote obj |
| 780 | | |
| 781 | | """ |
| 782 | | |
| 783 | | self.remoteobj = remoteobj |
| 784 | | self.remoteobj.notifyOnDisconnect(self._disconnected) |
| 785 | | self.connected = True |
| 786 | | |
| 787 | | if self.worker_pool == None: # Only pool workers the first time |
| 788 | | self.pool_workers(self.remoteobj) |
| 789 | | else: |
| 790 | | for worker in self.worker_pool: |
| 791 | | worker.remoteobj = self.remoteobj # Update workers |
| 792 | | if worker.job == None: |
| 793 | | worker.restart() |
| 794 | | |
| 795 | | def _disconnected(self, remoteobj): |
| 796 | | """ |
| 797 | | :type remoteobj: remote object |
| 798 | | :param remoteobj: remote obj |
| 799 | | |
| 800 | | """ |
| 801 | | |
| 802 | | log.msg('Closed connection to the server.') |
| 803 | | self.connected = False |
| 804 | | |
| 805 | | def _got_killed_jobs(self, killed_jobs): |
| 806 | | """ |
| 807 | | Callback for check_killed_jobs. |
| 808 | | |
| 809 | | :type killed_jobs: dict |
| 810 | | :param killed_jobs: dict of job jdicts which were killed |
| 811 | | |
| 812 | | """ |
| 813 | | |
| 814 | | if killed_jobs == None: |
| 815 | | return |
| 816 | | killed_jobs = [expand_job(jdict) for jdict in killed_jobs] |
| 817 | | for worker in self.worker_pool: |
| 818 | | if worker.job is None: |
| 819 | | continue |
| 820 | | if worker.free: |
| 821 | | continue |
| 822 | | for job in killed_jobs: |
| 823 | | if job is None or worker.job is None: |
| 824 | | continue |
| 825 | | if worker.job.job_id == job.job_id: |
| 826 | | msg = 'Processing killed job, restarting...' |
| 827 | | log.msg(LOG_PREFIX % worker.id + msg) |
| 828 | | worker.restart() |
| 829 | | |
| 830 | | def _retryConnect(self): |
| 831 | | log.msg('[Monitor] Disconnected, reconnecting in %s' % (5.0)) |
| 832 | | if not self.connected: |
| 833 | | reactor.callLater(5.0, self.connect) |
| 834 | | |
| 835 | | def _catchConnectionFailure(self, failure): |
| 836 | | log.msg("Error: ", failure.getErrorMessage()) |
| 837 | | log.msg("Traceback: ", failure.printTraceback()) |
| 838 | | self._disconnected(None) |
| 839 | | |
| 840 | | def _catch_failure(self, failure): |
| 841 | | log.msg("Error: ", failure.getErrorMessage()) |
| 842 | | log.msg("Traceback: ", failure.printTraceback()) |
| 843 | | |
| 844 | | def connect(self): |
| 845 | | """ |
| 846 | | This method connects the monitor to a remote PB server. |
| 847 | | |
| 848 | | """ |
| 849 | | |
| 850 | | if self.connected: # Don't connect multiple times |
| 851 | | return |
| 852 | | |
| 853 | | self.factory = ClientFactory(self._login, (), {}) |
| 854 | | cred = None |
| 855 | | if self.ssl: |
| 856 | | cred = X509Credentials() |
| 857 | | reactor.connectTLS(self.server, self.port, self.factory, cred) |
| 858 | | else: |
| 859 | | reactor.connectTCP(self.server, self.port, self.factory) |
| 860 | | |
| 861 | | log.msg(DELIMITER) |
| 862 | | log.msg('DSAGE Worker') |
| 863 | | log.msg('Started with PID: %s' % (os.getpid())) |
| 864 | | log.msg('Connecting to %s:%s' % (self.server, self.port)) |
| 865 | | if cred is not None: |
| 866 | | log.msg('Using SSL: True') |
| 867 | | else: |
| 868 | | log.msg('Using SSL: False') |
| 869 | | log.msg(DELIMITER) |
| 870 | | |
| 871 | | def _login(self, *args, **kwargs): |
| 872 | | if self.authenticate: |
| 873 | | log.msg('Connecting as authenticated worker...\n') |
| 874 | | d = self.factory.login(self.creds, (self, self.host_info)) |
| 875 | | else: |
| 876 | | from twisted.cred.credentials import Anonymous |
| 877 | | log.msg('Connecting as unauthenticated worker...\n') |
| 878 | | d = self.factory.login(Anonymous(), (self, self.host_info)) |
| 879 | | d.addCallback(self._connected) |
| 880 | | d.addErrback(self._catchConnectionFailure) |
| 881 | | |
| 882 | | return d |
| 883 | | |
| 884 | | def pool_workers(self, remoteobj): |
| 885 | | """ |
| 886 | | Creates the worker pool. |
| 887 | | |
| 888 | | """ |
| 889 | | |
| 890 | | log.msg('[Monitor] Starting %s workers...' % (self.workers)) |
| 891 | | self.worker_pool = [Worker(remoteobj, x, self.log_level, |
| 892 | | self.poll_rate) |
| 893 | | for x in range(self.workers)] |
| 894 | | |
| 895 | | |
| 896 | | def remote_set_uuid(self, uuid): |
| 897 | | """ |
| 898 | | Sets the workers uuid. |
| 899 | | This is called by the server. |
| 900 | | |
| 901 | | """ |
| 902 | | |
| 903 | | from sage.dsage.misc.misc import set_uuid |
| 904 | | set_uuid(uuid) |
| 905 | | |
| 906 | | |
| 907 | | def remote_calc_score(self, script): |
| 908 | | """ |
| 909 | | Calculuates the worker score. |
| 910 | | |
| 911 | | :type script: string |
| 912 | | :param script: script to score the worker |
| 913 | | |
| 914 | | """ |
| 915 | | |
| 916 | | from sage.dsage.misc.misc import exec_wrs |
| 917 | | |
| 918 | | return exec_wrs(script) |
| 919 | | |
| 920 | | |
| 921 | | def remote_kill_job(self, job_id): |
| 922 | | """ |
| 923 | | Kills the job given the job id. |
| 924 | | |
| 925 | | :type job_id: string |
| 926 | | :param job_id: the unique job identifier. |
| 927 | | |
| 928 | | """ |
| 929 | | |
| 930 | | print 'Killing %s' % (job_id) |
| 931 | | for worker in self.worker_pool: |
| 932 | | if worker.job != None: |
| 933 | | if worker.job.job_id == job_id: |
| 934 | | worker.restart() |
| 935 | | |
| 936 | | |
| 937 | | def usage(): |
| 938 | | """ |
| 939 | | Prints usage help. |
| 940 | | |
| 941 | | """ |
| 942 | | |
| 943 | | from optparse import OptionParser |
| 944 | | |
| 945 | | usage = ['usage: %prog [options]\n', |
| 946 | | 'Bug reports to <yqiang@gmail.com>'] |
| 947 | | parser = OptionParser(usage=''.join(usage)) |
| 948 | | parser.add_option('-s', '--server', |
| 949 | | dest='server', |
| 950 | | default='localhost', |
| 951 | | help='hostname. Default is localhost') |
| 952 | | parser.add_option('-p', '--port', |
| 953 | | dest='port', |
| 954 | | type='int', |
| 955 | | default=8081, |
| 956 | | help='port to connect to. default=8081') |
| 957 | | parser.add_option('--poll', |
| 958 | | dest='poll', |
| 959 | | type='float', |
| 960 | | default=5.0, |
| 961 | | help='poll rate before checking for new job. default=5') |
| 962 | | parser.add_option('-a', '--authenticate', |
| 963 | | dest='authenticate', |
| 964 | | default=False, |
| 965 | | action='store_true', |
| 966 | | help='Connect as authenticate worker. default=True') |
| 967 | | parser.add_option('-f', '--logfile', |
| 968 | | dest='logfile', |
| 969 | | default=os.path.join(DSAGE_DIR, 'worker.log'), |
| 970 | | help='log file') |
| 971 | | parser.add_option('-l', '--loglevel', |
| 972 | | dest='loglevel', |
| 973 | | type='int', |
| 974 | | default=0, |
| 975 | | help='log level. default=0') |
| 976 | | parser.add_option('--ssl', |
| 977 | | dest='ssl', |
| 978 | | action='store_true', |
| 979 | | default=False, |
| 980 | | help='enable or disable ssl') |
| 981 | | parser.add_option('--privkey', |
| 982 | | dest='privkey_file', |
| 983 | | default=os.path.join(DSAGE_DIR, 'dsage_key'), |
| 984 | | help='private key file. default = ' + |
| 985 | | '~/.sage/dsage/dsage_key') |
| 986 | | parser.add_option('--pubkey', |
| 987 | | dest='pubkey_file', |
| 988 | | default=os.path.join(DSAGE_DIR, 'dsage_key.pub'), |
| 989 | | help='public key file. default = ' + |
| 990 | | '~/.sage/dsage/dsage_key.pub') |
| 991 | | parser.add_option('-w', '--workers', |
| 992 | | dest='workers', |
| 993 | | type='int', |
| 994 | | default=2, |
| 995 | | help='number of workers. default=2') |
| 996 | | parser.add_option('--priority', |
| 997 | | dest='priority', |
| 998 | | type='int', |
| 999 | | default=20, |
| 1000 | | help='priority of workers. default=20') |
| 1001 | | parser.add_option('-u', '--username', |
| 1002 | | dest='username', |
| 1003 | | default=getuser(), |
| 1004 | | help='username') |
| 1005 | | parser.add_option('--noblock', |
| 1006 | | dest='noblock', |
| 1007 | | action='store_true', |
| 1008 | | default=False, |
| 1009 | | help='tells that the server was ' + |
| 1010 | | 'started in blocking mode') |
| 1011 | | (options, args) = parser.parse_args() |
| 1012 | | |
| 1013 | | return options |
| 1014 | | |
| 1015 | | def main(): |
| 1016 | | options = usage() |
| 1017 | | SSL = options.ssl |
| 1018 | | monitor = Monitor(server=options.server, port=options.port, |
| 1019 | | username=options.username, ssl=SSL, |
| 1020 | | workers=options.workers, |
| 1021 | | authenticate=options.authenticate, |
| 1022 | | priority=options.priority, poll=options.poll, |
| 1023 | | log_file=options.logfile, |
| 1024 | | log_level=options.loglevel, |
| 1025 | | pubkey_file=options.pubkey_file, |
| 1026 | | privkey_file=options.privkey_file) |
| 1027 | | monitor.connect() |
| 1028 | | try: |
| 1029 | | if options.noblock: |
| 1030 | | reactor.run(installSignalHandlers=0) |
| 1031 | | else: |
| 1032 | | reactor.run(installSignalHandlers=1) |
| 1033 | | except: |
| 1034 | | log.msg('Error starting the twisted reactor, exiting...') |
| 1035 | | sys.exit() |
| 1036 | | |
| 1037 | | if __name__ == '__main__': |
| 1038 | | usage() |
| 1039 | | main() |