Ticket #3600: dsage_process_pool.patch

File dsage_process_pool.patch, 168.1 KB (added by yi, 21 months ago)
  • sage/dsage/database/client.py

    # HG changeset patch
    # User Yi Qiang <yqiang@gmail.com>
    # Date 1215477042 25200
    # Node ID 00c15d56813e4783a5cbedad933a13769dadd6c7
    # Parent  9fe444b1d4ac3c714604409462eae7421ee44013
    [mq]: dsage_process_pool.patch
    
    diff --git a/sage/dsage/database/client.py b/sage/dsage/database/client.py
    a b  
    11class Client(object): 
    22    """ 
    33    A client of the dsage server. 
    4      
     4 
    55    """ 
    6      
     6 
    77    def __init__(self, username, public_key): 
    88        """ 
    99        :type username: string 
    1010        :param username: username 
    11          
     11 
    1212        :type public_key: string 
    1313        :param public_key: public key of user 
    1414 
    1515        """ 
    16          
     16 
    1717        self.username = username 
    1818        self.public_key = public_key 
    1919        self.creation_time = None 
     
    2121        self.last_login = None 
    2222        self.connected = None 
    2323        self.enabled = None 
    24          
     24 
    2525    def get_username(self): 
    2626        return self.username 
    27      
     27 
    2828    def get_public_key(self): 
    2929        return self.public_key 
    30      
     30 
    3131    def is_connected(self): 
    3232        return self.connected 
    33      
     33 
    3434    def is_enabled(self): 
    35         return self.enabled 
    36  No newline at end of file 
     35        return self.enabled 
  • sage/dsage/database/job.py

    diff --git a/sage/dsage/database/job.py b/sage/dsage/database/job.py
    a b  
    11############################################################################ 
    2 #                                                                      
    3 #   DSAGE: Distributed SAGE                      
    4 #                                                                              
    5 #       Copyright (C) 2006, 2007 Yi Qiang <yqiang@gmail.com>                
    6 #                                                                             
    7 #  Distributed under the terms of the GNU General Public License (GPL)         
     2# 
     3#   DSAGE: Distributed SAGE 
     4# 
     5#       Copyright (C) 2006, 2007 Yi Qiang <yqiang@gmail.com> 
     6# 
     7#  Distributed under the terms of the GNU General Public License (GPL) 
    88# 
    99#    This code is distributed in the hope that it will be useful, 
    1010#    but WITHOUT ANY WARRANTY; without even the implied warranty of 
     
    2929class Job(object): 
    3030    """ 
    3131    Defines a Job that gets distributed to clients. 
    32      
     32 
    3333    """ 
    3434 
    35     def __init__(self, job_id=None, name='Unamed', username=getuser(), 
     35    def __init__(self, job_id=None, name=None, username=getuser(), 
    3636                 code='', timeout=0, kind='sage', priority=5): 
    3737        """ 
    3838        Represents a job. 
    39          
     39 
    4040        :type job_id: string 
    4141        :param job_id: unique identifier for a job 
    42          
     42 
    4343        :type name: string 
    4444        :param name: name given to a job, not unique 
    45          
     45 
    4646        :type code: string 
    4747        :param code: the code that needs to be executed 
    48          
     48 
    4949        :type parent: string 
    50         :param parent: the job_id of another job  
    51          
     50        :param parent: the job_id of another job 
     51 
    5252        :type username: string 
    5353        :param username: username of person who created job 
    54          
     54 
    5555        :type timeout: integer 
    5656        :param timeout: upper bound for number of seconds this job takes 
    57          
     57 
    5858        :type priority: integer 
    5959        :param priority: a jobs priority from 1-5, 1 being the highest 
    60          
     60 
    6161        :type kind: string 
    62         :param kind: kind of the job (file, string, generator)  
     62        :param kind: kind of the job (file, string, generator) 
    6363 
    6464        """ 
    65          
     65 
    6666        self.job_id = job_id 
    67         self.name = name 
     67        if name is None: 
     68            self.name = 'Unamed' 
     69        else: 
     70            self.name = name 
    6871        self.username = username 
    6972        self.code = code 
    7073        self.timeout = timeout 
     
    8588 
    8689    def __str__(self): 
    8790        return "<Job('%s', %s)>" % (self.job_id, self.username) 
    88      
     91 
    8992    def __repr__(self): 
    9093        return self.__str__() 
    91      
     94 
    9295    def attach(self, var, obj, file_name=None): 
    9396        """ 
    9497        Attaches an object to a job. 
    95          
     98 
    9699        Parameters: 
    97100        var -- the variable name you'd like the worker to use 
    98101        obj -- the object you want to attach 
    99102        filename -- optional, if your object is a saved sobj 
    100          
     103 
    101104        """ 
    102          
     105 
    103106        if file_name is not None: 
    104107            try: 
    105108                s = open(file_name, 'rb').read() 
     
    114117            except cPickle.PicklingError: 
    115118                print 'Unable to attach your object.' 
    116119        self.data.append((var, s, 'object')) 
    117          
     120 
    118121    def attach_file(self, file_name): 
    119122        """ 
    120123        Attach a file to a job. 
    121          
     124 
    122125        Parameters: 
    123126        file_name -- obvious 
    124          
     127 
    125128        """ 
    126          
     129 
    127130        f = open(file_name, 'rb').read() 
    128131        f = zlib.compress(f) 
    129          
     132 
    130133        # Strip out any hard coded path in the file name 
    131134        file_name = os.path.split(file_name)[1] 
    132135        self.data.append((file_name, f, 'file')) 
    133      
     136 
    134137    def _reduce(self): 
    135138        """ 
    136139        Returns a _reduced form of Job.jdict to be sent over the network. 
    137          
     140 
    138141        """ 
    139          
     142 
    140143        # dump and compress the data of the job 
    141144        jdict = copy.deepcopy(self.__dict__) 
    142145        jdict['data'] = cPickle.dumps(self.data, 2) 
    143146        jdict['data'] = zlib.compress(jdict['data']) 
    144147        # We do not compress jdict['result'] since it's already compressed 
    145         jdict['result'] = self.result  
    146          
     148        jdict['result'] = self.result 
     149 
    147150        # Remove attributes that sqlalchemy put there for us 
    148151        for key in jdict.keys(): 
    149152            if key.startswith('_'): 
    150153                del jdict[key] 
    151                  
     154 
    152155        return jdict 
     156 
    153157 
    154158def expand_job(jdict): 
    155159    """ 
    156160    This method recreates a Job object given a jdict. 
    157      
     161 
    158162    :type jdict: dictionary 
    159163    :param jdict: the job dictionary 
    160      
    161164    """ 
    162      
     165 
    163166    if jdict is None: 
    164167        return None 
    165      
     168 
    166169    job_id = jdict['job_id'] 
    167170    job = Job(job_id=job_id) 
    168      
     171 
    169172    # decompress and load data 
    170173    try: 
    171174        jdict['data'] = zlib.decompress(jdict['data']) 
    172175        jdict['data'] = cPickle.loads(jdict['data']) 
    173     except (KeyError, TypeError): 
     176    except Exception, msg: 
     177        print Exception, msg 
    174178        jdict['data'] = None 
    175179 
    176180    try: 
    177181        jdict['result'] = zlib.decompress(jdict['result']) 
    178182        jdict['result'] = cPickle.loads(jdict['result']) 
    179     except (KeyError, TypeError): 
     183    # except (KeyError, TypeError): 
     184    except: 
    180185        jdict['result'] = None 
    181          
     186 
    182187    for k, v in jdict.iteritems(): 
    183188        setattr(job, k, v) 
    184189 
    185     return job 
    186  No newline at end of file 
     190    return job 
  • sage/dsage/database/jobdb.py

    diff --git a/sage/dsage/database/jobdb.py b/sage/dsage/database/jobdb.py
    a b  
    9393        """ 
    9494         
    9595 
     96class JobDatabaseSA(object): 
     97    """ 
     98    I implement the JobDatabase using SQLAlchemy. 
     99    """ 
    96100 
    97 class JobDatabaseSA(object): 
    98101    implements(IJobDatabase) 
    99102     
    100103    def __init__(self, Session): 
    101104        self.sess = Session() 
    102         self.failure_threshold = 5 
     105        self.failure_threshold = 3 
    103106         
    104107    def _shutdown(self): 
    105108        self.sess.close() 
     
    137140    def update_job(self, job): 
    138141        """ 
    139142        Takes a job object and updates it in the database. 
    140          
    141143        """ 
    142144         
    143145        self.sess.save_or_update(job) 
    144146        self.sess.commit() 
    145147         
    146148    def store_jdict(self, jdict): 
     149        """ 
     150        I store the jdict to the database. 
     151        """ 
     152 
    147153        try: 
    148154            job_id = jdict['job_id'] 
    149             assert job_id != None 
    150155            job = self.sess.query(Job).filter_by(job_id=job_id).first() 
    151156            for k,v in jdict.iteritems(): 
    152157                setattr(job, k, v) 
     
    328333         
    329334        Parameters: 
    330335        jdict -- sage.dsage.database.Job.jdict 
    331          
    332336        """ 
    333          
     337 
    334338        try: 
    335339            job_id = jdict['job_id'] 
    336340        except KeyError, msg: 
     
    475479         
    476480        """ 
    477481         
    478         return self._get_jobs_by_parameter('status', 'processing') 
    479  No newline at end of file 
     482        return self._get_jobs_by_parameter('status', 'processing') 
  • sage/dsage/database/worker.py

    diff --git a/sage/dsage/database/worker.py b/sage/dsage/database/worker.py
    a b  
     1""" 
     2Worker 
     3""" 
     4 
     5 
    16class Worker(object): 
     7 
    28    def __init__(self, host_info): 
    39        for k, v in host_info.iteritems(): 
    410            setattr(self, k, v) 
    5          
    6  No newline at end of file 
  • sage/dsage/database/workerdb.py

    diff --git a/sage/dsage/database/workerdb.py b/sage/dsage/database/workerdb.py
    a b  
    2222from sage.dsage.misc.constants import SERVER_LOG 
    2323from sage.dsage.database.worker import Worker 
    2424 
     25 
    2526class WorkerDatabaseSA(object): 
     27    """ 
     28    I implement the WorkerDatabase using SQLAlchemy. 
     29    """ 
     30 
    2631    def __init__(self, Session): 
    2732        self.sess = Session() 
    2833        self._set_initial_state() 
    29          
     34 
    3035    def _set_initial_state(self): 
    3136        w = self.sess.query(Worker).all() 
    3237        for _w in w: 
    3338            _w.connected = False 
    3439            self.sess.save_or_update(_w) 
    3540        self.sess.commit() 
    36          
     41 
    3742    def set_authenticated(self, uuid, authenticated): 
    3843        w = self.sess.query(Worker).filter_by(uuid=uuid).first() 
    3944        w.authenticated = authenticated 
    4045        self.sess.save_or_update(w) 
    4146        self.sess.commit() 
    42      
     47 
    4348    def set_busy(self, uuid, busy): 
    4449        w = self.sess.query(Worker).filter_by(uuid=uuid).first() 
    4550        w.busy = busy 
    4651        self.sess.save_or_update(w) 
    4752        self.sess.commit() 
    48      
     53 
    4954    def add_worker(self, host_info): 
    5055        w = Worker(host_info) 
    5156        self.sess.save(w) 
    5257        self.sess.commit() 
    53      
     58 
    5459    def update_worker(self, host_info): 
    5560        uuid = host_info['uuid'] 
    5661        w = self.sess.query(Worker).filter_by(uuid=uuid).first() 
     
    5863            setattr(w, k, v) 
    5964        self.sess.save_or_update(w) 
    6065        self.sess.commit() 
    61      
     66 
    6267    def get_worker(self, uuid): 
    6368        w = self.sess.query(Worker).filter_by(uuid=uuid).first() 
    64          
     69 
    6570        return w 
    66      
    67     def get_worker_list(self): 
    68         w = self.sess.query(Worker).all() 
    69          
     71 
     72    def get_worker_list(self, filter={}): 
     73        w = self.sess.query(Worker).filter_by(**filter).all() 
     74 
    7075        return w 
    71      
     76 
    7277    def get_worker_by_job_id(self, job_id): 
    7378        w = self.sess.query(Worker).filter_by(job_id=job_id) 
    7479 
    7580    def get_online_workers(self): 
    7681        w = self.sess.query(Worker).filter_by(connected=True).all() 
     82 
     83        return w 
     84     
     85    def get_avail_workers(self): 
     86        w = self.sess.query(Worker).filter_by(busy=False, connected=True).all() 
    7787         
    7888        return w 
    79      
     89         
    8090    def get_worker_count(self, connected, busy): 
    81         q = self.sess.query(Worker).filter_by(connected=connected,busy=busy) 
     91        q = self.sess.query(Worker).filter_by(connected=connected, busy=busy) 
    8292        workers = q.all() 
    83          
     93 
    8494        count = sum([w.workers for w in workers]) 
    85          
     95 
    8696        return count 
    87      
     97 
    8898    def get_cpu_speed(self, connected, busy): 
    8999        w = self.sess.query(Worker).all() 
    90          
     100 
    91101        return sum([_w.cpu_speed * _w.cpus for _w in w]) 
    92          
     102 
    93103    def set_connected(self, uuid, connected): 
    94104        w = self.sess.query(Worker).filter_by(uuid=uuid).first() 
    95105        w.connected = connected 
    96106        self.sess.save_or_update(w) 
    97107        self.sess.commit() 
    98      
     108 
    99109 
    100110class WorkerDatabase(object): 
    101111    """ 
    102     This table keeps track of workers. 
    103      
     112    I implement the WorkerDatabase using raw sqlite.  
    104113    """ 
    105      
     114 
    106115    def __init__(self, db_conn, log_file=SERVER_LOG, log_level=0): 
    107116        self.log_file = log_file 
    108117        self.log_level = log_level 
    109118        self.con = db_conn 
    110119        self.tablename = 'monitors' 
    111      
     120 
    112121    def _set_parameter(self, uuid, key, value): 
    113122        query = """UPDATE monitors 
    114123        SET %s=? 
     
    116125        cur = self.con.cursor() 
    117126        cur.execute(query, (value, uuid)) 
    118127        self.con.commit() 
    119      
     128 
    120129    def set_authenticated(self, uuid, authenticated): 
    121130        return self._set_parameter(uuid, 'authenticated', authenticated) 
    122      
     131 
    123132    def add_worker(self, host_info): 
    124133        query = """INSERT INTO monitors 
    125134        (uuid, 
     
    137146         mem_free) 
    138147        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 
    139148        """ 
    140          
     149 
    141150        uuid = host_info['uuid'] 
    142151        username = host_info['username'] 
    143152        hostname = host_info['hostname'] 
     
    151160        cpu_model = host_info['cpu_model'] 
    152161        mem_total = host_info['mem_total'] 
    153162        mem_free = host_info['mem_free'] 
    154          
     163 
    155164        cur = self.con.cursor() 
    156165        cur.execute(query, (uuid, username, hostname, ip, workers, 
    157166                            sage_version, os_, kernel_version, cpus, 
    158167                            cpu_speed, cpu_model, mem_total, mem_free)) 
    159168        self.con.commit() 
    160      
     169 
    161170    def update_worker(self, host_info): 
    162171        query = """UPDATE monitors 
    163172        SET hostname = ?, username = ?, ip = ?, workers = ?, sage_version = ?, 
    164173        os = ?, kernel_version = ?, cpus = ?, cpu_speed = ?, cpu_model = ?, 
    165174        mem_total = ?, mem_free = ? WHERE uuid = ? 
    166175        """ 
    167          
     176 
    168177        uuid = host_info['uuid'] 
    169178        username = host_info['username'] 
    170179        hostname = host_info['hostname'] 
     
    178187        cpu_model = host_info['cpu_model'] 
    179188        mem_total = host_info['mem_total'] 
    180189        mem_free = host_info['mem_free'] 
    181          
     190 
    182191        cur = self.con.cursor() 
    183192        cur.execute(query, (hostname, username, ip, workers, sage_version, 
    184193                            os_, kernel_version, cpus, cpu_speed, cpu_model, 
    185194                            mem_total, mem_free, uuid)) 
    186      
     195 
    187196    def get_worker(self, uuid): 
    188197        query = """SELECT 
    189198        uuid, 
     
    195204        os 
    196205        FROM monitors 
    197206        WHERE uuid = ?""" 
    198          
     207 
    199208        cur = self.con.cursor() 
    200         cur.execute(query, (uuid,)) 
     209        cur.execute(query, (uuid, )) 
    201210        result = cur.fetchone() 
    202211        if result is None: 
    203212            return result 
     
    206215        for k, v in monitor.iteritems(): 
    207216            if k == 'authenticated': 
    208217                monitor[k] = bool(v) 
    209          
     218 
    210219        return monitor 
    211      
     220 
    212221    def get_worker_list(self): 
    213222        """ 
    214223        Returns a list of connected monitors. 
    215          
     224 
    216225        """ 
    217          
     226 
    218227        query = """SELECT * FROM monitors""" 
    219228        cur = self.con.cursor() 
    220229        cur.execute(query) 
     
    226235                 # Convert from 1/0 to python bool 
    227236                if k in ('authenticated', 'connected', 'busy'): 
    228237                    monitor[k] = bool(v) 
    229          
     238 
    230239        return monitors 
    231      
     240 
    232241    def set_connected(self, uuid, connected=True): 
    233242        """ 
    234243        Sets the connected status of a monitor. 
    235          
     244 
    236245        Parameters: 
    237246        uuid -- string 
    238247        connected -- bool 
    239          
     248 
    240249        """ 
    241          
     250 
    242251        cur = self.con.cursor() 
    243252        if connected: 
    244253            query = """UPDATE monitors SET connected=1, last_connection=? 
     
    246255            cur.execute(query, (datetime.datetime.now(), uuid)) 
    247256        else: 
    248257            query = """UPDATE monitors SET connected=0 WHERE uuid=?""" 
    249             cur.execute(query, (uuid,)) 
    250          
     258            cur.execute(query, (uuid, )) 
     259 
    251260        self.con.commit() 
    252      
     261 
    253262    def is_connected(self, uuid): 
    254263        """ 
    255264        Returns whether the monitor is connected. 
    256          
     265 
    257266        """ 
    258          
     267 
    259268        query = """SELECT connected FROM monitors WHERE uuid = ?""" 
    260269        cur = self.con.cursor() 
    261         cur.execute(query, (uuid,)) 
     270        cur.execute(query, (uuid, )) 
    262271        result = cur.fetchone()[0] 
    263          
     272 
    264273        return result 
    265      
     274 
    266275    def set_busy(self, uuid, busy): 
    267276        """ 
    268277        Sets whether or not a worker is doing a job. 
    269          
     278 
    270279        """ 
    271          
     280 
    272281        if busy: 
    273282            query = """UPDATE monitors SET busy=1 WHERE uuid=?""" 
    274283        else: 
    275284            query = """UPDATE monitors SET busy=0 WHERE uuid=?""" 
    276          
     285 
    277286        cur = self.con.cursor() 
    278         cur.execute(query, (uuid,)) 
     287        cur.execute(query, (uuid, )) 
    279288        self.con.commit() 
    280      
     289 
    281290    def get_worker_count(self, connected, busy=False): 
    282291        """ 
    283292        Returns the number of workers. 
    284          
     293 
    285294        Parameters: 
    286295        connected -- bool 
    287296        busy -- bool 
    288          
     297 
    289298        """ 
    290          
     299 
    291300        if connected and not busy: 
    292301            query = """ 
    293302            SELECT workers FROM monitors WHERE connected AND NOT busy 
     
    304313            query = """ 
    305314            SELECT workers FROM monitors WHERE NOT connected AND busy 
    306315            """ 
    307          
     316 
    308317        cur = self.con.cursor() 
    309318        cur.execute(query) 
    310          
     319 
    311320        result = cur.fetchall() 
    312          
     321 
    313322        return sum(w[0] for w in result) 
    314      
     323 
    315324    def get_cpu_speed(self, connected=True, busy=False): 
    316325        """ 
    317326        Returns the aggregate cpu speed in Mhz. 
    318          
     327 
    319328        Parameters: 
    320329        connected -- bool 
    321          
     330 
    322331        """ 
    323          
     332 
    324333        if connected and busy: 
    325334            query = """SELECT cpu_speed, workers FROM monitors 
    326335            WHERE connected AND busy""" 
     
    329338            WHERE connected""" 
    330339        else: 
    331340            query = """SELECT cpu_speed, workers FROM monitors""" 
    332          
     341 
    333342        cur = self.con.cursor() 
    334343        cur.execute(query) 
    335          
     344 
    336345        result = cur.fetchall() 
    337          
     346 
    338347        cpu_speed = sum([s[0]*s[1] for s in result]) 
    339          
     348 
    340349        return cpu_speed 
    341      
     350 
    342351    def get_cpu_count(self, connected=True): 
    343352        """ 
    344353        Returns the number of cpus that are available. 
    345          
     354 
    346355        Parameters: 
    347356        connected -- bool 
    348          
     357 
    349358        """ 
    350          
     359 
    351360        if connected: 
    352361            query = """SELECT workers, cpus FROM monitors WHERE connected""" 
    353362        else: 
    354363            query = """SELECT workers, cpus FROM monitors""" 
    355          
     364 
    356365        cur = self.con.cursor() 
    357366        cur.execute(query) 
    358          
     367 
    359368        result = cur.fetchall() 
    360          
     369 
    361370        cpu_count = sum(min(s[0:2]) for s in result) 
    362          
    363         return cpu_count 
    364  No newline at end of file 
     371 
     372        return cpu_count 
  • sage/dsage/dist_functions/dist_factor.py

    diff --git a/sage/dsage/dist_functions/dist_factor.py b/sage/dsage/dist_functions/dist_factor.py
    a b  
    1616           Robert Bradshaw 
    1717           Yi Qiang 
    1818     
    19      
    20     sage: d = dsage.start_all(verbose=False, workers=4) # long time 
     19    sage: from sage.dsage.misc.misc import test_dsage  
     20    sage: d = test_dsage(dsage) 
    2121    Going into testing mode... 
    22     sage: sleep(5) # long time 
    23     sage: f = DistributedFactor(d, 2^125-1) # long time 
    24     sage: print f # long time  
     22    sage: f = DistributedFactor(d, 2^125-1)  
     23    sage: print f   
    2524    Factoring "42535295865117307932921825928971026431"  
    2625    Prime factors found so far: [31, 601, 1801] 
    27     sage: f.done # long time 
     26    sage: f.done  
    2827    False 
    29     sage: f.wait(timeout=60) # long time 
    30     sage: f.done # long time 
     28    sage: f.wait(timeout=60) 
     29    sage: f.done  
    3130    True 
    32     sage: print f # long time 
     31    sage: print f  
    3332    Factoring "42535295865117307932921825928971026431" 
    3433    Prime factors found so far: [31, 601, 1801, 269089806001, 4710883168879506001] 
    35      
    3634    """ 
    3735     
    3836    def __init__(self, dsage, n, concurrent=10, B1=2000, curves=50, 
  • sage/dsage/dist_functions/dist_function.py

    diff --git a/sage/dsage/dist_functions/dist_function.py b/sage/dsage/dist_functions/dist_function.py
    a b  
    2323 
    2424from sage.dsage.database.job import Job 
    2525from sage.dsage.interface.dsage_interface import (JobWrapper, 
    26                                                   BlockingJobWrapper, 
    27                                                   blockingCallFromThread) 
     26                                                  BlockingJobWrapper) 
     27                                                   
     28from twisted.internet import reactor 
     29from twisted.internet.threads import blockingCallFromThread 
    2830 
    2931class DistributedFunction(object): 
    3032    """ 
     
    112114        Reloads a distributed job from disk. 
    113115         
    114116        """ 
    115         from twisted.internet import reactor 
     117         
    116118        from twisted.internet import task 
    117119        if dsage.remoteobj is None: 
    118120            # XXX This is a hack because dsage.remoteobj is not set yet 
     
    164166               self.submit_job(job, job_name, async) 
    165167        self.outstanding_jobs = [] 
    166168 
    167     def wait(self, timeout=None): 
     169    def wait(self, t=0.5, timeout=None): 
    168170        """ 
    169171        Blocks until the job is completed. 
    170172         
     173        t -- the time to wait before polling again. 
    171174        """ 
    172175         
    173176        import signal 
    174177        if timeout == None: 
    175178            while not self.done: 
    176                     time.sleep(0.5) 
     179                    time.sleep(t) 
    177180        else: 
    178181            def handler(signum, frame): 
    179182                raise RuntimeError('Maximum wait time exceeded.') 
    180183            signal.signal(signal.SIGALRM, handler) 
    181184            signal.alarm(timeout) 
    182185            while not self.done: 
    183                 time.sleep(0.5) 
     186                time.sleep(t) 
    184187            signal.alarm(0) 
    185188         
    186     def start(self): 
     189    def start(self, ctime=1.0): 
    187190        """ 
    188191        Starts the Distributed Function. It will submit all jobs in the 
    189192        outstanding_jobs queue and also start a checker tasks that polls for 
     
    201204        self.checker_task = blockingCallFromThread(self.reactor, 
    202205                                                   task.LoopingCall, 
    203206                                                   self.check_waiting_jobs) 
    204         self.reactor.callFromThread(self.checker_task.start, 5.0, now=True) 
    205      
     207        self.reactor.callFromThread(self.checker_task.start, ctime, now=True) 
    206208     
    207209    def process_result(self): 
    208210        """ 
     
    216218     
    217219    def check_waiting_jobs(self): 
    218220        """ 
    219         Checks the status of jobs in the waiting queue. 
    220          
     221        I check the status of jobs in the waiting queue. 
    221222        """ 
    222223         
    223224        from twisted.internet import reactor 
    224225        from twisted.spread import pb 
     226 
    225227        for wrapped_job in self.waiting_jobs: 
    226228            if wrapped_job.killed == True: 
    227229                self.waiting_jobs.remove(wrapped_job) 
  • sage/dsage/dsage.py

    diff --git a/sage/dsage/dsage.py b/sage/dsage/dsage.py
    a b  
    3535                                       SERVER_TAC, DSAGE_DB) 
    3636from sage.dsage.misc.config import check_dsage_dir 
    3737from sage.dsage.misc.misc import find_open_port 
     38from sage.dsage.misc.misc import write_tac 
    3839import sage.plot.plot 
     40 
    3941 
    4042def spawn(cmd, verbose=True, stdout=None, stdin=None): 
    4143    """ 
    4244    Spawns a process and registers it with the SAGE. 
    4345    """ 
    44      
     46 
    4547    null = open('/dev/null', 'a') 
    4648    if stdout is None: 
    4749        stdout = null 
    4850    if stdin is None: 
    4951        stdin = null 
    5052    cmdl = cmd.split(' ') 
    51     process = subprocess.Popen(cmdl, shell=False, stdout=stdout, stdin=null) 
     53    process = subprocess.Popen(cmdl, shell=False, stdout=stdout, stdin=stdin) 
    5254    sage.interfaces.cleaner.cleaner(process.pid, cmd) 
    5355    if verbose: 
    5456        print 'Spawned %s (pid = %s)\n' % (' '.join(cmdl), process.pid) 
    55      
     57 
    5658    return process 
    5759 
    5860 
    5961class DistributedSage(object): 
    6062    r""" 
    6163    Distributed SAGE allows you to do distributed computing in SAGE. 
    62      
     64 
    6365    To get up and running quickly, run dsage.setup() to run the 
    6466    configuration utility. 
    65      
     67 
    6668    Note that configuration files will be stored in the 
    6769    directory \code{\$DOT\_SAGE/dsage}. 
    68      
     70 
    6971    QUICK-START 
    7072 
    7173    1.  Launch sage 
     
    9294        \code{sage: j} 
    9395        \code{4} 
    9496    """ 
    95          
    96     def start_all(self, port=None, workers=2, log_level=0, poll=1.0, 
    97                   authenticate=False, failure_threshold=3, 
    98                   verbose=True, testing=False): 
     97 
     98    def start_all(self, port=None, workers=2, log_level=0, authenticate=False,  
     99                  failure_threshold=3, verbose=True, testing=False): 
    99100        """ 
    100101        Start the server and worker and returns a connection to the server. 
    101          
     102 
    102103        """ 
    103          
     104 
    104105        from sage.dsage.interface.dsage_interface import BlockingDSage 
    105106        from sage.dsage.misc.misc import find_open_port 
    106          
     107 
    107108        if port is None: 
    108109            port = find_open_port().next() 
    109          
     110 
    110111        if testing or sage.plot.plot.DOCTEST_MODE: 
    111112            test_db = tempfile.NamedTemporaryFile() 
    112113            testing = True 
     
    122123                        log_level=5, 
    123124                        ssl=False, 
    124125                        blocking=False, 
    125                         poll=0.1, 
    126126                        authenticate=authenticate, 
    127127                        verbose=False) 
    128128        else: 
     
    131131                        blocking=False, 
    132132                        failure_threshold=failure_threshold, 
    133133                        verbose=verbose) 
    134          
     134 
    135135            self.worker(port=port, 
    136136                        workers=workers, 
    137137                        log_level=log_level, 
    138138                        blocking=False, 
    139                         poll=poll, 
    140139                        authenticate=authenticate, 
    141140                        verbose=verbose) 
    142          
     141 
    143142        # We want to establish a connection to the server 
    144143        tries = 10 
    145144        while(tries > 0): 
     
    158157            print 'Could not connect to the server.' 
    159158            print 'Error msg from last attempt: %s' % (msg) 
    160159            return 
    161          
     160 
    162161        if testing or sage.plot.plot.DOCTEST_MODE: 
    163162            d = BlockingDSage(server='localhost', port=port, testing=testing, 
    164163                              ssl=False) 
    165164        else: 
    166165            d = BlockingDSage(server='localhost', port=port) 
    167                  
     166 
    168167        return d 
    169      
     168 
    170169    def kill_all(self): 
    171170        """ 
    172171        Kills the server and worker. 
    173          
     172 
    174173        """ 
    175          
     174 
    176175        self.kill_worker() 
    177176        self.kill_server() 
    178      
     177 
    179178    def kill_worker(self): 
    180179        try: 
    181180            os.kill(self.worker_proc.pid, 9) 
     
    183182            del self.worker_proc 
    184183        except OSError, msg: 
    185184            print 'Error killing worker: %s' % msg 
    186      
     185 
    187186    def kill_server(self): 
    188187        try: 
    189188            os.kill(self.server_proc.pid, 9) 
     
    191190            del self.server_proc 
    192191        except OSError, msg: 
    193192            print 'Error killing server: %s' % msg 
    194      
     193 
     194 
    195195    def server(self, blocking=True, port=None, log_level=0, ssl=True, 
    196196               db_file=DSAGE_DB, 
    197197               log_file=SERVER_LOG, 
     
    201201               verbose=True, testing=False, profile=False): 
    202202        r""" 
    203203        Run the Distributed SAGE server. 
    204          
     204 
    205205        Doing \code{dsage.server()} will spawn a server process which 
    206206        listens by default on port 8081. 
    207207        """ 
     208 
    208209        open_ports = find_open_port() 
    209210        check_dsage_dir() 
    210211        cwd = os.getcwd() 
    211212        pid_file = 'server.pid' 
    212          
    213         def write_tac(tac): 
    214             os.chdir(DSAGE_DIR) 
    215             f = open('dsage_server.tac', 'w') 
    216             f.writelines(tac) 
    217             f.close() 
    218213 
    219214        if testing or sage.plot.plot.DOCTEST_MODE: 
    220215            test_db = tempfile.NamedTemporaryFile() 
     
    227222            except: 
    228223                pass 
    229224            db_file = test_db.name 
    230              
     225 
    231226        if port != None: 
    232227            server_port = port 
    233228        else: 
    234229            server_port = open_ports.next() 
    235230            open_ports.next() 
    236              
     231 
    237232        tac = SERVER_TAC % (db_file, failure_threshold, ssl, log_level, 
    238233                            log_file, privkey, cert, server_port, testing) 
    239         write_tac(tac) 
    240          
     234        write_tac(tac, 'dsage_server.tac') 
     235 
    241236        cmd = 'twistd -d %s --pidfile=%s ' % (DSAGE_DIR, pid_file) 
    242237        if profile: 
    243238            if verbose: 
    244239                print 'Launched with profiling enabled...' 
    245             cmd += '--nothotshot --profile=dsage_server.profile --savestats ' 
     240            cmd += '--profile=dsage_server.profile --savestats ' 
    246241        if blocking: 
    247242            cmd += '--nodaemon -y dsage_server.tac' 
    248243            cmd += ' | tee -a %s' % (log_file) 
     
    265260                    time.sleep(0.1) 
    266261                    continue 
    267262        os.chdir(cwd) 
    268          
    269      
    270     def worker(self, server='localhost', port=8081, workers=2, poll=1.0, 
     263 
     264    def worker(self, server='localhost', port=8081, workers=2, 
    271265               username=getuser(), blocking=True, ssl=True, log_level=0, 
    272266               authenticate=True, priority=20, 
    273267               privkey=os.path.join(DSAGE_DIR, 'dsage_key'), 
    274268               pubkey=os.path.join(DSAGE_DIR, 'dsage_key.pub'), 
    275                log_file=WORKER_LOG, 
    276                verbose=True): 
     269               log_file=WORKER_LOG, verbose=True, profile=False): 
    277270        r""" 
    278         Run the Distributed SAGE worker. 
    279          
     271        Run the Distributed Sage worker. 
     272 
    280273        Typing \code{sage.worker()} will launch a worker which by 
    281274        default connects to localhost on port 8081 to fetch jobs. 
    282275        """ 
    283          
     276 
     277        from sage.dsage.worker.monitor import MONITOR_TAC as tac 
    284278        check_dsage_dir() 
    285         cmd = ('dsage_worker.py -s %s -p %s -u %s -w %s --poll %s -l %s -f %s ' 
    286                + '--privkey=%s --pubkey=%s --priority=%s') 
    287         cmd = cmd % (server, port, username, workers, poll, log_level, 
    288                      log_file, privkey, pubkey, priority) 
    289         if ssl: 
    290             cmd += ' --ssl' 
    291         if authenticate: 
    292             cmd += ' -a' 
    293         if not blocking: 
    294             cmd += ' --noblock' 
    295             cmd = 'python ' + SAGE_ROOT + '/local/bin/' + cmd 
     279        cwd = os.getcwd() 
     280        pid_file = 'worker.pid' 
     281        tac = tac % (server, port, workers, username, ssl, authenticate,  
     282                     priority, log_level, log_file, privkey, pubkey) 
     283        write_tac(tac, 'dsage_worker.tac') 
     284        cmd = 'twistd -d %s --pidfile=%s ' % (DSAGE_DIR, pid_file) 
     285        if profile: 
     286            cmd += '--profile=dsage_worker.profile --savestats ' 
     287        if blocking: 
     288            cmd += '--nodaemon -y dsage_worker.tac' 
     289            cmd += ' | tee -a %s' % (log_file) 
     290            os.system(cmd) 
     291        else: 
     292            try: 
     293                os.remove(pid_file) 
     294            except: 
     295                pass 
     296            cmd += '--logfile=%s -y dsage_worker.tac' % (log_file) 
    296297            self.worker_proc = spawn(cmd, verbose=verbose) 
    297         else: 
    298             cmd = 'python ' + SAGE_ROOT + '/local/bin/' + cmd 
    299             os.system(cmd) 
    300      
     298            while True: 
     299                try: 
     300                    pid = int(open(pid_file).read()) 
     301                    sage.interfaces.cleaner.cleaner(pid, cmd) 
     302                    break 
     303                except: 
     304                    time.sleep(0.1) 
     305                    continue 
     306        os.chdir(cwd) 
    301307     
    302308    def setup(self, template=None): 
    303309        r""" 
    304310        This is the setup utility which helps you configure dsage. 
    305          
     311 
    306312        Type \code{dsage.setup()} to run the configuration for the server, 
    307313        worker and client.  Alternatively, if you want to run the 
    308314        configuration for just one parts, you can launch 
    309315        \code{dsage.setup_server()}, \code{dsage.setup\_worker()} 
    310316        or \code{dsage.setup()}. 
    311          
     317 
    312318        """ 
    313          
     319 
    314320        from sage.dsage.scripts.dsage_setup import setup 
    315321        setup(template=template) 
    316      
    317      
     322 
    318323    def setup_server(self, *args): 
    319324        """ 
    320325        This method runs the configuration utility for the server. 
    321          
     326 
    322327        """ 
    323          
     328 
    324329        from sage.dsage.scripts.dsage_setup import setup_server 
    325330        setup_server(*args) 
    326      
    327      
     331 
    328332    def setup_worker(self): 
    329333        """ 
    330334        This method runs the configuration utility for the worker. 
    331          
     335 
    332336        """ 
    333          
     337 
    334338        from sage.dsage.scripts.dsage_setup import setup_worker 
    335339        setup_worker() 
    336      
    337      
     340 
    338341    def setup_client(self): 
    339342        """ 
    340343        This method runs the configuration utility for the client. 
    341          
     344 
    342345        """ 
    343          
     346 
    344347        from sage.dsage.scripts.dsage_setup import setup_client 
    345348        setup_client() 
  • sage/dsage/interface/dsage_interface.py

    diff --git a/sage/dsage/interface/dsage_interface.py b/sage/dsage/interface/dsage_interface.py
    a b  
    2626import time 
    2727from getpass import getuser 
    2828 
    29 # This is a version of blockingCallFromThread that delays importing 
    30 # twisted.internet until it is first used. 
    31 def blockingCallFromThread(*args, **kwds): 
    32     from twisted.internet.threads import blockingCallFromThread 
    33     return blockingCallFromThread(*args, **kwds) 
     29from twisted.cred.credentials import Anonymous 
     30from twisted.internet.threads import blockingCallFromThread 
     31from twisted.internet import reactor 
    3432 
    3533from sage.dsage.database.job import Job, expand_job 
    3634from sage.dsage.misc.misc import random_str 
    3735from sage.dsage.misc.constants import DSAGE_DIR 
    3836 
     37 
    3938class DSageThread(threading.Thread): 
    4039    """ 
    4140    DSage thread 
    42      
     41 
    4342    """ 
    44      
     43 
    4544    def run(self): 
    46         from twisted.internet import reactor 
    4745        if not reactor.running: 
    4846            try: 
    4947                reactor.run(installSignalHandlers=0) 
     
    6664    log_level -- int (Default: 0) 
    6765    ssl -- int (Default: 1) 
    6866    """ 
    69      
     67 
    7068    def __init__(self, server='localhost', port=8081, 
    7169                 username=getuser(), 
    7270                 pubkey_file=os.path.join(DSAGE_DIR, 'dsage_key.pub'), 
    7371                 privkey_file=os.path.join(DSAGE_DIR, 'dsage_key'), 
    7472                 log_level=0, ssl=True, testing=False): 
    75          
     73 
    7674        from twisted.cred import credentials 
    7775        from twisted.conch.ssh import keys 
    7876        from twisted.spread import banana 
    7977        banana.SIZE_LIMIT = 100*1024*1024 # 100 MegaBytes 
    80          
     78 
    8179        self.server = server 
    8280        self.port = port 
    8381        self.username = username 
     
    9088        self.result = None 
    9189        self.info_str = 'Connected to: %s:%s' 
    9290        self._testing = testing 
    93          
     91 
    9492        if not self._testing: 
    9593            self._pubkey = keys.Key.fromFile(self._pubkey_file) 
    9694            try: 
     
    110108        else: 
    111109            self.username = 'tester' 
    112110        self.connect() 
    113      
    114      
     111 
    115112    def __repr__(self): 
    116113        return self.__str__() 
    117      
     114 
    118115    def __str__(self): 
    119116        if self.is_connected(): 
    120117            return self.info_str % (self.server, self.port) 
    121118        else: 
    122119            return 'Not connected.' 
    123      
     120 
    124121    def __call__(self, cmd, user_vars=None, load_files=[], job_name=None): 
    125122        cmd = ['ans = %s\n' % (cmd), 
    126123               'print ans\n', 
    127124               "DSAGE_RESULT = ans\n"] 
    128          
     125 
    129126        return self.eval(''.join(cmd), user_vars=user_vars, 
    130127                                       load_files=load_files, 
    131128                                       job_name=job_name) 
    132      
     129 
    133130    def __getstate__(self): 
    134131        d = copy.copy(self.__dict__) 
    135132        d['remoteobj'] = None 
    136          
     133 
    137134        return d 
    138      
     135 
    139136    def _getpassphrase(self): 
    140137        import getpass 
    141138        passphrase = getpass.getpass('Passphrase (Hit enter for None): ') 
    142          
     139 
    143140        return passphrase 
    144      
     141 
    145142    def _catch_failure(self, failure): 
    146143        print "Error connecting: %s" % failure.getErrorMessage() 
    147      
     144 
    148145    def _connected(self, remoteobj): 
    149146        if self._log_level > 0: 
    150147            print 'Connected to remote server.\r' 
    151148        self._remoteobj = remoteobj 
    152149        self._remoteobj.notifyOnDisconnect(self._disconnected) 
    153      
     150 
    154151    def _disconnected(self, remoteobj): 
    155152        print '[DSage] Closed connection to %s' % (self.server) 
    156153        self.info_str = 'Not connected.' 
    157      
     154 
    158155    def _got_my_jobs(self, jobs, job_name): 
    159156        from sage.dsage.errors.exceptions import NoJobException 
    160157        if jobs == None: 
     
    162159        if job_name: 
    163160            return [JobWrapper(self._remoteobj, job) 
    164161                    for job in jobs if job.name == job_name] 
    165      
     162 
    166163    def _killed_job(self, job_id): 
    167164        pass 
    168      
     165 
    169166    def restore(self, remoteobj): 
    170167        """ 
    171168        This method restores a connection to the server. 
    172          
     169 
    173170        """ 
    174          
     171 
    175172        self._remoteobj = remoteobj 
    176      
     173 
    177174    def connect(self): 
    178175        """ 
    179176        This methods establishes the conection to the remote server. 
    180          
     177 
    181178        """ 
    182          
     179 
    183180        from twisted.internet import reactor 
    184181        from sage.dsage.twisted.pb import ClientFactory 
    185182        factory = ClientFactory(self._login, (), {}) 
    186183        factory.continueTrying = False # Do not attempt to reconnect 
    187          
     184 
    188185        if self.ssl == 1: 
    189186            # Old, uses OpenSSL, SAGE uses GNUTLS now 
    190187            # from twisted.internet import ssl 
     
    198195            reactor.connectTLS(self.server, self.port, factory, cred) 
    199196        else: 
    200197            reactor.connectTCP(self.server, self.port, factory) 
    201      
     198 
    202199    def _login(self, *args, **kwargs): 
    203         from twisted.cred.credentials import Anonymous 
    204200        if self._testing: 
    205201            d = self.factory.login(Anonymous(), None) 
    206202        else: 
    207203            d = self.factory.login(self._creds, None) 
    208204        d.addCallback(self._connected) 
    209205        d.addErrback(self._catch_failure) 
    210          
     206 
    211207        return d 
    212      
     208 
    213209    def disconnect(self): 
    214210        print 'Disconnecting from server.' 
    215         self._remoteobj = None 
    216      
     211        t = self._remoteobj.broker.transport 
     212        d = blockingCallFromThread(reactor, t.loseConnection) 
     213 
    217214    def eval(self, cmd, timeout=0, user_vars=None, job_name=None): 
    218215        """ 
    219216        eval evaluates a command 
    220          
     217 
    221218        Parameters: 
    222219        cmd -- the sage command to be evaluated (str) 
    223220        globals -- a dict (see help for python's eval method) 
    224221        job_name -- an alphanumeric job name 
    225          
     222 
    226223        """ 
    227          
     224 
    228225        self.is_connected() 
    229226        if not job_name or not isinstance(job_name, str): 
    230227            job_name = 'default job' 
    231          
     228 
    232229        kind = 'sage' 
    233          
     230 
    234231        # We have to convert timeout to a python int so it will not cause 
    235232        # security exceptions with twisted. 
    236          
    237         job = Job(job_id=None, code=cmd, name=job_name,      
     233 
     234        job = Job(job_id=None, code=cmd, name=job_name, 
    238235                  username=self.username, timeout=timeout, kind=kind) 
    239          
     236 
    240237        wrapped_job = JobWrapper(self._remoteobj, job) 
    241238        if user_vars is not None: 
    242239            for k, v in user_vars.iteritems(): 
    243240                job.attach(k, v) 
    244          
     241 
    245242        return wrapped_job 
    246      
     243 
    247244    def eval_file(self, fname, job_name, async=False): 
    248245        """ 
    249246        eval_file allows you to evaluate the contents of an entire file. 
    250          
     247 
    251248        Parameters: 
    252249            fname -- file name of the file you wish to evaluate 
    253          
     250 
    254251        """ 
    255          
     252 
    256253        self.is_connected() 
    257          
     254 
    258255        kind = 'file' 
    259256        cmd = open(fname).read() 
    260257        job = Job(job_id=None, code=cmd, name=job_name, 
    261258                  username=self.username, kind=kind) 
    262          
     259 
    263260        if async: 
    264261            wrapped_job = JobWrapper(self._remoteobj, job) 
    265262        else: 
    266263            wrapped_job = BlockingJobWrapper(self._remoteobj, job) 
    267          
     264 
    268265        return wrapped_job 
    269      
     266 
    270267    def send_job(self, job): 
    271268        """ 
    272269        Sends a Job object to the server. 
    273          
     270 
    274271        """ 
    275          
     272 
    276273        if not isinstance(job, Job): 
    277274            raise TypeError 
    278275        wrapped_job = JobWrapper(self._remoteobj, job) 
    279276        return wrapped_job 
    280      
     277 
    281278    def _got_job_id(self, id_, job): 
    282279        job.job_id = id_ 
    283280        job.username = self.username 
     
    285282        d = self._remoteobj.callRemote('submit_job', pickled_job) 
    286283        d.addErrback(self._catch_failure) 
    287284        # d.addCallback(self._submitted, job) 
    288          
     285 
    289286        return JobWrapper(self._remoteobj, job) 
    290      
     287 
    291288    def eval_dir(self, dir_, job_name): 
    292289        from twisted.internet import defer 
    293290        self.is_connected() 
     
    303300            deferreds.append(d) 
    304301        d_list = defer.DeferredList(deferreds) 
    305302        return d_list 
    306      
     303 
    307304    def kill(self, job_id, async=False): 
    308305        """ 
    309306        Kills a job given the job id. 
    310          
     307 
    311308        Parameters: 
    312309        job_id -- job id 
    313          
     310 
    314311        """ 
    315          
     312 
    316313        if async: 
    317314            d = self._remoteobj.callRemote('kill_job', job_id) 
    318315            d.addCallback(self._killed_job) 
     
    321318            job_id = blockingCallFromThread(self._remoteobj.callRemote, 
    322319                                               'kill_job', 
    323320                                               job_id) 
    324                                                 
    325      
     321 
    326322    def get_my_jobs(self, is_active=False, job_name=None): 
    327323        """ 
    328324        This method returns a list of jobs that belong to you. 
    329          
     325 
    330326        Parameters: 
    331327        is_active -- set to true to get only active jobs (bool) 
    332          
     328 
    333329        Use this method if you get disconnected from the server and wish to 
    334330        retrieve your old jobs back. 
    335          
     331 
    336332        """ 
    337          
     333 
    338334        self.is_connected() 
    339          
     335 
    340336        d = self._remoteobj.callRemote('get_jobs_by_username', 
    341337                                      self.username, 
    342338                                      is_active, 
    343339                                      job_name) 
    344340        d.addCallback(self._got_my_jobs, job_name) 
    345341        d.addErrback(self._catch_failure) 
    346          
     342 
    347343        return d 
    348      
     344 
    349345    def cluster_speed(self): 
    350346        """ 
    351347        Returns the speed of the cluster. 
    352          
     348 
    353349        """ 
    354          
     350 
    355351        self.is_connected() 
    356          
     352 
    357353        return self._remoteobj.callRemote('get_cluster_speed') 
    358      
    359     def is_connected(self):         
    360         if self._remoteobj == None: 
    361             return False 
    362         if self._remoteobj.broker.disconnected: 
    363             raise False 
    364         return True 
     354 
     355    def is_connected(self): 
     356        return self._remoteobj or self._remoteobj.broker.disconnected 
    365357 
    366358 
    367359class BlockingDSage(DSage): 
    368360    """ 
    369361    This is the blocking version of the DSage interface. 
    370      
    371362    """ 
    372     def __init__(self, server='localhost', port=8081, 
    373                 username=getuser(), 
     363 
     364    def __init__(self, server='localhost', port=8081, username=getuser(), 
    374365                 pubkey_file=os.path.join(DSAGE_DIR, 'dsage_key.pub'), 
    375366                 privkey_file=os.path.join(DSAGE_DIR, 'dsage_key'), 
    376367                 log_level=0, ssl=True, testing=False): 
     
    380371        DSage.__init__(self, server=server, port=port, username=username, 
    381372                       pubkey_file=pubkey_file, privkey_file=privkey_file, 
    382373                       log_level=log_level, ssl=ssl, testing=testing) 
    383      
    384      
     374 
    385375    def connect(self): 
    386376        """ 
    387377        This methods establishes the conection to the remote server. 
    388          
     378 
    389379        """ 
    390          
     380 
    391381        from twisted.internet import reactor 
    392382        from sage.dsage.twisted.pb import ClientFactory 
    393          
     383 
    394384        self.factory = ClientFactory(self._login, (), {}) 
    395385        self.factory.continueTrying = False 
    396          
     386 
    397387        if self.ssl: 
    398388            from gnutls.interfaces.twisted import X509Credentials 
    399389            cred = X509Credentials() 
    400             blockingCallFromThread(reactor, reactor.connectTLS, self.server,  
     390            blockingCallFromThread(reactor, reactor.connectTLS, self.server, 
    401391                                   self.port, self.factory, cred) 
    402392        else: 
    403393            blockingCallFromThread(reactor, reactor.connectTCP, self.server, 
    404394                                   self.port, self.factory) 
    405      
     395 
    406396    def _login(self, *args, **kwargs): 
    407         from twisted.cred.credentials import Anonymous 
    408397        if self._testing: 
    409398            d = self.factory.login(Anonymous(), None) 
    410399        else: 
    411400            d = self.factory.login(self._creds, None) 
    412401        d.addCallback(self._connected) 
    413402        d.addErrback(self._catch_failure) 
    414          
     403 
    415404        return d 
    416405 
    417406    def job_results_iter(self, jobs): 
    418407        """ 
    419408        Returns an iterator that yields results of jobs as they come in. 
    420          
    421         INPUT:  
     409 
     410        INPUT: 
    422411            jobs -- a list of tuples (x, j) where x is (args, kwds) and j is 
    423412                    a job object 
    424          
     413 
    425414        OUTPUT: 
    426415            (x, job) 
    427416 
    428417        """ 
    429          
     418 
    430419        import time 
    431420        out_list = [] 
    432          
     421 
    433422        while len(out_list) != len(jobs): 
    434             for x,j in jobs: 
     423            for x, j in jobs: 
    435424                if j not in out_list: 
    436425                    j.get_job() 
    437426                    if j.status in ('completed', 'killed'): 
    438427                        out_list.append(j) 
    439428                        yield (x, j) 
    440429                    time.sleep(0.2) 
    441      
     430 
    442431    def block_on_jobs(self, jobs): 
    443432        """ 
    444433        Blocks on a list of jobs until all the jobs are completed. 
    445          
     434 
    446435        INPUT: 
    447436            jobs -- a list of jobs which are not completed 
    448          
     437 
    449438        OUTPUT: 
    450439            jobs -- a list of completed jobs 
    451              
     440 
    452441        EXAMPLE: 
    453442            sage: from sage.dsage.misc.misc import find_open_port 
    454443            sage: port = find_open_port().next() 
     
    462451            True 
    463452            sage: def f(n): 
    464453            ...     return n*n 
    465             ...  
     454            ... 
    466455            sage: j = d.block_on_jobs(d.map(f, [25,12,25,32,12])) 
    467             sage: j # random 
     456            sage: j 
    468457            [625, 144, 625, 1024, 144] 
    469458        """ 
    470          
     459 
    471460        out_list = [] 
    472          
     461 
    473462        while len(out_list) != len(jobs): 
    474463            for j in jobs: 
    475464                if j not in out_list: 
     
    477466                    if j.status in ('completed', 'killed'): 
    478467                        out_list.append(j) 
    479468        return out_list 
    480          
     469 
    481470    def map(self, f, *args): 
    482471        """ 
    483472        Apply function to every item of iterable and return a list of the 
    484473        results. If additional iterable arguments are passed, function must 
    485474        take that many arguments and is applied to the items from all 
    486475        iterables in parallel. 
    487          
     476 
    488477        INPUT: 
    489478            f -- a function 
    490479            *args -- iterables containing the parameters to the function 
    491              
     480 
    492481        EXAMPLE: 
    493482            sage: from sage.dsage.misc.misc import find_open_port 
    494483            sage: port = find_open_port().next() 
     
    502491            True 
    503492            sage: def f(n): 
    504493            ...     return n*n 
    505             ...  
     494            ... 
    506495            sage: j = d.map(f, [25,12,25,32,12]) 
    507496            sage: j 
    508497            [No output yet., 
     
    513502        """ 
    514503 
    515504        from itertools import izip 
    516          
    517         jobs = [self.eval_function(f, (a, {}), job_name=f.__name__)  
     505 
     506        jobs = [self.eval_function(f, (a, {}), job_name=f.__name__) 
    518507                for a in izip(*args)] 
    519                      
     508 
    520509        return jobs 
    521      
    522     def parallel_iter(self, f, inputs):         
     510 
     511    def parallel_iter(self, f, inputs): 
    523512        """ 
    524513        dsage parallel iterator implementation. 
    525514 
     
    532521        OUTPUT: 
    533522            iterator over 2-tuples (inputs[i], f(inputs[i])), 
    534523            where the order may be completely random 
    535          
     524 
    536525        EXAMPLE: 
    537526            sage: from sage.dsage.misc.misc import find_open_port 
    538527            sage: port = find_open_port().next() 
     
    549538            ... def f(n,m): 
    550539            ...     return n+m 
    551540            ... 
    552             sage: list(f([(1,2), (5, 10/3)])) # random 
    553             [(((5, 10/3), {}), 25/3), (((1, 2), {}), 3)] 
     541            sage: f([(1,2), (5, 10/3)]) 
     542            [((1, 2), 3), ((5, 10/3), 25/3)] 
    554543        """ 
    555          
     544 
    556545        jobs = [] 
    557546        for x in inputs: 
    558547            job = self.eval_function(f, x, job_name=f.__name__) 
    559548            jobs.append((x, job)) 
    560          
    561         return self.job_results_iter(jobs)   
    562      
     549 
     550        return self.job_results_iter(jobs) 
     551 
    563552    def eval_function(self, f, arguments, job_name=None): 
    564553        """ 
    565554        Takes a function and it's arguments, pickles it, and creates a job 
    566555        which executes the function with the arguments. 
    567          
     556 
    568557        INPUT: 
    569558            f -- function 
    570559            arguments -- tuple(tuple, dict) --> *args, **kwds 
    571              
     560 
    572561        OUTPUT: 
    573562            job wrapper representing the function evaluated at input. 
    574              
     563 
    575564        EXAMPLE: 
    576565            sage: from sage.dsage.misc.misc import find_open_port 
    577566            sage: port = find_open_port().next() 
     
    590579            sage: j.wait() 
    591580            sage: j 
    592581            625 
    593              
     582 
    594583        """ 
    595584 
    596585        from sage.misc.fpickle import pickle_function 
     
    607596        job.attach('args', arguments[0]) 
    608597        job.attach('kwds', arguments[1]) 
    609598        wrapped_job = BlockingJobWrapper(self._remoteobj, job) 
    610          
     599 
    611600        return wrapped_job 
    612      
     601 
    613602    def eval(self, cmd, user_vars=None, job_name=None, timeout=600, 
    614603             load_files=[], priority=5, async=False): 
    615604        """ 
    616605        eval evaluates a command 
    617          
     606 
    618607        Parameters: 
    619608        cmd -- the sage command to be evaluated (str) 
    620609        user_vars -- a dict of predefined variables you want to use. 
     
    624613        load_files -- list of files to load before executing the job 
    625614        priority -- priority of the job created (0-5) 
    626615        async -- whether to use the async implementation of the method 
    627          
     616 
    628617        """ 
    629          
    630         self.is_connected()         
     618 
     619        self.is_connected() 
    631620        kind = 'sage' 
    632          
    633         job = Job(job_id=None, code=cmd, name=job_name,      
     621 
     622        job = Job(job_id=None, code=cmd, name=job_name, 
    634623                  username=self.username, timeout=timeout, priority=priority, 
    635624                  kind=kind) 
    636          
     625 
    637626        for fname in load_files: 
    638627            if os.path.exists(fname): 
    639628                job.attach_file(fname) 
    640          
     629 
    641630        if user_vars is not None: 
    642631            for k, v in user_vars.iteritems(): 
    643632                job.attach(k, v) 
    644          
     633 
    645634        if async: 
    646635            wrapped_job = JobWrapper(self._remoteobj, job) 
    647636        else: 
    648637            wrapped_job = BlockingJobWrapper(self._remoteobj, job) 
    649          
     638 
    650639        return wrapped_job 
    651      
     640 
    652641    def send_job(self, job, async=False): 
    653642        """ 
    654643        Sends a Job object to the server. 
    655          
     644 
    656645        Parameters: 
    657646        job -- a Job object to send to the remote server 
    658647        async -- if True, use async method of doing remote task 
    659          
     648 
    660649        """ 
    661          
     650 
    662651        if not isinstance(job, Job): 
    663652            raise TypeError 
    664653        if async: 
    665654            wrapped_job = JobWrapper(self._remoteobj, job) 
    666655        else: 
    667656            wrapped_job = BlockingJobWrapper(self._remoteobj, job) 
    668          
     657 
    669658        return wrapped_job 
    670      
     659 
    671660    def get_my_jobs(self, status='new'): 
    672661        """ 
    673662        This method returns a list of jobs that belong to you. 
    674          
     663 
    675664        Parameters: 
    676665        active -- set to true to get only active jobs (bool) 
    677          
     666 
    678667        Use this method if you get disconnected from the server and wish to 
    679668        retrieve your old jobs back. 
    680          
     669 
    681670        """ 
    682          
     671 
    683672        self.is_connected() 
    684         from twisted.internet import reactor         
    685673        jdicts = blockingCallFromThread(reactor, self._remoteobj.callRemote, 
    686674                                        'get_jobs_by_username', 
    687675                                        self.username, status) 
    688          
     676 
    689677        return [expand_job(jdict) for jdict in jdicts] 
    690      
    691      
     678 
    692679    def kill_all(self): 
    693680        """ 
    694681        Kills all of your active jobs. 
    695          
     682 
    696683        """ 
    697          
     684 
    698685        active_jobs = self.get_my_jobs(active=True) 
    699          
     686 
    700687        for job in active_jobs: 
    701688            self.kill(job.job_id) 
    702      
     689 
    703690    def cluster_speed(self): 
    704691        """ 
    705692        Returns the speed of the cluster. 
    706          
     693 
    707694        """ 
    708          
     695 
    709696        self.is_connected() 
    710         from twisted.internet import reactor         
     697 
    711698        return blockingCallFromThread(reactor, self._remoteobj.callRemote, 
    712699                                         'get_cluster_speed') 
    713      
     700 
    714701    def get_workers_list(self): 
    715702        """Returns a list of monitors connected to the server. 
    716          
     703 
    717704        """ 
    718          
     705 
    719706        self.is_connected() 
    720         from twisted.internet import reactor         
     707 
    721708        return blockingCallFromThread(reactor, self._remoteobj.callRemote, 
    722709                                         'get_worker_list') 
    723      
     710 
    724711    def get_client_list(self): 
    725712        """ 
    726713        Returns a list of clients connected to the server. 
    727714        """ 
    728          
     715 
    729716        self.is_connected() 
    730         from twisted.internet import reactor         
     717 
    731718        return blockingCallFromThread(reactor, self._remoteobj.callRemote, 
    732719                                         'get_client_list') 
    733      
     720 
    734721    def get_worker_count(self): 
    735722        """ 
    736723        Returns the number of busy and free workers. 
    737          
     724 
    738725        """ 
    739          
     726 
    740727        self.is_connected() 
    741         from twisted.internet import reactor         
     728 
    742729        return blockingCallFromThread(reactor, self._remoteobj.callRemote, 
    743730                                         'get_worker_count') 
    744      
     731 
    745732    def web_server_url(self): 
    746733        """ 
    747734        Returns the web server url. 
    748735        """ 
    749          
     736 
    750737        self.is_connected() 
    751         from twisted.internet import reactor         
     738 
    752739        return blockingCallFromThread(reactor, self._remoteobj.callRemote, 
    753740                                      'web_server_url') 
    754      
     741 
    755742    def web_view(self): 
    756743        """ 
    757744        Opens the dsage server's web interface in a browser. 
    758          
     745 
    759746        """ 
    760          
     747 
    761748        from sage.server.misc import open_page 
    762749        url = self.web_server_url() 
    763750        address = url.split(':')[1].strip('/') 
    764751        port = int(url.split(':')[2].strip('/')) 
    765752        open_page(address, port, False) 
    766          
     753 
    767754    def server_log(self, n=50): 
    768         from twisted.internet import reactor 
    769755        return blockingCallFromThread(reactor, self._remoteobj.callRemote, 
    770756                                      'read_log', n, 'server') 
    771      
     757 
    772758    def worker_log(self, n=50): 
    773         from twisted.internet import reactor 
    774759        return blockingCallFromThread(reactor, self._remoteobj.callRemote, 
    775760                                      'read_log', n, 'worker') 
    776                                        
    777                                        
     761 
     762 
    778763class JobWrapper(object): 
    779764    """ 
    780765    Represents a remote job. 
    781      
     766 
    782767    Parameters: 
    783768        remoteobj -- the PB server's remoteobj 
    784769        job -- a Job object (job) 
    785      
     770 
    786771    """ 
    787      
     772 
    788773    def __init__(self, remoteobj, job): 
    789774        self._remoteobj = remoteobj 
    790775        self._update_job(job._reduce()) 
     
    796781        d.addCallback(self._got_job_id) 
    797782        d.addCallback(self._got_jdict) 
    798783        d.addErrback(self._catch_failure) 
    799      
     784 
    800785    def __repr__(self): 
    801786        return self.job_id 
    802          
     787 
    803788    def __str__(self): 
    804789        if self.status == 'completed' and not self.output: 
    805790            return 'No output. (Done)' 
    806791        elif not self.output: 
    807792            return 'No output yet. (Not done)' 
    808          
     793 
    809794        return self.output 
    810      
     795 
    811796    def __getstate__(self): 
    812797        d = copy.copy(self.__dict__) 
    813798        d['remoteobj'] = None 
    814799        d['sync_job_task'] = None 
    815          
     800 
    816801        return d 
    817      
     802 
    818803    def _update_job(self, jdict): 
    819804        self._jdict = jdict 
    820805        job = expand_job(jdict) 
     
    826811        timeout = 0.5 
    827812        while self._job.result is None: 
    828813            reactor.iterate(timeout) 
    829      
     814 
    830815    def save(self, filename=None): 
    831816        if filename is None: 
    832817            filename = str(self._job.name) 
    833818        filename += '.sobj' 
    834819        f = open(filename, 'w') 
    835820        cPickle.dump(self, f, 2) 
    836          
     821 
    837822        return filename 
    838      
     823 
    839824    def restore(self, dsage): 
    840825        self._remoteobj = dsage.remoteobj 
    841      
     826 
    842827    def _catch_failure(self, failure): 
    843828        from twisted.internet import error 
    844829        from twisted.spread import pb 
     
    848833            pass 
    849834            # print "Error: ", failure.getErrorMessage() 
    850835            # print "Traceback: ", failure.printTraceback() 
    851      
     836 
    852837    def _got_job_id(self, job_id): 
    853838        self.job_id = job_id 
    854839        try: 
    855840            d = self._remoteobj.callRemote('get_job_by_id', job_id) 
    856841        except Exception, msg: 
    857842            raise 
    858          
     843 
    859844        return d 
    860      
     845 
    861846    def _got_jdict(self, jdict): 
    862847        self.job_id = jdict['job_id'] 
    863848        self._update_job(jdict) 
    864      
     849 
    865850    def get_job(self): 
    866851        from sage.dsage.errors.exceptions import NotConnectedException 
    867          
     852 
    868853        if self._remoteobj is None: 
    869854            raise NotConnectedException 
    870855        if self.job_id is None: 
     
    873858            d = self._remoteobj.callRemote('get_job_by_id', self.job_id) 
    874859        except Exception, msg: 
    875860            raise 
    876          
     861 
    877862        d.addCallback(self._got_jdict) 
    878863        d.addErrback(self._catch_failure) 
    879          
     864 
    880865        return d 
    881      
     866 
    882867    def get_job_output(self): 
    883868        if self._remoteobj == None: 
    884869            return 
     
    887872                                           self.job_id) 
    888873        except Exception, msg: 
    889874            raise 
    890          
     875 
    891876        d.addCallback(self._got_job_output) 
    892877        d.addErrback(self._catch_failure) 
    893          
     878 
    894879        return d 
    895      
     880 
    896881    def _got_job_output(self, output): 
    897882        self.output = output 
    898      
     883 
    899884    def get_job_result(self): 
    900885        if self._remoteobj == None: 
    901886            return 
     
    904889                                           self.job_id) 
    905890        except Exception, msg: 
    906891            raise 
    907          
     892 
    908893        d.addCallback(self._got_job_result) 
    909894        d.addErrback(self._catch_failure) 
    910          
     895 
    911896        return d 
    912      
     897 
    913898    def _got_job_result(self, result): 
    914899        self.result = result 
    915      
     900 
    916901    def sync_job(self): 
    917902        from twisted.spread import pb 
    918903        if self._remoteobj == None: 
     
    926911                if self.sync_job_task.running: 
    927912                    self.sync_job_task.stop() 
    928913            return 
    929          
     914 
    930915        try: 
    931916            d = self._remoteobj.callRemote('sync_job', self.job_id) 
    932917        except pb.DeadReferenceError: 
     
    934919                if self.sync_job_task.running: 
    935920                    self.sync_job_task.stop() 
    936921            return 
    937          
     922 
    938923        d.addCallback(self._got_jdict) 
    939924        d.addErrback(self._catch_failure) 
    940      
     925 
    941926    def write_result(self, filename): 
    942927        result_file = open(filename, 'w') 
    943          
     928 
    944929        # skip the first element since that is not the actual result 
    945930        for line in self.result: 
    946931            line = str(line) 
    947932            result_file.write(line) 
    948933        result_file.close() 
    949      
     934 
    950935    def kill(self): 
    951936        """ 
    952937        Kills the current job. 
    953          
     938 
    954939        """ 
    955          
     940 
    956941        if self.job_id is not None: 
    957942            try: 
    958943                d = self._remoteobj.callRemote('kill_job', self.job_id) 
    959944            except Exception, msg: 
    960                 print 'Unable to kill %s because %s'  % (self.job_id, msg) 
     945                print 'Unable to kill %s because %s' % (self.job_id, msg) 
    961946                return 
    962947            d.addCallback(self._killed_job) 
    963948            d.addErrback(self._catch_failure) 
    964949            return d 
    965950        else: 
    966951            return 
    967          
     952 
    968953    def _killed_job(self, job_id): 
    969         return 
     954        self.status = 'killed' 
     955 
    970956 
    971957class BlockingJobWrapper(JobWrapper): 
    972958    """ 
    973959    Blocking version of the JobWrapper object.  This is to be used 
    974960    interactively. 
    975      
     961 
    976962    """ 
    977      
     963 
    978964    def __init__(self, remoteobj, job): 
    979965        self._update_job(job._reduce()) 
    980966        self._remoteobj = remoteobj 
    981         from twisted.internet import reactor 
    982967        self.job_id = blockingCallFromThread(reactor, self._remoteobj.callRemote, 
    983968                                           'submit_job', job._reduce()) 
    984      
     969 
    985970    def __repr__(self): 
    986971        if self.killed: 
    987972            return 'Job %s was killed' % (self.job_id) 
     
    989974            self.get_job() 
    990975        if self.status == 'completed' and not self.output: 
    991976            return 'No output.' 
     977        if self.result: 
     978            return str(self.result) 
    992979        if not self.output: 
    993980            return 'No output yet.' 
    994981        else: 
    995982            return self.output 
    996      
     983 
    997984    def get_job(self): 
    998985        from sage.dsage.errors.exceptions import NotConnectedException 
    999          
     986 
    1000987        if self._remoteobj == None: 
    1001988            raise NotConnectedException 
    1002989        if self.status == 'completed': 
    1003990            return 
    1004          
    1005         from twisted.internet import reactor         
     991 
    1006992        jdict = blockingCallFromThread(reactor, self._remoteobj.callRemote, 
    1007993                                        'get_job_by_id', self.job_id) 
    1008          
     994 
    1009995        self._update_job(jdict) 
    1010      
     996 
    1011997    def async_get_job(self): 
    1012998        return JobWrapper.get_job(self) 
    1013      
     999 
    10141000    def rerun(self): 
    10151001        """ 
    10161002        Resubmits the current job. 
     1003 
    10171004        """ 
    1018         from twisted.internet import reactor 
    10191005        self.job_id = blockingCallFromThread(reactor, 
    10201006                                             self._remoteobj.callRemote, 
    10211007                                             'submit_job', self._jdict) 
     1008 
    10221009    def kill(self): 
    10231010        """ 
    10241011        Kills the current job. 
    1025          
     1012 
    10261013        """ 
    1027         from twisted.internet import reactor         
     1014 
    10281015        job_id = blockingCallFromThread(reactor, self._remoteobj.callRemote, 
    10291016                                           'kill_job', self.job_id) 
    10301017        self.job_id = job_id 
    10311018        self.killed = True 
    1032          
     1019 
    10331020        return job_id 
    1034      
    1035      
     1021 
    10361022    def async_kill(self): 
    10371023        """ 
    10381024        async version of kill 
    1039          
     1025 
    10401026        """ 
    1041          
     1027 
    10421028        d = self._remoteobj.callRemote('kill_job', self.job_id) 
    10431029        d.addCallback(self._killed_job) 
    10441030        d.addErrback(self._catch_failure) 
    1045          
     1031 
    10461032        return d 
    1047      
    1048      
     1033 
    10491034    def wait(self, timeout=None): 
    10501035        """ 
    10511036        Waits on a job until it is completed. 
    1052          
     1037 
    10531038        Parameters: 
    10541039        timeout -- number of seconds to wait, if it has not completed by then 
    10551040                   it will raise RunTimeError if it is set to None, 
    10561041                   it will wait indefinitely until the job is completed 
    10571042        """ 
    1058          
     1043 
    10591044        import signal 
    1060          
     1045 
    10611046        if timeout is None: 
    10621047            while self.status != 'completed': 
    10631048                # print 'Wating...' 
    1064                 time.sleep(1.0) 
     1049                time.sleep(0.5) 
    10651050                self.get_job() 
    10661051        else: 
    10671052            def handler(signum, frame): 
    10681053                raise RuntimeError('Maximum wait time exceeded.') 
     1054 
    10691055            signal.signal(signal.SIGALRM, handler) 
    10701056            signal.alarm(timeout) 
    10711057            while self.status != 'completed': 
    1072                 time.sleep(1.0) 
     1058                time.sleep(0.5) 
    10731059                self.get_job() 
    10741060            signal.alarm(0) 
  • sage/dsage/misc/config.py

    diff --git a/sage/dsage/interface/nodoctest.py b/sage/dsage/interface/nodoctest.py
    new file mode 100644
    diff --git a/sage/dsage/misc/config.py b/sage/dsage/misc/config.py
    a b  
    2424 
    2525import os 
    2626import ConfigParser 
     27import uuid 
    2728 
    2829from sage.dsage.misc.constants import DSAGE_DIR 
    2930     
     
    8889        elif type == 'monitor': 
    8990            conf_file = os.path.join(DSAGE_DIR, 'worker.conf') 
    9091            config.read(conf_file) 
    91             import uuid 
    9292            if len(config.get('uuid', 'id')) != 36: 
    9393                config.set('uuid', 'id', str(uuid.uuid1())) 
    9494                f = open(conf_file, 'w') 
     
    121121    if value.lower() not in boolean_states: 
    122122        raise ValueError('Not a boolean: %s' % value) 
    123123     
    124     return boolean_states[value.lower()] 
     124    return boolean_states[value.lower()] 
     125 No newline at end of file 
  • sage/dsage/misc/constants.py

    diff --git a/sage/dsage/misc/constants.py b/sage/dsage/misc/constants.py
    a b  
    88DSAGE_LOCAL = os.path.join(os.getenv('SAGE_ROOT'), 'local/dsage') 
    99DSAGE_DB_DIR = os.path.join(DSAGE_DIR, 'db') 
    1010DSAGE_DB = os.path.join(DSAGE_DB_DIR, 'dsage.db') 
     11SAGE_BIN = os.path.join(os.getenv('SAGE_ROOT'), 'sage') 
    1112 
    1213# These are the twisted tac files to be used with twistd 
    1314SERVER_TAC = """import sys 
     
    144145dsage_service.setServiceParent(application) 
    145146 
    146147print_info(dsage_server)""" 
    147  
    148 WORKER_TAC = """""" 
    149  No newline at end of file 
  • sage/dsage/misc/misc.py

    diff --git a/sage/dsage/misc/misc.py b/sage/dsage/misc/misc.py
    a b  
    2828 
    2929from sage.dsage.misc.constants import DSAGE_DIR 
    3030from sage.dsage.misc.config import check_dsage_dir 
     31 
     32 
     33def test_dsage(dsage): 
     34    from time import sleep 
     35    port = find_open_port().next() 
     36    dsage.server(blocking=False, port=port, verbose=False, ssl=False, 
     37                 log_level=5, testing=True) 
     38    dsage.worker(blocking=False, port=port, verbose=False, ssl=False, 
     39                 log_level=5,authenticate=False) 
     40    sleep(0.5) 
     41    d = dsage.connect(username='tester', port=port, ssl=False, testing=True) 
     42    sleep(0.5) 
     43 
     44    return d 
     45 
     46def write_tac(tac, fname): 
     47    os.chdir(DSAGE_DIR) 
     48    f = open(fname, 'w') 
     49    f.writelines(tac) 
     50    f.close() 
    3151 
    3252def exec_wrs(script): 
    3353    """ 
     
    135155                yield port 
    136156                port += 1 
    137157            else: 
    138                 port += 1 
    139  No newline at end of file 
     158                port += 1 
  • sage/dsage/scripts/dsage_setup.py

    diff --git a/sage/dsage/scripts/dsage_setup.py b/sage/dsage/scripts/dsage_setup.py
    a b  
    11############################################################################ 
    2 #                                                                      
    3 #   DSAGE: Distributed SAGE                      
    4 #                                                                              
    5 #       Copyright (C) 2006, 2007 Yi Qiang <yqiang@gmail.com>                
    6 #                                                                             
    7 #  Distributed under the terms of the GNU General Public License (GPL)         
     2# 
     3#   DSAGE: Distributed SAGE 
     4# 
     5#       Copyright (C) 2006, 2007 Yi Qiang <yqiang@gmail.com> 
     6# 
     7#  Distributed under the terms of the GNU General Public License (GPL) 
    88# 
    99#    This code is distributed in the hope that it will be useful, 
    1010#    but WITHOUT ANY WARRANTY; without even the implied warranty of 
     
    3737SAGE_ROOT = os.getenv('SAGE_ROOT') 
    3838DSAGE_VERSION = version 
    3939 
     40 
    4041def get_config(type): 
    4142    config = ConfigParser.ConfigParser() 
    4243    config.add_section('general') 
     
    5455        config.add_section('server_log') 
    5556        config.add_section('db') 
    5657        config.add_section('db_log') 
     58 
    5759    return config 
     60 
    5861 
    5962def add_default_client(Session): 
    6063    """ 
    6164    Adds the default client. 
    62      
     65 
    6366    """ 
    64      
     67 
    6568    from twisted.conch.ssh import keys 
    6669    from getpass import getuser 
    67      
     70 
    6871    clientdb = ClientDatabase(Session) 
    69      
     72 
    7073    username = getuser() 
    7174    pubkey_file = os.path.join(DSAGE_DIR, 'dsage_key.pub') 
    7275    pubkey = keys.Key.fromFile(pubkey_file) 
     
    8386        else: 
    8487            print 'User %s already exists.' % (username) 
    8588 
     89 
    8690def setup_client(testing=False): 
    8791    check_dsage_dir() 
    8892    key_file = os.path.join(DSAGE_DIR, 'dsage_key') 
    8993    if testing: 
    9094        cmd = ["ssh-keygen", "-q", "-trsa", "-P ''", "-f%s" % key_file] 
    9195        return 
    92      
     96 
    9397    if not cmd_exists('ssh-keygen'): 
    9498        print DELIMITER 
    9599        print "Could NOT find ssh-keygen." 
    96100        print "Aborting." 
    97101        return 
    98          
     102 
    99103    print DELIMITER 
    100104    print "Generating public/private key pair for authentication..." 
    101105    print "Your key will be stored in %s/dsage_key" % DSAGE_DIR 
    102106    print "Just hit enter when prompted for a passphrase" 
    103107    print DELIMITER 
    104      
    105     cmd = ["ssh-keygen", "-q", "-trsa", "-f%s" % key_file]     
     108 
     109    cmd = ["ssh-keygen", "-q", "-trsa", "-f%s" % key_file] 
    106110    ld = os.environ['LD_LIBRARY_PATH'] 
    107111    try: 
    108112        del os.environ['LD_LIBRARY_PATH'] 
    109113        p = subprocess.call(cmd) 
    110114    finally: 
    111115        os.environ['LD_LIBRARY_PATH'] = ld 
    112          
     116 
    113117    print "\n" 
    114118    print "Client configuration finished.\n" 
     119 
    115120 
    116121def setup_worker(): 
    117122    check_dsage_dir() 
    118123    print "Worker configuration finished.\n" 
     124 
    119125 
    120126def setup_server(template=None): 
    121127    check_dsage_dir() 
     
    125131    if dn == '': 
    126132        print "Using default localhost" 
    127133        dn = 'localhost' 
    128      
     134 
    129135    template_dict = {'organization': 'SAGE (at %s)' % (dn), 
    130136                'unit': '389', 
    131137                'locality': None, 
     
    134140                'cn': dn, 
    135141                'uid': 'sage_user', 
    136142                'dn_oid': None, 
    137                 'serial': str(random.randint(1,2**31)), 
     143                'serial': str(random.randint(1, 2**31)), 
    138144                'dns_name': None, 
    139145                'crl_dist_points': None, 
    140146                'ip_address': None, 
     
    146152                'signing_key': True, 
    147153                'encryption_key': True, 
    148154                } 
    149                  
     155 
    150156    if isinstance(template, dict): 
    151157        template_dict.update(template) 
    152      
     158 
    153159    s = "" 
    154160    for key, val in template_dict.iteritems(): 
    155161        if val is None: 
     
    160166            w = ' '.join(['"%s"' % x for x in val]) 
    161167        else: 
    162168            w = '"%s"' % val 
    163         s += '%s = %s \n' % (key, w)  
    164      
     169        s += '%s = %s \n' % (key, w) 
     170 
    165171    template_file = os.path.join(DSAGE_DIR, 'cert.cfg') 
    166172    f = open(template_file, 'w') 
    167173    f.write(s) 
    168174    f.close() 
    169      
     175 
    170176    # Disable certificate generation -- not used right now anyways 
    171177    privkey_file = os.path.join(DSAGE_DIR, 'cacert.pem') 
    172178    pubkey_file = os.path.join(DSAGE_DIR, 'pubcert.pem') 
    173      
     179 
    174180    print DELIMITER 
    175181    print "Generating SSL certificate for server..." 
    176      
     182 
    177183    if False and os.uname()[0] != 'Darwin' and cmd_exists('openssl'): 
    178184        # We use openssl by default if it exists, since it is *vastly* 
    179185        # faster on Linux. 
     
    187193        print cmd[0] 
    188194        # cmd = ['openssl genrsa > %s' % privkey_file] 
    189195        subprocess.call(cmd, shell=True) 
    190          
     196 
    191197    cmd = ['certtool --generate-self-signed --template %s --load-privkey %s \ 
    192198           --outfile %s' % (template_file, privkey_file, pubkey_file)] 
    193199    subprocess.call(cmd, shell=True) 
    194200    print DELIMITER 
    195      
     201 
    196202    # Set read only permissions on cert 
    197203    os.chmod(os.path.join(DSAGE_DIR, 'cacert.pem'), 0600) 
    198      
     204 
    199205    # create database schemas 
    200206    from sage.dsage.database.db_config import init_db_sa as init_db 
    201207    Session = init_db(DSAGE_DB) 
    202      
     208 
    203209    # add default user 
    204210    add_default_client(Session) 
    205              
     211 
    206212    print "Server configuration finished.\n\n" 
    207      
     213 
     214 
    208215def setup(template=None): 
    209216    setup_client() 
    210217    setup_worker() 
     
    221228            setup_worker() 
    222229        elif sys.argv[1] == 'client': 
    223230            setup_client() 
    224  
  • (a) a/sage/dsage/scripts/dsage_worker.py vs. (b) /dev/null

    diff --git a/sage/dsage/scripts/dsage_worker.py b/sage/dsage/scripts/dsage_worker.py
    deleted file mode 100755
    <
    a b  
    1 #!/usr/bin/env python 
    2 ############################################################################ 
    3 #                                                                      
    4 #   DSAGE: Distributed SAGE                      
    5 #                                                                              
    6 #       Copyright (C) 2006, 2007 Yi Qiang <yqiang@gmail.com>                
    7 #                                                                             
    8 #  Distributed under the terms of the GNU General Public License (GPL)         
    9 # 
    10 #    This code is distributed in the hope that it will be useful, 
    11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of 
    12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
    13 #    General Public License for more details. 
    14 # 
    15 #  The full text of the GPL is available at: 
    16 # 
    17 #                  http://www.gnu.org/licenses/ 
    18 # 
    19 ############################################################################ 
    20 __docformat__ = "restructuredtext en" 
    21  
    22 import sys 
    23 import os 
    24 import cPickle 
    25 import zlib 
    26 import pexpect 
    27 import datetime 
    28 from math import ceil 
    29 from getpass import getuser 
    30  
    31 from twisted.spread import pb 
    32 from twisted.internet import reactor, defer, error, task 
    33 from twisted.python import log 
    34 from twisted.spread import banana 
    35 banana.SIZE_LIMIT = 100*1024*1024 # 100 MegaBytes 
    36  
    37 from gnutls.constants import * 
    38 from gnutls.crypto import * 
    39 from gnutls.errors import * 
    40 from gnutls.interfaces.twisted import X509Credentials 
    41  
    42 from sage.interfaces.sage0 import Sage 
    43 from sage.misc.preparser import preparse_file 
    44  
    45 from sage.dsage.database.job import Job, expand_job 
    46 from sage.dsage.misc.hostinfo import HostInfo 
    47 from sage.dsage.errors.exceptions import NoJobException 
    48 from sage.dsage.twisted.pb import ClientFactory 
    49 from sage.dsage.misc.constants import DELIMITER 
    50 from sage.dsage.misc.constants import DSAGE_DIR 
    51 from sage.dsage.misc.constants import TMP_WORKER_FILES 
    52 from sage.dsage.misc.misc import random_str, get_uuid 
    53  
    54 START_MARKER = '\x01r\x01e'  
    55 END_MARKER = '\x01r\x01b' 
    56 LOG_PREFIX = "[Worker %s] " 
    57  
    58 class Worker(object): 
    59     """ 
    60     Workers perform the computation of dsage jobs. 
    61      
    62     """ 
    63      
    64     def __init__(self, remoteobj, id, log_level=0, poll=1.0): 
    65         """ 
    66         :type remoteobj: remoteobj 
    67         :param remoteobj: Reference to the remote dsage server 
    68          
    69         :type id: integer 
    70         :param id: numerical identifier of worker 
    71          
    72         :type log_level: integer 
    73         :param log_level: log level, higher means more verbose 
    74          
    75         :type poll: integer 
    76         :param poll: rate (in seconds) a worker talks to the server 
    77          
    78         """ 
    79          
    80         self.remoteobj = remoteobj 
    81         self.id = id 
    82         self.free = True 
    83         self.job = None 
    84         self.log_level = log_level 
    85         self.poll_rate = poll 
    86         self.checker_task = task.LoopingCall(self.check_work)