Previous topic

Disco API Reference

Next topic

disco.ddfs — Client interface for Disco Distributed Filesystem

This Page

disco.core — Client interface for Disco

The disco.core module provides a high-level interface for communication with the Disco master. It provides functions for submitting new jobs, querying status of the system, and getting results of jobs.

The Disco object encapsulates connection to the Disco master. Once a connection has been established, you can use the object to query status of the system, or submit a new job with the Disco.new_job() method. See the disco.func module for more information about constructing Disco jobs.

Disco.new_job() is provided with all information needed to run a job, which it packages and sends to the master. The method returns immediately and returns a Job object that corresponds to the newly started job.

class disco.core.Disco(master)

Opens and encapsulates connection to the Disco master.

Parameter:master – address of the Disco master, for instance disco://localhost.
blacklist(node)

Blacklists node so that tasks are no longer run on it.

(Added in version 0.2.4)

clean(name)

Cleans records of the job name.

Note that after the job records have been cleaned, there is no way to obtain addresses to the result files from the master. However, no data is actually deleted by Disco.clean(), in contrast to Disco.purge().

If you won’t need the results, use Disco.purge().

events(name, offset=0)

Returns an iterator that iterates over job events, ordered by time.

It is safe to call this function while the job is running.

The iterator returns tuples (offset, event). You can pass an offset value to this function, to make the iterator skip over the events before the specified offset. This provides an efficient way to monitor job events continuously. See DISCO_EVENTS in disco.settings for more information on how to enable the console output of job events.

(Added in version 0.2.3)

jobinfo(name)
Returns a dictionary containing information about the job name.
joblist()
Returns a list of jobs and their statuses.
jobspec(name)
Returns the raw job request package, as constructed by Disco.new_job(), for the job name.
kill(name)
Kills the job name.
new_job(name, **kwargs)

Submits a new job request to the master.

This method accepts the same set of keyword args as Job.run(). The master argument for the Job constructor is provided by this method. Returns a Job object that corresponds to the newly submitted job request.

nodeinfo()
Returns a dictionary describing status of the nodes that are managed by this Disco master.
oob_get(name, key)

Returns an out-of-band value assigned to key for the job name.

See disco.node.worker for more information on using OOB.

oob_list(name)

Returns all out-of-band keys for the job name.

OOB data is stored by the tasks of job name, using the disco_worker.put() function.

profile_stats(name, mode='')

Returns results of profiling of the given job name.

The job must have been run with the profile flag enabled.

You can restrict results specifically to the map or reduce task by setting mode either to "map" or "reduce". By default results include both the map and the reduce phases. Results are accumulated from all nodes.

The function returns a pstats.Stats object. You can print out results as follows:

job.profile_stats().print_stats()

(Added in version 0.2.1)

purge(name)
Deletes all records and files related to the job name.
request(url, data=None, offset=0)

Requests url at the master.

If a string data is specified, a POST request is made with data as the request payload.

A string is returned that contains the reply for the request. This method is mostly used by other methods in this class internally.

results(jobspec, timeout=2000)

Returns a list of results for a single job or for many concurrently running jobs, depending on the type of jobspec.

If jobspec is a string (job name) or the function is called through the job object (job.results()), this function returns a list of results for the job if the results become available in timeout milliseconds. If not, returns an empty list.

(Added in version 0.2.1) If jobspec is a list of jobs, the function waits at most for timeout milliseconds for at least one on the jobs to finish. In this mode, jobspec can be a list of strings (job names), a list of job objects, or a list of result entries as returned by this function. Two lists are returned: a list of finished jobs and a list of still active jobs. Both the lists contain elements of the following type:

["job name", ["status", [results]]]

where status is either unknown_job, dead, active or ready.

You can use the latter mode as an efficient way to wait for several jobs to finish. Consider the following example that prints out results of jobs as soon as they finish. Here jobs is initially a list of jobs, produced by several calls to Disco.new_job():

while jobs:
 ready, jobs = disco.results(jobs)
  for name, results in ready:
   for k, v in result_iterator(results[1]):
    print k, v
   disco.purge(name)

Note how the list of active jobs, jobs, returned by Disco.results() can be used as the input to the function itself.

wait(name, poll_interval=2, timeout=None, clean=False, show='')

Block until the job name has finished. Returns a list URLs to the results files which is typically processed with result_iterator().

Disco.wait() polls the server for the job status every poll_interval seconds. It raises a disco.JobError if the job hasn’t finished in timeout seconds, if specified.

Parameters:
  • clean

    if set to True, calls Disco.clean() when the job has finished.

    Note that this only removes records from the master, but not the actual result files. Once you are done with the results, call:

    disco.purge(disco.util.jobname(results[0]))
    

    to delete the actual result files.

  • show – enables console output of job events. You can control this parameter also using the environment variable DISCO_EVENTS, which provides the default. See DISCO_EVENTS in disco.settings. (Added in version 0.2.3)
whitelist(node)

Whitelists node so that the master may submit tasks to it.

(Added in version 0.2.4)

class disco.core.JobDict(*args, **kwargs)

Disco.new_job() and Job.run() accept the same set of keyword arguments as specified below.

Note

All arguments that are required are marked as such. All other arguments are optional.

Parameters:
  • input (required, list of inputs or list of list of inputs) –

    Each input must be specified in one of the following ways:

    • http://www.example.com/data - any HTTP address
    • disco://cnode03/bigtxt/file_name - Disco address. Refers to cnode03:/var/disco/bigtxt/file_name. Currently this is an alias for http://cnode03:[DISCO_PORT]/bigtxt/file_name.
    • dir://cnode03/jobname/ - Result directory. This format is used by Disco internally.
    • /home/bob/bigfile.txt - a local file. Note that the file must either exist on all the nodes or you must make sure that the job is run only on the nodes where the file exists. Due to these restrictions, this form has only limited use.
    • raw://some_string - pseudo-address; instead of fetching data from a remote source, use some_string in the address as data. Useful for specifying dummy inputs for generator maps.
    • tag://tagname - a tag stored in Disco Distributed Filesystem (Added in version 0.3)

    (Added in version 0.2.2): An input entry can be a list of inputs: This lets you specify redundant versions of an input file. If a list of redundant inputs is specified, the scheduler chooses the input that is located on the node with the lowest load at the time of scheduling. Redundant inputs are tried one by one until the task succeeds. Redundant inputs require that the map function is specified.

  • map (disco.func.map()) – a pure function that defines the map task.
  • map_init (disco.func.init()) – initialization function for the map task. This function is called once before the task starts.
  • map_input_stream (list of disco.func.input_stream()) –

    The given functions are chained together and the final resulting disco.func.InputStream object is used to iterate over input entries.

    (Added in version 0.2.4)

  • map_output_stream (list of disco.func.output_stream()) – The given functions are chained together and the disco.func.OutputStream.add() method of the last returned disco.func.OutputStream object is used to serialize key, value pairs output by the map. (Added in version 0.2.4)
  • map_reader (disco.func.input_stream()) –

    Convenience function to define the last disco.func.input_stream() function in the map_input_stream chain.

    Disco worker provides a convenience function disco.func.re_reader() that can be used to create a reader using regular expressions.

    If you want to use outputs of an earlier job as inputs, use disco.func.chain_reader() as the map_reader.

    Default is disco.func.map_line_reader().

    (Changing after version 0.3.1) The default map_reader will become None. See the note in disco.func.map_line_reader() for information on how this might affect you.

  • map_writer

    (Deprecated in version 0.3) This function comes in handy e.g. when reduce is not specified and you want map output in a specific format. Another typical case is to use disco.func.object_writer() as map_writer and disco.func.object_reader() as reduce_reader so you can produce arbitrary Python objects in map.

    Remember to specify a reduce_reader that can read the format produced by map_writer. (Added in version 0.2)

  • reduce (disco.func.reduce()) –

    If no reduce function is specified, the job will quit after the map phase has finished.

    Added in version 0.3.1: Reduce supports now an alternative signature, disco.func.reduce2() which uses an iterator instead of out.add() to output results.

    Changed in version 0.2: It is possible to define only reduce without map. For more information, see the FAQ entry Do I always have to provide a function for map and reduce?.

  • reduce_init (disco.func.init()) – initialization function for the reduce task. This function is called once before the task starts.
  • reduce_input_stream (list of disco.func.output_stream()) – The given functions are chained together and the last returned disco.func.InputStream object is given to reduce as its first argument. (Added in version 0.2.4)
  • reduce_output_stream (list of disco.func.output_stream()) – The given functions are chained together and the last returned disco.func.OutputStream object is given to reduce as its second argument. (Added in version 0.2.4)
  • reduce_reader (disco.func.input_stream()) –

    This function needs to match with map_writer, if map is specified. If map is not specified, you can read arbitrary inputs with this function, similar to map_reader. (Added in version 0.2)

    Default is disco.func.chain_reader().

  • reduce_writer – (Deprecated in version 0.3) You can use this function to output results in an arbitrary format from your map/reduce job. If you use result_iterator() to read results, set its reader parameter to a function that can read the format produced by reduce_writer. (Added in version 0.2)
  • combiner (disco.func.combiner()) – called after the partitioning function, for each partition.
  • partition (disco.func.partition()) –

    decides how the map output is distributed to reduce.

    Default is disco.func.default_partition().

  • partitions (int or None) –

    number of partitions, if any.

    Default is 1.

  • merge_partitions (bool) –

    whether or not to merge partitioned inputs during reduce.

    Default is False.

  • nr_reduces (Deprecated in version 0.3 integer) – Use partitions instead.
  • scheduler (dict) –

    options for the job scheduler. The following keys are supported:

    • max_cores - use this many cores at most
      (applies to both map and reduce).

      Default is 2**31.

    • force_local - always run task on the node where
      input data is located; never use HTTP to access data remotely.
    • force_remote - never run task on the node where input
      data is located; always use HTTP to access data remotely.

    (Added in version 0.2.4)

  • sort (boolean) –

    flag specifying whether the intermediate results, that is, input to the reduce function, should be sorted. Sorting is most useful in ensuring that the equal keys are consequent in the input for the reduce function.

    Other than ensuring that equal keys are grouped together, sorting ensures that keys are returned in the ascending order. No other assumptions should be made on the comparison function.

    The external program sort is used to sort the input on disk. In-memory sort can easily be performed by the tasks themselves.

    Default is False.

  • params (Params) –

    object that is passed to worker tasks to store state The object is serialized using the pickle module, so it should be pickleable.

    A convience class Params is provided that provides an easy way to encapsulate a set of parameters. Params allows including pure functions in the parameters.

  • ext_params

    if either map or reduce function is an external program, typically specified using disco.util.external(), this object is used to deliver parameters to the program.

    The default C interface for external Disco functions uses netstring to encode the parameter dictionary. Hence the ext_params value must be a dictionary string (key, value) pairs.

    However, if the external program doesn’t use the default C interface, it can receive parameters in any format. In this case, the ext_params value can be an arbitrary string which can be decoded by the program properly.

    For more information, see Disco External Interface.

  • required_files (list of paths or dict) –

    additional files that are required by the job. Either a list of paths to files to include, or a dictionary which contains items of the form (filename, filecontents).

    You can use this parameter to include custom modules or shared libraries in the job. (Added in version 0.2.3)

    Note

    All files will be saved in a flat directory on the worker. No subdirectories will be created.

    Note

    LD_LIBRARY_PATH is set so you can include a shared library foo.so in required_files and load it in the job directly as ctypes.cdll.LoadLibrary("foo.so"). For an example, see Disco External Interface.

  • required_modules

    required modules to send to the worker (Changed in version 0.2.3): Disco tries to guess which modules are needed by your job functions automatically. It sends any local dependencies (i.e. modules not included in the Python standard library) to nodes by default.

    If guessing fails, or you have other requirements, see disco.modutil for options.

  • status_interval (integer) –

    print “K items mapped / reduced” for every Nth item. Setting the value to 0 disables messages.

    Increase this value, or set it to zero, if you get “Message rate limit exceeded” error due to system messages. This might happen if your tasks are really fast. Decrease the value if you want more messages or you don’t have that many data items.

    Default is 100000.

  • profile (boolean) –

    enable tasks profiling. Retrieve profiling results with Disco.profile_stats().

    Default is False.

pack()
Pack up the JobDict for sending over the wire.
classmethod unpack(jobpack, globals={})
Unpack the previously packed JobDict.
class disco.core.Job(master, name)

Creates a Disco job with the given name.

Use Job.run() to start the job.

You need not instantiate this class directly. Instead, the Disco.new_job() can be used to create and start a job.

Parameters:
  • master – An instance of the Disco class that identifies the Disco master runs this job. This argument is required but it is provided automatically when the job is started using Disco.new_job().
  • name

    The job name. When you create a handle for an existing job, the name is used as given. When you create a new job, the name given is used by Disco as a prefix to construct a unique name, which is then stored in the instance.

    Note

    Only characters in [a-zA-Z0-9_] are allowed in the job name.

All methods in Disco that are related to individual jobs, namely

are also accessible through the Job object, so you can say job.wait() instead of Disco.wait(job.name). However, the job methods in Disco come in handy if you want to manipulate a job that is identified by a job name (Job.name) instead of a Job object.

If you have access only to results of a job, you can extract the job name from an address with the disco.util.jobname() function. A typical case is that you are done with results of a job and they are not needed anymore. You can delete the unneeded job files as follows:

from disco.core import Job
from disco.util import jobname

Job(master, jobname(results[0])).purge()
run(**kwargs)

Returns the job immediately after the request has been submitted.

Accepts the same set of keyword arguments as JobDict.

A typical pattern in Disco scripts is to run a job synchronously, that is, to block the script until the job has finished. This is accomplished as follows:

from disco.core import Disco
results = Disco(master).new_job(...).wait()

Note that job methods of the Disco class are directly accessible through the Job object, such as Disco.wait() above.

A JobError is raised if an error occurs while starting the job.

class disco.core.Params(**kwargs)

Parameter container for map / reduce tasks.

This object provides a convenient way to contain custom parameters, or state, in your tasks.

This example shows a simple way of using Params:

def fun_map(e, params):
        params.c += 1
        if not params.c % 10:
                return [(params.f(e), params.c)]
        return [(e, params.c)]

disco.new_job(name="disco://localhost",
              input=["disco://localhost/myjob/file1"],
              map=fun_map,
              params=disco.core.Params(c=0, f=lambda x: x + "!"))

You can specify any number of key-value pairs to the Params. The pairs will be available to task functions through the params argument. Each task receives its own copy of the initial params object. key must be a valid Python identifier. value can be any Python object. For instance, value can be an arbitrary pure function, such as params.f in the previous example.

disco.core.result_iterator(*args, **kwargs)