Table Of Contents

Previous topic

disco.ddfs — Client interface for Disco Distributed Filesystem

Next topic

disco.error – Errors with special meaning in Disco

This Page

disco.func — Functions for constructing Disco jobs

A Disco job is specified by one or more user-defined job functions, namely map, reduce, combiner and partitioner functions (see disco.core.JobDict for more information). Of these functions, only map is required.

Hint

When writing custom functions, take into account the following features of the disco worker environment:

  • Only the specified function is included in the request. The function can’t refer to anything outside of its local scope. It can’t call any functions specified elsewhere in your source file. Nor can it refer to any global names, including any imported modules. If you need to use a module, import it within the function body.

    In short, job functions must be pure.

  • The function should not print anything to stderr. The task uses stderr to signal events to the master. You can raise a disco.error.DataError, to abort the task on this node and try again on another node. It is usually best to let the task fail if any exceptions occur: do not catch any exceptions from which you can’t recover. When exceptions occur, the disco worker will catch them and signal an appropriate event to the master.

User-defined Functions

The following types of functions can be provided by the user:

disco.func.map(entry, params)

Returns an iterable of (key, value) pairs given an entry.

Parameters:

For instance:

def fun_map(e, params):
    return [(w, 1) for w in e.split()]

This example takes a line of text as input in e, tokenizes it, and returns a list of words as the output.

The map task can also be an external program. For more information, see Disco External Interface.

disco.func.partition(key, nr_partitions, params)

Returns an integer in range(0, nr_partitions).

Parameters:
  • key – is a key object emitted by a task function
  • nr_partitions – the number of partitions
  • params – the disco.core.Params object specified by the params parameter in disco.core.JobDict.
disco.func.combiner(key, value, buffer, done, params)

Returns an iterator of (key, value) pairs or None.

Parameters:
  • key – key object emitted by the map()
  • value – value object emitted by the map()
  • buffer – an accumulator object (a dictionary), that combiner can use to save its state. The function must control the buffer size, to prevent it from consuming too much memory, by calling buffer.clear() after each block of results.
  • done – flag indicating if this is the last call with a given buffer
  • params – the disco.core.Params object specified by the params parameter in disco.core.JobDict.

This function receives all output from the disco.func.map() before it is saved to intermediate results. Only the output produced by this function is saved to the results.

After disco.func.map() has consumed all input entries, combiner is called for the last time with the done flag set to True. This is the last opportunity for the combiner to return something.

disco.func.reduce(input_stream, output_stream, params)

Takes three parameters, and adds reduced output to an output object.

Parameters:

For instance:

def fun_reduce(iter, out, params):
    d = {}
    for k, v in iter:
        d[k] = d.get(k, 0) + 1
    for k, c in d.iteritems():
        out.add(k, c)

This example counts how many times each key appears.

The reduce task can also be an external program. For more information, see Disco External Interface.

disco.func.reduce2(input_stream, params)

Alternative reduce signature which takes 2 parameters.

Reduce functions with this signature should return an iterator of key, value pairs, which will be implicitly added to the disco.func.OutputStream.

For instance:

def fun_reduce(iter, params):
    from disco.util import kvgroup
    for k, vs in kvgroup(sorted(iter)):
        yield k, sum(1 for v in vs)

This example counts the number of values for each key.

disco.func.init(input_iter, params)

Perform some task initialization.

Parameters:

Typically this function is used to initialize some modules in the worker environment (e.g. ctypes.cdll.LoadLibrary()), to initialize some values in params, or to skip unneeded entries in the beginning of the input stream.

disco.func.input_stream(stream, size, url, params)
Parameters:

Returns a triplet (disco.func.InputStream, size, url) that is passed to the next input_stream function in the chain. The last disco.func.InputStream object returned by the chain is used to iterate through input entries.

Using an disco.func.input_stream() allows you to customize how input urls are opened.

disco.func.output_stream(stream, partition, url, params)
Parameters:

Returns a triplet (disco.func.OutputStream, size, url) that is passed to the next output_stream function in the chain. The disco.func.OutputStream.add() method of the last disco.func.OutputStream object returned by the chain is used to output entries from map or reduce.

Using an output_stream() allows you to customize where and how output is stored. The default should almost always be used.

Interfaces

class disco.func.InputStream

A file-like object returned by the map_input_stream or reduce_input_stream chain of disco.func.input_stream() functions. Used either to read bytes from the input source or to iterate through input entries.

__iter__()
Iterates through input entries. Typically calls self.read() to read bytes from the underlying file object, which are deserialized to the actual input entries.
read(num_bytes=None)
Reads at most num_bytes from the input source, or until EOF if num_bytes is not specified.
class disco.func.OutputStream

A file-like object returned by the map_output_stream or reduce_output_stream chain of disco.func.output_stream() functions. Used to encode key, value pairs add write them to the underlying file object.

add(key, value)
Adds a key, value pair to the output stream. This method typically calls self.write() to write a serialized pair to the actual file object.
write(data)
Writes serialized key, value pairs to the underlying file object.

Default/Utility Functions

These functions are provided by Disco to help disco.core.Job creation:

disco.func.default_partition(key, nr_partitions, params)
Returns hash(str(key)) % nr_partitions.
disco.func.make_range_partition(min_val, max_val)

Returns a new partitioning function that partitions keys in the range [min_val:max_val] into equal sized partitions.

The number of partitions is defined by partitions in disco.core.JobDict.

disco.func.nop_reduce(iter, out, params)

No-op reduce.

This function can be used to combine results per partition from many map functions to a single result file per partition.

disco.func.gzip_reader(fd, size, url, params)
Wraps the input in a gzip.GzipFile object.
disco.func.map_line_reader(fd, sze, fname)

Yields each line of input.

(Deprecated in 0.3.1) This reader is deprecated in favor of using the default Python file-like object iterator. Since 0.3, no reader is necessary for iterable objects returned from the input_stream(). For map() functions previously relying on this reader, there is one small caveat to be aware of: this reader has always stripped newline characters from the end of lines. For file-like object iterators, lines are left in tact. This may or may not affect jobs relying on this reader, depending on how the lines are used.

disco.func.chain_reader(stream, size, url, ignore_corrupt=False)
disco.func.netstr_reader(stream, size, url, ignore_corrupt=False)
disco.func.netstr_writer(fd, key, value, params)
Writer for Disco’s default/internal key-value format.
disco.func.object_reader(fd, sze, fname)
(Deprecated in 0.3) A wrapper for netstr_reader() that uses Python’s cPickle module to serialize arbitrary Python objects to strings.
disco.func.object_writer(fd, key, value, params)
(Deprecated in 0.3) A wrapper for netstr_writer() that uses Python’s cPickle module to deserialize strings to Python objects.
disco.func.re_reader(item_re_str, fd, size, fname, output_tail=False, read_buffer_size=8192)

A map reader that uses an arbitrary regular expression to parse the input stream.

Parameter:item_re_str – regular expression for matching input items

The reader works as follows:

  1. X bytes is read from fd and appended to an internal buffer buf.
  2. m = regexp.match(buf) is executed.
  3. If buf produces a match, m.groups() is yielded, which contains an input entry for the map function. Step 2. is executed for the remaining part of buf. If no match is made, go to step 1.
  4. If fd is exhausted before size bytes have been read, and size tests True, a disco.error.DataError is raised.
  5. When fd is exhausted but buf contains unmatched bytes, two modes are available: If output_tail=True, the remaining buf is yielded as is. Otherwise, a message is sent that warns about trailing bytes. The remaining buf is discarded.

Note that re_reader() fails if the input streams contains unmatched bytes between matched entries. Make sure that your item_re_str is constructed so that it covers all bytes in the input stream.

re_reader() provides an easy way to construct parsers for textual input streams. For instance, the following reader produces full HTML documents as input entries:

def html_reader(fd, size, fname):
    for x in re_reader("<HTML>(.*?)</HTML>", fd, size, fname):
        yield x[0]

Note that since output_tail=True in map_line_reader(), an input file that lacks the final newline character is silently accepted.

disco.func.map_input_stream(stream, size, url, params)
An input_stream() which looks at the scheme of url and tries to import a function named input_stream from the module disco.schemes.scheme_SCHEME, where SCHEME is the parsed scheme. If no scheme is found in the url, file is used. The resulting input stream is then used.
disco.func.map_output_stream(stream, partition, url, params)
An output_stream() which returns a handle to a partition output. The handle ensures that if a task fails, partially written data is ignored.
disco.func.reduce_input_stream(stream, size, url, params)
An input_stream() which looks at the scheme of url and tries to import a function named input_stream from the module disco.schemes.scheme_SCHEME, where SCHEME is the parsed scheme. If no scheme is found in the url, file is used. The resulting input stream is then used.
disco.func.reduce_output_stream(stream, partition, url, params)
An output_stream() which returns a handle to a reduce output. The handle ensures that if a task fails, partially written data is ignored.