Ticket #9501: trac_9501.2.patch

File trac_9501.2.patch, 10.0 KB (added by was, 11 years ago)

new version with fixed warnings.

  • sage/parallel/all.py

    # HG changeset patch
    # User William Stein <wstein@gmail.com>
    # Date 1279238019 -7200
    # Node ID bfe47c390b9417bf66e9024f61d57d061c3310a4
    # Parent  8f90c7549b515b7189a62fbe47cb5811b3de9997
    trac 9501: Make an @fork decorator
    
    diff -r 8f90c7549b51 -r bfe47c390b94 sage/parallel/all.py
    a b  
    1 from decorate import parallel
     1from decorate import parallel, fork
    22
  • sage/parallel/decorate.py

    diff -r 8f90c7549b51 -r bfe47c390b94 sage/parallel/decorate.py
    a b  
    1515    """
    1616    Convert a to a pair (args, kwds) using some rules:
    1717
    18         * if already of that form, leave that way.
    19         * if a is a tuple make (a,{})
    20         * if a is a dict make (tuple([]),a)
    21         * otherwise make ((a,),{})
     18        - if already of that form, leave that way.
     19        - if a is a tuple make (a,{})
     20        - if a is a dict make (tuple([]),a)
     21        - otherwise make ((a,),{})
    2222   
    2323    INPUT:
    24         a -- object
     24        - ``a`` -- object
    2525    OUTPUT:
    26         args -- tuple
    27         kwds -- dictionary
     26        - ``args`` -- tuple
     27        - ``kwds`` -- dictionary
    2828
    29     EXAMPLES:
     29    EXAMPLES::
     30   
    3031        sage: sage.parallel.decorate.normalize_input( (2, {3:4}) )
    3132        ((2, {3: 4}), {})
    3233        sage: sage.parallel.decorate.normalize_input( (2,3) )
     
    4950class Parallel:
    5051    """
    5152    Create parallel decorated function.
    52 
    5353    """
    5454    def __init__(self, p_iter = 'fork', ncpus=None, **kwds):
     55        """
     56        EXAMPLES::
     57
     58            sage: P = sage.parallel.decorate.Parallel(); P
     59            <sage.parallel.decorate.Parallel instance at 0x...>
     60        """
    5561        # The default p_iter is currently the reference implementation.
    5662        # This may change.
    5763        self.p_iter = None
     
    8490        normalized form (args, kwds) using normalize_inputs.
    8591
    8692        INPUT:
    87             f -- Python callable object or function
     93            - `f` -- Python callable object or function
     94           
    8895        OUTPUT:
    89             decorated version of f
     96            decorated version of `f`
    9097
    91         EXAMPLES:
     98        EXAMPLES::
    9299       
     100            sage: P = sage.parallel.decorate.Parallel()
     101            sage: def g(n,m): return n+m
     102            sage: h = P(g)          # indirect doctest
     103            sage: list(h([(2,3)]))
     104            [(((2, 3), {}), 5)]
    93105        """
    94106        # Construct the wrapper parallel version of the function we're wrapping.
    95107        # We may rework this so g is a class instance, which has the plus that
     
    106118    This is a decorator that gives a function a parallel interface,
    107119    allowing it to be called with a list of inputs, whose valuaes will
    108120    be computed in parallel.
    109    
     121
     122    .. warning::
     123
     124         The parallel subprocesses will not have access to data
     125         created in pexpect interfaces.  This behavior with respect to
     126         pexpect interfaces is very important to keep in mind when
     127         setting up certain computations. It's the one big limitation
     128         of this decorator.
     129
    110130    INPUT:
    111131   
    112132        - ``p_iter`` -- parallel iterator function or string:
    113             - 'fork'            -- (default) use a new fork for each input
    114             - 'multiprocessing' -- use multiprocessing library
    115             - 'reference'       -- use a fake serial reference implementation
     133            - ``fork``            -- (default) use a new fork for each input
     134            - ``multiprocessing`` -- use multiprocessing library
     135            - ``reference``       -- use a fake serial reference implementation
    116136        - ``ncpus`` -- integer, number of cpus
    117137        - ``timeout`` -- number of seconds until task is killed (only supported by 'fork')
    118138
     139    .. warning::
     140
     141         If you use anything but 'fork' above, then a whole new
     142         subprocess is spawned, so all your local state (variables,
     143         new functions, etc.,) is not available.
    119144
    120145    EXAMPLES::
    121146
     
    161186        return Parallel()(p_iter)
    162187    return Parallel(p_iter, ncpus, **kwds)
    163188   
     189
     190
     191###################################################################
     192# The @fork decorator -- run a function with no sideeffects in memory
     193# (so only side effects are on disk).
     194# We have both a function and a class below, so that the decorator
     195# can be used with or without options:
     196#   @fork
     197#   def f(...): ...
     198# and
     199#   @fork(...options...):
     200#   def f(...): ...
     201###################################################################
     202
     203class Fork:
     204    """
     205    A fork decorator class.
     206    """
     207    def __init__(self, timeout=0):
     208        """
     209        INPUT:
     210            - ``timeout`` -- (default: 0) kills subrocess after this many
     211              seconds, or if timeout=0, do not kill the subprocess.
     212
     213        EXAMPLES::
     214       
     215            sage: sage.parallel.decorate.Fork()
     216            <sage.parallel.decorate.Fork instance at 0x...>
     217            sage: sage.parallel.decorate.Fork(timeout=3)
     218            <sage.parallel.decorate.Fork instance at 0x...>
     219
     220        """
     221        self.timeout = timeout
     222       
     223    def __call__(self, f):
     224        """
     225        INPUT:
     226            - f -- a function
     227        OUTPUT:
     228            - a decorated function
     229
     230        EXAMPLES::
     231       
     232            sage: F = sage.parallel.decorate.Fork(timeout=3)
     233            sage: def g(n,m): return n+m
     234            sage: h = F(g)     # indirect doctest
     235            sage: h(2,3)
     236            5           
     237        """
     238        P = Parallel(p_iter='fork', ncpus=1, timeout=self.timeout)
     239        g = P(f)
     240        def h(*args, **kwds):
     241            return list(g([(args, kwds)]))[0][1]
     242        return h
     243
     244def fork(f=None, timeout=0):
     245    """
     246    Decorate a function so that when called it runs in a forked
     247    subprocess.  This means that it won't have any in-memory
     248    side-effects on the master Sage process.  The pexpect interfaces
     249    are all reset.
     250   
     251    INPUT:
     252        - ``f`` -- a function
     253        - ``timeout`` -- (default: 0) kills subrocess after this many
     254          seconds, or if timeout=0, do not kill the subprocess.
     255
     256    .. warning::
     257
     258        The forked subprocesses will not have access to data created
     259        in pexpect interfaces.  This behavior with respect to pexpect
     260        interfaces is very important to keep in mind when setting up
     261        certain computations. It's the one big limitation of this
     262        decorator.
     263
     264    EXAMPLES::
     265
     266    We create a function and run it with the fork decorator.  Note that
     267    it does not have a side effect.  Despite trying to change that global
     268    variable "a" below, it does not get changed.::
     269   
     270        sage: a = 5
     271        sage: @fork
     272        ... def g(n, m):
     273        ...       global a
     274        ...       a = 10
     275        ...       return factorial(n).ndigits() + m
     276        sage: g(5, m=5)
     277        8
     278        sage: a
     279        5
     280
     281    We use fork to make sure that the function dies after 1 second no matter what::
     282   
     283        sage: @fork(timeout=1)
     284        ... def g(n, m): return factorial(n).ndigits() + m
     285        sage: g(5, m=5)
     286        8
     287        sage: g(10^7, m=5)
     288        Killing subprocess ... with input ((10000000,), {'m': 5}) which took too long
     289        'NO DATA (timed out)'
     290
     291    We illustrate that pexpect interface state is not effected by
     292    forked functions (they get their own new pexpect interfaces!)::
     293   
     294        sage: gp.eval('a = 5')
     295        '5'
     296        sage: @fork()
     297        ... def g():
     298        ...       gp.eval('a = 10')
     299        ...       return gp.eval('a')
     300        sage: g()
     301        '10'
     302        sage: gp.eval('a')
     303        '5'
     304
     305    We illustrate that the forked function has its own pexpect interface.::
     306   
     307        sage: gp.eval('a = 15')
     308        '15'
     309        sage: @fork()
     310        ... def g(): return gp.eval('a')
     311        sage: g()
     312        'a'
     313
     314    We illustrate that segfaulting subprocesses are no trouble at all::
     315
     316        sage: cython('def f(): print <char*>0')
     317        sage: @fork
     318        ... def g(): f()
     319        sage: g()
     320        'NO DATA'
     321    """
     322    F = Fork(timeout=timeout)
     323    return F(f) if f else F
  • sage/parallel/use_fork.py

    diff -r 8f90c7549b51 -r bfe47c390b94 sage/parallel/use_fork.py
    a b  
    1515    """
    1616    A parallel iterator implemented using fork.
    1717    """
    18     def __init__(self, ncpus, timeout=0, verbose=True):
     18    def __init__(self, ncpus, timeout=0, verbose=True, reset_interfaces=True):
    1919        """
    2020        Create fork-based parallel iterator.
    2121       
    2222        INPUT:
    2323
    2424            - ``ncpus`` -- the maximal number of simultaneous processes to spawn
    25             - ``timeout`` -- (float) time in seconds until a subprocess is automatically killed
    26             - ``verbose`` -- whether to print anything about what the iterator does (e.g., killing subprocesses)
     25            - ``timeout`` -- (float) time in seconds until a
     26              subprocess is automatically killed
     27            - ``verbose`` -- whether to print anything about what the
     28              iterator does (e.g., killing subprocesses)
     29            - ``reset_interface`` -- if True (the default), all the
     30              pexpect interfaces are reset in the forked off
     31              subprocesses.  You definitely want this, since not doing
     32              this can lead to weird issues. 
    2733
    2834        EXAMPLES::
    2935
     
    4147            raise TypeError, "ncpus must be an integer"
    4248        self.timeout = float(timeout)  # require a float
    4349        self.verbose = verbose
     50        self.reset_interfaces = reset_interfaces
    4451
    4552    def __call__(self, f, inputs):
    4653        """
     
    191198
    192199            # The expect interfaces (and objects defined in them) are
    193200            # not valid.
    194             sage.interfaces.quit.invalidate_all()
     201            if self.reset_interfaces:
     202                sage.interfaces.quit.invalidate_all()
    195203
    196204            # Now evaluate the function f.
    197205            value = f(*x[0], **x[1])