multiprocessing — Process-based “threading” interface¶New in version 2.6. Introduction¶multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows. Warning Some of this package’s functionality requires a functioning shared semaphore implementation on the host operating system. Without one, the multiprocessing.synchronize module will be disabled, and attempts to import it will result in an ImportError. See issue 3770 for additional information. The Process class¶In multiprocessing, processes are spawned by creating a Process object and then calling its start() method. Process follows the API of threading.Thread. A trivial example of a multiprocess program is from multiprocessing import Process
def f(name):
print 'hello', name
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
Here the function f is run in a child process. For an explanation of why (on Windows) the if __name__ == '__main__' part is necessary, see Programming guidelines. Exchanging objects between processes¶multiprocessing supports two types of communication channel between processes: Queues
Pipes
Synchronization between processes¶multiprocessing contains equivalents of all the synchronization primitives from threading. For instance one can use a lock to ensure that only one process prints to standard output at a time: from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
print 'hello world', i
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
Without using the lock output from the different processes is liable to get all mixed up. Sharing state between processes¶As mentioned above, when doing concurrent programming it is usually best to avoid using shared state as far as possible. This is particularly true when using multiple processes. However, if you really do need to use some shared data then multiprocessing provides a couple of ways of doing so. Shared memory
Server process
Using a pool of workers¶The Pool class represents a pool of worker processes. It has methods which allows tasks to be offloaded to the worker processes in a few different ways. For example: from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
result = pool.applyAsync(f, [10]) # evaluate "f(10)" asynchronously
print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
Reference¶The multiprocessing package mostly replicates the API of the threading module. Process and exceptions¶
Pipes and Queues¶When using multiple processes, one generally uses message passing for communication between processes and avoids having to use any synchronization primitives like locks. For passing messages one can use Pipe() (for a connection between two processes) or a queue (which allows multiple producers and consumers). The Queue and JoinableQueue types are multi-producer, multi-consumer FIFO queues modelled on the Queue.Queue class in the standard library. They differ in that Queue lacks the task_done() and join() methods introduced into Python 2.5’s Queue.Queue class. If you use JoinableQueue then you must call JoinableQueue.task_done() for each task removed from the queue or else the semaphore used to count the number of unfinished tasks may eventually overflow raising an exception. Note that one can also create a shared queue by using a manager object – see Managers. Note multiprocessing uses the usual Queue.Empty and Queue.Full exceptions to signal a timeout. They are not available in the multiprocessing namespace so you need to import them from Queue. Warning If a process is killed using Process.terminate() or os.kill() while it is trying to use a Queue, then the data in the queue is likely to become corrupted. This may cause any other processes to get an exception when it tries to use the queue later on. Warning As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread()), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children. Note that a queue created using a manager does not have this issue. See Programming guidelines. For an example of the usage of queues for interprocess communication see Examples.
Miscellaneous¶
Note multiprocessing contains no analogues of threading.active_count(), threading.enumerate(), threading.settrace(), threading.setprofile(), threading.Timer, or threading.local. Connection Objects¶Connection objects allow the sending and receiving of picklable objects or strings. They can be thought of as message oriented connected sockets. Connection objects usually created using Pipe() – see also Listeners and Clients.
For example: >>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes('thank you')
>>> a.recv_bytes()
'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
Warning The Connection.recv() method automatically unpickles the data it receives, which can be a security risk unless you can trust the process which sent the message. Therefore, unless the connection object was produced using Pipe() you should only use the recv() and send() methods after performing some sort of authentication. See Authentication keys. Warning If a process is killed while it is trying to read or write to a pipe then the data in the pipe is likely to become corrupted, because it may become impossible to be sure where the message boundaries lie. Synchronization primitives¶Generally synchronization primitives are not as necessary in a multiprocess program as they are in a multithreaded program. See the documentation for threading module. Note that one can also create synchronization primitives by using a manager object – see Managers.
Note The acquire() method of BoundedSemaphore, Lock, RLock and Semaphore has a timeout parameter not supported by the equivalents in threading. The signature is acquire(block=True, timeout=None) with keyword parameters being acceptable. If block is True and timeout is not None then it specifies a timeout in seconds. If block is False then timeout is ignored. Note If the SIGINT signal generated by Ctrl-C arrives while the main thread is blocked by a call to BoundedSemaphore.acquire(), Lock.acquire(), RLock.acquire(), Semaphore.acquire(), Condition.acquire() or Condition.wait() then the call will be immediately interrupted and KeyboardInterrupt will be raised. This differs from the behaviour of threading where SIGINT will be ignored while the equivalent blocking calls are in progress. Managers¶Managers provide a way to create data which can be shared between different processes. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.
Manager processes will be shutdown as soon as they are garbage collected or their parent process exits. The manager classes are defined in the multiprocessing.managers module:
Namespace objects¶A namespace object has no public methods, but does have writable attributes. Its representation shows the values of its attributes. However, when using a proxy for a namespace object, an attribute beginning with '_' will be an attribute of the proxy and not an attribute of the referent: >>> manager = multiprocessing.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3 # this is an attribute of the proxy
>>> print Global
Namespace(x=10, y='hello')
Customized managers¶To create one’s own manager, one creates a subclass of BaseManager and use the resgister() classmethod to register new types or callables with the manager class. For example: from multiprocessing.managers import BaseManager
class MathsClass(object):
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
manager = MyManager()
manager.start()
maths = manager.Maths()
print maths.add(4, 3) # prints 7
print maths.mul(7, 8) # prints 56
Using a remote manager¶It is possible to run a manager server on one machine and have clients use it from other machines (assuming that the firewalls involved allow it). Running the following commands creates a server for a single shared queue which remote clients can access: >>> from multiprocessing.managers import BaseManager
>>> import Queue
>>> queue = Queue.Queue()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('getQueue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey='abracadabra')
>>> m.serveForever()
One client can access the server as follows: >>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('getQueue')
>>> m = QueueManager.from_address(address=('foo.bar.org', 50000),
>>> authkey='abracadabra')
>>> queue = m.getQueue()
>>> queue.put('hello')
Another client can also use it: >>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('getQueue')
>>> m = QueueManager.from_address(address=('foo.bar.org', 50000), authkey='abracadabra')
>>> queue = m.getQueue()
>>> queue.get()
'hello'
Proxy Objects¶A proxy is an object which refers to a shared object which lives (presumably) in a different process. The shared object is said to be the referent of the proxy. Multiple proxy objects may have the same referent. A proxy object has methods which invoke corresponding methods of its referent (although not every method of the referent will necessarily be available through the proxy). A proxy can usually be used in most of the same ways that its referent can: >>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print l
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print repr(l)
<ListProxy object, typeid 'list' at 0xb799974c>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
Notice that applying str() to a proxy will return the representation of the referent, whereas applying repr() will return the representation of the proxy. An important feature of proxy objects is that they are picklable so they can be passed between processes. Note, however, that if a proxy is sent to the corresponding manager’s process then unpickling it will produce the referent itself. This means, for example, that one shared object can contain a second: >>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print a, b
[[]] []
>>> b.append('hello')
>>> print a, b
[['hello']] ['hello']
Note The proxy types in multiprocessing do nothing to support comparisons by value. So, for instance, manager.list([1,2,3]) == [1,2,3]
will return False. One should just use a copy of the referent instead when making comparisons.
Cleanup¶A proxy object uses a weakref callback so that when it gets garbage collected it deregisters itself from the manager which owns its referent. A shared object gets deleted from the manager process when there are no longer any proxies referring to it. Process Pools¶One can create a pool of processes which will carry out tasks submitted to it with the Pool class.
The following example demonstrates the use of a pool: from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
result = pool.applyAsync(f, (10,)) # evaluate "f(10)" asynchronously
print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print it.next() # prints "0"
print it.next() # prints "1"
print it.next(timeout=1) # prints "4" unless your computer is *very* slow
import time
result = pool.applyAsync(time.sleep, (10,))
print result.get(timeout=1) # raises TimeoutError
Listeners and Clients¶Usually message passing between processes is done using queues or by using Connection objects returned by Pipe(). However, the multiprocessing.connection module allows some extra flexibility. It basically gives a high level message oriented API for dealing with sockets or Windows named pipes, and also has support for digest authentication using the hmac module.
The module defines two exceptions:
Examples The following server code creates a listener which uses 'secret password' as an authentication key. It then waits for a connection and sends some data to the client: from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
listener = Listener(address, authkey='secret password')
conn = listener.accept()
print 'connection accepted from', listener.last_accepted
conn.send([2.25, None, 'junk', float])
conn.send_bytes('hello')
conn.send_bytes(array('i', [42, 1729]))
conn.close()
listener.close()
The following code connects to the server and receives some data from the server: from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
conn = Client(address, authkey='secret password')
print conn.recv() # => [2.25, None, 'junk', float]
print conn.recv_bytes() # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print conn.recv_bytes_into(arr) # => 8
print arr # => array('i', [42, 1729, 0, 0, 0])
conn.close()
Address Formats¶
Note that any string beginning with two backslashes is assumed by default to be an 'AF_PIPE' address rather than an 'AF_UNIX' address. Authentication keys¶When one uses Connection.recv(), the data received is automatically unpickled. Unfortunately unpickling data from an untrusted source is a security risk. Therefore Listener and Client() use the hmac module to provide digest authentication. An authentication key is a string which can be thought of as a password: once a connection is established both ends will demand proof that the other knows the authentication key. (Demonstrating that both ends are using the same key does not involve sending the key over the connection.) If authentication is requested but do authentication key is specified then the return value of current_process().authkey is used (see Process). This value will automatically inherited by any Process object that the current process creates. This means that (by default) all processes of a multi-process program will share a single authentication key which can be used when setting up connections between the themselves. Suitable authentication keys can also be generated by using os.urandom(). Logging¶Some support for logging is available. Note, however, that the logging package does not use process shared locks so it is possible (depending on the handler type) for messages from different processes to get mixed up.
Below is an example session with logging turned on: >>> import processing, logging
>>> logger = processing.getLogger()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = processing.Manager()
[INFO/SyncManager-1] child process calling self.run()
[INFO/SyncManager-1] manager bound to '\\\\.\\pipe\\pyc-2776-0-lj0tfa'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-1] manager exiting with exitcode 0
Programming guidelines¶There are certain guidelines and idioms which should be adhered to when using multiprocessing. All platforms¶Avoid shared state
Picklability Ensure that the arguments to the methods of proxies are picklable. Thread safety of proxies
Joining zombie processes On Unix when a process finishes but has not been joined it becomes a zombie. There should never be very many because each time a new process starts (or active_children() is called) all completed processes which have not yet been joined will be joined. Also calling a finished process’s Process.is_alive() will join the process. Even so it is probably good practice to explicitly join all the processes that you start. Better to inherit than pickle/unpickle On Windows many types from multiprocessing need to be picklable so that child processes can use them. However, one should generally avoid sending shared objects to other processes using pipes or queues. Instead you should arrange the program so that a process which need access to a shared resource created elsewhere can inherit it from an ancestor process. Avoid terminating processes
Joining processes that use queues
Explicitly pass resources to child processes
Windows¶Since Windows lacks os.fork() it has a few extra restrictions: More picklability
Global variables
Safe importing of main module
Examples¶Demonstration of how to create and use customized managers and proxies: #
# This module shows how to use arbitrary callables with a subclass of
# `BaseManager`.
#
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo(object):
def f(self):
print 'you called Foo.f()'
def g(self):
print 'you called Foo.g()'
def _h(self):
print 'you called Foo._h()'
# A simple generator function
def baz():
for i in xrange(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ('next', '__next__')
def __iter__(self):
return self
def next(self):
return self._callmethod('next')
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print '-' * 20
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print '-' * 20
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print '-' * 20
it = manager.baz()
for i in it:
print '<%d>' % i,
print
print '-' * 20
op = manager.operator()
print 'op.add(23, 45) =', op.add(23, 45)
print 'op.pow(2, 94) =', op.pow(2, 94)
print 'op.getslice(range(10), 2, 6) =', op.getslice(range(10), 2, 6)
print 'op.repeat(range(5), 3) =', op.repeat(range(5), 3)
print 'op._exposed_ =', op._exposed_
##
if __name__ == '__main__':
freeze_support()
test()
Using Pool: #
# A test of `multiprocessing.Pool` class
#
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
def f(x):
return 1.0 / (x-5.0)
def pow3(x):
return x**3
def noop(x):
pass
#
# Test code
#
def test():
print 'cpu_count() = %d\n' % multiprocessing.cpu_count()
#
# Create pool
#
PROCESSES = 4
print 'Creating pool with %d processes\n' % PROCESSES
pool = multiprocessing.Pool(PROCESSES)
print 'pool = %s' % pool
print
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print 'Ordered results using pool.apply_async():'
for r in results:
print '\t', r.get()
print
print 'Ordered results using pool.imap():'
for x in imap_it:
print '\t', x
print
print 'Unordered results using pool.imap_unordered():'
for x in imap_unordered_it:
print '\t', x
print
print 'Ordered results using pool.map() --- will block till complete:'
for x in pool.map(calculatestar, TASKS):
print '\t', x
print
#
# Simple benchmarks
#
N = 100000
print 'def pow3(x): return x**3'
t = time.time()
A = map(pow3, xrange(N))
print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
(N, time.time() - t)
t = time.time()
B = pool.map(pow3, xrange(N))
print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
(N, time.time() - t)
t = time.time()
C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
' seconds' % (N, N//8, time.time() - t)
assert A == B == C, (len(A), len(B), len(C))
print
L = [None] * 1000000
print 'def noop(x): pass'
print 'L = [None] * 1000000'
t = time.time()
A = map(noop, L)
print '\tmap(noop, L):\n\t\t%s seconds' % \
(time.time() - t)
t = time.time()
B = pool.map(noop, L)
print '\tpool.map(noop, L):\n\t\t%s seconds' % \
(time.time() - t)
t = time.time()
C = list(pool.imap(noop, L, chunksize=len(L)//8))
print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
(len(L)//8, time.time() - t)
assert A == B == C, (len(A), len(B), len(C))
print
del A, B, C, L
#
# Test error handling
#
print 'Testing error handling:'
try:
print pool.apply(f, (5,))
except ZeroDivisionError:
print '\tGot ZeroDivisionError as expected from pool.apply()'
else:
raise AssertionError, 'expected ZeroDivisionError'
try:
print pool.map(f, range(10))
except ZeroDivisionError:
print '\tGot ZeroDivisionError as expected from pool.map()'
else:
raise AssertionError, 'expected ZeroDivisionError'
try:
print list(pool.imap(f, range(10)))
except ZeroDivisionError:
print '\tGot ZeroDivisionError as expected from list(pool.imap())'
else:
raise AssertionError, 'expected ZeroDivisionError'
it = pool.imap(f, range(10))
for i in range(10):
try:
x = it.next()
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError, 'expected ZeroDivisionError'
assert i == 9
print '\tGot ZeroDivisionError as expected from IMapIterator.next()'
print
#
# Testing timeouts
#
print 'Testing ApplyResult.get() with timeout:',
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print
print
print 'Testing IMapIterator.next() with timeout:',
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print
print
#
# Testing callback
#
print 'Testing callback:'
A = []
B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
r = pool.apply_async(mul, (7, 8), callback=A.append)
r.wait()
r = pool.map_async(pow3, range(10), callback=A.extend)
r.wait()
if A == B:
print '\tcallbacks succeeded\n'
else:
print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B)
#
# Check there are no outstanding tasks
#
assert not pool._cache, 'cache = %r' % pool._cache
#
# Check close() methods
#
print 'Testing close():'
for worker in pool._pool:
assert worker.is_alive()
result = pool.apply_async(time.sleep, [0.5])
pool.close()
pool.join()
assert result.get() is None
for worker in pool._pool:
assert not worker.is_alive()
print '\tclose() succeeded\n'
#
# Check terminate() method
#
print 'Testing terminate():'
pool = multiprocessing.Pool(2)
DELTA = 0.1
ignore = pool.apply(pow3, [2])
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
pool.terminate()
pool.join()
for worker in pool._pool:
assert not worker.is_alive()
print '\tterminate() succeeded\n'
#
# Check garbage collection
#
print 'Testing garbage collection:'
pool = multiprocessing.Pool(2)
DELTA = 0.1
processes = pool._pool
ignore = pool.apply(pow3, [2])
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
results = pool = None
time.sleep(DELTA * 2)
for worker in processes:
assert not worker.is_alive()
print '\tgarbage collection succeeded\n'
if __name__ == '__main__':
multiprocessing.freeze_support()
assert len(sys.argv) in (1, 2)
if len(sys.argv) == 1 or sys.argv[1] == 'processes':
print ' Using processes '.center(79, '-')
elif sys.argv[1] == 'threads':
print ' Using threads '.center(79, '-')
import multiprocessing.dummy as multiprocessing
else:
print 'Usage:\n\t%s [processes | threads]' % sys.argv[0]
raise SystemExit(2)
test()
Synchronization types like locks, conditions and queues: #
# A test file for the `multiprocessing` package
#
import time, sys, random
from Queue import Empty
import multiprocessing # may get overwritten
#### TEST_VALUE
def value_func(running, mutex):
random.seed()
time.sleep(random.random()*4)
mutex.acquire()
print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished'
running.value -= 1
mutex.release()
def test_value():
TASKS = 10
running = multiprocessing.Value('i', TASKS)
mutex = multiprocessing.Lock()
for i in range(TASKS):
p = multiprocessing.Process(target=value_func, args=(running, mutex))
p.start()
while running.value > 0:
time.sleep(0.08)
mutex.acquire()
print running.value,
sys.stdout.flush()
mutex.release()
print
print 'No more running processes'
#### TEST_QUEUE
def queue_func(queue):
for i in range(30):
time.sleep(0.5 * random.random())
queue.put(i*i)
queue.put('STOP')
def test_queue():
q = multiprocessing.Queue()
p = multiprocessing.Process(target=queue_func, args=(q,))
p.start()
o = None
while o != 'STOP':
try:
o = q.get(timeout=0.3)
print o,
sys.stdout.flush()
except Empty:
print 'TIMEOUT'
print
#### TEST_CONDITION
def condition_func(cond):
cond.acquire()
print '\t' + str(cond)
time.sleep(2)
print '\tchild is notifying'
print '\t' + str(cond)
cond.notify()
cond.release()
def test_condition():
cond = multiprocessing.Condition()
p = multiprocessing.Process(target=condition_func, args=(cond,))
print cond
cond.acquire()
print cond
cond.acquire()
print cond
p.start()
print 'main is waiting'
cond.wait()
print 'main has woken up'
print cond
cond.release()
print cond
cond.release()
p.join()
print cond
#### TEST_SEMAPHORE
def semaphore_func(sema, mutex, running):
sema.acquire()
mutex.acquire()
running.value += 1
print running.value, 'tasks are running'
mutex.release()
random.seed()
time.sleep(random.random()*2)
mutex.acquire()
running.value -= 1
print '%s has finished' % multiprocessing.current_process()
mutex.release()
sema.release()
def test_semaphore():
sema = multiprocessing.Semaphore(3)
mutex = multiprocessing.RLock()
running = multiprocessing.Value('i', 0)
processes = [
multiprocessing.Process(target=semaphore_func,
args=(sema, mutex, running))
for i in range(10)
]
for p in processes:
p.start()
for p in processes:
p.join()
#### TEST_JOIN_TIMEOUT
def join_timeout_func():
print '\tchild sleeping'
time.sleep(5.5)
print '\n\tchild terminating'
def test_join_timeout():
p = multiprocessing.Process(target=join_timeout_func)
p.start()
print 'waiting for process to finish'
while 1:
p.join(timeout=1)
if not p.is_alive():
break
print '.',
sys.stdout.flush()
#### TEST_EVENT
def event_func(event):
print '\t%r is waiting' % multiprocessing.current_process()
event.wait()
print '\t%r has woken up' % multiprocessing.current_process()
def test_event():
event = multiprocessing.Event()
processes = [multiprocessing.Process(target=event_func, args=(event,))
for i in range(5)]
for p in processes:
p.start()
print 'main is sleeping'
time.sleep(2)
print 'main is setting event'
event.set()
for p in processes:
p.join()
#### TEST_SHAREDVALUES
def sharedvalues_func(values, arrays, shared_values, shared_arrays):
for i in range(len(values)):
v = values[i][1]
sv = shared_values[i].value
assert v == sv
for i in range(len(values)):
a = arrays[i][1]
sa = list(shared_arrays[i][:])
assert a == sa
print 'Tests passed'
def test_sharedvalues():
values = [
('i', 10),
('h', -2),
('d', 1.25)
]
arrays = [
('i', range(100)),
('d', [0.25 * i for i in range(100)]),
('H', range(1000))
]
shared_values = [multiprocessing.Value(id, v) for id, v in values]
shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
p = multiprocessing.Process(
target=sharedvalues_func,
args=(values, arrays, shared_values, shared_arrays)
)
p.start()
p.join()
assert p.exitcode == 0
####
def test(namespace=multiprocessing):
global multiprocessing
multiprocessing = namespace
for func in [ test_value, test_queue, test_condition,
test_semaphore, test_join_timeout, test_event,
test_sharedvalues ]:
print '\n\t######## %s\n' % func.__name__
func()
ignore = multiprocessing.active_children() # cleanup any old processes
if hasattr(multiprocessing, '_debug_info'):
info = multiprocessing._debug_info()
if info:
print info
raise ValueError, 'there should be no positive refcounts left'
if __name__ == '__main__':
multiprocessing.freeze_support()
assert len(sys.argv) in (1, 2)
if len(sys.argv) == 1 or sys.argv[1] == 'processes':
print ' Using processes '.center(79, '-')
namespace = multiprocessing
elif sys.argv[1] == 'manager':
print ' Using processes and a manager '.center(79, '-')
namespace = multiprocessing.Manager()
namespace.Process = multiprocessing.Process
namespace.current_process = multiprocessing.current_process
namespace.active_children = multiprocessing.active_children
elif sys.argv[1] == 'threads':
print ' Using threads '.center(79, '-')
import multiprocessing.dummy as namespace
else:
print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]
raise SystemExit, 2
test(namespace)
An showing how to use queues to feed tasks to a collection of worker process and collect the results: #
# Simple example which uses a pool of workers to carry out some tasks.
#
# Notice that the results will probably not come out of the output
# queue in the same in the same order as the corresponding tasks were
# put on the input queue. If it is important to get the results back
# in the original order then consider using `Pool.map()` or
# `Pool.imap()` (which will save on the amount of code needed anyway).
#
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print 'Unordered results:'
for i in range(len(TASKS1)):
print '\t', done_queue.get()
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print '\t', done_queue.get()
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()
An example of how a pool of worker processes can each run a SimpleHTTPServer.HttpServer instance while sharing a single listening socket. #
# Example where a pool of http servers share a single listening socket
#
# On Windows this module depends on the ability to pickle a socket
# object so that the worker processes can inherit a copy of the server
# object. (We import `multiprocessing.reduction` to enable this pickling.)
#
# Not sure if we should synchronize access to `socket.accept()` method by
# using a process-shared lock -- does not seem to be necessary.
#
import os
import sys
from multiprocessing import Process, current_process, freeze_support
from BaseHTTPServer import HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
if sys.platform == 'win32':
import multiprocessing.reduction # make sockets pickable/inheritable
def note(format, *args):
sys.stderr.write('[%s]\t%s\n' % (current_process().name, format%args))
class RequestHandler(SimpleHTTPRequestHandler):
# we override log_message() to show which process is handling the request
def log_message(self, format, *args):
note(format, *args)
def serve_forever(server):
note('starting server')
try:
server.serve_forever()
except KeyboardInterrupt:
pass
def runpool(address, number_of_processes):
# create a single server object -- children will each inherit a copy
server = HTTPServer(address, RequestHandler)
# create child processes to act as workers
for i in range(number_of_processes-1):
Process(target=serve_forever, args=(server,)).start()
# main process also acts as a worker
serve_forever(server)
def test():
DIR = os.path.join(os.path.dirname(__file__), '..')
ADDRESS = ('localhost', 8000)
NUMBER_OF_PROCESSES = 4
print 'Serving at http://%s:%d using %d worker processes' % \
(ADDRESS[0], ADDRESS[1], NUMBER_OF_PROCESSES)
print 'To exit press Ctrl-' + ['C', 'Break'][sys.platform=='win32']
os.chdir(DIR)
runpool(ADDRESS, NUMBER_OF_PROCESSES)
if __name__ == '__main__':
freeze_support()
test()
Some simple benchmarks comparing multiprocessing with threading: #
# Simple benchmarks for the multiprocessing package
#
import time, sys, multiprocessing, threading, Queue, gc
if sys.platform == 'win32':
_timer = time.clock
else:
_timer = time.time
delta = 1
#### TEST_QUEUESPEED
def queuespeed_func(q, c, iterations):
a = '0' * 256
c.acquire()
c.notify()
c.release()
for i in xrange(iterations):
q.put(a)
q.put('STOP')
def test_queuespeed(Process, q, c):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
p = Process(target=queuespeed_func, args=(q, c, iterations))
c.acquire()
p.start()
c.wait()
c.release()
result = None
t = _timer()
while result != 'STOP':
result = q.get()
elapsed = _timer() - t
p.join()
print iterations, 'objects passed through the queue in', elapsed, 'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_PIPESPEED
def pipe_func(c, cond, iterations):
a = '0' * 256
cond.acquire()
cond.notify()
cond.release()
for i in xrange(iterations):
c.send(a)
c.send('STOP')
def test_pipespeed():
c, d = multiprocessing.Pipe()
cond = multiprocessing.Condition()
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
p = multiprocessing.Process(target=pipe_func,
args=(d, cond, iterations))
cond.acquire()
p.start()
cond.wait()
cond.release()
result = None
t = _timer()
while result != 'STOP':
result = c.recv()
elapsed = _timer() - t
p.join()
print iterations, 'objects passed through connection in',elapsed,'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_SEQSPEED
def test_seqspeed(seq):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
t = _timer()
for i in xrange(iterations):
a = seq[5]
elapsed = _timer()-t
print iterations, 'iterations in', elapsed, 'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_LOCK
def test_lockspeed(l):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
t = _timer()
for i in xrange(iterations):
l.acquire()
l.release()
elapsed = _timer()-t
print iterations, 'iterations in', elapsed, 'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_CONDITION
def conditionspeed_func(c, N):
c.acquire()
c.notify()
for i in xrange(N):
c.wait()
c.notify()
c.release()
def test_conditionspeed(Process, c):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
c.acquire()
p = Process(target=conditionspeed_func, args=(c, iterations))
p.start()
c.wait()
t = _timer()
for i in xrange(iterations):
c.notify()
c.wait()
elapsed = _timer()-t
c.release()
p.join()
print iterations * 2, 'waits in', elapsed, 'seconds'
print 'average number/sec:', iterations * 2 / elapsed
####
def test():
manager = multiprocessing.Manager()
gc.disable()
print '\n\t######## testing Queue.Queue\n'
test_queuespeed(threading.Thread, Queue.Queue(),
threading.Condition())
print '\n\t######## testing multiprocessing.Queue\n'
test_queuespeed(multiprocessing.Process, multiprocessing.Queue(),
multiprocessing.Condition())
print '\n\t######## testing Queue managed by server process\n'
test_queuespeed(multiprocessing.Process, manager.Queue(),
manager.Condition())
print '\n\t######## testing multiprocessing.Pipe\n'
test_pipespeed()
print
print '\n\t######## testing list\n'
test_seqspeed(range(10))
print '\n\t######## testing list managed by server process\n'
test_seqspeed(manager.list(range(10)))
print '\n\t######## testing Array("i", ..., lock=False)\n'
test_seqspeed(multiprocessing.Array('i', range(10), lock=False))
print '\n\t######## testing Array("i", ..., lock=True)\n'
test_seqspeed(multiprocessing.Array('i', range(10), lock=True))
print
print '\n\t######## testing threading.Lock\n'
test_lockspeed(threading.Lock())
print '\n\t######## testing threading.RLock\n'
test_lockspeed(threading.RLock())
print '\n\t######## testing multiprocessing.Lock\n'
test_lockspeed(multiprocessing.Lock())
print '\n\t######## testing multiprocessing.RLock\n'
test_lockspeed(multiprocessing.RLock())
print '\n\t######## testing lock managed by server process\n'
test_lockspeed(manager.Lock())
print '\n\t######## testing rlock managed by server process\n'
test_lockspeed(manager.RLock())
print
print '\n\t######## testing threading.Condition\n'
test_conditionspeed(threading.Thread, threading.Condition())
print '\n\t######## testing multiprocessing.Condition\n'
test_conditionspeed(multiprocessing.Process, multiprocessing.Condition())
print '\n\t######## testing condition managed by a server process\n'
test_conditionspeed(multiprocessing.Process, manager.Condition())
gc.enable()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
An example/demo of how to use the managers.SyncManager, Process and others to build a system which can distribute processes and work via a distributed queue to a “cluster” of machines on a network, accessible via SSH. You will need to have private key authentication for all hosts configured for this to work. # # Module to allow spawning of processes on foreign host # # Depends on `multiprocessing` package -- tested with `processing-0.60` # __all__ = ['Cluster', 'Host', 'get_logger', 'current_process'] # # Imports # import sys import os import tarfile import shutil import subprocess import logging import itertools import Queue try: import cPickle as pickle except ImportError: import pickle from multiprocessing import Process, current_process, cpu_count from multiprocessing import util, managers, connection, forking, pool # # Logging # def get_logger(): return _logger _logger = logging.getLogger('distributing') _logger.propogate = 0 util.fix_up_logger(_logger) _formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT) _handler = logging.StreamHandler() _handler.setFormatter(_formatter) _logger.addHandler(_handler) info = _logger.info debug = _logger.debug # # Get number of cpus # try: slot_count = cpu_count() except NotImplemented: slot_count = 1 # # Manager type which spawns subprocesses # class HostManager(managers.SyncManager): ''' Manager type used for spawning processes on a (presumably) foreign host ''' def __init__(self, address, authkey): managers.SyncManager.__init__(self, address, authkey) self._name = 'Host-unknown' def Process(self, group=None, target=None, name=None, args=(), kwargs={}): if hasattr(sys.modules['__main__'], '__file__'): main_path = os.path.basename(sys.modules['__main__'].__file__) else: main_path = None data = pickle.dumps((target, args, kwargs)) p = self._RemoteProcess(data, main_path) if name is None: temp = self._name.split('Host-')[-1] + '/Process-%s' name = temp % ':'.join(map(str, p.get_identity())) p.set_name(name) return p @classmethod def from_address(cls, address, authkey): manager = cls(address, authkey) managers.transact(address, authkey, 'dummy') manager._state.value = managers.State.STARTED manager._name = 'Host-%s:%s' % manager.address manager.shutdown = util.Finalize( manager, HostManager._finalize_host, args=(manager._address, manager._authkey, manager._name), exitpriority=-10 ) return manager @staticmethod def _finalize_host(address, authkey, name): managers.transact(address, authkey, 'shutdown') def __repr__(self): return '<Host(%s)>' % self._name # # Process subclass representing a process on (possibly) a remote machine # class RemoteProcess(Process): ''' Represents a process started on a remote host ''' def __init__(self, data, main_path): assert not main_path or os.path.basename(main_path) == main_path Process.__init__(self) self._data = data self._main_path = main_path def _bootstrap(self): forking.prepare({'main_path': self._main_path}) self._target, self._args, self._kwargs = pickle.loads(self._data) return Process._bootstrap(self) def get_identity(self): return self._identity HostManager.register('_RemoteProcess', RemoteProcess) # # A Pool class that uses a cluster # class DistributedPool(pool.Pool): def __init__(self, cluster, processes=None, initializer=None, initargs=()): self._cluster = cluster self.Process = cluster.Process pool.Pool.__init__(self, processes or len(cluster), initializer, initargs) def _setup_queues(self): self._inqueue = self._cluster._SettableQueue() self._outqueue = self._cluster._SettableQueue() self._quick_put = self._inqueue.put self._quick_get = self._outqueue.get @staticmethod def _help_stuff_finish(inqueue, task_handler, size): inqueue.set_contents([None] * size) # # Manager type which starts host managers on other machines # def LocalProcess(**kwds): p = Process(**kwds) p.set_name('localhost/' + p.name) return p class Cluster(managers.SyncManager): ''' Represents collection of slots running on various hosts. `Cluster` is a subclass of `SyncManager` so it allows creation of various types of shared objects. ''' def __init__(self, hostlist, modules): managers.SyncManager.__init__(self, address=('localhost', 0)) self._hostlist = hostlist self._modules = modules if __name__ not in modules: modules.append(__name__) files = [sys.modules[name].__file__ for name in modules] for i, file in enumerate(files): if file.endswith('.pyc') or file.endswith('.pyo'): files[i] = file[:-4] + '.py' self._files = [os.path.abspath(file) for file in files] def start(self): managers.SyncManager.start(self) l = connection.Listener(family='AF_INET', authkey=self._authkey) for i, host in enumerate(self._hostlist): host._start_manager(i, self._authkey, l.address, self._files) for host in self._hostlist: if host.hostname != 'localhost': conn = l.accept() i, address, cpus = conn.recv() conn.close() other_host = self._hostlist[i] other_host.manager = HostManager.from_address(address, self._authkey) other_host.slots = other_host.slots or cpus other_host.Process = other_host.manager.Process else: host.slots = host.slots or slot_count host.Process = LocalProcess self._slotlist = [ Slot(host) for host in self._hostlist for i in range(host.slots) ] self._slot_iterator = itertools.cycle(self._slotlist) self._base_shutdown = self.shutdown del self.shutdown def shutdown(self): for host in self._hostlist: if host.hostname != 'localhost': host.manager.shutdown() self._base_shutdown() def Process(self, group=None, target=None, name=None, args=(), kwargs={}): slot = self._slot_iterator.next() return slot.Process( group=group, target=target, name=name, args=args, kwargs=kwargs ) def Pool(self, processes=None, initializer=None, initargs=()): return DistributedPool(self, processes, initializer, initargs) def __getitem__(self, i): return self._slotlist[i] def __len__(self): return len(self._slotlist) def __iter__(self): return iter(self._slotlist) # # Queue subclass used by distributed pool # class SettableQueue(Queue.Queue): def empty(self): return not self.queue def full(self): return self.maxsize > 0 and len(self.queue) == self.maxsize def set_contents(self, contents): # length of contents must be at least as large as the number of # threads which have potentially called get() self.not_empty.acquire() try: self.queue.clear() self.queue.extend(contents) self.not_empty.notifyAll() finally: self.not_empty.release() Cluster.register('_SettableQueue', SettableQueue) # # Class representing a notional cpu in the cluster # class Slot(object): def __init__(self, host): self.host = host self.Process = host.Process # # Host # class Host(object): ''' Represents a host to use as a node in a cluster. `hostname` gives the name of the host. If hostname is not "localhost" then ssh is used to log in to the host. To log in as a different user use a host name of the form "username@somewhere.org" `slots` is used to specify the number of slots for processes on the host. This affects how often processes will be allocated to this host. Normally this should be equal to the number of cpus on that host. ''' def __init__(self, hostname, slots=None): self.hostname = hostname self.slots = slots def _start_manager(self, index, authkey, address, files): if self.hostname != 'localhost': tempdir = copy_to_remote_temporary_directory(self.hostname, files) debug('startup files copied to %s:%s', self.hostname, tempdir) p = subprocess.Popen( ['ssh', self.hostname, 'python', '-c', '"import os; os.chdir(%r); ' 'from distributing import main; main()"' % tempdir], stdin=subprocess.PIPE ) data = dict( name='BoostrappingHost', index=index, dist_log_level=_logger.getEffectiveLevel(), dir=tempdir, authkey=str(authkey), parent_address=address ) pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL) p.stdin.close() # # Copy files to remote directory, returning name of directory # unzip_code = '''" import tempfile, os, sys, tarfile tempdir = tempfile.mkdtemp(prefix='distrib-') os.chdir(tempdir) tf = tarfile.open(fileobj=sys.stdin, mode='r|gz') for ti in tf: tf.extract(ti) print tempdir "''' def copy_to_remote_temporary_directory(host, files): p = subprocess.Popen( ['ssh', host, 'python', '-c', unzip_code], stdout=subprocess.PIPE, stdin=subprocess.PIPE ) tf = tarfile.open(fileobj=p.stdin, mode='w|gz') for name in files: tf.add(name, os.path.basename(name)) tf.close() p.stdin.close() return p.stdout.read().rstrip() # # Code which runs a host manager # def main(): # get data from parent over stdin data = pickle.load(sys.stdin) sys.stdin.close() # set some stuff _logger.setLevel(data['dist_log_level']) forking.prepare(data) # create server for a `HostManager` object server = managers.Server(HostManager._registry, ('', 0), data['authkey']) current_process()._server = server # report server address and number of cpus back to parent conn = connection.Client(data['parent_address'], authkey=data['authkey']) conn.send((data['index'], server.address, slot_count)) conn.close() # set name etc current_process().set_name('Host-%s:%s' % server.address) util._run_after_forkers() # register a cleanup function def cleanup(directory): debug('removing directory %s', directory) shutil.rmtree(directory) debug('shutting down host manager') util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0) # start host manager debug('remote host manager starting in %s', data['dir']) server.serve_forever() |