发布于 2015-08-30 07:52:29 | 289 次阅读 | 评论: 0 | 来源: 网络整理
You have a program that performs a lot of CPU-intensive work, and you want to make it run faster by having it take advantage of multiple CPUs.
The concurrent.futures library provides a ProcessPoolExecutor class that can be used to execute computationally intensive functions in a separately running instance of the Python interpreter. However, in order to use it, you first need to have some com‐ putationally intensive work. Let’s illustrate with a simple yet practical example. Suppose you have an entire directory of gzip-compressed Apache web server logs:
Further suppose each log file contains lines like this:
124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] “GET /robots.txt ...” 200 71 210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] “GET /ply/ ...” 200 11875 210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] “GET /favicon.ico ...” 404 369 61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] “GET /blog/atom.xml ...” 304 - ...
Here is a simple script that takes this data and identifies all hosts that have accessed the robots.txt file:
# findrobots.py
import gzip import io import glob
‘’’ Find all of the hosts that access robots.txt in a single log file ‘’’ robots = set() with gzip.open(filename) as f:
- for line in io.TextIOWrapper(f,encoding=’ascii’):
fields = line.split() if fields[6] == ‘/robots.txt’:
robots.add(fields[0])
return robots
‘’’ Find all hosts across and entire sequence of files ‘’’ files = glob.glob(logdir+’/*.log.gz’) all_robots = set() for robots in map(find_robots, files):
all_robots.update(robots)
return all_robots
robots = find_all_robots(‘logs’) for ipaddr in robots:
print(ipaddr)
The preceding program is written in the commonly used map-reduce style. The function find_robots() is mapped across a collection of filenames and the results are combined into a single result (the all_robots set in the find_all_robots() function). Now, suppose you want to modify this program to use multiple CPUs. It turns out to be easy—simply replace the map() operation with a similar operation carried out on a process pool from the concurrent.futures library. Here is a slightly modified version of the code:
# findrobots.py
import gzip import io import glob from concurrent import futures
‘’’ Find all of the hosts that access robots.txt in a single log file
‘’’ robots = set() with gzip.open(filename) as f:
- for line in io.TextIOWrapper(f,encoding=’ascii’):
fields = line.split() if fields[6] == ‘/robots.txt’:
robots.add(fields[0])
return robots
‘’’ Find all hosts across and entire sequence of files ‘’’ files = glob.glob(logdir+’/*.log.gz’) all_robots = set() with futures.ProcessPoolExecutor() as pool:
- for robots in pool.map(find_robots, files):
- all_robots.update(robots)
return all_robots
robots = find_all_robots(‘logs’) for ipaddr in robots:
print(ipaddr)
With this modification, the script produces the same result but runs about 3.5 times faster on our quad-core machine. The actual performance will vary according to the number of CPUs available on your machine.
Typical usage of a ProcessPoolExecutor is as follows: from concurrent.futures import ProcessPoolExecutor
Under the covers, a ProcessPoolExecutor creates N independent running Python in‐ terpreters where N is the number of available CPUs detected on the system. You can change the number of processes created by supplying an optional argument to Proces sPoolExecutor(N). The pool runs until the last statement in the with block is executed, at which point the process pool is shut down. However, the program will wait until all submitted work has been processed. Work to be submitted to a pool must be defined in a function. There are two methods for submission. If you are are trying to parallelize a list comprehension or a map() operation, you use pool.map():
# A function that performs a lot of work def work(x):
... return result
# Nonparallel code results = map(work, data)
# Parallel implementation with ProcessPoolExecutor() as pool:
results = pool.map(work, data)
Alternatively, you can manually submit single tasks using the pool.submit() method:
# Some function def work(x):
... return result
... # Example of submitting work to the pool future_result = pool.submit(work, arg)
# Obtaining the result (blocks until done) r = future_result.result() ...
If you manually submit a job, the result is an instance of Future. To obtain the actual result, you call its result() method. This blocks until the result is computed and re‐ turned by the pool. Instead of blocking, you can also arrange to have a callback function triggered upon completion instead. For example:
The user-supplied callback function receives an instance of Future that must be used to obtain the actual result (i.e., by calling its result() method). Although process pools can be easy to use, there are a number of important consider‐ ations to be made in designing larger programs. In no particular order:
decomposed into independent parts.
instance methods, closures, or other kinds of constructs are not supported.
carried out in a separate interpreter using interprocess communication. Thus, data exchanged between interpreters has to be serialized.
effects. With the exception of simple things such as logging, you don’t really have any control over the behavior of child processes once started. Thus, to preserve your sanity, it is probably best to keep things simple and carry out work in pure-functions that don’t alter their environment.
clone of the Python interpreter, including all of the program state at the time of the fork. On Windows, an independent copy of the interpreter that does not clone state is launched. The actual forking process does not occur until the first pool.map() or pool.submit() method is called.
threads. In particular, you should probably create and launch process pools prior to the creation of any threads (e.g., create the pool in the main thread at program startup).