Ticket #3600: dsage_process_pool.patch
| File dsage_process_pool.patch, 168.1 KB (added by yi, 21 months ago) |
|---|
-
sage/dsage/database/client.py
# HG changeset patch # User Yi Qiang <yqiang@gmail.com> # Date 1215477042 25200 # Node ID 00c15d56813e4783a5cbedad933a13769dadd6c7 # Parent 9fe444b1d4ac3c714604409462eae7421ee44013 [mq]: dsage_process_pool.patch diff --git a/sage/dsage/database/client.py b/sage/dsage/database/client.py
a b 1 1 class Client(object): 2 2 """ 3 3 A client of the dsage server. 4 4 5 5 """ 6 6 7 7 def __init__(self, username, public_key): 8 8 """ 9 9 :type username: string 10 10 :param username: username 11 11 12 12 :type public_key: string 13 13 :param public_key: public key of user 14 14 15 15 """ 16 16 17 17 self.username = username 18 18 self.public_key = public_key 19 19 self.creation_time = None … … 21 21 self.last_login = None 22 22 self.connected = None 23 23 self.enabled = None 24 24 25 25 def get_username(self): 26 26 return self.username 27 27 28 28 def get_public_key(self): 29 29 return self.public_key 30 30 31 31 def is_connected(self): 32 32 return self.connected 33 33 34 34 def is_enabled(self): 35 return self.enabled 36 No newline at end of file 35 return self.enabled -
sage/dsage/database/job.py
diff --git a/sage/dsage/database/job.py b/sage/dsage/database/job.py
a b 1 1 ############################################################################ 2 # 3 # DSAGE: Distributed SAGE 4 # 5 # Copyright (C) 2006, 2007 Yi Qiang <yqiang@gmail.com> 6 # 7 # Distributed under the terms of the GNU General Public License (GPL) 2 # 3 # DSAGE: Distributed SAGE 4 # 5 # Copyright (C) 2006, 2007 Yi Qiang <yqiang@gmail.com> 6 # 7 # Distributed under the terms of the GNU General Public License (GPL) 8 8 # 9 9 # This code is distributed in the hope that it will be useful, 10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of … … 29 29 class Job(object): 30 30 """ 31 31 Defines a Job that gets distributed to clients. 32 32 33 33 """ 34 34 35 def __init__(self, job_id=None, name= 'Unamed', username=getuser(),35 def __init__(self, job_id=None, name=None, username=getuser(), 36 36 code='', timeout=0, kind='sage', priority=5): 37 37 """ 38 38 Represents a job. 39 39 40 40 :type job_id: string 41 41 :param job_id: unique identifier for a job 42 42 43 43 :type name: string 44 44 :param name: name given to a job, not unique 45 45 46 46 :type code: string 47 47 :param code: the code that needs to be executed 48 48 49 49 :type parent: string 50 :param parent: the job_id of another job 51 50 :param parent: the job_id of another job 51 52 52 :type username: string 53 53 :param username: username of person who created job 54 54 55 55 :type timeout: integer 56 56 :param timeout: upper bound for number of seconds this job takes 57 57 58 58 :type priority: integer 59 59 :param priority: a jobs priority from 1-5, 1 being the highest 60 60 61 61 :type kind: string 62 :param kind: kind of the job (file, string, generator) 62 :param kind: kind of the job (file, string, generator) 63 63 64 64 """ 65 65 66 66 self.job_id = job_id 67 self.name = name 67 if name is None: 68 self.name = 'Unamed' 69 else: 70 self.name = name 68 71 self.username = username 69 72 self.code = code 70 73 self.timeout = timeout … … 85 88 86 89 def __str__(self): 87 90 return "<Job('%s', %s)>" % (self.job_id, self.username) 88 91 89 92 def __repr__(self): 90 93 return self.__str__() 91 94 92 95 def attach(self, var, obj, file_name=None): 93 96 """ 94 97 Attaches an object to a job. 95 98 96 99 Parameters: 97 100 var -- the variable name you'd like the worker to use 98 101 obj -- the object you want to attach 99 102 filename -- optional, if your object is a saved sobj 100 103 101 104 """ 102 105 103 106 if file_name is not None: 104 107 try: 105 108 s = open(file_name, 'rb').read() … … 114 117 except cPickle.PicklingError: 115 118 print 'Unable to attach your object.' 116 119 self.data.append((var, s, 'object')) 117 120 118 121 def attach_file(self, file_name): 119 122 """ 120 123 Attach a file to a job. 121 124 122 125 Parameters: 123 126 file_name -- obvious 124 127 125 128 """ 126 129 127 130 f = open(file_name, 'rb').read() 128 131 f = zlib.compress(f) 129 132 130 133 # Strip out any hard coded path in the file name 131 134 file_name = os.path.split(file_name)[1] 132 135 self.data.append((file_name, f, 'file')) 133 136 134 137 def _reduce(self): 135 138 """ 136 139 Returns a _reduced form of Job.jdict to be sent over the network. 137 140 138 141 """ 139 142 140 143 # dump and compress the data of the job 141 144 jdict = copy.deepcopy(self.__dict__) 142 145 jdict['data'] = cPickle.dumps(self.data, 2) 143 146 jdict['data'] = zlib.compress(jdict['data']) 144 147 # We do not compress jdict['result'] since it's already compressed 145 jdict['result'] = self.result 146 148 jdict['result'] = self.result 149 147 150 # Remove attributes that sqlalchemy put there for us 148 151 for key in jdict.keys(): 149 152 if key.startswith('_'): 150 153 del jdict[key] 151 154 152 155 return jdict 156 153 157 154 158 def expand_job(jdict): 155 159 """ 156 160 This method recreates a Job object given a jdict. 157 161 158 162 :type jdict: dictionary 159 163 :param jdict: the job dictionary 160 161 164 """ 162 165 163 166 if jdict is None: 164 167 return None 165 168 166 169 job_id = jdict['job_id'] 167 170 job = Job(job_id=job_id) 168 171 169 172 # decompress and load data 170 173 try: 171 174 jdict['data'] = zlib.decompress(jdict['data']) 172 175 jdict['data'] = cPickle.loads(jdict['data']) 173 except (KeyError, TypeError): 176 except Exception, msg: 177 print Exception, msg 174 178 jdict['data'] = None 175 179 176 180 try: 177 181 jdict['result'] = zlib.decompress(jdict['result']) 178 182 jdict['result'] = cPickle.loads(jdict['result']) 179 except (KeyError, TypeError): 183 # except (KeyError, TypeError): 184 except: 180 185 jdict['result'] = None 181 186 182 187 for k, v in jdict.iteritems(): 183 188 setattr(job, k, v) 184 189 185 return job 186 No newline at end of file 190 return job -
sage/dsage/database/jobdb.py
diff --git a/sage/dsage/database/jobdb.py b/sage/dsage/database/jobdb.py
a b 93 93 """ 94 94 95 95 96 class JobDatabaseSA(object): 97 """ 98 I implement the JobDatabase using SQLAlchemy. 99 """ 96 100 97 class JobDatabaseSA(object):98 101 implements(IJobDatabase) 99 102 100 103 def __init__(self, Session): 101 104 self.sess = Session() 102 self.failure_threshold = 5105 self.failure_threshold = 3 103 106 104 107 def _shutdown(self): 105 108 self.sess.close() … … 137 140 def update_job(self, job): 138 141 """ 139 142 Takes a job object and updates it in the database. 140 141 143 """ 142 144 143 145 self.sess.save_or_update(job) 144 146 self.sess.commit() 145 147 146 148 def store_jdict(self, jdict): 149 """ 150 I store the jdict to the database. 151 """ 152 147 153 try: 148 154 job_id = jdict['job_id'] 149 assert job_id != None150 155 job = self.sess.query(Job).filter_by(job_id=job_id).first() 151 156 for k,v in jdict.iteritems(): 152 157 setattr(job, k, v) … … 328 333 329 334 Parameters: 330 335 jdict -- sage.dsage.database.Job.jdict 331 332 336 """ 333 337 334 338 try: 335 339 job_id = jdict['job_id'] 336 340 except KeyError, msg: … … 475 479 476 480 """ 477 481 478 return self._get_jobs_by_parameter('status', 'processing') 479 No newline at end of file 482 return self._get_jobs_by_parameter('status', 'processing') -
sage/dsage/database/worker.py
diff --git a/sage/dsage/database/worker.py b/sage/dsage/database/worker.py
a b 1 """ 2 Worker 3 """ 4 5 1 6 class Worker(object): 7 2 8 def __init__(self, host_info): 3 9 for k, v in host_info.iteritems(): 4 10 setattr(self, k, v) 5 6 No newline at end of file -
sage/dsage/database/workerdb.py
diff --git a/sage/dsage/database/workerdb.py b/sage/dsage/database/workerdb.py
a b 22 22 from sage.dsage.misc.constants import SERVER_LOG 23 23 from sage.dsage.database.worker import Worker 24 24 25 25 26 class WorkerDatabaseSA(object): 27 """ 28 I implement the WorkerDatabase using SQLAlchemy. 29 """ 30 26 31 def __init__(self, Session): 27 32 self.sess = Session() 28 33 self._set_initial_state() 29 34 30 35 def _set_initial_state(self): 31 36 w = self.sess.query(Worker).all() 32 37 for _w in w: 33 38 _w.connected = False 34 39 self.sess.save_or_update(_w) 35 40 self.sess.commit() 36 41 37 42 def set_authenticated(self, uuid, authenticated): 38 43 w = self.sess.query(Worker).filter_by(uuid=uuid).first() 39 44 w.authenticated = authenticated 40 45 self.sess.save_or_update(w) 41 46 self.sess.commit() 42 47 43 48 def set_busy(self, uuid, busy): 44 49 w = self.sess.query(Worker).filter_by(uuid=uuid).first() 45 50 w.busy = busy 46 51 self.sess.save_or_update(w) 47 52 self.sess.commit() 48 53 49 54 def add_worker(self, host_info): 50 55 w = Worker(host_info) 51 56 self.sess.save(w) 52 57 self.sess.commit() 53 58 54 59 def update_worker(self, host_info): 55 60 uuid = host_info['uuid'] 56 61 w = self.sess.query(Worker).filter_by(uuid=uuid).first() … … 58 63 setattr(w, k, v) 59 64 self.sess.save_or_update(w) 60 65 self.sess.commit() 61 66 62 67 def get_worker(self, uuid): 63 68 w = self.sess.query(Worker).filter_by(uuid=uuid).first() 64 69 65 70 return w 66 67 def get_worker_list(self ):68 w = self.sess.query(Worker). all()69 71 72 def get_worker_list(self, filter={}): 73 w = self.sess.query(Worker).filter_by(**filter).all() 74 70 75 return w 71 76 72 77 def get_worker_by_job_id(self, job_id): 73 78 w = self.sess.query(Worker).filter_by(job_id=job_id) 74 79 75 80 def get_online_workers(self): 76 81 w = self.sess.query(Worker).filter_by(connected=True).all() 82 83 return w 84 85 def get_avail_workers(self): 86 w = self.sess.query(Worker).filter_by(busy=False, connected=True).all() 77 87 78 88 return w 79 89 80 90 def get_worker_count(self, connected, busy): 81 q = self.sess.query(Worker).filter_by(connected=connected, busy=busy)91 q = self.sess.query(Worker).filter_by(connected=connected, busy=busy) 82 92 workers = q.all() 83 93 84 94 count = sum([w.workers for w in workers]) 85 95 86 96 return count 87 97 88 98 def get_cpu_speed(self, connected, busy): 89 99 w = self.sess.query(Worker).all() 90 100 91 101 return sum([_w.cpu_speed * _w.cpus for _w in w]) 92 102 93 103 def set_connected(self, uuid, connected): 94 104 w = self.sess.query(Worker).filter_by(uuid=uuid).first() 95 105 w.connected = connected 96 106 self.sess.save_or_update(w) 97 107 self.sess.commit() 98 108 99 109 100 110 class WorkerDatabase(object): 101 111 """ 102 This table keeps track of workers. 103 112 I implement the WorkerDatabase using raw sqlite. 104 113 """ 105 114 106 115 def __init__(self, db_conn, log_file=SERVER_LOG, log_level=0): 107 116 self.log_file = log_file 108 117 self.log_level = log_level 109 118 self.con = db_conn 110 119 self.tablename = 'monitors' 111 120 112 121 def _set_parameter(self, uuid, key, value): 113 122 query = """UPDATE monitors 114 123 SET %s=? … … 116 125 cur = self.con.cursor() 117 126 cur.execute(query, (value, uuid)) 118 127 self.con.commit() 119 128 120 129 def set_authenticated(self, uuid, authenticated): 121 130 return self._set_parameter(uuid, 'authenticated', authenticated) 122 131 123 132 def add_worker(self, host_info): 124 133 query = """INSERT INTO monitors 125 134 (uuid, … … 137 146 mem_free) 138 147 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 139 148 """ 140 149 141 150 uuid = host_info['uuid'] 142 151 username = host_info['username'] 143 152 hostname = host_info['hostname'] … … 151 160 cpu_model = host_info['cpu_model'] 152 161 mem_total = host_info['mem_total'] 153 162 mem_free = host_info['mem_free'] 154 163 155 164 cur = self.con.cursor() 156 165 cur.execute(query, (uuid, username, hostname, ip, workers, 157 166 sage_version, os_, kernel_version, cpus, 158 167 cpu_speed, cpu_model, mem_total, mem_free)) 159 168 self.con.commit() 160 169 161 170 def update_worker(self, host_info): 162 171 query = """UPDATE monitors 163 172 SET hostname = ?, username = ?, ip = ?, workers = ?, sage_version = ?, 164 173 os = ?, kernel_version = ?, cpus = ?, cpu_speed = ?, cpu_model = ?, 165 174 mem_total = ?, mem_free = ? WHERE uuid = ? 166 175 """ 167 176 168 177 uuid = host_info['uuid'] 169 178 username = host_info['username'] 170 179 hostname = host_info['hostname'] … … 178 187 cpu_model = host_info['cpu_model'] 179 188 mem_total = host_info['mem_total'] 180 189 mem_free = host_info['mem_free'] 181 190 182 191 cur = self.con.cursor() 183 192 cur.execute(query, (hostname, username, ip, workers, sage_version, 184 193 os_, kernel_version, cpus, cpu_speed, cpu_model, 185 194 mem_total, mem_free, uuid)) 186 195 187 196 def get_worker(self, uuid): 188 197 query = """SELECT 189 198 uuid, … … 195 204 os 196 205 FROM monitors 197 206 WHERE uuid = ?""" 198 207 199 208 cur = self.con.cursor() 200 cur.execute(query, (uuid, ))209 cur.execute(query, (uuid, )) 201 210 result = cur.fetchone() 202 211 if result is None: 203 212 return result … … 206 215 for k, v in monitor.iteritems(): 207 216 if k == 'authenticated': 208 217 monitor[k] = bool(v) 209 218 210 219 return monitor 211 220 212 221 def get_worker_list(self): 213 222 """ 214 223 Returns a list of connected monitors. 215 224 216 225 """ 217 226 218 227 query = """SELECT * FROM monitors""" 219 228 cur = self.con.cursor() 220 229 cur.execute(query) … … 226 235 # Convert from 1/0 to python bool 227 236 if k in ('authenticated', 'connected', 'busy'): 228 237 monitor[k] = bool(v) 229 238 230 239 return monitors 231 240 232 241 def set_connected(self, uuid, connected=True): 233 242 """ 234 243 Sets the connected status of a monitor. 235 244 236 245 Parameters: 237 246 uuid -- string 238 247 connected -- bool 239 248 240 249 """ 241 250 242 251 cur = self.con.cursor() 243 252 if connected: 244 253 query = """UPDATE monitors SET connected=1, last_connection=? … … 246 255 cur.execute(query, (datetime.datetime.now(), uuid)) 247 256 else: 248 257 query = """UPDATE monitors SET connected=0 WHERE uuid=?""" 249 cur.execute(query, (uuid, ))250 258 cur.execute(query, (uuid, )) 259 251 260 self.con.commit() 252 261 253 262 def is_connected(self, uuid): 254 263 """ 255 264 Returns whether the monitor is connected. 256 265 257 266 """ 258 267 259 268 query = """SELECT connected FROM monitors WHERE uuid = ?""" 260 269 cur = self.con.cursor() 261 cur.execute(query, (uuid, ))270 cur.execute(query, (uuid, )) 262 271 result = cur.fetchone()[0] 263 272 264 273 return result 265 274 266 275 def set_busy(self, uuid, busy): 267 276 """ 268 277 Sets whether or not a worker is doing a job. 269 278 270 279 """ 271 280 272 281 if busy: 273 282 query = """UPDATE monitors SET busy=1 WHERE uuid=?""" 274 283 else: 275 284 query = """UPDATE monitors SET busy=0 WHERE uuid=?""" 276 285 277 286 cur = self.con.cursor() 278 cur.execute(query, (uuid, ))287 cur.execute(query, (uuid, )) 279 288 self.con.commit() 280 289 281 290 def get_worker_count(self, connected, busy=False): 282 291 """ 283 292 Returns the number of workers. 284 293 285 294 Parameters: 286 295 connected -- bool 287 296 busy -- bool 288 297 289 298 """ 290 299 291 300 if connected and not busy: 292 301 query = """ 293 302 SELECT workers FROM monitors WHERE connected AND NOT busy … … 304 313 query = """ 305 314 SELECT workers FROM monitors WHERE NOT connected AND busy 306 315 """ 307 316 308 317 cur = self.con.cursor() 309 318 cur.execute(query) 310 319 311 320 result = cur.fetchall() 312 321 313 322 return sum(w[0] for w in result) 314 323 315 324 def get_cpu_speed(self, connected=True, busy=False): 316 325 """ 317 326 Returns the aggregate cpu speed in Mhz. 318 327 319 328 Parameters: 320 329 connected -- bool 321 330 322 331 """ 323 332 324 333 if connected and busy: 325 334 query = """SELECT cpu_speed, workers FROM monitors 326 335 WHERE connected AND busy""" … … 329 338 WHERE connected""" 330 339 else: 331 340 query = """SELECT cpu_speed, workers FROM monitors""" 332 341 333 342 cur = self.con.cursor() 334 343 cur.execute(query) 335 344 336 345 result = cur.fetchall() 337 346 338 347 cpu_speed = sum([s[0]*s[1] for s in result]) 339 348 340 349 return cpu_speed 341 350 342 351 def get_cpu_count(self, connected=True): 343 352 """ 344 353 Returns the number of cpus that are available. 345 354 346 355 Parameters: 347 356 connected -- bool 348 357 349 358 """ 350 359 351 360 if connected: 352 361 query = """SELECT workers, cpus FROM monitors WHERE connected""" 353 362 else: 354 363 query = """SELECT workers, cpus FROM monitors""" 355 364 356 365 cur = self.con.cursor() 357 366 cur.execute(query) 358 367 359 368 result = cur.fetchall() 360 369 361 370 cpu_count = sum(min(s[0:2]) for s in result) 362 363 return cpu_count 364 No newline at end of file 371 372 return cpu_count -
sage/dsage/dist_functions/dist_factor.py
diff --git a/sage/dsage/dist_functions/dist_factor.py b/sage/dsage/dist_functions/dist_factor.py
a b 16 16 Robert Bradshaw 17 17 Yi Qiang 18 18 19 20 sage: d = dsage.start_all(verbose=False, workers=4) # long time19 sage: from sage.dsage.misc.misc import test_dsage 20 sage: d = test_dsage(dsage) 21 21 Going into testing mode... 22 sage: sleep(5) # long time 23 sage: f = DistributedFactor(d, 2^125-1) # long time 24 sage: print f # long time 22 sage: f = DistributedFactor(d, 2^125-1) 23 sage: print f 25 24 Factoring "42535295865117307932921825928971026431" 26 25 Prime factors found so far: [31, 601, 1801] 27 sage: f.done # long time26 sage: f.done 28 27 False 29 sage: f.wait(timeout=60) # long time30 sage: f.done # long time28 sage: f.wait(timeout=60) 29 sage: f.done 31 30 True 32 sage: print f # long time31 sage: print f 33 32 Factoring "42535295865117307932921825928971026431" 34 33 Prime factors found so far: [31, 601, 1801, 269089806001, 4710883168879506001] 35 36 34 """ 37 35 38 36 def __init__(self, dsage, n, concurrent=10, B1=2000, curves=50, -
sage/dsage/dist_functions/dist_function.py
diff --git a/sage/dsage/dist_functions/dist_function.py b/sage/dsage/dist_functions/dist_function.py
a b 23 23 24 24 from sage.dsage.database.job import Job 25 25 from sage.dsage.interface.dsage_interface import (JobWrapper, 26 BlockingJobWrapper, 27 blockingCallFromThread) 26 BlockingJobWrapper) 27 28 from twisted.internet import reactor 29 from twisted.internet.threads import blockingCallFromThread 28 30 29 31 class DistributedFunction(object): 30 32 """ … … 112 114 Reloads a distributed job from disk. 113 115 114 116 """ 115 from twisted.internet import reactor117 116 118 from twisted.internet import task 117 119 if dsage.remoteobj is None: 118 120 # XXX This is a hack because dsage.remoteobj is not set yet … … 164 166 self.submit_job(job, job_name, async) 165 167 self.outstanding_jobs = [] 166 168 167 def wait(self, t imeout=None):169 def wait(self, t=0.5, timeout=None): 168 170 """ 169 171 Blocks until the job is completed. 170 172 173 t -- the time to wait before polling again. 171 174 """ 172 175 173 176 import signal 174 177 if timeout == None: 175 178 while not self.done: 176 time.sleep( 0.5)179 time.sleep(t) 177 180 else: 178 181 def handler(signum, frame): 179 182 raise RuntimeError('Maximum wait time exceeded.') 180 183 signal.signal(signal.SIGALRM, handler) 181 184 signal.alarm(timeout) 182 185 while not self.done: 183 time.sleep( 0.5)186 time.sleep(t) 184 187 signal.alarm(0) 185 188 186 def start(self ):189 def start(self, ctime=1.0): 187 190 """ 188 191 Starts the Distributed Function. It will submit all jobs in the 189 192 outstanding_jobs queue and also start a checker tasks that polls for … … 201 204 self.checker_task = blockingCallFromThread(self.reactor, 202 205 task.LoopingCall, 203 206 self.check_waiting_jobs) 204 self.reactor.callFromThread(self.checker_task.start, 5.0, now=True) 205 207 self.reactor.callFromThread(self.checker_task.start, ctime, now=True) 206 208 207 209 def process_result(self): 208 210 """ … … 216 218 217 219 def check_waiting_jobs(self): 218 220 """ 219 Checks the status of jobs in the waiting queue. 220 221 I check the status of jobs in the waiting queue. 221 222 """ 222 223 223 224 from twisted.internet import reactor 224 225 from twisted.spread import pb 226 225 227 for wrapped_job in self.waiting_jobs: 226 228 if wrapped_job.killed == True: 227 229 self.waiting_jobs.remove(wrapped_job) -
sage/dsage/dsage.py
diff --git a/sage/dsage/dsage.py b/sage/dsage/dsage.py
a b 35 35 SERVER_TAC, DSAGE_DB) 36 36 from sage.dsage.misc.config import check_dsage_dir 37 37 from sage.dsage.misc.misc import find_open_port 38 from sage.dsage.misc.misc import write_tac 38 39 import sage.plot.plot 40 39 41 40 42 def spawn(cmd, verbose=True, stdout=None, stdin=None): 41 43 """ 42 44 Spawns a process and registers it with the SAGE. 43 45 """ 44 46 45 47 null = open('/dev/null', 'a') 46 48 if stdout is None: 47 49 stdout = null 48 50 if stdin is None: 49 51 stdin = null 50 52 cmdl = cmd.split(' ') 51 process = subprocess.Popen(cmdl, shell=False, stdout=stdout, stdin= null)53 process = subprocess.Popen(cmdl, shell=False, stdout=stdout, stdin=stdin) 52 54 sage.interfaces.cleaner.cleaner(process.pid, cmd) 53 55 if verbose: 54 56 print 'Spawned %s (pid = %s)\n' % (' '.join(cmdl), process.pid) 55 57 56 58 return process 57 59 58 60 59 61 class DistributedSage(object): 60 62 r""" 61 63 Distributed SAGE allows you to do distributed computing in SAGE. 62 64 63 65 To get up and running quickly, run dsage.setup() to run the 64 66 configuration utility. 65 67 66 68 Note that configuration files will be stored in the 67 69 directory \code{\$DOT\_SAGE/dsage}. 68 70 69 71 QUICK-START 70 72 71 73 1. Launch sage … … 92 94 \code{sage: j} 93 95 \code{4} 94 96 """ 95 96 def start_all(self, port=None, workers=2, log_level=0, poll=1.0, 97 authenticate=False, failure_threshold=3, 98 verbose=True, testing=False): 97 98 def start_all(self, port=None, workers=2, log_level=0, authenticate=False, 99 failure_threshold=3, verbose=True, testing=False): 99 100 """ 100 101 Start the server and worker and returns a connection to the server. 101 102 102 103 """ 103 104 104 105 from sage.dsage.interface.dsage_interface import BlockingDSage 105 106 from sage.dsage.misc.misc import find_open_port 106 107 107 108 if port is None: 108 109 port = find_open_port().next() 109 110 110 111 if testing or sage.plot.plot.DOCTEST_MODE: 111 112 test_db = tempfile.NamedTemporaryFile() 112 113 testing = True … … 122 123 log_level=5, 123 124 ssl=False, 124 125 blocking=False, 125 poll=0.1,126 126 authenticate=authenticate, 127 127 verbose=False) 128 128 else: … … 131 131 blocking=False, 132 132 failure_threshold=failure_threshold, 133 133 verbose=verbose) 134 134 135 135 self.worker(port=port, 136 136 workers=workers, 137 137 log_level=log_level, 138 138 blocking=False, 139 poll=poll,140 139 authenticate=authenticate, 141 140 verbose=verbose) 142 141 143 142 # We want to establish a connection to the server 144 143 tries = 10 145 144 while(tries > 0): … … 158 157 print 'Could not connect to the server.' 159 158 print 'Error msg from last attempt: %s' % (msg) 160 159 return 161 160 162 161 if testing or sage.plot.plot.DOCTEST_MODE: 163 162 d = BlockingDSage(server='localhost', port=port, testing=testing, 164 163 ssl=False) 165 164 else: 166 165 d = BlockingDSage(server='localhost', port=port) 167 166 168 167 return d 169 168 170 169 def kill_all(self): 171 170 """ 172 171 Kills the server and worker. 173 172 174 173 """ 175 174 176 175 self.kill_worker() 177 176 self.kill_server() 178 177 179 178 def kill_worker(self): 180 179 try: 181 180 os.kill(self.worker_proc.pid, 9) … … 183 182 del self.worker_proc 184 183 except OSError, msg: 185 184 print 'Error killing worker: %s' % msg 186 185 187 186 def kill_server(self): 188 187 try: 189 188 os.kill(self.server_proc.pid, 9) … … 191 190 del self.server_proc 192 191 except OSError, msg: 193 192 print 'Error killing server: %s' % msg 194 193 194 195 195 def server(self, blocking=True, port=None, log_level=0, ssl=True, 196 196 db_file=DSAGE_DB, 197 197 log_file=SERVER_LOG, … … 201 201 verbose=True, testing=False, profile=False): 202 202 r""" 203 203 Run the Distributed SAGE server. 204 204 205 205 Doing \code{dsage.server()} will spawn a server process which 206 206 listens by default on port 8081. 207 207 """ 208 208 209 open_ports = find_open_port() 209 210 check_dsage_dir() 210 211 cwd = os.getcwd() 211 212 pid_file = 'server.pid' 212 213 def write_tac(tac):214 os.chdir(DSAGE_DIR)215 f = open('dsage_server.tac', 'w')216 f.writelines(tac)217 f.close()218 213 219 214 if testing or sage.plot.plot.DOCTEST_MODE: 220 215 test_db = tempfile.NamedTemporaryFile() … … 227 222 except: 228 223 pass 229 224 db_file = test_db.name 230 225 231 226 if port != None: 232 227 server_port = port 233 228 else: 234 229 server_port = open_ports.next() 235 230 open_ports.next() 236 231 237 232 tac = SERVER_TAC % (db_file, failure_threshold, ssl, log_level, 238 233 log_file, privkey, cert, server_port, testing) 239 write_tac(tac )240 234 write_tac(tac, 'dsage_server.tac') 235 241 236 cmd = 'twistd -d %s --pidfile=%s ' % (DSAGE_DIR, pid_file) 242 237 if profile: 243 238 if verbose: 244 239 print 'Launched with profiling enabled...' 245 cmd += '-- nothotshot --profile=dsage_server.profile --savestats '240 cmd += '--profile=dsage_server.profile --savestats ' 246 241 if blocking: 247 242 cmd += '--nodaemon -y dsage_server.tac' 248 243 cmd += ' | tee -a %s' % (log_file) … … 265 260 time.sleep(0.1) 266 261 continue 267 262 os.chdir(cwd) 268 269 270 def worker(self, server='localhost', port=8081, workers=2, poll=1.0, 263 264 def worker(self, server='localhost', port=8081, workers=2, 271 265 username=getuser(), blocking=True, ssl=True, log_level=0, 272 266 authenticate=True, priority=20, 273 267 privkey=os.path.join(DSAGE_DIR, 'dsage_key'), 274 268 pubkey=os.path.join(DSAGE_DIR, 'dsage_key.pub'), 275 log_file=WORKER_LOG, 276 verbose=True): 269 log_file=WORKER_LOG, verbose=True, profile=False): 277 270 r""" 278 Run the Distributed S AGEworker.279 271 Run the Distributed Sage worker. 272 280 273 Typing \code{sage.worker()} will launch a worker which by 281 274 default connects to localhost on port 8081 to fetch jobs. 282 275 """ 283 276 277 from sage.dsage.worker.monitor import MONITOR_TAC as tac 284 278 check_dsage_dir() 285 cmd = ('dsage_worker.py -s %s -p %s -u %s -w %s --poll %s -l %s -f %s ' 286 + '--privkey=%s --pubkey=%s --priority=%s') 287 cmd = cmd % (server, port, username, workers, poll, log_level, 288 log_file, privkey, pubkey, priority) 289 if ssl: 290 cmd += ' --ssl' 291 if authenticate: 292 cmd += ' -a' 293 if not blocking: 294 cmd += ' --noblock' 295 cmd = 'python ' + SAGE_ROOT + '/local/bin/' + cmd 279 cwd = os.getcwd() 280 pid_file = 'worker.pid' 281 tac = tac % (server, port, workers, username, ssl, authenticate, 282 priority, log_level, log_file, privkey, pubkey) 283 write_tac(tac, 'dsage_worker.tac') 284 cmd = 'twistd -d %s --pidfile=%s ' % (DSAGE_DIR, pid_file) 285 if profile: 286 cmd += '--profile=dsage_worker.profile --savestats ' 287 if blocking: 288 cmd += '--nodaemon -y dsage_worker.tac' 289 cmd += ' | tee -a %s' % (log_file) 290 os.system(cmd) 291 else: 292 try: 293 os.remove(pid_file) 294 except: 295 pass 296 cmd += '--logfile=%s -y dsage_worker.tac' % (log_file) 296 297 self.worker_proc = spawn(cmd, verbose=verbose) 297 else: 298 cmd = 'python ' + SAGE_ROOT + '/local/bin/' + cmd 299 os.system(cmd) 300 298 while True: 299 try: 300 pid = int(open(pid_file).read()) 301 sage.interfaces.cleaner.cleaner(pid, cmd) 302 break 303 except: 304 time.sleep(0.1) 305 continue 306 os.chdir(cwd) 301 307 302 308 def setup(self, template=None): 303 309 r""" 304 310 This is the setup utility which helps you configure dsage. 305 311 306 312 Type \code{dsage.setup()} to run the configuration for the server, 307 313 worker and client. Alternatively, if you want to run the 308 314 configuration for just one parts, you can launch 309 315 \code{dsage.setup_server()}, \code{dsage.setup\_worker()} 310 316 or \code{dsage.setup()}. 311 317 312 318 """ 313 319 314 320 from sage.dsage.scripts.dsage_setup import setup 315 321 setup(template=template) 316 317 322 318 323 def setup_server(self, *args): 319 324 """ 320 325 This method runs the configuration utility for the server. 321 326 322 327 """ 323 328 324 329 from sage.dsage.scripts.dsage_setup import setup_server 325 330 setup_server(*args) 326 327 331 328 332 def setup_worker(self): 329 333 """ 330 334 This method runs the configuration utility for the worker. 331 335 332 336 """ 333 337 334 338 from sage.dsage.scripts.dsage_setup import setup_worker 335 339 setup_worker() 336 337 340 338 341 def setup_client(self): 339 342 """ 340 343 This method runs the configuration utility for the client. 341 344 342 345 """ 343 346 344 347 from sage.dsage.scripts.dsage_setup import setup_client 345 348 setup_client() -
sage/dsage/interface/dsage_interface.py
diff --git a/sage/dsage/interface/dsage_interface.py b/sage/dsage/interface/dsage_interface.py
a b 26 26 import time 27 27 from getpass import getuser 28 28 29 # This is a version of blockingCallFromThread that delays importing 30 # twisted.internet until it is first used. 31 def blockingCallFromThread(*args, **kwds): 32 from twisted.internet.threads import blockingCallFromThread 33 return blockingCallFromThread(*args, **kwds) 29 from twisted.cred.credentials import Anonymous 30 from twisted.internet.threads import blockingCallFromThread 31 from twisted.internet import reactor 34 32 35 33 from sage.dsage.database.job import Job, expand_job 36 34 from sage.dsage.misc.misc import random_str 37 35 from sage.dsage.misc.constants import DSAGE_DIR 38 36 37 39 38 class DSageThread(threading.Thread): 40 39 """ 41 40 DSage thread 42 41 43 42 """ 44 43 45 44 def run(self): 46 from twisted.internet import reactor47 45 if not reactor.running: 48 46 try: 49 47 reactor.run(installSignalHandlers=0) … … 66 64 log_level -- int (Default: 0) 67 65 ssl -- int (Default: 1) 68 66 """ 69 67 70 68 def __init__(self, server='localhost', port=8081, 71 69 username=getuser(), 72 70 pubkey_file=os.path.join(DSAGE_DIR, 'dsage_key.pub'), 73 71 privkey_file=os.path.join(DSAGE_DIR, 'dsage_key'), 74 72 log_level=0, ssl=True, testing=False): 75 73 76 74 from twisted.cred import credentials 77 75 from twisted.conch.ssh import keys 78 76 from twisted.spread import banana 79 77 banana.SIZE_LIMIT = 100*1024*1024 # 100 MegaBytes 80 78 81 79 self.server = server 82 80 self.port = port 83 81 self.username = username … … 90 88 self.result = None 91 89 self.info_str = 'Connected to: %s:%s' 92 90 self._testing = testing 93 91 94 92 if not self._testing: 95 93 self._pubkey = keys.Key.fromFile(self._pubkey_file) 96 94 try: … … 110 108 else: 111 109 self.username = 'tester' 112 110 self.connect() 113 114 111 115 112 def __repr__(self): 116 113 return self.__str__() 117 114 118 115 def __str__(self): 119 116 if self.is_connected(): 120 117 return self.info_str % (self.server, self.port) 121 118 else: 122 119 return 'Not connected.' 123 120 124 121 def __call__(self, cmd, user_vars=None, load_files=[], job_name=None): 125 122 cmd = ['ans = %s\n' % (cmd), 126 123 'print ans\n', 127 124 "DSAGE_RESULT = ans\n"] 128 125 129 126 return self.eval(''.join(cmd), user_vars=user_vars, 130 127 load_files=load_files, 131 128 job_name=job_name) 132 129 133 130 def __getstate__(self): 134 131 d = copy.copy(self.__dict__) 135 132 d['remoteobj'] = None 136 133 137 134 return d 138 135 139 136 def _getpassphrase(self): 140 137 import getpass 141 138 passphrase = getpass.getpass('Passphrase (Hit enter for None): ') 142 139 143 140 return passphrase 144 141 145 142 def _catch_failure(self, failure): 146 143 print "Error connecting: %s" % failure.getErrorMessage() 147 144 148 145 def _connected(self, remoteobj): 149 146 if self._log_level > 0: 150 147 print 'Connected to remote server.\r' 151 148 self._remoteobj = remoteobj 152 149 self._remoteobj.notifyOnDisconnect(self._disconnected) 153 150 154 151 def _disconnected(self, remoteobj): 155 152 print '[DSage] Closed connection to %s' % (self.server) 156 153 self.info_str = 'Not connected.' 157 154 158 155 def _got_my_jobs(self, jobs, job_name): 159 156 from sage.dsage.errors.exceptions import NoJobException 160 157 if jobs == None: … … 162 159 if job_name: 163 160 return [JobWrapper(self._remoteobj, job) 164 161 for job in jobs if job.name == job_name] 165 162 166 163 def _killed_job(self, job_id): 167 164 pass 168 165 169 166 def restore(self, remoteobj): 170 167 """ 171 168 This method restores a connection to the server. 172 169 173 170 """ 174 171 175 172 self._remoteobj = remoteobj 176 173 177 174 def connect(self): 178 175 """ 179 176 This methods establishes the conection to the remote server. 180 177 181 178 """ 182 179 183 180 from twisted.internet import reactor 184 181 from sage.dsage.twisted.pb import ClientFactory 185 182 factory = ClientFactory(self._login, (), {}) 186 183 factory.continueTrying = False # Do not attempt to reconnect 187 184 188 185 if self.ssl == 1: 189 186 # Old, uses OpenSSL, SAGE uses GNUTLS now 190 187 # from twisted.internet import ssl … … 198 195 reactor.connectTLS(self.server, self.port, factory, cred) 199 196 else: 200 197 reactor.connectTCP(self.server, self.port, factory) 201 198 202 199 def _login(self, *args, **kwargs): 203 from twisted.cred.credentials import Anonymous204 200 if self._testing: 205 201 d = self.factory.login(Anonymous(), None) 206 202 else: 207 203 d = self.factory.login(self._creds, None) 208 204 d.addCallback(self._connected) 209 205 d.addErrback(self._catch_failure) 210 206 211 207 return d 212 208 213 209 def disconnect(self): 214 210 print 'Disconnecting from server.' 215 self._remoteobj = None 216 211 t = self._remoteobj.broker.transport 212 d = blockingCallFromThread(reactor, t.loseConnection) 213 217 214 def eval(self, cmd, timeout=0, user_vars=None, job_name=None): 218 215 """ 219 216 eval evaluates a command 220 217 221 218 Parameters: 222 219 cmd -- the sage command to be evaluated (str) 223 220 globals -- a dict (see help for python's eval method) 224 221 job_name -- an alphanumeric job name 225 222 226 223 """ 227 224 228 225 self.is_connected() 229 226 if not job_name or not isinstance(job_name, str): 230 227 job_name = 'default job' 231 228 232 229 kind = 'sage' 233 230 234 231 # We have to convert timeout to a python int so it will not cause 235 232 # security exceptions with twisted. 236 237 job = Job(job_id=None, code=cmd, name=job_name, 233 234 job = Job(job_id=None, code=cmd, name=job_name, 238 235 username=self.username, timeout=timeout, kind=kind) 239 236 240 237 wrapped_job = JobWrapper(self._remoteobj, job) 241 238 if user_vars is not None: 242 239 for k, v in user_vars.iteritems(): 243 240 job.attach(k, v) 244 241 245 242 return wrapped_job 246 243 247 244 def eval_file(self, fname, job_name, async=False): 248 245 """ 249 246 eval_file allows you to evaluate the contents of an entire file. 250 247 251 248 Parameters: 252 249 fname -- file name of the file you wish to evaluate 253 250 254 251 """ 255 252 256 253 self.is_connected() 257 254 258 255 kind = 'file' 259 256 cmd = open(fname).read() 260 257 job = Job(job_id=None, code=cmd, name=job_name, 261 258 username=self.username, kind=kind) 262 259 263 260 if async: 264 261 wrapped_job = JobWrapper(self._remoteobj, job) 265 262 else: 266 263 wrapped_job = BlockingJobWrapper(self._remoteobj, job) 267 264 268 265 return wrapped_job 269 266 270 267 def send_job(self, job): 271 268 """ 272 269 Sends a Job object to the server. 273 270 274 271 """ 275 272 276 273 if not isinstance(job, Job): 277 274 raise TypeError 278 275 wrapped_job = JobWrapper(self._remoteobj, job) 279 276 return wrapped_job 280 277 281 278 def _got_job_id(self, id_, job): 282 279 job.job_id = id_ 283 280 job.username = self.username … … 285 282 d = self._remoteobj.callRemote('submit_job', pickled_job) 286 283 d.addErrback(self._catch_failure) 287 284 # d.addCallback(self._submitted, job) 288 285 289 286 return JobWrapper(self._remoteobj, job) 290 287 291 288 def eval_dir(self, dir_, job_name): 292 289 from twisted.internet import defer 293 290 self.is_connected() … … 303 300 deferreds.append(d) 304 301 d_list = defer.DeferredList(deferreds) 305 302 return d_list 306 303 307 304 def kill(self, job_id, async=False): 308 305 """ 309 306 Kills a job given the job id. 310 307 311 308 Parameters: 312 309 job_id -- job id 313 310 314 311 """ 315 312 316 313 if async: 317 314 d = self._remoteobj.callRemote('kill_job', job_id) 318 315 d.addCallback(self._killed_job) … … 321 318 job_id = blockingCallFromThread(self._remoteobj.callRemote, 322 319 'kill_job', 323 320 job_id) 324 325 321 326 322 def get_my_jobs(self, is_active=False, job_name=None): 327 323 """ 328 324 This method returns a list of jobs that belong to you. 329 325 330 326 Parameters: 331 327 is_active -- set to true to get only active jobs (bool) 332 328 333 329 Use this method if you get disconnected from the server and wish to 334 330 retrieve your old jobs back. 335 331 336 332 """ 337 333 338 334 self.is_connected() 339 335 340 336 d = self._remoteobj.callRemote('get_jobs_by_username', 341 337 self.username, 342 338 is_active, 343 339 job_name) 344 340 d.addCallback(self._got_my_jobs, job_name) 345 341 d.addErrback(self._catch_failure) 346 342 347 343 return d 348 344 349 345 def cluster_speed(self): 350 346 """ 351 347 Returns the speed of the cluster. 352 348 353 349 """ 354 350 355 351 self.is_connected() 356 352 357 353 return self._remoteobj.callRemote('get_cluster_speed') 358 359 def is_connected(self): 360 if self._remoteobj == None: 361 return False 362 if self._remoteobj.broker.disconnected: 363 raise False 364 return True 354 355 def is_connected(self): 356 return self._remoteobj or self._remoteobj.broker.disconnected 365 357 366 358 367 359 class BlockingDSage(DSage): 368 360 """ 369 361 This is the blocking version of the DSage interface. 370 371 362 """ 372 def __init__(self, server='localhost', port=8081, 373 username=getuser(),363 364 def __init__(self, server='localhost', port=8081, username=getuser(), 374 365 pubkey_file=os.path.join(DSAGE_DIR, 'dsage_key.pub'), 375 366 privkey_file=os.path.join(DSAGE_DIR, 'dsage_key'), 376 367 log_level=0, ssl=True, testing=False): … … 380 371 DSage.__init__(self, server=server, port=port, username=username, 381 372 pubkey_file=pubkey_file, privkey_file=privkey_file, 382 373 log_level=log_level, ssl=ssl, testing=testing) 383 384 374 385 375 def connect(self): 386 376 """ 387 377 This methods establishes the conection to the remote server. 388 378 389 379 """ 390 380 391 381 from twisted.internet import reactor 392 382 from sage.dsage.twisted.pb import ClientFactory 393 383 394 384 self.factory = ClientFactory(self._login, (), {}) 395 385 self.factory.continueTrying = False 396 386 397 387 if self.ssl: 398 388 from gnutls.interfaces.twisted import X509Credentials 399 389 cred = X509Credentials() 400 blockingCallFromThread(reactor, reactor.connectTLS, self.server, 390 blockingCallFromThread(reactor, reactor.connectTLS, self.server, 401 391 self.port, self.factory, cred) 402 392 else: 403 393 blockingCallFromThread(reactor, reactor.connectTCP, self.server, 404 394 self.port, self.factory) 405 395 406 396 def _login(self, *args, **kwargs): 407 from twisted.cred.credentials import Anonymous408 397 if self._testing: 409 398 d = self.factory.login(Anonymous(), None) 410 399 else: 411 400 d = self.factory.login(self._creds, None) 412 401 d.addCallback(self._connected) 413 402 d.addErrback(self._catch_failure) 414 403 415 404 return d 416 405 417 406 def job_results_iter(self, jobs): 418 407 """ 419 408 Returns an iterator that yields results of jobs as they come in. 420 421 INPUT: 409 410 INPUT: 422 411 jobs -- a list of tuples (x, j) where x is (args, kwds) and j is 423 412 a job object 424 413 425 414 OUTPUT: 426 415 (x, job) 427 416 428 417 """ 429 418 430 419 import time 431 420 out_list = [] 432 421 433 422 while len(out_list) != len(jobs): 434 for x, j in jobs:423 for x, j in jobs: 435 424 if j not in out_list: 436 425 j.get_job() 437 426 if j.status in ('completed', 'killed'): 438 427 out_list.append(j) 439 428 yield (x, j) 440 429 time.sleep(0.2) 441 430 442 431 def block_on_jobs(self, jobs): 443 432 """ 444 433 Blocks on a list of jobs until all the jobs are completed. 445 434 446 435 INPUT: 447 436 jobs -- a list of jobs which are not completed 448 437 449 438 OUTPUT: 450 439 jobs -- a list of completed jobs 451 440 452 441 EXAMPLE: 453 442 sage: from sage.dsage.misc.misc import find_open_port 454 443 sage: port = find_open_port().next() … … 462 451 True 463 452 sage: def f(n): 464 453 ... return n*n 465 ... 454 ... 466 455 sage: j = d.block_on_jobs(d.map(f, [25,12,25,32,12])) 467 sage: j # random456 sage: j 468 457 [625, 144, 625, 1024, 144] 469 458 """ 470 459 471 460 out_list = [] 472 461 473 462 while len(out_list) != len(jobs): 474 463 for j in jobs: 475 464 if j not in out_list: … … 477 466 if j.status in ('completed', 'killed'): 478 467 out_list.append(j) 479 468 return out_list 480 469 481 470 def map(self, f, *args): 482 471 """ 483 472 Apply function to every item of iterable and return a list of the 484 473 results. If additional iterable arguments are passed, function must 485 474 take that many arguments and is applied to the items from all 486 475 iterables in parallel. 487 476 488 477 INPUT: 489 478 f -- a function 490 479 *args -- iterables containing the parameters to the function 491 480 492 481 EXAMPLE: 493 482 sage: from sage.dsage.misc.misc import find_open_port 494 483 sage: port = find_open_port().next() … … 502 491 True 503 492 sage: def f(n): 504 493 ... return n*n 505 ... 494 ... 506 495 sage: j = d.map(f, [25,12,25,32,12]) 507 496 sage: j 508 497 [No output yet., … … 513 502 """ 514 503 515 504 from itertools import izip 516 517 jobs = [self.eval_function(f, (a, {}), job_name=f.__name__) 505 506 jobs = [self.eval_function(f, (a, {}), job_name=f.__name__) 518 507 for a in izip(*args)] 519 508 520 509 return jobs 521 522 def parallel_iter(self, f, inputs): 510 511 def parallel_iter(self, f, inputs): 523 512 """ 524 513 dsage parallel iterator implementation. 525 514 … … 532 521 OUTPUT: 533 522 iterator over 2-tuples (inputs[i], f(inputs[i])), 534 523 where the order may be completely random 535 524 536 525 EXAMPLE: 537 526 sage: from sage.dsage.misc.misc import find_open_port 538 527 sage: port = find_open_port().next() … … 549 538 ... def f(n,m): 550 539 ... return n+m 551 540 ... 552 sage: list(f([(1,2), (5, 10/3)])) # random553 [(( (5, 10/3), {}), 25/3), (((1, 2), {}),3)]541 sage: f([(1,2), (5, 10/3)]) 542 [((1, 2), 3), ((5, 10/3), 25/3)] 554 543 """ 555 544 556 545 jobs = [] 557 546 for x in inputs: 558 547 job = self.eval_function(f, x, job_name=f.__name__) 559 548 jobs.append((x, job)) 560 561 return self.job_results_iter(jobs) 562 549 550 return self.job_results_iter(jobs) 551 563 552 def eval_function(self, f, arguments, job_name=None): 564 553 """ 565 554 Takes a function and it's arguments, pickles it, and creates a job 566 555 which executes the function with the arguments. 567 556 568 557 INPUT: 569 558 f -- function 570 559 arguments -- tuple(tuple, dict) --> *args, **kwds 571 560 572 561 OUTPUT: 573 562 job wrapper representing the function evaluated at input. 574 563 575 564 EXAMPLE: 576 565 sage: from sage.dsage.misc.misc import find_open_port 577 566 sage: port = find_open_port().next() … … 590 579 sage: j.wait() 591 580 sage: j 592 581 625 593 582 594 583 """ 595 584 596 585 from sage.misc.fpickle import pickle_function … … 607 596 job.attach('args', arguments[0]) 608 597 job.attach('kwds', arguments[1]) 609 598 wrapped_job = BlockingJobWrapper(self._remoteobj, job) 610 599 611 600 return wrapped_job 612 601 613 602 def eval(self, cmd, user_vars=None, job_name=None, timeout=600, 614 603 load_files=[], priority=5, async=False): 615 604 """ 616 605 eval evaluates a command 617 606 618 607 Parameters: 619 608 cmd -- the sage command to be evaluated (str) 620 609 user_vars -- a dict of predefined variables you want to use. … … 624 613 load_files -- list of files to load before executing the job 625 614 priority -- priority of the job created (0-5) 626 615 async -- whether to use the async implementation of the method 627 616 628 617 """ 629 630 self.is_connected() 618 619 self.is_connected() 631 620 kind = 'sage' 632 633 job = Job(job_id=None, code=cmd, name=job_name, 621 622 job = Job(job_id=None, code=cmd, name=job_name, 634 623 username=self.username, timeout=timeout, priority=priority, 635 624 kind=kind) 636 625 637 626 for fname in load_files: 638 627 if os.path.exists(fname): 639 628 job.attach_file(fname) 640 629 641 630 if user_vars is not None: 642 631 for k, v in user_vars.iteritems(): 643 632 job.attach(k, v) 644 633 645 634 if async: 646 635 wrapped_job = JobWrapper(self._remoteobj, job) 647 636 else: 648 637 wrapped_job = BlockingJobWrapper(self._remoteobj, job) 649 638 650 639 return wrapped_job 651 640 652 641 def send_job(self, job, async=False): 653 642 """ 654 643 Sends a Job object to the server. 655 644 656 645 Parameters: 657 646 job -- a Job object to send to the remote server 658 647 async -- if True, use async method of doing remote task 659 648 660 649 """ 661 650 662 651 if not isinstance(job, Job): 663 652 raise TypeError 664 653 if async: 665 654 wrapped_job = JobWrapper(self._remoteobj, job) 666 655 else: 667 656 wrapped_job = BlockingJobWrapper(self._remoteobj, job) 668 657 669 658 return wrapped_job 670 659 671 660 def get_my_jobs(self, status='new'): 672 661 """ 673 662 This method returns a list of jobs that belong to you. 674 663 675 664 Parameters: 676 665 active -- set to true to get only active jobs (bool) 677 666 678 667 Use this method if you get disconnected from the server and wish to 679 668 retrieve your old jobs back. 680 669 681 670 """ 682 671 683 672 self.is_connected() 684 from twisted.internet import reactor685 673 jdicts = blockingCallFromThread(reactor, self._remoteobj.callRemote, 686 674 'get_jobs_by_username', 687 675 self.username, status) 688 676 689 677 return [expand_job(jdict) for jdict in jdicts] 690 691 678 692 679 def kill_all(self): 693 680 """ 694 681 Kills all of your active jobs. 695 682 696 683 """ 697 684 698 685 active_jobs = self.get_my_jobs(active=True) 699 686 700 687 for job in active_jobs: 701 688 self.kill(job.job_id) 702 689 703 690 def cluster_speed(self): 704 691 """ 705 692 Returns the speed of the cluster. 706 693 707 694 """ 708 695 709 696 self.is_connected() 710 from twisted.internet import reactor 697 711 698 return blockingCallFromThread(reactor, self._remoteobj.callRemote, 712 699 'get_cluster_speed') 713 700 714 701 def get_workers_list(self): 715 702 """Returns a list of monitors connected to the server. 716 703 717 704 """ 718 705 719 706 self.is_connected() 720 from twisted.internet import reactor 707 721 708 return blockingCallFromThread(reactor, self._remoteobj.callRemote, 722 709 'get_worker_list') 723 710 724 711 def get_client_list(self): 725 712 """ 726 713 Returns a list of clients connected to the server. 727 714 """ 728 715 729 716 self.is_connected() 730 from twisted.internet import reactor 717 731 718 return blockingCallFromThread(reactor, self._remoteobj.callRemote, 732 719 'get_client_list') 733 720 734 721 def get_worker_count(self): 735 722 """ 736 723 Returns the number of busy and free workers. 737 724 738 725 """ 739 726 740 727 self.is_connected() 741 from twisted.internet import reactor 728 742 729 return blockingCallFromThread(reactor, self._remoteobj.callRemote, 743 730 'get_worker_count') 744 731 745 732 def web_server_url(self): 746 733 """ 747 734 Returns the web server url. 748 735 """ 749 736 750 737 self.is_connected() 751 from twisted.internet import reactor 738 752 739 return blockingCallFromThread(reactor, self._remoteobj.callRemote, 753 740 'web_server_url') 754 741 755 742 def web_view(self): 756 743 """ 757 744 Opens the dsage server's web interface in a browser. 758 745 759 746 """ 760 747 761 748 from sage.server.misc import open_page 762 749 url = self.web_server_url() 763 750 address = url.split(':')[1].strip('/') 764 751 port = int(url.split(':')[2].strip('/')) 765 752 open_page(address, port, False) 766 753 767 754 def server_log(self, n=50): 768 from twisted.internet import reactor769 755 return blockingCallFromThread(reactor, self._remoteobj.callRemote, 770 756 'read_log', n, 'server') 771 757 772 758 def worker_log(self, n=50): 773 from twisted.internet import reactor774 759 return blockingCallFromThread(reactor, self._remoteobj.callRemote, 775 760 'read_log', n, 'worker') 776 777 761 762 778 763 class JobWrapper(object): 779 764 """ 780 765 Represents a remote job. 781 766 782 767 Parameters: 783 768 remoteobj -- the PB server's remoteobj 784 769 job -- a Job object (job) 785 770 786 771 """ 787 772 788 773 def __init__(self, remoteobj, job): 789 774 self._remoteobj = remoteobj 790 775 self._update_job(job._reduce()) … … 796 781 d.addCallback(self._got_job_id) 797 782 d.addCallback(self._got_jdict) 798 783 d.addErrback(self._catch_failure) 799 784 800 785 def __repr__(self): 801 786 return self.job_id 802 787 803 788 def __str__(self): 804 789 if self.status == 'completed' and not self.output: 805 790 return 'No output. (Done)' 806 791 elif not self.output: 807 792 return 'No output yet. (Not done)' 808 793 809 794 return self.output 810 795 811 796 def __getstate__(self): 812 797 d = copy.copy(self.__dict__) 813 798 d['remoteobj'] = None 814 799 d['sync_job_task'] = None 815 800 816 801 return d 817 802 818 803 def _update_job(self, jdict): 819 804 self._jdict = jdict 820 805 job = expand_job(jdict) … … 826 811 timeout = 0.5 827 812 while self._job.result is None: 828 813 reactor.iterate(timeout) 829 814 830 815 def save(self, filename=None): 831 816 if filename is None: 832 817 filename = str(self._job.name) 833 818 filename += '.sobj' 834 819 f = open(filename, 'w') 835 820 cPickle.dump(self, f, 2) 836 821 837 822 return filename 838 823 839 824 def restore(self, dsage): 840 825 self._remoteobj = dsage.remoteobj 841 826 842 827 def _catch_failure(self, failure): 843 828 from twisted.internet import error 844 829 from twisted.spread import pb … … 848 833 pass 849 834 # print "Error: ", failure.getErrorMessage() 850 835 # print "Traceback: ", failure.printTraceback() 851 836 852 837 def _got_job_id(self, job_id): 853 838 self.job_id = job_id 854 839 try: 855 840 d = self._remoteobj.callRemote('get_job_by_id', job_id) 856 841 except Exception, msg: 857 842 raise 858 843 859 844 return d 860 845 861 846 def _got_jdict(self, jdict): 862 847 self.job_id = jdict['job_id'] 863 848 self._update_job(jdict) 864 849 865 850 def get_job(self): 866 851 from sage.dsage.errors.exceptions import NotConnectedException 867 852 868 853 if self._remoteobj is None: 869 854 raise NotConnectedException 870 855 if self.job_id is None: … … 873 858 d = self._remoteobj.callRemote('get_job_by_id', self.job_id) 874 859 except Exception, msg: 875 860 raise 876 861 877 862 d.addCallback(self._got_jdict) 878 863 d.addErrback(self._catch_failure) 879 864 880 865 return d 881 866 882 867 def get_job_output(self): 883 868 if self._remoteobj == None: 884 869 return … … 887 872 self.job_id) 888 873 except Exception, msg: 889 874 raise 890 875 891 876 d.addCallback(self._got_job_output) 892 877 d.addErrback(self._catch_failure) 893 878 894 879 return d 895 880 896 881 def _got_job_output(self, output): 897 882 self.output = output 898 883 899 884 def get_job_result(self): 900 885 if self._remoteobj == None: 901 886 return … … 904 889 self.job_id) 905 890 except Exception, msg: 906 891 raise 907 892 908 893 d.addCallback(self._got_job_result) 909 894 d.addErrback(self._catch_failure) 910 895 911 896 return d 912 897 913 898 def _got_job_result(self, result): 914 899 self.result = result 915 900 916 901 def sync_job(self): 917 902 from twisted.spread import pb 918 903 if self._remoteobj == None: … … 926 911 if self.sync_job_task.running: 927 912 self.sync_job_task.stop() 928 913 return 929 914 930 915 try: 931 916 d = self._remoteobj.callRemote('sync_job', self.job_id) 932 917 except pb.DeadReferenceError: … … 934 919 if self.sync_job_task.running: 935 920 self.sync_job_task.stop() 936 921 return 937 922 938 923 d.addCallback(self._got_jdict) 939 924 d.addErrback(self._catch_failure) 940 925 941 926 def write_result(self, filename): 942 927 result_file = open(filename, 'w') 943 928 944 929 # skip the first element since that is not the actual result 945 930 for line in self.result: 946 931 line = str(line) 947 932 result_file.write(line) 948 933 result_file.close() 949 934 950 935 def kill(self): 951 936 """ 952 937 Kills the current job. 953 938 954 939 """ 955 940 956 941 if self.job_id is not None: 957 942 try: 958 943 d = self._remoteobj.callRemote('kill_job', self.job_id) 959 944 except Exception, msg: 960 print 'Unable to kill %s because %s' % (self.job_id, msg)945 print 'Unable to kill %s because %s' % (self.job_id, msg) 961 946 return 962 947 d.addCallback(self._killed_job) 963 948 d.addErrback(self._catch_failure) 964 949 return d 965 950 else: 966 951 return 967 952 968 953 def _killed_job(self, job_id): 969 return 954 self.status = 'killed' 955 970 956 971 957 class BlockingJobWrapper(JobWrapper): 972 958 """ 973 959 Blocking version of the JobWrapper object. This is to be used 974 960 interactively. 975 961 976 962 """ 977 963 978 964 def __init__(self, remoteobj, job): 979 965 self._update_job(job._reduce()) 980 966 self._remoteobj = remoteobj 981 from twisted.internet import reactor982 967 self.job_id = blockingCallFromThread(reactor, self._remoteobj.callRemote, 983 968 'submit_job', job._reduce()) 984 969 985 970 def __repr__(self): 986 971 if self.killed: 987 972 return 'Job %s was killed' % (self.job_id) … … 989 974 self.get_job() 990 975 if self.status == 'completed' and not self.output: 991 976 return 'No output.' 977 if self.result: 978 return str(self.result) 992 979 if not self.output: 993 980 return 'No output yet.' 994 981 else: 995 982 return self.output 996 983 997 984 def get_job(self): 998 985 from sage.dsage.errors.exceptions import NotConnectedException 999 986 1000 987 if self._remoteobj == None: 1001 988 raise NotConnectedException 1002 989 if self.status == 'completed': 1003 990 return 1004 1005 from twisted.internet import reactor 991 1006 992 jdict = blockingCallFromThread(reactor, self._remoteobj.callRemote, 1007 993 'get_job_by_id', self.job_id) 1008 994 1009 995 self._update_job(jdict) 1010 996 1011 997 def async_get_job(self): 1012 998 return JobWrapper.get_job(self) 1013 999 1014 1000 def rerun(self): 1015 1001 """ 1016 1002 Resubmits the current job. 1003 1017 1004 """ 1018 from twisted.internet import reactor1019 1005 self.job_id = blockingCallFromThread(reactor, 1020 1006 self._remoteobj.callRemote, 1021 1007 'submit_job', self._jdict) 1008 1022 1009 def kill(self): 1023 1010 """ 1024 1011 Kills the current job. 1025 1012 1026 1013 """ 1027 from twisted.internet import reactor 1014 1028 1015 job_id = blockingCallFromThread(reactor, self._remoteobj.callRemote, 1029 1016 'kill_job', self.job_id) 1030 1017 self.job_id = job_id 1031 1018 self.killed = True 1032 1019 1033 1020 return job_id 1034 1035 1021 1036 1022 def async_kill(self): 1037 1023 """ 1038 1024 async version of kill 1039 1025 1040 1026 """ 1041 1027 1042 1028 d = self._remoteobj.callRemote('kill_job', self.job_id) 1043 1029 d.addCallback(self._killed_job) 1044 1030 d.addErrback(self._catch_failure) 1045 1031 1046 1032 return d 1047 1048 1033 1049 1034 def wait(self, timeout=None): 1050 1035 """ 1051 1036 Waits on a job until it is completed. 1052 1037 1053 1038 Parameters: 1054 1039 timeout -- number of seconds to wait, if it has not completed by then 1055 1040 it will raise RunTimeError if it is set to None, 1056 1041 it will wait indefinitely until the job is completed 1057 1042 """ 1058 1043 1059 1044 import signal 1060 1045 1061 1046 if timeout is None: 1062 1047 while self.status != 'completed': 1063 1048 # print 'Wating...' 1064 time.sleep( 1.0)1049 time.sleep(0.5) 1065 1050 self.get_job() 1066 1051 else: 1067 1052 def handler(signum, frame): 1068 1053 raise RuntimeError('Maximum wait time exceeded.') 1054 1069 1055 signal.signal(signal.SIGALRM, handler) 1070 1056 signal.alarm(timeout) 1071 1057 while self.status != 'completed': 1072 time.sleep( 1.0)1058 time.sleep(0.5) 1073 1059 self.get_job() 1074 1060 signal.alarm(0) -
sage/dsage/misc/config.py
diff --git a/sage/dsage/interface/nodoctest.py b/sage/dsage/interface/nodoctest.py new file mode 100644 diff --git a/sage/dsage/misc/config.py b/sage/dsage/misc/config.py
a b 24 24 25 25 import os 26 26 import ConfigParser 27 import uuid 27 28 28 29 from sage.dsage.misc.constants import DSAGE_DIR 29 30 … … 88 89 elif type == 'monitor': 89 90 conf_file = os.path.join(DSAGE_DIR, 'worker.conf') 90 91 config.read(conf_file) 91 import uuid92 92 if len(config.get('uuid', 'id')) != 36: 93 93 config.set('uuid', 'id', str(uuid.uuid1())) 94 94 f = open(conf_file, 'w') … … 121 121 if value.lower() not in boolean_states: 122 122 raise ValueError('Not a boolean: %s' % value) 123 123 124 return boolean_states[value.lower()] 124 return boolean_states[value.lower()] 125 No newline at end of file -
sage/dsage/misc/constants.py
diff --git a/sage/dsage/misc/constants.py b/sage/dsage/misc/constants.py
a b 8 8 DSAGE_LOCAL = os.path.join(os.getenv('SAGE_ROOT'), 'local/dsage') 9 9 DSAGE_DB_DIR = os.path.join(DSAGE_DIR, 'db') 10 10 DSAGE_DB = os.path.join(DSAGE_DB_DIR, 'dsage.db') 11 SAGE_BIN = os.path.join(os.getenv('SAGE_ROOT'), 'sage') 11 12 12 13 # These are the twisted tac files to be used with twistd 13 14 SERVER_TAC = """import sys … … 144 145 dsage_service.setServiceParent(application) 145 146 146 147 print_info(dsage_server)""" 147 148 WORKER_TAC = """"""149 No newline at end of file -
sage/dsage/misc/misc.py
diff --git a/sage/dsage/misc/misc.py b/sage/dsage/misc/misc.py
a b 28 28 29 29 from sage.dsage.misc.constants import DSAGE_DIR 30 30 from sage.dsage.misc.config import check_dsage_dir 31 32 33 def test_dsage(dsage): 34 from time import sleep 35 port = find_open_port().next() 36 dsage.server(blocking=False, port=port, verbose=False, ssl=False, 37 log_level=5, testing=True) 38 dsage.worker(blocking=False, port=port, verbose=False, ssl=False, 39 log_level=5,authenticate=False) 40 sleep(0.5) 41 d = dsage.connect(username='tester', port=port, ssl=False, testing=True) 42 sleep(0.5) 43 44 return d 45 46 def write_tac(tac, fname): 47 os.chdir(DSAGE_DIR) 48 f = open(fname, 'w') 49 f.writelines(tac) 50 f.close() 31 51 32 52 def exec_wrs(script): 33 53 """ … … 135 155 yield port 136 156 port += 1 137 157 else: 138 port += 1 139 No newline at end of file 158 port += 1 -
sage/dsage/scripts/dsage_setup.py
diff --git a/sage/dsage/scripts/dsage_setup.py b/sage/dsage/scripts/dsage_setup.py
a b 1 1 ############################################################################ 2 # 3 # DSAGE: Distributed SAGE 4 # 5 # Copyright (C) 2006, 2007 Yi Qiang <yqiang@gmail.com> 6 # 7 # Distributed under the terms of the GNU General Public License (GPL) 2 # 3 # DSAGE: Distributed SAGE 4 # 5 # Copyright (C) 2006, 2007 Yi Qiang <yqiang@gmail.com> 6 # 7 # Distributed under the terms of the GNU General Public License (GPL) 8 8 # 9 9 # This code is distributed in the hope that it will be useful, 10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of … … 37 37 SAGE_ROOT = os.getenv('SAGE_ROOT') 38 38 DSAGE_VERSION = version 39 39 40 40 41 def get_config(type): 41 42 config = ConfigParser.ConfigParser() 42 43 config.add_section('general') … … 54 55 config.add_section('server_log') 55 56 config.add_section('db') 56 57 config.add_section('db_log') 58 57 59 return config 60 58 61 59 62 def add_default_client(Session): 60 63 """ 61 64 Adds the default client. 62 65 63 66 """ 64 67 65 68 from twisted.conch.ssh import keys 66 69 from getpass import getuser 67 70 68 71 clientdb = ClientDatabase(Session) 69 72 70 73 username = getuser() 71 74 pubkey_file = os.path.join(DSAGE_DIR, 'dsage_key.pub') 72 75 pubkey = keys.Key.fromFile(pubkey_file) … … 83 86 else: 84 87 print 'User %s already exists.' % (username) 85 88 89 86 90 def setup_client(testing=False): 87 91 check_dsage_dir() 88 92 key_file = os.path.join(DSAGE_DIR, 'dsage_key') 89 93 if testing: 90 94 cmd = ["ssh-keygen", "-q", "-trsa", "-P ''", "-f%s" % key_file] 91 95 return 92 96 93 97 if not cmd_exists('ssh-keygen'): 94 98 print DELIMITER 95 99 print "Could NOT find ssh-keygen." 96 100 print "Aborting." 97 101 return 98 102 99 103 print DELIMITER 100 104 print "Generating public/private key pair for authentication..." 101 105 print "Your key will be stored in %s/dsage_key" % DSAGE_DIR 102 106 print "Just hit enter when prompted for a passphrase" 103 107 print DELIMITER 104 105 cmd = ["ssh-keygen", "-q", "-trsa", "-f%s" % key_file] 108 109 cmd = ["ssh-keygen", "-q", "-trsa", "-f%s" % key_file] 106 110 ld = os.environ['LD_LIBRARY_PATH'] 107 111 try: 108 112 del os.environ['LD_LIBRARY_PATH'] 109 113 p = subprocess.call(cmd) 110 114 finally: 111 115 os.environ['LD_LIBRARY_PATH'] = ld 112 116 113 117 print "\n" 114 118 print "Client configuration finished.\n" 119 115 120 116 121 def setup_worker(): 117 122 check_dsage_dir() 118 123 print "Worker configuration finished.\n" 124 119 125 120 126 def setup_server(template=None): 121 127 check_dsage_dir() … … 125 131 if dn == '': 126 132 print "Using default localhost" 127 133 dn = 'localhost' 128 134 129 135 template_dict = {'organization': 'SAGE (at %s)' % (dn), 130 136 'unit': '389', 131 137 'locality': None, … … 134 140 'cn': dn, 135 141 'uid': 'sage_user', 136 142 'dn_oid': None, 137 'serial': str(random.randint(1, 2**31)),143 'serial': str(random.randint(1, 2**31)), 138 144 'dns_name': None, 139 145 'crl_dist_points': None, 140 146 'ip_address': None, … … 146 152 'signing_key': True, 147 153 'encryption_key': True, 148 154 } 149 155 150 156 if isinstance(template, dict): 151 157 template_dict.update(template) 152 158 153 159 s = "" 154 160 for key, val in template_dict.iteritems(): 155 161 if val is None: … … 160 166 w = ' '.join(['"%s"' % x for x in val]) 161 167 else: 162 168 w = '"%s"' % val 163 s += '%s = %s \n' % (key, w) 164 169 s += '%s = %s \n' % (key, w) 170 165 171 template_file = os.path.join(DSAGE_DIR, 'cert.cfg') 166 172 f = open(template_file, 'w') 167 173 f.write(s) 168 174 f.close() 169 175 170 176 # Disable certificate generation -- not used right now anyways 171 177 privkey_file = os.path.join(DSAGE_DIR, 'cacert.pem') 172 178 pubkey_file = os.path.join(DSAGE_DIR, 'pubcert.pem') 173 179 174 180 print DELIMITER 175 181 print "Generating SSL certificate for server..." 176 182 177 183 if False and os.uname()[0] != 'Darwin' and cmd_exists('openssl'): 178 184 # We use openssl by default if it exists, since it is *vastly* 179 185 # faster on Linux. … … 187 193 print cmd[0] 188 194 # cmd = ['openssl genrsa > %s' % privkey_file] 189 195 subprocess.call(cmd, shell=True) 190 196 191 197 cmd = ['certtool --generate-self-signed --template %s --load-privkey %s \ 192 198 --outfile %s' % (template_file, privkey_file, pubkey_file)] 193 199 subprocess.call(cmd, shell=True) 194 200 print DELIMITER 195 201 196 202 # Set read only permissions on cert 197 203 os.chmod(os.path.join(DSAGE_DIR, 'cacert.pem'), 0600) 198 204 199 205 # create database schemas 200 206 from sage.dsage.database.db_config import init_db_sa as init_db 201 207 Session = init_db(DSAGE_DB) 202 208 203 209 # add default user 204 210 add_default_client(Session) 205 211 206 212 print "Server configuration finished.\n\n" 207 213 214 208 215 def setup(template=None): 209 216 setup_client() 210 217 setup_worker() … … 221 228 setup_worker() 222 229 elif sys.argv[1] == 'client': 223 230 setup_client() 224 -
(a) a/sage/dsage/scripts/dsage_worker.py vs. (b) /dev/null
diff --git a/sage/dsage/scripts/dsage_worker.py b/sage/dsage/scripts/dsage_worker.py deleted file mode 100755
a b 1 #!/usr/bin/env python2 ############################################################################3 #4 # DSAGE: Distributed SAGE5 #6 # Copyright (C) 2006, 2007 Yi Qiang <yqiang@gmail.com>7 #8 # Distributed under the terms of the GNU General Public License (GPL)9 #10 # This code is distributed in the hope that it will be useful,11 # but WITHOUT ANY WARRANTY; without even the implied warranty of12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU13 # General Public License for more details.14 #15 # The full text of the GPL is available at:16 #17 # http://www.gnu.org/licenses/18 #19 ############################################################################20 __docformat__ = "restructuredtext en"21 22 import sys23 import os24 import cPickle25 import zlib26 import pexpect27 import datetime28 from math import ceil29 from getpass import getuser30 31 from twisted.spread import pb32 from twisted.internet import reactor, defer, error, task33 from twisted.python import log34 from twisted.spread import banana35 banana.SIZE_LIMIT = 100*1024*1024 # 100 MegaBytes36 37 from gnutls.constants import *38 from gnutls.crypto import *39 from gnutls.errors import *40 from gnutls.interfaces.twisted import X509Credentials41 42 from sage.interfaces.sage0 import Sage43 from sage.misc.preparser import preparse_file44 45 from sage.dsage.database.job import Job, expand_job46 from sage.dsage.misc.hostinfo import HostInfo47 from sage.dsage.errors.exceptions import NoJobException48 from sage.dsage.twisted.pb import ClientFactory49 from sage.dsage.misc.constants import DELIMITER50 from sage.dsage.misc.constants import DSAGE_DIR51 from sage.dsage.misc.constants import TMP_WORKER_FILES52 from sage.dsage.misc.misc import random_str, get_uuid53 54 START_MARKER = '\x01r\x01e'55 END_MARKER = '\x01r\x01b'56 LOG_PREFIX = "[Worker %s] "57 58 class Worker(object):59 """60 Workers perform the computation of dsage jobs.61 62 """63 64 def __init__(self, remoteobj, id, log_level=0, poll=1.0):65 """66 :type remoteobj: remoteobj67 :param remoteobj: Reference to the remote dsage server68 69 :type id: integer70 :param id: numerical identifier of worker71 72 :type log_level: integer73 :param log_level: log level, higher means more verbose74 75 :type poll: integer76 :param poll: rate (in seconds) a worker talks to the server77 78 """79 80 self.remoteobj = remoteobj81 self.id = id82 self.free = True83 self.job = None84 self.log_level = log_level85 self.poll_rate = poll86 self.checker_task = task.LoopingCall(self.check_work)<
