Ticket #14150: 14150_use_fork.patch
File 14150_use_fork.patch, 8.1 KB (added by , 9 years ago) |
---|
-
sage/homology/simplicial_complex.py
# HG changeset patch # User Jeroen Demeyer <jdemeyer@cage.ugent.be> # Date 1361633777 -3600 # Node ID add47f1fdcaef7d4272dd8641744725824b7ef7a # Parent 0c1f23063394bc714ec5c5f97bb9dce483b2929c Fix wait() in @parallel diff --git a/sage/homology/simplicial_complex.py b/sage/homology/simplicial_complex.py
a b 2164 2164 statement that the Stanley-Reisner ring of ``self`` is 2165 2165 Cohen-Macaulay. 2166 2166 2167 .. NOTE ::2168 2169 This method, especially when it returns ``False``, may2170 print a message like ::2171 2172 Exception OSError: (10, 'No child processes') in <generator object __call__ at 0x10c8e3af0> ignored2173 2174 This may be ignored.2175 2176 2167 EXAMPLES: 2177 2168 2178 2169 Spheres are Cohen-Macaulay:: -
sage/parallel/use_fork.py
diff --git a/sage/parallel/use_fork.py b/sage/parallel/use_fork.py
a b 66 66 67 67 INPUT: 68 68 69 - ``f`` -- a Python function that need not be pickleable or anything else! 70 - ``inputs`` -- a list of pickleable pairs ``(args, kwds)``, where 71 ``args`` is a tuple and ``kwds`` is a dictionary. 69 - ``f`` -- a Python function that need not be pickleable or anything else! 70 71 - ``inputs`` -- a list of pickleable pairs ``(args, kwds)``, where 72 ``args`` is a tuple and ``kwds`` is a dictionary. 72 73 73 74 OUTPUT: 74 75 … … 79 80 [(([10], {}), 100), (([20], {}), 400)] 80 81 sage: sorted(list( F( (lambda x, y: x^2+y), [([10],{'y':1}), ([20],{'y':2})]))) 81 82 [(([10], {'y': 1}), 101), (([20], {'y': 2}), 402)] 83 84 TESTS: 85 86 The output of functions decorated with :func:parallel is read 87 as a pickle by the parent process. We intentionally break the 88 unpickling and demonstrate that this failure is handled 89 gracefully (an exception is displayed and an empty list is 90 returned):: 91 92 sage: Polygen = parallel(polygen) 93 sage: list(Polygen([QQ])) 94 [(((Rational Field,), {}), x)] 95 sage: from sage.structure.sage_object import unpickle_override, register_unpickle_override 96 sage: register_unpickle_override('sage.rings.polynomial.polynomial_rational_flint', 'Polynomial_rational_flint', Integer) 97 sage: L = list(Polygen([QQ])) 98 ('__init__() takes at most 2 positional arguments (4 given)', <type 'sage.rings.integer.Integer'>, (Univariate Polynomial Ring in x over Rational Field, [0, 1], False, True)) 99 sage: L 100 [] 101 102 Fix the unpickling:: 103 104 sage: del unpickle_override[('sage.rings.polynomial.polynomial_rational_flint', 'Polynomial_rational_flint')] 105 sage: list(Polygen([QQ,QQ])) 106 [(((Rational Field,), {}), x), (((Rational Field,), {}), x)] 82 107 """ 83 108 n = self.ncpus 84 109 v = list(inputs) … … 87 112 from sage.misc.all import tmp_dir, walltime 88 113 dir = tmp_dir() 89 114 timeout = self.timeout 90 # Subprocesses shouldn't inherit unflushed buffers (cf. #11778):91 sys.stdout.flush()92 sys.stderr.flush()93 115 94 116 workers = {} 95 117 try: 96 118 while len(v) > 0 or len(workers) > 0: 97 119 # Spawn up to n subprocesses 98 120 while len(v) > 0 and len(workers) < n: 121 # Subprocesses shouldn't inherit unflushed buffers (cf. #11778): 122 sys.stdout.flush() 123 sys.stderr.flush() 124 99 125 pid = os.fork() 100 126 # The way fork works is that pid returns the 101 127 # nonzero pid of the subprocess for the master … … 117 143 oldest = min([X[1] for X in workers.values()]) 118 144 signal.signal(signal.SIGALRM, mysig) 119 145 signal.alarm(max(int(timeout - (walltime()-oldest)), 1)) 146 120 147 try: 121 148 pid = os.wait()[0] 122 149 signal.signal(signal.SIGALRM, signal.SIG_IGN) 150 w = workers.pop(pid) 123 151 except RuntimeError: 124 152 signal.signal(signal.SIGALRM, signal.SIG_IGN) 125 153 # Kill workers that are too old … … 129 157 print( 130 158 "Killing subprocess %s with input %s which took too long" 131 159 % (pid, X[0]) ) 132 sys.stdout.flush() 133 os.kill(pid,9) 160 os.kill(pid, signal.SIGKILL) 134 161 X[-1] = ' (timed out)' 162 except KeyError: 163 # Some other process exited, not our problem... 164 pass 135 165 else: 136 # If the computation was interrupted the pid 137 # might not be in the workers list, in which 138 # case we skip this. 139 if pid in workers: 140 # collect data from process that successfully terminated 141 sobj = os.path.join(dir, '%s.sobj'%pid) 142 if not os.path.exists(sobj): 143 X = "NO DATA" + workers[pid][-1] # the message field 144 else: 145 X = load(sobj, compress=False) 146 os.unlink(sobj) 147 out = os.path.join(dir, '%s.out'%pid) 148 if not os.path.exists(out): 149 output = "NO OUTPUT" 150 else: 151 output = open(out).read() 152 os.unlink(out) 166 # collect data from process that successfully terminated 167 sobj = os.path.join(dir, '%s.sobj'%pid) 168 if not os.path.exists(sobj): 169 X = "NO DATA" + w[-1] # the message field 170 else: 171 X = load(sobj, compress=False) 172 os.unlink(sobj) 173 out = os.path.join(dir, '%s.out'%pid) 174 if not os.path.exists(out): 175 output = "NO OUTPUT" 176 else: 177 output = open(out).read() 178 os.unlink(out) 153 179 154 if output.strip(): 155 print output, 156 sys.stdout.flush() 180 if output.strip(): 181 print output, 157 182 158 yield (workers[pid][0], X) 159 del workers[pid] 183 yield (w[0], X) 160 184 161 except Exception ,msg:185 except Exception as msg: 162 186 print msg 163 sys.stdout.flush()164 187 165 188 finally: 166 167 189 # Clean up all temporary files. 168 190 try: 169 191 for X in os.listdir(dir): 170 192 os.unlink(os.path.join(dir, X)) 171 193 os.rmdir(dir) 172 except OSError ,msg:194 except OSError as msg: 173 195 if self.verbose: 174 196 print msg 175 sys.stdout.flush()176 197 177 198 # Send "kill -9" signal to workers that are left. 178 199 if len(workers) > 0: … … 181 202 sys.stdout.flush() 182 203 for pid in workers.keys(): 183 204 try: 184 os.kill(pid, 9) 185 except OSError, msg: 205 os.kill(pid, signal.SIGKILL) 206 os.waitpid(pid, 0) 207 except OSError as msg: 186 208 if self.verbose: 187 209 print msg 188 sys.stdout.flush()189 os.wait()190 210 191 211 def _subprocess(self, f, dir, x): 192 212 """