source: sage/dsage/twisted/pb.py @ 6718:f9c9f93a1525

Revision 6718:f9c9f93a1525, 15.5 KB checked in by Yi Qiang <yqiang@…>, 6 years ago (diff)

Fixed job failures.

Line 
1############################################################################
2#                                                                     
3#   DSAGE: Distributed SAGE                     
4#                                                                             
5#       Copyright (C) 2006, 2007 Yi Qiang <yqiang@gmail.com>               
6#                                                                           
7#  Distributed under the terms of the GNU General Public License (GPL)       
8#
9#    This code is distributed in the hope that it will be useful,
10#    but WITHOUT ANY WARRANTY; without even the implied warranty of
11#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12#    General Public License for more details.
13#
14#  The full text of the GPL is available at:
15#
16#                  http://www.gnu.org/licenses/
17#
18############################################################################
19
20from twisted.spread import pb
21from twisted.spread import banana
22banana.SIZE_LIMIT = 100*1024*1024 # 100 MegaBytes
23
24from zope.interface import implements
25from twisted.cred import portal, credentials
26from twisted.cred.credentials import ISSHPrivateKey
27from twisted.cred.credentials import IAnonymous
28from twisted.cred.credentials import Anonymous
29from twisted.spread.interfaces import IJellyable
30from twisted.spread.pb import IPerspective, AsReferenceable
31from twisted.python import log
32
33from sage.dsage.misc.hostinfo import HostInfo
34from sage.dsage.errors.exceptions import BadTypeError, BadJobError
35
36pb.setUnjellyableForClass(HostInfo, HostInfo)
37
38class WorkerPBServerFactory(pb.PBServerFactory):
39    """
40    This factory serves workers requests.
41   
42    """
43   
44    def __init__(self, root):
45        pb.PBServerFactory.__init__(self, root)
46       
47    def clientConnectionMade(self, broker):
48        """Keeps a 3-tuple of connected workers.
49        tuple[0] - the broker
50        tuple[1] - the worker ip
51        tuple[2] - the worker port
52
53        """
54       
55        broker.notifyOnDisconnect(self.clientConnectionLost)
56   
57    def clientConnectionLost(self):
58        pass
59       
60class PBClientFactory(pb.PBClientFactory):
61    """
62    Custom implementation of the PBClientFactory that supports logging in
63    with public key as well as anonymous credentials.
64   
65    """
66   
67    def login(self, creds, mind=None):
68        if ISSHPrivateKey.providedBy(creds):
69            d = self.getRootObject()
70            d.addCallback(self._cbSendUsername, 
71                          creds.username, 
72                          creds.algName, 
73                          creds.blob, 
74                          creds.sigData,
75                          creds.signature,
76                          mind)
77
78            return d
79        elif IAnonymous.providedBy(creds):
80            d = self.getRootObject()
81            d.addCallback(self._cbAnonymousLogin, mind)
82            return d
83        else:
84            raise TypeError('Invalid credentials.')
85
86    def _cbSendUsername(self, root, username, algorithm, blob, sig_data,
87                        signature, mind):
88        d = root.callRemote("login", username, algorithm, blob, sig_data,
89                                signature, mind)
90        return d
91
92    def _cbAnonymousLogin(self, root, mind):
93        d = root.callRemote("login_anonymous", mind)
94       
95        return d
96   
97    # We should override this in the future to do something useful...
98    # def startedConnecting(self, connector):
99    #     pass
100       
101class _SSHKeyPortalRoot(pb._PortalRoot):
102    def rootObject(self, broker):
103        return _SSHKeyPortalWrapper(self.portal, broker)
104
105class _SSHKeyPortalWrapper(pb._PortalWrapper):
106    def remote_login(self, username, algorithm, blob, data, signature, mind):
107        pubkey_cred = credentials.SSHPrivateKey(username,
108                                                algorithm, 
109                                                blob,
110                                                data,
111                                                signature)
112
113        d = self.portal.login(pubkey_cred, mind, IPerspective)
114        d.addCallback(self._loggedIn)
115       
116        return d
117   
118    def remote_login_anonymous(self, mind):
119        d = self.portal.login(Anonymous(), mind, IPerspective)
120        d.addCallback(self._loggedIn)
121       
122        return d
123       
124    def _loggedIn(self, (interface, perspective, logout)):
125        if not IJellyable.providedBy(perspective):
126            perspective = AsReferenceable(perspective, "perspective")
127        self.broker.notifyOnDisconnect(logout)
128       
129        return perspective
130
131class DefaultPerspective(pb.Avatar):
132    """
133    Custom implementation of pb.Avatar.
134   
135    """
136   
137    def __init__(self, DSageServer, avatarID):
138        self.DSageServer = DSageServer
139        self.avatarID = avatarID
140        self.connections = 0
141               
142    def perspectiveMessageReceived(self, broker, message, args, kw):
143        self.broker = broker
144
145        return pb.Avatar.perspectiveMessageReceived(self, broker, 
146                                                    message, args, kw)
147
148    def attached(self, avatar, mind):
149        self.connections += 1
150        if isinstance(mind, tuple):
151            self.mind = mind
152            self.host_info = mind[1]
153            self.host_info['ip'] = mind[0].broker.transport.getPeer().host
154            self.host_info['port'] = mind[0].broker.transport.getPeer().port
155            uuid = self.host_info['uuid']
156            if self.DSageServer.monitordb.get_monitor(uuid) is None:
157                self.DSageServer.monitordb.add_monitor(self.host_info)
158            self.DSageServer.monitordb.set_connected(uuid, connected=True)
159        else:
160            self.DSageServer.clientdb.update_login_time(self.avatarID)
161            self.DSageServer.clientdb.set_connected(self.avatarID,
162                                                    connected=True)
163
164    def detached(self, avatar, mind):
165        self.connections -= 1
166        log.msg('%s disconnected' % (self.avatarID))
167        if isinstance(mind, tuple):
168            self.DSageServer.monitordb.set_connected(self.host_info['uuid'],
169                                                     connected=False)
170        else:
171            self.DSageServer.clientdb.set_connected(self.avatarID,
172                                                    connected=False)
173           
174class AnonymousMonitorPerspective(DefaultPerspective):
175    """
176    Defines the perspective of an anonymous worker.
177   
178    """
179   
180    def __init__(self, DSageServer, avatarID):
181        DefaultPerspective.__init__(self, DSageServer, avatarID)
182
183    def attached(self, avatar, mind):
184        self.connections += 1
185        self.mind = mind
186        self.host_info = mind[1]
187        self.host_info['ip'] = mind[0].broker.transport.getPeer().host
188        self.host_info['port'] = mind[0].broker.transport.getPeer().port
189        uuid = self.host_info['uuid']
190        if self.DSageServer.monitordb.get_monitor(uuid) is None:
191            self.DSageServer.monitordb.add_monitor(self.host_info)
192        self.DSageServer.monitordb.set_connected(uuid, connected=True)
193        self.DSageServer.monitordb.set_anonymous(uuid, anonymous=True)
194   
195    def detached(self, avatar, mind):
196        self.connections -= 1
197        log.msg('%s disconnected' % (self.avatarID))
198        self.DSageServer.monitordb.set_connected(self.host_info['uuid'],
199                                                 connected=False)
200           
201    def perspective_get_job(self):
202        """
203        Returns jobs only marked as doable by anonymous workers.
204       
205        """
206       
207        uuid = self.mind[1]['uuid']
208        jdict = self.DSageServer.get_job(anonymous=True)
209        if jdict is not None:
210            self.DSageServer.set_job_uuid(jdict['job_id'], uuid)
211            self.DSageServer.set_busy(uuid, busy=True)
212        else:
213            self.DSageServer.set_busy(uuid, busy=False)
214           
215        return jdict
216       
217    def perspective_get_killed_jobs_list(self):
218        return self.DSageServer.get_killed_jobs_list()
219
220   
221    def perspective_job_failed(self, job_id, traceback):
222        if not isinstance(job_id, str):
223            log.msg('Bad job_id %s' % (job_id))
224            raise BadTypeError()
225
226        uuid = self.mind[1]['uuid']
227        self.DSageServer.set_busy(uuid, busy=False)
228
229        return self.DSageServer.job_failed(job_id, traceback)
230       
231    def perspective_job_done(self, job_id, output, result, completed,
232                             cpu_time):
233        if not (isinstance(job_id, str) or isinstance(completed, bool)):
234            log.msg('Bad job_id passed to perspective_job_done')
235            log.msg('job_id: %s' % (job_id))
236            log.msg('output: %s' % (output))
237            log.msg('completed: %s' % (completed))
238            raise BadTypeError()
239        if completed:
240            uuid = self.mind[1]['uuid']
241            self.DSageServer.set_busy(uuid, busy=False)
242           
243        return self.DSageServer.job_done(job_id, output, result, completed,
244                                         cpu_time)
245
246class MonitorPerspective(AnonymousMonitorPerspective):
247    """
248    Defines the perspective of an authenticated worker to the server.
249   
250    """
251    def __init__(self, DSageServer, avatarID):
252        DefaultPerspective.__init__(self, DSageServer, avatarID)
253
254    def attached(self, avatar, mind):
255        self.connections += 1
256        self.mind = mind
257        self.host_info = mind[1]
258        self.host_info['ip'] = mind[0].broker.transport.getPeer().host
259        self.host_info['port'] = mind[0].broker.transport.getPeer().port
260        uuid = self.host_info['uuid']
261        if self.DSageServer.monitordb.get_monitor(uuid) is None:
262            self.DSageServer.monitordb.add_monitor(self.host_info)
263        self.DSageServer.monitordb.set_connected(uuid, connected=True)
264        self.DSageServer.monitordb.set_anonymous(uuid, anonymous=False)
265       
266    def perspective_get_job(self):
267        """
268        Returns jobs to authenticated workers.
269       
270        """
271       
272        try:
273            uuid = self.mind[1]['uuid']
274        except Exception, msg:
275            raise ValueError("Could not match a uuid to the monitor.")
276       
277        jdict = self.DSageServer.get_job(anonymous=False)
278        if jdict is not None:
279            self.DSageServer.set_job_uuid(jdict['job_id'], uuid)
280            self.DSageServer.set_busy(uuid, busy=True)
281        else:
282            self.DSageServer.set_busy(uuid, busy=False)
283           
284        return jdict
285
286class UserPerspective(DefaultPerspective):
287    """
288    Defines the perspective of a regular user to the server.
289   
290    """
291   
292    def __init__(self, DSageServer, avatarID):
293        DefaultPerspective.__init__(self, DSageServer, avatarID)
294       
295    def perspective_get_job_by_id(self, job_id):
296        if not isinstance(job_id, str):
297            log.msg('Bad job_id [%s] passed to get_job_by_id' % (job_id))
298            raise BadTypeError()
299        # log.msg('Returning job %s to %s' % (job_id, self.avatarID))
300        job = self.DSageServer.get_job_by_id(job_id)
301       
302        return job
303
304    def perspective_get_jobs_by_username(self, username, active=True):
305        if not (isinstance(username, str)):
306            log.msg('Bad username [%s] passed to ' +
307                    'perspective_get_jobs_by_username' % (username))
308            raise BadTypeError()
309
310        jobs = self.DSageServer.get_jobs_by_username(username, active)
311
312        return jobs
313
314    def perspective_get_job_result_by_id(self, job_id):
315        if not isinstance(job_id, str):
316            log.msg('Bad job_id [%s] passed to' +
317                    'perspective_get_job_result_by_id' % (job_id))
318            raise BadTypeError()
319           
320        return self.DSageServer.get_job_result_by_id(job_id)
321
322    def perspective_get_job_output_by_id(self, job_id):
323        if not isinstance(job_id, str):
324            log.msg('Bad job_id [%s] passed to ' + 
325                    'get_job_output_by_id' % (job_id))
326            raise BadTypeError()
327           
328        return self.DSageServer.get_job_output_by_id(job_id)
329
330    def perspective_sync_job(self, job_id):
331        if not isinstance(job_id, str):
332            return None
333           
334        return self.DSageServer.sync_job(job_id)
335
336    def perspective_submit_job(self, jdict):
337        if jdict is None:
338            raise BadJobError()
339        if jdict['username'] != self.avatarID:
340            log.msg('username does not match credentials')
341            log.msg('claim: %s' % jdict['username'])
342            log.msg('actual: %s' % self.avatarID)
343            raise BadJobError()
344           
345        return self.DSageServer.submit_job(jdict)
346
347    def perspective_kill_job(self, job_id, reason=None):
348        if not isinstance(job_id, str):
349            log.msg('Bad job_id [%s] passed to perspective_kill_job' % job_id)
350            raise BadTypeError()
351
352        return self.DSageServer.kill_job(job_id, reason)
353
354    def perspective_get_cluster_speed(self):
355        return self.DSageServer.get_cluster_speed()
356       
357    def perspective_get_monitor_list(self):
358        # return [x[1] for x in self.DSageServer.get_worker_list()]
359        return self.DSageServer.get_monitor_list()
360       
361    def perspective_get_client_list(self):
362        return self.DSageServer.get_client_list()
363   
364    def perspective_get_worker_count(self):
365        return self.DSageServer.get_worker_count()
366
367    def perspective_get_killed_jobs_list(self):
368        return self.DSageServer.get_killed_jobs_list()
369       
370class AdminPerspective(UserPerspective):
371    """
372    Defines the perspective of the admin.
373   
374    """
375
376    def __init__(self, DSageServer, avatarID):
377        UserPerspective.__init__(self, DSageServer, avatarID)
378               
379class Realm(object):
380    implements(portal.IRealm)
381
382    def __init__(self, DSageServer):
383        self.DSageServer = DSageServer
384        self.avatars = {}
385        self.max_connections = 100
386       
387    def requestAvatar(self, avatarID, mind, *interfaces):       
388        if not pb.IPerspective in interfaces:
389            raise NotImplementedError("No supported avatar interface.")
390        else:
391            if avatarID == 'admin':
392                kind = 'admin'
393            elif avatarID == 'Anonymous' and mind:
394                kind = 'anonymous_monitor'
395            elif mind:
396                kind = 'monitor'
397            else:
398                kind = 'client'
399               
400            if (avatarID, kind) in self.avatars.keys():
401                avatar = self.avatars[(avatarID, kind)]
402            else:
403                if kind == 'admin':
404                    avatar = AdminPerspective(self.DSageServer, avatarID)
405                elif kind == 'anonymous_monitor':
406                    avatar = AnonymousMonitorPerspective(self.DSageServer,
407                                                         avatarID)
408                elif kind == 'monitor':
409                    avatar = MonitorPerspective(self.DSageServer, avatarID)
410                elif kind == 'client':
411                    avatar = UserPerspective(self.DSageServer, avatarID)
412                self.avatars[(avatarID, kind)] = avatar
413        if avatar.connections >= self.max_connections:
414            raise ValueError('Too many connections for user %s' % avatarID)
415       
416        avatar.attached(avatar, mind)
417       
418        if kind == 'monitor':
419            log.msg('(%s, %s) id: %s connected' % (avatarID, kind,             
420                                                   mind[1]['uuid']))
421        else:
422            log.msg('(%s, %s) connected' % (avatarID, kind))
423   
424        return pb.IPerspective, avatar, lambda a=avatar:a.detached(avatar,
425                                                                   mind)
Note: See TracBrowser for help on using the repository browser.