source: sage/dsage/twisted/pb.py @ 3829:adb48a8dd468

Revision 3829:adb48a8dd468, 14.1 KB checked in by Yi Qiang <yqiang@…>, 6 years ago (diff)

Added job failed to anonymous perspective.

Line 
1############################################################################
2#                                                                     
3#   DSAGE: Distributed SAGE                     
4#                                                                             
5#       Copyright (C) 2006, 2007 Yi Qiang <yqiang@gmail.com>               
6#                                                                           
7#  Distributed under the terms of the GNU General Public License (GPL)       
8#
9#    This code is distributed in the hope that it will be useful,
10#    but WITHOUT ANY WARRANTY; without even the implied warranty of
11#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12#    General Public License for more details.
13#
14#  The full text of the GPL is available at:
15#
16#                  http://www.gnu.org/licenses/
17############################################################################
18
19from twisted.spread import pb
20from zope.interface import implements
21from twisted.cred import portal, credentials
22from twisted.cred.credentials import ISSHPrivateKey
23from twisted.cred.credentials import Anonymous
24from twisted.spread.interfaces import IJellyable
25from twisted.spread.pb import IPerspective, AsReferenceable
26from twisted.python import log
27
28from sage.dsage.misc.hostinfo import HostInfo
29import sage.dsage.server.worker_tracker as worker_tracker
30from sage.dsage.errors.exceptions import BadTypeError, BadJobError
31
32pb.setUnjellyableForClass(HostInfo, HostInfo)
33
34class WorkerPBServerFactory(pb.PBServerFactory):
35    r"""
36    This factory serves workers requests.
37   
38    """
39   
40    def __init__(self, root):
41        pb.PBServerFactory.__init__(self, root)
42       
43    def clientConnectionMade(self, broker):
44        """Keeps a 3-tuple of connected workers.
45        tuple[0] - the broker
46        tuple[1] - the worker ip
47        tuple[2] - the worker port
48
49        """
50       
51        broker.notifyOnDisconnect(self.clientConnectionLost)
52        worker_tracker.add((broker,
53                            broker.transport.getPeer().host, 
54                            broker.transport.getPeer().port))
55   
56    def clientConnectionLost(self):
57        for broker, host, port in worker_tracker.worker_list:
58            if broker.transport.disconnected:
59                worker_tracker.remove((broker, host, port))
60       
61class PBClientFactory(pb.PBClientFactory):
62    r"""
63    Custom implementation of the PBClientFactory that supports logging in
64    with public key as well as anonymous credentials.
65   
66    """
67   
68    def login(self, creds, mind=None):
69        if ISSHPrivateKey.providedBy(creds):
70            d = self.getRootObject()
71            d.addCallback(self._cbSendUsername, 
72                          creds.username, 
73                          creds.algName, 
74                          creds.blob, 
75                          creds.sigData,
76                          creds.signature,
77                          mind)
78
79            return d
80        else:
81            d = self.getRootObject()
82            d.addCallback(self._cbAnonymousLogin, mind)
83            return d
84
85    def _cbSendUsername(self, root, username, alg_name, blob, sig_data,
86                        signature, mind):
87        d = root.callRemote("login", username, alg_name, blob, sig_data,
88                                signature, mind)
89        return d
90
91    def _cbAnonymousLogin(self, root, mind):
92        d = root.callRemote("login_anonymous", mind)
93       
94        return d
95       
96class _SSHKeyPortalRoot(pb._PortalRoot):
97    def rootObject(self, broker):
98        return _SSHKeyPortalWrapper(self.portal, broker)
99
100class _SSHKeyPortalWrapper(pb._PortalWrapper):
101    def remote_login(self, username, alg_name, blob, data, signature, mind):
102        pubkey_cred = credentials.SSHPrivateKey(username,
103                                                alg_name, 
104                                                blob,
105                                                data,
106                                                signature)
107
108        d = self.portal.login(pubkey_cred, mind, IPerspective)
109        d.addCallback(self._loggedIn)
110       
111        return d
112   
113    def remote_login_anonymous(self, mind):
114        d = self.portal.login(Anonymous(), mind, IPerspective)
115        d.addCallback(self._loggedIn)
116       
117        return d
118       
119    def _loggedIn(self, (interface, perspective, logout)):
120        if not IJellyable.providedBy(perspective):
121            perspective = AsReferenceable(perspective, "perspective")
122        self.broker.notifyOnDisconnect(logout)
123       
124        return perspective
125
126class DefaultPerspective(pb.Avatar):
127    r"""
128    Custom implementation of pb.Avatar so we can keep track of the broker.
129   
130    """
131
132    current_connections = 0
133   
134    def __init__(self, DSageServer, avatarID):
135        self.DSageServer = DSageServer
136        self.avatarID = avatarID
137       
138        log.msg('%s connected' % self.avatarID)
139       
140    def perspectiveMessageReceived(self, broker, message, args, kw):
141        self.broker = broker
142
143        return pb.Avatar.perspectiveMessageReceived(self, broker, 
144                                                    message, args, kw)
145
146    def attached(self, avatar, mind):
147        self.current_connections += 1
148        if isinstance(mind, tuple):
149            self.mind = mind
150            self.host_info = mind[1]
151            self.host_info['ip'] = mind[0].broker.transport.getPeer().host
152            self.host_info['port'] = mind[0].broker.transport.getPeer().port
153            uuid = self.host_info['uuid']
154            if self.DSageServer.monitordb.get_monitor(uuid) is None:
155                self.DSageServer.monitordb.add_monitor(self.host_info)
156            self.DSageServer.monitordb.set_connected(uuid, connected=True)
157        else:
158            self.DSageServer.clientdb.update_login_time(self.avatarID)
159            self.DSageServer.clientdb.set_connected(self.avatarID, connected=True)
160
161    def detached(self, avatar, mind):
162        self.current_connections -= 1
163        log.msg('%s disconnected' % (self.avatarID))
164        if isinstance(mind, tuple):
165            self.DSageServer.monitordb.set_connected(self.host_info['uuid'], connected=False)
166        else:
167            self.DSageServer.clientdb.set_connected(self.avatarID, connected=False)
168           
169class AnonymousMonitorPerspective(DefaultPerspective):
170    r"""
171    Defines the perspective of an anonymous worker.
172   
173    """
174   
175    def __init__(self, DSageServer, avatarID):
176        DefaultPerspective.__init__(self, DSageServer, avatarID)
177           
178    def perspective_get_job(self):
179        r"""
180        Returns jobs only marked as doable by anonymous workers.
181       
182        """
183       
184        uuid = self.mind[1]['uuid']
185        jdict = self.DSageServer.get_job(anonymous=True)
186        if jdict is not None:
187            self.DSageServer.set_job_uuid(jdict['job_id'], uuid)
188            self.DSageServer.set_busy(uuid, busy=True)
189        else:
190            self.DSageServer.set_busy(uuid, busy=False)
191           
192        return jdict
193       
194    def perspective_get_killed_jobs_list(self):
195        return self.DSageServer.get_killed_jobs_list()
196
197   
198    def perspective_job_failed(self, job_id):
199        if not isinstance(job_id, str):
200            print 'Bad job_id [%s] passed to perspective_job_failed' % (job_id)
201            raise BadTypeError()
202
203        uuid = self.mind[1]['uuid']
204        self.DSageServer.set_busy(uuid, busy=False)
205
206        return self.DSageServer.job_failed(job_id)
207   
208    def perspective_job_done(self, job_id, output, 
209                             result, completed, worker_info):
210        if not (isinstance(job_id, str) or isinstance(completed, bool)):
211            print 'Bad job_id passed to perspective_job_done'
212            raise BadTypeError()
213       
214        return self.DSageServer.job_done(job_id, output, result, 
215                                         completed, worker_info)
216                                 
217class MonitorPerspective(DefaultPerspective):
218    r"""
219    Defines the perspective of an authenticated worker to the server.
220   
221    """
222    def __init__(self, DSageServer, avatarID):
223        DefaultPerspective.__init__(self, DSageServer, avatarID)
224       
225    def perspective_get_job(self):
226        r"""
227        Returns jobs to authenticated workers.
228       
229        """
230       
231        uuid = self.mind[1]['uuid']
232        jdict = self.DSageServer.get_job(anonymous=False, uuid=uuid)
233        if jdict is not None:
234            self.DSageServer.set_job_uuid(jdict['job_id'], uuid)
235            self.DSageServer.set_busy(uuid, busy=True)
236        else:
237            self.DSageServer.set_busy(uuid, busy=False)
238           
239        return jdict
240
241    def perspective_job_done(self, job_id, output, result, 
242                            completed, worker_info):
243        if not (isinstance(job_id, str) or isinstance(completed, bool)):
244            print 'Bad job_id passed to perspective_job_done'
245            print 'job_id: %s' % (job_id)
246            print 'output: %s' % (output)
247            # print 'result: %s' % (result)
248            print 'completed: %s' % (completed)
249            print 'worker_info: %s' % (worker_info)
250            raise BadTypeError()
251        if completed:
252            uuid = self.mind[1]['uuid']
253            self.DSageServer.set_busy(uuid, busy=False)
254           
255        return self.DSageServer.job_done(job_id, output, result, 
256                                  completed, worker_info)
257
258    def perspective_job_failed(self, job_id):
259        if not isinstance(job_id, str):
260            print 'Bad job_id [%s] passed to perspective_job_failed' % (job_id)
261            raise BadTypeError()
262       
263        uuid = self.mind[1]['uuid']
264        self.DSageServer.set_busy(uuid, busy=False)
265       
266        return self.DSageServer.job_failed(job_id)
267   
268    def perspective_submit_host_info(self, hostinfo):
269        if not isinstance(hostinfo, dict):
270            raise BadTypeError()
271        return self.DSageServer.submit_host_info(hostinfo)
272
273    def perspective_get_killed_jobs_list(self):
274        return self.DSageServer.get_killed_jobs_list()   
275       
276class UserPerspective(DefaultPerspective):
277    r"""
278    Defines the perspective of a regular user to the server.
279   
280    """
281    def __init__(self, DSageServer, avatarID):
282        DefaultPerspective.__init__(self, DSageServer, avatarID)
283       
284    def perspective_get_job_by_id(self, job_id):
285        if not isinstance(job_id, str):
286            print 'Bad job_id [%s] passed to get_job_by_id' % (job_id)
287            raise BadTypeError()
288        # log.msg('Returning job %s to %s' % (job_id, self.avatarID))
289        job = self.DSageServer.get_job_by_id(job_id)
290       
291        return job
292
293    def perspective_get_jobs_by_username(self, username):
294        if not (isinstance(username, str)):
295            print 'Bad username [%s] passed to perspective_get_jobs_by_username' % (username)
296            raise BadTypeError()
297
298        jobs = self.DSageServer.get_jobs_by_username(username)
299
300        return jobs
301
302    def perspective_get_job_result_by_id(self, job_id):
303        if not isinstance(job_id, str):
304            print 'Bad job_id [%s] passed to perspective_get_job_result_by_id' % (job_id)
305            raise BadTypeError()
306           
307        return self.DSageServer.get_job_result_by_id(job_id)
308
309    def perspective_get_job_output_by_id(self, job_id):
310        if not isinstance(job_id, str):
311            print 'Bad job_id [%s] passed to get_job_output_by_id' % (job_id)
312            raise BadTypeError()
313           
314        return self.DSageServer.get_job_output_by_id(job_id)
315
316    def perspective_sync_job(self, job_id):
317        if not isinstance(job_id, str):
318            return None
319           
320        return self.DSageServer.sync_job(job_id)
321
322    def perspective_submit_job(self, jdict):
323        if jdict is None:
324            raise BadJobError()
325        if jdict['username'] != self.avatarID:
326            print 'username does not match credentials'
327            print 'claim: %s\n actual:%s' % (jdict['username'], self.avatarID)
328            raise BadJobError()
329           
330        return self.DSageServer.submit_job(jdict)
331
332    def perspective_kill_job(self, job_id, reason=None):
333        if not isinstance(job_id, str):
334            print 'Bad job_id [%s] passed to perspective_kill_job' % job_id
335            raise BadTypeError()
336
337        return self.DSageServer.kill_job(job_id, reason)
338
339    def perspective_get_cluster_speed(self):
340        return self.DSageServer.get_cluster_speed()
341       
342    def perspective_get_monitor_list(self):
343        # return [x[1] for x in self.DSageServer.get_worker_list()]
344        return self.DSageServer.get_monitor_list()
345       
346    def perspective_get_client_list(self):
347        return self.DSageServer.get_client_list()
348   
349    def perspective_submit_host_info(self, hostinfo):
350        if not isinstance(hostinfo, dict):
351            raise BadTypeError()
352        return self.DSageServer.submit_host_info(hostinfo)
353
354    def perspective_get_killed_jobs_list(self):
355        return self.DSageServer.get_killed_jobs_list()
356       
357class AdminPerspective(UserPerspective):
358    r"""
359    Defines the perspective of the admin.
360   
361    """
362
363    def __init__(self, DSageServer, avatarID):
364        UserPerspective.__init__(self, DSageServer, avatarID)
365               
366class Realm(object):
367    implements(portal.IRealm)
368
369    def __init__(self, DSageServer):
370        self.DSageServer = DSageServer
371
372    def requestAvatar(self, avatarID, mind, *interfaces):       
373        if not pb.IPerspective in interfaces:
374            raise NotImplementedError, "No supported avatar interface."
375        else:
376            if avatarID == 'admin':
377                avatar = AdminPerspective(self.DSageServer, avatarID)
378            elif avatarID == 'Anonymous' and mind:
379                avatar = AnonymousMonitorPerspective(self.DSageServer,
380                                                     avatarID)
381            elif mind:
382                avatar = MonitorPerspective(self.DSageServer, avatarID)
383            else:
384                avatar = UserPerspective(self.DSageServer, avatarID)
385        avatar.attached(avatar, mind)
386        return pb.IPerspective, avatar, lambda a=avatar:a.detached(avatar, 
387                                                                   mind)
Note: See TracBrowser for help on using the repository browser.