source: sage/dsage/server/server.py @ 3831:62161b21fb93

Revision 3831:62161b21fb93, 19.2 KB checked in by Yi Qiang <yqiang@…>, 6 years ago (diff)

Use twisted logging instead of printing whenever possible.

Line 
1##############################################################################
2#                                                                     
3#  DSAGE: Distributed SAGE                     
4#                                                                             
5#       Copyright (C) 2006, 2007 Yi Qiang <yqiang@gmail.com>               
6#                                                                           
7#  Distributed under the terms of the GNU General Public License (GPL)       
8#
9#    This code is distributed in the hope that it will be useful,
10#    but WITHOUT ANY WARRANTY; without even the implied warranty of
11#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12#    General Public License for more details.
13#
14#  The full text of the GPL is available at:
15#
16#                  http://www.gnu.org/licenses/
17##############################################################################
18
19import zlib
20import cPickle
21import datetime
22import xml.dom.minidom
23import cStringIO
24
25from twisted.spread import pb
26from twisted.python import log
27
28from sage.dsage.misc.hostinfo import HostInfo
29import sage.dsage.server.client_tracker as client_tracker
30from sage.dsage.server.hostinfo_tracker import hostinfo_list
31from sage.dsage.errors.exceptions import BadTypeError
32from sage.dsage.database.job import expand_job
33
34pb.setUnjellyableForClass(HostInfo, HostInfo)
35
36class DSageServer(pb.Root):
37    r"""
38    This class represents Distributed Sage server which does all the
39    coordination of distributing jobs, creating new jobs and accepting job
40    submissions.
41       
42    """
43    def __init__(self, jobdb, monitordb, clientdb, log_level=0):
44        r"""
45        Initializes the Distributed Sage PB Server.
46
47        Parameters:
48        jobdb -- pass in the Database object
49        log_level -- specifies the amount of logging to be done (default=0)
50       
51        """
52
53        self.jobdb = jobdb
54        self.monitordb = monitordb
55        self.clientdb = clientdb
56        self.LOG_LEVEL = log_level
57
58    def unpickle(self, pickled_job):
59        return cPickle.loads(zlib.decompress(pickled_job))
60
61    def get_job(self, anonymous=False, uuid=None):
62        r"""
63        Returns a job to the client.
64       
65        This method returns the first job that has not been completed
66        in our job database.
67
68        """
69       
70        if anonymous:
71            jdict = self.jobdb.get_job(anonymous=True)
72        else:
73            jdict = self.jobdb.get_job(anonymous=False)
74        if jdict == None:
75            if self.LOG_LEVEL > 3:
76                log.msg('[DSage, get_job]' + ' Job db is empty.')
77            return None
78        else:
79            if self.LOG_LEVEL > 3:
80                log.msg('[DSage, get_job]' + ' Returning Job %s to client' 
81                        % (jdict['job_id']))
82            jdict['status'] = 'processing'
83            self.jobdb.store_job(jdict)
84   
85        return jdict
86   
87    def set_job_uuid(self, job_id, uuid):
88        return self.jobdb.set_job_uuid(job_id, uuid)
89   
90    def set_busy(self, uuid, busy):
91        return self.monitordb.set_busy(uuid, busy=busy)
92       
93    def get_job_by_id(self, job_id):
94        r"""
95        Returns a job by the job id.
96       
97        Parameters:
98        id -- the job id
99       
100        """
101
102        job = self.jobdb.get_job_by_id(job_id)
103        return job
104
105    def get_job_result_by_id(self, job_id):
106        """Returns the job result.
107
108        Parameters:
109        id -- the job id (str)
110
111        """
112       
113        jdict = self.jobdb.get_job_by_id(job_id)
114        job = expand_job(jdict)
115        return job.result
116
117    def get_job_output_by_id(self, job_id):
118        """Returns the job output.
119
120        Parameters:
121        id -- the job id (str)
122
123        """
124
125        job = self.jobdb.get_job_by_id(job_id)
126
127        return job.output
128
129    def sync_job(self, job_id):
130        raise NotImplementedError
131        # job = self.jobdb.get_job_by_id(job_id)
132        # new_job = copy.deepcopy(job)
133        # print new_job
134        # # Set file, data to 'Omitted' so we don't need to transfer it
135        # new_job.code = 'Omitted...'
136        # new_job.data = 'Omitted...'
137
138        # return job.pickle()
139
140    def get_jobs_by_username(self, username):
141        r"""
142        Returns jobs created by username.
143
144        Parameters:
145        username -- the username (str)
146        is_active -- when set to True, only return active jobs (bool)
147        job_name -- the job name (optional)
148
149        """
150
151        jobs = self.jobdb.get_jobs_by_username(username)
152       
153        if self.LOG_LEVEL > 3:
154            log.msg(jobs)
155        return jobs
156
157    def submit_job(self, jdict):
158        r"""
159        Submits a job to the job database.
160       
161        Parameters:
162        jdict -- the internal dictionary of a Job object
163       
164        """ 
165       
166
167        if self.LOG_LEVEL > 3:
168            log.msg('[DSage, submit_job] %s' % (jdict))
169       
170        if jdict['code'] is None:
171            return False
172        if jdict['name'] is None:
173            jdict['name'] = 'No name specified'
174        jdict['update_time'] = datetime.datetime.now()
175       
176        return self.jobdb.store_job(jdict)
177       
178    def get_all_jobs(self):
179        r"""
180        Returns a list of all jobs in the database.
181       
182        """
183        return self.jobdb.get_all_jobs()
184       
185    def get_active_jobs(self):
186        r"""
187        Returns a list of active jobs"""
188
189        return self.jobdb.get_active_jobs()
190
191    def get_active_clients_list(self):
192        r"""
193        Returns a list of active clients.
194       
195        """
196       
197        raise NotImplementedError
198
199    def get_killed_jobs_list(self):
200        r"""
201        Returns a list of killed job jdicts.
202        """
203       
204        killed_jobs = self.jobdb.get_killed_jobs_list()
205        return killed_jobs
206
207    def get_next_job_id(self):
208        r"""
209        Returns the next job id.
210       
211        """
212       
213        if self.LOG_LEVEL > 0:
214            log.msg('[DSage, get_next_job_id] Returning next job ID')
215           
216        return self.jobdb.get_next_job_id()
217
218    def job_done(self, job_id, output, result, completed, worker_info):
219        r"""
220        job_done is called by the workers check_output method.
221
222        Parameters:
223        job_id -- job id (str)
224        output -- the stdout from the worker (string)
225        result -- the result from the client (compressed pickle string)
226                  result could be 'None'
227        completed -- whether or not the job is completed (bool)
228        worker_info -- ''.join(os.uname())
229
230        """
231
232        if self.LOG_LEVEL > 0:
233            log.msg('[DSage, job_done] Job %s called back' % (job_id))
234        if self.LOG_LEVEL > 3:
235            log.msg('[DSage, job_done] Output: %s ' % output)
236            # log.msg('[DSage, job_done] Result: Some binary data...')
237            log.msg('[DSage, job_done] completed: %s ' % completed)
238            log.msg('[DSage, job_done] worker_info: %s ' % str(worker_info))
239
240        jdict = self.get_job_by_id(job_id)
241
242        if self.LOG_LEVEL > 3:
243            log.msg('[DSage, job_done] result type' , type(result))
244           
245        output = str(output)
246        if jdict['output'] is not None: # Append new output to existing output
247            jdict['output'] += output
248        else:
249            jdict['output'] = output
250        if completed:
251            jdict['result'] = result
252            jdict['status'] = 'completed'
253            jdict['worker_info'] = str(worker_info)
254       
255        jdict['update_time'] = datetime.datetime.now()
256
257        return self.jobdb.store_job(jdict)
258
259    def job_failed(self, job_id, traceback):
260        r"""
261        job_failed is called when a remote job fails.
262
263        Parameters:
264        job_id -- the job id (str)
265       
266        """
267   
268        job = expand_job(self.jobdb.get_job_by_id(job_id))
269        job.failures += 1
270        job.output = traceback
271       
272        if job.failures > self.jobdb.JOB_FAILURE_THRESHOLD:
273            job.status = 'failed'
274        else:
275            job.status = 'new' # Put job back in the queue
276       
277        if self.LOG_LEVEL > 1:
278            s = ['[DSage, job_failed] Job %s failed ' % (job_id),
279                 '%s times. ' % (job.failures)]
280            log.msg(''.join(s))
281           
282        return self.jobdb.store_job(job.reduce())
283
284    def kill_job(self, job_id, reason):
285        r"""
286        Kills a job. 
287
288        Marks as job as killed and moves it to the killed jobs database.
289       
290        """
291
292        if job_id == None:
293            if self.LOG_LEVEL > 0:
294                log.msg('[DSage, kill_job] No such job id %s' % job_id)
295            return None
296        else:
297            self.jobdb.set_killed(job_id, killed=True)
298            if self.LOG_LEVEL > 0:
299                log.msg('Killed job %s' % (job_id))
300               
301        return job_id
302
303    def get_monitor_list(self):
304        r"""
305        Returns a list of workers as a 3 tuple.
306
307        tuple[0] = broker object
308        tuple[1] = ip
309        tuple[2] = port
310
311        """
312        return self.monitordb.get_monitor_list()
313   
314    def get_client_list(self):
315        r"""
316        Returns a list of clients.
317       
318        """
319       
320        return self.clientdb.get_client_list()
321
322    def get_cluster_speed(self):
323        r"""
324        Returns an approximation of the total CPU speed of the cluster.
325
326        """
327       
328        cluster_speed = 0
329        if self.LOG_LEVEL > 3:
330            log.msg(hostinfo_list)
331            log.msg(len(hostinfo_list))
332        for h in hostinfo_list:
333            speed_multiplier = int(h['cpus'])
334            for k,v in h.iteritems():
335                if k == 'cpu_speed':
336                    cluster_speed += float(v) * speed_multiplier
337       
338        return cluster_speed
339
340    def submit_host_info(self, h):
341        r"""
342        Takes a dict of workers machine specs.
343       
344        """
345       
346        if self.LOG_LEVEL > 0:
347            log.msg(h)
348        if len(hostinfo_list) == 0:
349            hostinfo_list.append(h)
350        else:
351            for h in hostinfo_list:
352                if h['uuid'] not in h.values():
353                    hostinfo_list.append(h)
354   
355    def generate_xml_stats(self):
356        r"""
357        This method returns a an XML document to be consumed by the Dashboard
358        widget
359       
360        """
361       
362        def create_gauge(doc):
363            gauge = doc.createElement('gauge')
364            doc.appendChild(gauge)
365
366            return doc, gauge
367
368        def add_totalAgentCount(doc, gauge):
369            totalAgentCount = doc.createElement('totalAgentCount')
370            gauge.appendChild(totalAgentCount)
371            working_workers = self.monitordb.get_worker_count(connected=True,
372                                                              busy=True)
373            free_workers = self.monitordb.get_worker_count(connected=True,
374                                                           busy=False)
375            disconnected_workers = self.monitordb.get_worker_count(
376                                   connected=False,
377                                   busy=False)
378            total_workers = (working_workers + 
379                             free_workers + 
380                             disconnected_workers)
381            count = doc.createTextNode(str(total_workers))
382            totalAgentCount.appendChild(count)
383
384            return doc, totalAgentCount
385       
386        def add_onlineAgentCount(doc, gauge):
387            onlineAgentCount = doc.createElement('onlineAgentCount')
388            gauge.appendChild(onlineAgentCount)
389            free_workers = self.monitordb.get_worker_count(connected=True,
390                                                           busy=False)
391            busy_workers = self.monitordb.get_worker_count(connected=True,
392                                                           busy=True)
393            count = doc.createTextNode(str(free_workers + busy_workers))
394            onlineAgentCount.appendChild(count)
395           
396            return doc, onlineAgentCount
397           
398        def add_offlineAgentCount(doc, gauge):
399            offlineAgentCount = doc.createElement('offlineAgentCount')
400            gauge.appendChild(offlineAgentCount)
401            worker_count = self.monitordb.get_worker_count(connected=False,
402                                                           busy=False)
403            count = doc.createTextNode(str(worker_count))
404            offlineAgentCount.appendChild(count)
405           
406            return doc, offlineAgentCount
407           
408        def add_workingAgentCount(doc, gauge):
409            workingAgentCount = doc.createElement('workingAgentCount')
410            gauge.appendChild(workingAgentCount)
411            worker_count = self.monitordb.get_worker_count(connected=True,
412                                                           busy=True)
413            count = doc.createTextNode(str(worker_count))
414            workingAgentCount.appendChild(count)
415           
416            return doc, workingAgentCount
417       
418        def add_availableAgentCount(doc, gauge):
419            availableAgentCount = doc.createElement('availableAgentCount')
420            gauge.appendChild(availableAgentCount)
421            worker_count = self.monitordb.get_worker_count(connected=True,
422                                                           busy=False)
423            count = doc.createTextNode(str(worker_count))
424            availableAgentCount.appendChild(count)
425           
426            return doc, availableAgentCount
427       
428        def add_unavailableAgentCount(doc, gauge):
429            unavailableAgentCount = doc.createElement('unavailableAgentCount')
430            gauge.appendChild(unavailableAgentCount)
431            worker_count = self.monitordb.get_worker_count(connected=True,
432                                                           busy=True)
433            count = doc.createTextNode(str(worker_count))
434            unavailableAgentCount.appendChild(count)
435           
436            return doc, unavailableAgentCount
437           
438        def add_workingMegaHertz(doc, gauge):
439            workingMegaHertz = doc.createElement('workingMegaHertz')
440            gauge.appendChild(workingMegaHertz)
441            cpu_speed = self.monitordb.get_cpu_speed(connected=True, busy=True)
442            mhz = doc.createTextNode(str(cpu_speed))
443            workingMegaHertz.appendChild(mhz)
444
445            return doc, workingMegaHertz
446
447        def add_availableProcessorCount(doc, gauge):
448            pass
449           
450        def add_unavailableProcessorCount(doc, gauge):
451            pass
452           
453        def add_onlineProcessorCount(doc, gauge):
454            onlineProcessorCount = doc.createElement('onlineProcessorCount')
455            gauge.appendChild(onlineProcessorCount)
456            cpu_count = self.monitordb.get_cpu_count(connected=True)
457            c = doc.createTextNode(str(cpu_count))
458            onlineProcessorCount.appendChild(c)
459           
460            return doc, onlineProcessorCount
461       
462        def add_offlineProcessorCount(doc, gauge):
463            offlineProcessorCount = doc.createElement('offlineProcessorCount')
464            gauge.appendChild(offlineProcessorCount)
465            cpu_count = self.monitordb.get_cpu_count(connected=False)
466            c = doc.createTextNode(str(cpu_count))
467            offlineProcessorCount.appendChild(c)
468           
469            return doc, offlineProcessorCount
470           
471        def add_workingProcessorCount(doc, gauge):
472            workingProcessorCount = doc.createElement('workingProcessorCount')
473            gauge.appendChild(workingProcessorCount)
474            worker_count = self.monitordb.get_cpu_count(connected=True)
475            pcount = doc.createTextNode(str(worker_count))
476            workingProcessorCount.appendChild(pcount)
477
478            return doc, workingProcessorCount
479       
480        def add_workingAgentPercentage(doc, gauge):
481            workingAgentPercentage = doc.createElement(
482                                                    'workingAgentPercentage')
483            gauge.appendChild(workingAgentPercentage)
484            working_workers = self.monitordb.get_worker_count(connected=True,
485                                                              busy=True)
486            free_workers = self.monitordb.get_worker_count(connected=True,
487                                                           busy=False)
488            disconnected_workers = self.monitordb.get_worker_count(
489                                   connected=False,
490                                   busy=False)
491            total_workers = (working_workers + 
492                             free_workers + 
493                             disconnected_workers)
494           
495            if total_workers != 0:
496                worker_percentage = float(working_workers / total_workers) * 100
497            else:
498                worker_percentage = 0.0
499            percentage = doc.createTextNode(str(worker_percentage))
500            workingAgentPercentage.appendChild(percentage)
501           
502            return doc, workingAgentPercentage
503           
504        def add_date(doc, gauge):
505            date = datetime.datetime.now()
506
507            year = doc.createElement('Year')
508            gauge.appendChild(year)
509            year.appendChild(doc.createTextNode(str(date.year)))
510
511            seconds = doc.createElement('Seconds')
512            gauge.appendChild(seconds)
513            seconds.appendChild(doc.createTextNode(str(date.second)))
514
515            minutes = doc.createElement('Minutes')
516            gauge.appendChild(minutes)
517            minutes.appendChild(doc.createTextNode(str(date.minute)))
518
519            return doc, year, seconds, minutes
520       
521        doc = xml.dom.minidom.Document()
522        doc, gauge = create_gauge(doc)
523       
524        add_totalAgentCount(doc, gauge)
525        add_onlineAgentCount(doc, gauge)
526        add_offlineAgentCount(doc, gauge)
527        add_availableAgentCount(doc, gauge)
528        add_unavailableAgentCount(doc, gauge)
529        add_workingAgentCount(doc, gauge)
530        add_workingAgentPercentage(doc, gauge)
531
532        add_onlineProcessorCount(doc, gauge)
533        add_offlineAgentCount(doc, gauge)
534        add_availableProcessorCount(doc, gauge)
535        add_unavailableProcessorCount(doc, gauge)
536        add_workingProcessorCount(doc, gauge)
537        add_workingMegaHertz(doc, gauge)
538       
539        add_date(doc, gauge)
540        s = cStringIO.StringIO()   
541        doc.writexml(s, newl='\n')
542       
543        return s.getvalue()
544
545class DSageWorkerServer(DSageServer):
546    r"""
547    Exposes methods to workers.
548    """
549   
550    def remote_get_job(self):
551        return DSageServer.get_job(self)
552
553    def remote_job_done(self, job_id, output, result, completed, worker_info):
554        if not (isinstance(job_id, str) or isinstance(completed, bool)):
555            log.msg('BadType in remote_job_done')
556            raise BadTypeError()
557
558        return DSageServer.job_done(self, job_id, output, result, 
559                             completed, worker_info)
560
561    def remote_job_failed(self, job_id, traceback):
562        if not isinstance(job_id, str):
563            log.msg('BadType in remote_job_failed')
564            raise BadTypeError()
565           
566        return DSageServer.job_failed(self, job_id, traceback)
567
568    def remote_get_killed_jobs_list(self):
569        return DSageServer.get_killed_jobs_list(self)       
570
571    def remote_submit_host_info(self, hostinfo):
572        if not isinstance(hostinfo, dict):
573            log.msg('BadType in remote_submit_host_info')
574            raise BadTypeError()
575        return DSageServer.submit_host_info(self, hostinfo)
Note: See TracBrowser for help on using the repository browser.