Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
""" Parallel iterator built using the ``fork()`` system call """
#***************************************************************************** # Copyright (C) 2010 William Stein <wstein@gmail.com> # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 2 of the License, or # (at your option) any later version. # http://www.gnu.org/licenses/ #*****************************************************************************
from __future__ import absolute_import, print_function
from shutil import rmtree from cysignals.alarm import AlarmInterrupt, alarm, cancel_alarm
from sage.interfaces.process import ContainChildren from sage.misc.misc import walltime
class WorkerData(object): """ Simple class which stores data about a running ``p_iter_fork`` worker.
This just stores three attributes:
- ``input``: the input value used by this worker
- ``starttime``: the walltime when this worker started
- ``failure``: an optional message indicating the kind of failure
EXAMPLES::
sage: from sage.parallel.use_fork import WorkerData sage: W = WorkerData(42); W <sage.parallel.use_fork.WorkerData object at ...> sage: W.starttime # random 1499330252.463206 """ def __init__(self, input, starttime=None, failure=""): r""" See the class documentation for description of the inputs.
EXAMPLES::
sage: from sage.parallel.use_fork import WorkerData sage: W = WorkerData(42) """
class p_iter_fork(object): """ A parallel iterator implemented using ``fork()``.
INPUT:
- ``ncpus`` -- the maximal number of simultaneous subprocesses to spawn - ``timeout`` -- (float, default: 0) wall time in seconds until a subprocess is automatically killed - ``verbose`` -- (default: False) whether to print anything about what the iterator does (e.g., killing subprocesses) - ``reset_interfaces`` -- (default: True) whether to reset all pexpect interfaces
EXAMPLES::
sage: X = sage.parallel.use_fork.p_iter_fork(2,3, False); X <sage.parallel.use_fork.p_iter_fork object at ...> sage: X.ncpus 2 sage: X.timeout 3.0 sage: X.verbose False """ def __init__(self, ncpus, timeout=0, verbose=False, reset_interfaces=True): """ Create a ``fork()``-based parallel iterator.
See the class documentation for description of the inputs.
EXAMPLES::
sage: X = sage.parallel.use_fork.p_iter_fork(2,3, False); X <sage.parallel.use_fork.p_iter_fork object at ...> sage: X.ncpus 2 sage: X.timeout 3.0 sage: X.verbose False """ raise TypeError("ncpus must be an integer")
def __call__(self, f, inputs): """ Parallel iterator using ``fork()``.
INPUT:
- ``f`` -- a function (or more general, any callable)
- ``inputs`` -- a list of pairs ``(args, kwds)`` to be used as arguments to ``f``, where ``args`` is a tuple and ``kwds`` is a dictionary.
OUTPUT:
EXAMPLES::
sage: F = sage.parallel.use_fork.p_iter_fork(2,3) sage: sorted(list( F( (lambda x: x^2), [([10],{}), ([20],{})]))) [(([10], {}), 100), (([20], {}), 400)] sage: sorted(list( F( (lambda x, y: x^2+y), [([10],{'y':1}), ([20],{'y':2})]))) [(([10], {'y': 1}), 101), (([20], {'y': 2}), 402)]
TESTS:
The output of functions decorated with :func:`parallel` is read as a pickle by the parent process. We intentionally break the unpickling and demonstrate that this failure is handled gracefully (the exception is put in the list instead of the answer)::
sage: Polygen = parallel(polygen) sage: list(Polygen([QQ])) [(((Rational Field,), {}), x)] sage: from sage.structure.sage_object import unpickle_override, register_unpickle_override sage: register_unpickle_override('sage.rings.polynomial.polynomial_rational_flint', 'Polynomial_rational_flint', Integer) sage: L = list(Polygen([QQ])) sage: L [(((Rational Field,), {}), 'INVALID DATA __init__() takes at most 2 positional arguments (4 given)')]
Fix the unpickling::
sage: del unpickle_override[('sage.rings.polynomial.polynomial_rational_flint', 'Polynomial_rational_flint')] sage: list(Polygen([QQ,QQ])) [(((Rational Field,), {}), x), (((Rational Field,), {}), x)] """
# Spawn up to n subprocesses # The way fork works is that pid returns the # nonzero pid of the subprocess for the master # process and returns 0 for the subprocess. # This is the subprocess. self._subprocess(f, dir, *v0)
# Now wait for one subprocess to finish and report the result. # However, wait at most the time since the oldest process started.
# Kill workers that are too old "Killing subprocess %s with input %s which took too long" % (pid, W.input) ) # Some other process exited, not our problem... else: # collect data from process that successfully terminated else:
except IOError: pass
finally: # Send SIGKILL signal to workers that are left. print("Killing any remaining workers...") except OSError: # If kill() failed, it is most likely because # the process already exited. pass else: except OSError as msg: if self.verbose: print(msg)
# Clean up all temporary files.
def _subprocess(self, f, dir, args, kwds={}): """ Setup and run evaluation of ``f(*args, **kwds)``, storing the result in the given directory ``dir``.
This method is called by each forked subprocess.
INPUT:
- ``f`` -- a function
- ``dir`` -- name of a directory
- ``args`` -- a tuple with positional arguments for ``f``
- ``kwds`` -- (optional) a dict with keyword arguments for ``f``
TESTS:
The method ``_subprocess`` is really meant to be run only in a subprocess. It doesn't print not return anything, the output is saved in pickles. It redirects stdout, so we save and later restore stdout in order not to break the doctester::
sage: saved_stdout = sys.stdout sage: F = sage.parallel.use_fork.p_iter_fork(2,3) sage: F._subprocess(operator.add, tmp_dir(), (1, 2)) sage: sys.stdout = saved_stdout """
# Make it so all stdout is sent to a file so it can # be displayed.
# Run some commands to tell Sage that its # pid has changed (forcing a reload of # misc).
# The pexpect interfaces (and objects defined in them) are # not valid.
# Now evaluate the function f.
# And save the result to disk. |