Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

""" 

Parallel iterator built using the ``fork()`` system call 

""" 

 

#***************************************************************************** 

# Copyright (C) 2010 William Stein <wstein@gmail.com> 

# 

# This program is free software: you can redistribute it and/or modify 

# it under the terms of the GNU General Public License as published by 

# the Free Software Foundation, either version 2 of the License, or 

# (at your option) any later version. 

# http://www.gnu.org/licenses/ 

#***************************************************************************** 

 

from __future__ import absolute_import, print_function 

 

from shutil import rmtree 

from cysignals.alarm import AlarmInterrupt, alarm, cancel_alarm 

 

from sage.interfaces.process import ContainChildren 

from sage.misc.misc import walltime 

 

 

class WorkerData(object): 

""" 

Simple class which stores data about a running ``p_iter_fork`` 

worker. 

 

This just stores three attributes: 

 

- ``input``: the input value used by this worker 

 

- ``starttime``: the walltime when this worker started 

 

- ``failure``: an optional message indicating the kind of failure 

 

EXAMPLES:: 

 

sage: from sage.parallel.use_fork import WorkerData 

sage: W = WorkerData(42); W 

<sage.parallel.use_fork.WorkerData object at ...> 

sage: W.starttime # random 

1499330252.463206 

""" 

def __init__(self, input, starttime=None, failure=""): 

r""" 

See the class documentation for description of the inputs. 

 

EXAMPLES:: 

 

sage: from sage.parallel.use_fork import WorkerData 

sage: W = WorkerData(42) 

""" 

self.input = input 

self.starttime = starttime or walltime() 

self.failure = failure 

 

 

class p_iter_fork(object): 

""" 

A parallel iterator implemented using ``fork()``. 

 

INPUT: 

 

- ``ncpus`` -- the maximal number of simultaneous 

subprocesses to spawn 

- ``timeout`` -- (float, default: 0) wall time in seconds until 

a subprocess is automatically killed 

- ``verbose`` -- (default: False) whether to print 

anything about what the iterator does (e.g., killing 

subprocesses) 

- ``reset_interfaces`` -- (default: True) whether to reset 

all pexpect interfaces 

 

EXAMPLES:: 

 

sage: X = sage.parallel.use_fork.p_iter_fork(2,3, False); X 

<sage.parallel.use_fork.p_iter_fork object at ...> 

sage: X.ncpus 

2 

sage: X.timeout 

3.0 

sage: X.verbose 

False 

""" 

def __init__(self, ncpus, timeout=0, verbose=False, reset_interfaces=True): 

""" 

Create a ``fork()``-based parallel iterator. 

 

See the class documentation for description of the inputs. 

 

EXAMPLES:: 

 

sage: X = sage.parallel.use_fork.p_iter_fork(2,3, False); X 

<sage.parallel.use_fork.p_iter_fork object at ...> 

sage: X.ncpus 

2 

sage: X.timeout 

3.0 

sage: X.verbose 

False 

""" 

self.ncpus = int(ncpus) 

if self.ncpus != ncpus: # check that there wasn't a roundoff 

raise TypeError("ncpus must be an integer") 

self.timeout = float(timeout) # require a float 

self.verbose = verbose 

self.reset_interfaces = reset_interfaces 

 

def __call__(self, f, inputs): 

""" 

Parallel iterator using ``fork()``. 

 

INPUT: 

 

- ``f`` -- a function (or more general, any callable) 

 

- ``inputs`` -- a list of pairs ``(args, kwds)`` to be used as 

arguments to ``f``, where ``args`` is a tuple and ``kwds`` is 

a dictionary. 

 

OUTPUT: 

 

EXAMPLES:: 

 

sage: F = sage.parallel.use_fork.p_iter_fork(2,3) 

sage: sorted(list( F( (lambda x: x^2), [([10],{}), ([20],{})]))) 

[(([10], {}), 100), (([20], {}), 400)] 

sage: sorted(list( F( (lambda x, y: x^2+y), [([10],{'y':1}), ([20],{'y':2})]))) 

[(([10], {'y': 1}), 101), (([20], {'y': 2}), 402)] 

 

TESTS: 

 

The output of functions decorated with :func:`parallel` is read 

as a pickle by the parent process. We intentionally break the 

unpickling and demonstrate that this failure is handled 

gracefully (the exception is put in the list instead of the 

answer):: 

 

sage: Polygen = parallel(polygen) 

sage: list(Polygen([QQ])) 

[(((Rational Field,), {}), x)] 

sage: from sage.structure.sage_object import unpickle_override, register_unpickle_override 

sage: register_unpickle_override('sage.rings.polynomial.polynomial_rational_flint', 'Polynomial_rational_flint', Integer) 

sage: L = list(Polygen([QQ])) 

sage: L 

[(((Rational Field,), {}), 

'INVALID DATA __init__() takes at most 2 positional arguments (4 given)')] 

 

Fix the unpickling:: 

 

sage: del unpickle_override[('sage.rings.polynomial.polynomial_rational_flint', 'Polynomial_rational_flint')] 

sage: list(Polygen([QQ,QQ])) 

[(((Rational Field,), {}), x), (((Rational Field,), {}), x)] 

""" 

n = self.ncpus 

v = list(inputs) 

import os, sys, signal 

from sage.structure.sage_object import loads 

from sage.misc.temporary_file import tmp_dir 

dir = tmp_dir() 

timeout = self.timeout 

 

workers = {} 

try: 

while len(v) > 0 or len(workers) > 0: 

# Spawn up to n subprocesses 

while len(v) > 0 and len(workers) < n: 

v0 = v.pop(0) # Input value for the next subprocess 

with ContainChildren(): 

pid = os.fork() 

# The way fork works is that pid returns the 

# nonzero pid of the subprocess for the master 

# process and returns 0 for the subprocess. 

if not pid: 

# This is the subprocess. 

self._subprocess(f, dir, *v0) 

 

workers[pid] = WorkerData(v0) 

 

if len(workers) > 0: 

# Now wait for one subprocess to finish and report the result. 

# However, wait at most the time since the oldest process started. 

T = walltime() 

if timeout: 

oldest = min(W.starttime for W in workers.values()) 

alarm(max(timeout - (T - oldest), 0.1)) 

 

try: 

pid = os.wait()[0] 

cancel_alarm() 

W = workers.pop(pid) 

except AlarmInterrupt: 

# Kill workers that are too old 

for pid, W in workers.items(): 

if T - W.starttime > timeout: 

if self.verbose: 

print( 

"Killing subprocess %s with input %s which took too long" 

% (pid, W.input) ) 

os.kill(pid, signal.SIGKILL) 

W.failure = " (timed out)" 

except KeyError: 

# Some other process exited, not our problem... 

pass 

else: 

# collect data from process that successfully terminated 

sobj = os.path.join(dir, '%s.sobj'%pid) 

try: 

with open(sobj) as file: 

data = file.read() 

except IOError: 

answer = "NO DATA" + W.failure 

else: 

os.unlink(sobj) 

try: 

answer = loads(data, compress=False) 

except Exception as E: 

answer = "INVALID DATA {}".format(E) 

 

out = os.path.join(dir, '%s.out'%pid) 

try: 

with open(out) as file: 

sys.stdout.write(file.read()) 

os.unlink(out) 

except IOError: 

pass 

 

yield (W.input, answer) 

finally: 

# Send SIGKILL signal to workers that are left. 

if workers: 

if self.verbose: 

print("Killing any remaining workers...") 

sys.stdout.flush() 

for pid in workers: 

try: 

os.kill(pid, signal.SIGKILL) 

except OSError: 

# If kill() failed, it is most likely because 

# the process already exited. 

pass 

else: 

try: 

os.waitpid(pid, 0) 

except OSError as msg: 

if self.verbose: 

print(msg) 

 

# Clean up all temporary files. 

rmtree(dir) 

 

def _subprocess(self, f, dir, args, kwds={}): 

""" 

Setup and run evaluation of ``f(*args, **kwds)``, storing the 

result in the given directory ``dir``. 

 

This method is called by each forked subprocess. 

 

INPUT: 

 

- ``f`` -- a function 

 

- ``dir`` -- name of a directory 

 

- ``args`` -- a tuple with positional arguments for ``f`` 

 

- ``kwds`` -- (optional) a dict with keyword arguments for ``f`` 

 

TESTS: 

 

The method ``_subprocess`` is really meant to be run only in a 

subprocess. It doesn't print not return anything, the output is 

saved in pickles. It redirects stdout, so we save and later 

restore stdout in order not to break the doctester:: 

 

sage: saved_stdout = sys.stdout 

sage: F = sage.parallel.use_fork.p_iter_fork(2,3) 

sage: F._subprocess(operator.add, tmp_dir(), (1, 2)) 

sage: sys.stdout = saved_stdout 

""" 

import imp, os, sys 

from sage.structure.sage_object import save 

 

# Make it so all stdout is sent to a file so it can 

# be displayed. 

out = os.path.join(dir, '%s.out'%os.getpid()) 

sys.stdout = open(out, 'w') 

 

# Run some commands to tell Sage that its 

# pid has changed (forcing a reload of 

# misc). 

import sage.misc.misc 

imp.reload(sage.misc.misc) 

 

# The pexpect interfaces (and objects defined in them) are 

# not valid. 

if self.reset_interfaces: 

sage.interfaces.quit.invalidate_all() 

 

# Now evaluate the function f. 

value = f(*args, **kwds) 

 

# And save the result to disk. 

sobj = os.path.join(dir, '%s.sobj'%os.getpid()) 

save(value, sobj, compress=False)