This tutorial shows how to create and run a Disco job that counts words in a large text file. To start with, you need nothing but a single large text file. Let’s call the file bigfile.txt. If you don’t happen to have a suitable file at hand, you can download one from here.
Disco can distribute computation only if data is distributed as well. Thus our first step is to split bigfile.txt into small chunks. There is a standard Unix command, split, that can split a file into many pieces, which is exactly what we want. We need also a directory where the chunks are stored. Let’s call it bigtxt:
mkdir bigtxt
split -l 100000 bigfile.txt bigtxt/bigtxt-
After running these lines, the directory bigtxt contains many files, named like bigtxt-aa, bigtxt-ab etc. which each contain 100,000 lines (except the last chunk that might contain fewer).
If your bigfile.txt contains fewer than 100,000 lines, you can make the chunk size smaller. The more chunks you have, the more processes you can run in parallel. However, since launching a new process is not free, you shouldn’t make the chunks too small.
In theory you could use the chunks in the bigtxt directory directly, but in practice it is a good idea to distribute the IO load to many separate servers.
We can push the data to Disco Distributed Filesystem, which will take care of distributing and replicating our data:
ddfs push -r data:bigtxt bigtxt
This pushes all the files under bigtxt to the tag data:bigtxt. You can check where the files are located:
ddfs blobs data:bigtxt
and make sure they contain what you think they do:
ddfs cat data:bigtxt | less
Next we need to write map and reduce functions to count the words in the chunks.
Start your favorite text editor and open a file called, say, count_words.py. Let’s write first our map function:
def fun_map(line, params):
for word in line.split():
yield w, 1
Quite compact, eh? The map function always takes two parameters, here they are called line and params. The first parameter contains an input entry, which is by default a line of input. An input entry can be anything, as you can define a custom function that extracts them from an input stream — see the parameter map_reader in disco.core.Job() for more information. The second parameter, params, can be any object that you specify, in case that you need some additional input for your functions.
However, here we can happily process input line by line. The map function needs to return an iterator over of key-value pairs. Here we split a line into tokens with the standard string.split() function. Each token is output separately as a key, together with the value 1.
Now, let’s write the corresponding reduce function:
def fun_reduce(iter, out, params):
stats = {}
for word, count in iter:
stats[word] += stats.get(word, 0) + int(count)
for word, total in stats.iteritems():
out.add(word, total)
The reduce function takes three parameters: The first parameter, iter, is an iterator that loops through the intermediate values produced by the map function, which belong to this reduce instance or partition.
In this case, different words are randomly assigned to different reduce instances. Again, this is something that can be changed — see the parameter partition in disco.core.Job() for more information. However, as long as all occurrences of the same word go to the same reduce, we can be sure that the final counts are correct.
So we iterate through all the words, and increment a counter in the dictionary stats for each word. Once the iterator has finished, we know the final counts, which are then sent to the output stream using the out disco.func.OutputStream. The object contains a method, out.add(key, value) that takes a key-value pair and saves it to a result file.
The third parameter params contains the same additional input as in the map function.
We could also write our reduce without out and using disco.util.kvgroup():
def fun_reduce(iter, params):
for word, counts in kvgroup(sorted(iter)):
yield word, sum(counts)
In this case, all the key, val pairs generated by the reduce are added to the disco.func.OutputStream.
That’s it. Now we have written map and reduce functions for counting words in parallel.
Now the only thing missing is a command for running the job. First, we establish a connection to the Disco master by instantiating a disco.core.Disco object. After that, we can start the job by calling disco.core.Disco.new_job(). There’s a large number of parameters that you can use to specify your job but only three of them are required for a simple job like ours.
In addition to starting the job, we want to print out the results as well. First, however, we have to wait until the job has finished. This is done with the disco.core.Disco.wait() call, which returns results of the job once has it has finished. For convenience, the disco.core.Disco.wait() method, as well as other methods related to a job, can be called through the disco.core.Job object that is returned by disco.core.Disco.new_job().
A function called disco.core.result_iterator() takes a list of addresses to the result files, that is returned by disco.core.Disco.wait(), and iterates through all key-value pairs in the results.
The following lines run the job and print out the results. Write them to the end of your file:
import sys
from disco.core import Disco, result_iterator
results = Disco(sys.argv[1]).new_job(
name='disco_tut',
input=sys.argv[2:],
map=fun_map,
reduce=fun_reduce).wait()
for word, total in result_iterator(results):
print word, total
Here we read the address of the Disco master and the input files from the command line. Note how the map and reduce functions are provided to disco.core.Disco.new_job() simply as normal keywords arguments map and reduce.
Now comes the moment of truth.
Run the script as follows:
python count_words.py disco://localhost tag://data:bigtxt > bigtxt.results
If you run the Disco master on a non-standard port, replace disco://localhost with the correct address to the master.
If everything goes well, the script pauses for some time while the job executes. The inputs are read from the tag data:bigtxt, which was created earlier. Finally the output is written to bigtxt.results. While the job is running, you can point your web browser at http://localhost:8989 (or some other port where you run the Disco master) which lets you follow the progress of your job in real-time.
You can also set the environment variable DISCO_EVENTS=1 to see job events on your console instead of the web UI.
As you saw, creating a new Disco job is pretty straightforward. Next you could write functions for a bit more complex job, which could, for instance, count only words that are provided as a parameter to the map function.
You can also experiment with providing custom partitioning and reader functions. They are written in the same way as map and reduce functions. Just see some examples in the disco.func module. After that, you could try to chain many map/reduce jobs together, so that outputs of the previous job are used as the inputs for the next one — in that case you need to use disco.func.chain_reader().
The best way to learn is to pick a problem or algorithm that you know well, and implement it with Disco. After all, Disco was designed to be as simple as possible so you can concentrate on your own problems, not on the framework.