Ticket #14150: 14150_use_fork.patch

File 14150_use_fork.patch, 8.1 KB (added by jdemeyer, 7 years ago)
  • sage/homology/simplicial_complex.py

    # HG changeset patch
    # User Jeroen Demeyer <jdemeyer@cage.ugent.be>
    # Date 1361633777 -3600
    # Node ID add47f1fdcaef7d4272dd8641744725824b7ef7a
    # Parent  0c1f23063394bc714ec5c5f97bb9dce483b2929c
    Fix wait() in @parallel
    
    diff --git a/sage/homology/simplicial_complex.py b/sage/homology/simplicial_complex.py
    a b  
    21642164        statement that the Stanley-Reisner ring of ``self`` is
    21652165        Cohen-Macaulay.
    21662166
    2167         .. NOTE ::
    2168 
    2169             This method, especially when it returns ``False``, may
    2170             print a message like ::
    2171 
    2172                 Exception OSError: (10, 'No child processes') in <generator object __call__ at 0x10c8e3af0>     ignored
    2173 
    2174             This may be ignored.
    2175 
    21762167        EXAMPLES:
    21772168
    21782169        Spheres are Cohen-Macaulay::
  • sage/parallel/use_fork.py

    diff --git a/sage/parallel/use_fork.py b/sage/parallel/use_fork.py
    a b  
    6666
    6767        INPUT:
    6868
    69             - ``f`` -- a Python function that need not be pickleable or anything else!
    70             - ``inputs`` -- a list of pickleable pairs ``(args, kwds)``, where
    71               ``args`` is a tuple and ``kwds`` is a dictionary.
     69        - ``f`` -- a Python function that need not be pickleable or anything else!
     70
     71        - ``inputs`` -- a list of pickleable pairs ``(args, kwds)``, where
     72          ``args`` is a tuple and ``kwds`` is a dictionary.
    7273
    7374        OUTPUT:
    7475
     
    7980            [(([10], {}), 100), (([20], {}), 400)]
    8081            sage: sorted(list( F( (lambda x, y: x^2+y), [([10],{'y':1}), ([20],{'y':2})])))
    8182            [(([10], {'y': 1}), 101), (([20], {'y': 2}), 402)]
     83
     84        TESTS:
     85
     86        The output of functions decorated with :func:parallel is read
     87        as a pickle by the parent process. We intentionally break the
     88        unpickling and demonstrate that this failure is handled
     89        gracefully (an exception is displayed and an empty list is
     90        returned)::
     91
     92            sage: Polygen = parallel(polygen)
     93            sage: list(Polygen([QQ]))
     94            [(((Rational Field,), {}), x)]
     95            sage: from sage.structure.sage_object import unpickle_override, register_unpickle_override
     96            sage: register_unpickle_override('sage.rings.polynomial.polynomial_rational_flint', 'Polynomial_rational_flint', Integer)
     97            sage: L = list(Polygen([QQ]))
     98            ('__init__() takes at most 2 positional arguments (4 given)', <type 'sage.rings.integer.Integer'>, (Univariate Polynomial Ring in x over Rational Field, [0, 1], False, True))
     99            sage: L
     100            []
     101
     102        Fix the unpickling::
     103
     104            sage: del unpickle_override[('sage.rings.polynomial.polynomial_rational_flint', 'Polynomial_rational_flint')]
     105            sage: list(Polygen([QQ,QQ]))
     106            [(((Rational Field,), {}), x), (((Rational Field,), {}), x)]
    82107        """
    83108        n = self.ncpus
    84109        v = list(inputs)
     
    87112        from sage.misc.all import tmp_dir, walltime
    88113        dir = tmp_dir()
    89114        timeout = self.timeout
    90         # Subprocesses shouldn't inherit unflushed buffers (cf. #11778):
    91         sys.stdout.flush()
    92         sys.stderr.flush()
    93115
    94116        workers = {}
    95117        try:
    96118            while len(v) > 0 or len(workers) > 0:
    97119                # Spawn up to n subprocesses
    98120                while len(v) > 0 and len(workers) < n:
     121                    # Subprocesses shouldn't inherit unflushed buffers (cf. #11778):
     122                    sys.stdout.flush()
     123                    sys.stderr.flush()
     124
    99125                    pid = os.fork()
    100126                    # The way fork works is that pid returns the
    101127                    # nonzero pid of the subprocess for the master
     
    117143                        oldest = min([X[1] for X in workers.values()])
    118144                        signal.signal(signal.SIGALRM, mysig)
    119145                        signal.alarm(max(int(timeout - (walltime()-oldest)), 1))
     146
    120147                    try:
    121148                        pid = os.wait()[0]
    122149                        signal.signal(signal.SIGALRM, signal.SIG_IGN)
     150                        w = workers.pop(pid)
    123151                    except RuntimeError:
    124152                        signal.signal(signal.SIGALRM, signal.SIG_IGN)
    125153                        # Kill workers that are too old
     
    129157                                    print(
    130158                                        "Killing subprocess %s with input %s which took too long"
    131159                                         % (pid, X[0]) )
    132                                     sys.stdout.flush()
    133                                 os.kill(pid,9)
     160                                os.kill(pid, signal.SIGKILL)
    134161                                X[-1] = ' (timed out)'
     162                    except KeyError:
     163                        # Some other process exited, not our problem...
     164                        pass
    135165                    else:
    136                         # If the computation was interrupted the pid
    137                         # might not be in the workers list, in which
    138                         # case we skip this.
    139                         if pid in workers:
    140                             # collect data from process that successfully terminated
    141                             sobj = os.path.join(dir, '%s.sobj'%pid)
    142                             if not os.path.exists(sobj):
    143                                 X = "NO DATA" + workers[pid][-1]  # the message field
    144                             else:
    145                                 X = load(sobj, compress=False)
    146                                 os.unlink(sobj)
    147                             out = os.path.join(dir, '%s.out'%pid)
    148                             if not os.path.exists(out):
    149                                 output = "NO OUTPUT"
    150                             else:
    151                                 output = open(out).read()
    152                                 os.unlink(out)
     166                        # collect data from process that successfully terminated
     167                        sobj = os.path.join(dir, '%s.sobj'%pid)
     168                        if not os.path.exists(sobj):
     169                            X = "NO DATA" + w[-1]  # the message field
     170                        else:
     171                            X = load(sobj, compress=False)
     172                            os.unlink(sobj)
     173                        out = os.path.join(dir, '%s.out'%pid)
     174                        if not os.path.exists(out):
     175                            output = "NO OUTPUT"
     176                        else:
     177                            output = open(out).read()
     178                            os.unlink(out)
    153179
    154                             if output.strip():
    155                                 print output,
    156                                 sys.stdout.flush()
     180                        if output.strip():
     181                            print output,
    157182
    158                             yield (workers[pid][0], X)
    159                             del workers[pid]
     183                        yield (w[0], X)
    160184
    161         except Exception, msg:
     185        except Exception as msg:
    162186            print msg
    163             sys.stdout.flush()
    164187
    165188        finally:
    166 
    167189            # Clean up all temporary files.
    168190            try:
    169191                for X in os.listdir(dir):
    170192                    os.unlink(os.path.join(dir, X))
    171193                os.rmdir(dir)
    172             except OSError, msg:
     194            except OSError as msg:
    173195                if self.verbose:
    174196                    print msg
    175                     sys.stdout.flush()
    176197
    177198            # Send "kill -9" signal to workers that are left.
    178199            if len(workers) > 0:
     
    181202                sys.stdout.flush()
    182203                for pid in workers.keys():
    183204                    try:
    184                         os.kill(pid, 9)
    185                     except OSError, msg:
     205                        os.kill(pid, signal.SIGKILL)
     206                        os.waitpid(pid, 0)
     207                    except OSError as msg:
    186208                        if self.verbose:
    187209                            print msg
    188                             sys.stdout.flush()
    189                 os.wait()
    190210
    191211    def _subprocess(self, f, dir, x):
    192212        """