Configuration

class desipipe.config.Config(user=None)

Bases: BaseDict

desipipe configuration (‘config.yaml’) is saved under ‘DESIPIPE_CONFIG_DIR’ environment variable if defined, else ‘~/.desipipe’.

Read configuration.

Parameters:

user (str, default=None) – User. Defaults to :func`getuser`.

__class__

alias of DictMetaClass

clear() None.  Remove all items from D.
property config_fn

Path to .yaml configuration file.

get(k[, d]) D[k] if k in D, else d.  d defaults to None.
items() a set-like object providing a view on D's items
keys() a set-like object providing a view on D's keys
pop(k[, d]) v, remove specified key and return the corresponding value.

If key is not found, d is returned if given, otherwise KeyError is raised.

popitem() (k, v), remove and return some (key, value) pair

as a 2-tuple; but raise KeyError if D is empty.

property queue_dir

Queue directory.

setdefault(k[, d]) D.get(k,d), also set D[k]=d if k not in D
update([E, ]**F) None.  Update D from mapping/iterable E and F.

If E present and has a .keys() method, does: for k in E: D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v

values() an object providing a view on D's values

Environment

desipipe.environment.Environment(environ=None, data=None, **kwargs)

Convenient function that returns an environment.

Parameters:
  • environ (BaseEnvironment, str, dict, default=None) – A BaseEnvironment instance, which is then returned directly, a string specifying the name of the environment (e.g. ‘base’, ‘nersc-cosmodesi’) or a dictionary of environment variables. If not specified, the default environment in desipipe’s configuration (see Config) if provided, else ‘base’.

  • data (dict, default=None) – Optionally, additional environment variables.

  • **kwargs (dict) – Optionally, bash command for the environment, see BaseEnvironment.

Returns:

environ

Return type:

BaseEnvironment

class desipipe.environment.RegisteredEnvironment(name, bases, class_dict)

Bases: DictMetaClass

Metaclass registering BaseEnvironment-derived classes.

__base__

alias of DictMetaClass

mro()

Return a type’s method resolution order.

register(subclass)

Register a virtual subclass of an ABC.

Returns the subclass, to allow usage as a class decorator.

set_logger()

Add attributes for logging:

  • logger

  • methods log_debug, log_info, log_warning, log_error, log_critical

desipipe.environment.bash_env(cmd)

Run input command in a subprocess and collect environment variables.

desipipe.environment.change_environ(environ)

Temporarily set the process environment variables.

>>> with change_environ(PLUGINS_DIR='test/plugins'):
...   "PLUGINS_DIR" in os.environ
True
>>> "PLUGINS_DIR" in os.environ
False
Parameters:

environ (dict[str, unicode]) – Environment variables to set

desipipe.environment.get_environ(environ=None, data=None, **kwargs)

Convenient function that returns an environment.

Parameters:
  • environ (BaseEnvironment, str, dict, default=None) – A BaseEnvironment instance, which is then returned directly, a string specifying the name of the environment (e.g. ‘base’, ‘nersc-cosmodesi’) or a dictionary of environment variables. If not specified, the default environment in desipipe’s configuration (see Config) if provided, else ‘base’.

  • data (dict, default=None) – Optionally, additional environment variables.

  • **kwargs (dict) – Optionally, bash command for the environment, see BaseEnvironment.

Returns:

environ

Return type:

BaseEnvironment

File manager

class desipipe.file_manager.FileManager(database=(), environ=None)

Bases: FileEntryCollection

BaseFile manager, main class to be used to get paths to / load / save files.

Initialize FileEntryCollection.

Parameters:
  • data (dict, string, default=None) – Dictionary or path to a data base yaml file.

  • string (str, default=None) – If not None, yaml format string to decode. Added on top of data.

  • parser (callable, default=None) – Function that parses yaml string into a dictionary. Used when data is string, or string is not None.

  • **kwargs (dict) – Arguments for parser().

__class__

alias of BaseMetaClass

append(entry)

Append an input file entry, which may be e.g. a dictionary, or a BaseFileEntry instance.

chgdir(newdir, olddir=None)

Change directory for new one in path. As a default, olddir (directory to be replaced) is taken to be the common directory to all entries that does not contain replacement options.

clone(**kwargs)

Return an updated copy.

classmethod databases(keywords=None)

List of paths to available data bases following desipipe’s configuration (see Config).

exists(return_type='dict')

Return summary description of which files exist or not.

Parameters:

return_type (str, default='dict') – If ‘dict’, return a dictionary mapping exists to the file paths. If ‘str’, return a string.

Returns:

summary

Return type:

dict, str

property filepaths

All file paths in file collection.

get(*args, check_exists=False, raise_error=True, **kwargs)

Return the BaseFile instance that matches input arguments, see select().

Parameters:
  • *args (dict) – If select() returns several file entries, and / or file entries with multiples files, a ValueError is raised.

  • **kwargs (dict) – If select() returns several file entries, and / or file entries with multiples files, a ValueError is raised.

  • check_exists (bool, default=False) – If True, check whether file exists; if not, raise a FileNotFoundError.

  • raise_error (bool, default=True) – If False and an error is raised, catch it and return None.

Returns:

file

Return type:

BaseFile

index(id=None, filetype=None, keywords=None, return_entry=False, empty_error=False, ignore=False, **kwargs)

Return indices for input identifiers, keywords, or options, e.g.

>>> db.index(keywords=['power cutsky', 'fiber'], options={'tracer': 'ELG'})

selects the index of data base entries whose description contains ‘power’ and ‘cutsky’ or ‘fiber’, and option ‘tracer’ is ‘ELG’.

Parameters:
  • id (list, str, default=None) – List of file entry identifiers. Defaults to all identifiers (no selection).

  • filetype (list, str, default=None) – List of file types. Defaults to all file types (no selection).

  • keywords (list, str, default=None) – List of keywords to search for in the file entry descriptions. If a string contains several words, all of them must be in the description for the corresponding file entry to be selected. If a list of strings is provided, any of the strings must be in the description for the corresponding file entry to be selected. e.g. ['power cutsky', 'fiber'] selects the data base entries whose description contains ‘power’ and ‘cutsky’ or ‘fiber’.

  • empty_error (bool, default=False) – If True, return (hopefully meaningful) error if no match is found.

  • **kwargs (dict) – Restrict to these options, see BaseFileEntry.select().

Returns:

index – List of indices.

Return type:

list

iter(include=None, exclude=None, get=None, intersection=False)

Iterate over options that are common to all file entries (options), and return the list of the (selected) FileManager instances.

Parameters:
  • include (str, list, default=None) – List of options to include in the iteration. None to include all options.

  • exclude (str, list, default=None) – List of options to exclude in the iteration. None to not exclude any option.

  • get (bool, default=None) – If False, return a list of FileManager instances. If True, call FileManager.get() to return a list of BaseFile instances. If None, exclude is None, and there are single options in FileManager instances, call FileManager.get() to return a list of BaseFile instances.

  • intersection (bool, default=True) – If False, iterate over all file entry options, i.e. not only those which are common to all file entries.

Returns:

fms

Return type:

list of the FileManager instances.

iter_options(include=None, exclude=None, intersection=False)

Iterate over options that are common to all file entries (options).

Parameters:
  • include (str, list, default=None) – List of options to include in the iteration. None to include all options.

  • exclude (str, list, default=None) – List of options to exclude in the iteration. None to not exclude any option.

  • intersection (bool, default=True) – If False, iterate over all file entry options, i.e. not only those which are common to all file entries.

Returns:

options

Return type:

list of the options.

property options

Return intersection of all options, i.e. options that are common to all file entries.

save(fn, replace_environ=False)

Save data base to yaml file fn.

Parameters:
  • fn (str, Path) – Where to save file data base.

  • replace_environ (bool, default=False) – If True, replace environment variables in entry’s path by their values.

select(id=None, filetype=None, keywords=None, empty_error=False, **kwargs)

Restrict to input identifiers, keywords, or options, e.g.

>>> db.select(keywords=['power cutsky', 'fiber'], options={'tracer': 'ELG'})

selects the data base entries whose description contains ‘power’ and ‘cutsky’ or ‘fiber’, and option ‘tracer’ is ‘ELG’.

Parameters:
  • id (list, str, default=None) – List of file entry identifiers. Defaults to all identifiers (no selection).

  • filetype (list, str, default=None) – List of file types. Defaults to all file types (no selection).

  • keywords (list, str, default=None) – List of keywords to search for in the file entry descriptions. If a string contains several words, all of them must be in the description for the corresponding file entry to be selected. If a list of strings is provided, any of the strings must be in the description for the corresponding file entry to be selected. e.g. ['power cutsky', 'fiber'] selects the data base entries whose description contains ‘power’ and ‘cutsky’ or ‘fiber’.

  • empty_error (bool, default=False) – If True, return (hopefully meaningful) error if no match is found.

  • ignore (bool, list, default=False) – If True, also keep file entries that do not take the input options. If list, do not cut file entries to these input options.

  • **kwargs (dict) – Restrict to these options, see BaseFileEntry.select().

Returns:

new – Selected data base.

Return type:

FileEntryCollection

Create symlink for all files in the collection.

update(**kwargs)

Update data (list of BaseFileEntry) or environ (dict).

class desipipe.file_manager.JointMetaClass(name, bases, namespace, **kwargs)

Bases: ABCMeta, BaseMetaClass

mro()

Return a type’s method resolution order.

register(subclass)

Register a virtual subclass of an ABC.

Returns the subclass, to allow usage as a class decorator.

set_logger()

Add attributes for logging:

  • logger

  • methods log_debug, log_info, log_warning, log_error, log_critical

class desipipe.file_manager.RegisteredFileEntry(name, bases, class_dict)

Bases: BaseMetaClass

Metaclass registering BaseFileEntry-derived classes.

__base__

alias of BaseMetaClass

mro()

Return a type’s method resolution order.

set_logger()

Add attributes for logging:

  • logger

  • methods log_debug, log_info, log_warning, log_error, log_critical

class desipipe.file_manager.YamlLoader(stream)

Bases: SafeLoader

yaml loader that correctly parses numbers. Taken from https://stackoverflow.com/questions/30458977/yaml-loads-5e-6-as-string-and-not-a-number.

Initialize the scanner.

check_state_key(key)

Block special attributes/methods from being set in a newly created object, to prevent user-controlled methods from being called during deserialization

desipipe.file_manager.common_options(list_options, intersection=True)

Return the common options.

desipipe.file_manager.get_file_entry(file_entry=None, file_entry_collection=None, **kwargs)

Convenient function that returns the file entry.

Parameters:
  • file_entry (BaseFileEntry, str, dict, default=None) – A BaseFileEntry instance, which is then returned directly, a string specifying the name of the file entry (e.g. ‘base’) or a dictionary of file entry attributes. If not specified, the default file entry in desipipe’s configuration (see Config) is used if provided, else ‘base’.

  • **kwargs (dict) – Optionally, additional provider attributes.

Returns:

provider

Return type:

BaseProvider

desipipe.file_manager.in_options(values, options, return_index=False)

Return input values that are in options.

desipipe.file_manager.yaml_parser(string)

Parse string in yaml format.

I/Os

To implement a new file format, just subclass BaseFile.

class desipipe.io.RegisteredFile(name, bases, class_dict)

Bases: BaseMetaClass

Metaclass registering BaseFile-derived classes.

__base__

alias of BaseMetaClass

mro()

Return a type’s method resolution order.

set_logger()

Add attributes for logging:

  • logger

  • methods log_debug, log_info, log_warning, log_error, log_critical

desipipe.io.get_filetype(filetype, path, *args, **kwargs)

Convenient function that returns a BaseFile instance.

Parameters:
  • filetype (str, BaseFile) – Name of BaseFile, or BaseFile instance.

  • path (str) – Path to file.

  • *args (tuple) – Other arguments for BaseFile.

  • **kwargs (dict) – Other optional arguments for BaseFile.

Returns:

file

Return type:

BaseFile

Task manager

exception desipipe.task_manager.DeserializationError

Bases: Exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class desipipe.task_manager.MyStream(stream, callback=None)

Bases: object

class desipipe.task_manager.Queue(name, base_dir=None, create=None)

Bases: BaseClass

Queue keeping track of all tasks that have been run and to be run, with sqlite backend.

Initialize queue.

Parameters:
  • name (str) – Name of queue; can contain alphanumeric characters, underscores and hyphens. “this/queue” saves the queue as base_dir/this/queue.sqlite. “/this/queue” saves the queue as “/this/queue.sqlite” (starting from root).

  • base_dir (str, default=None) – Base directory where to save queue. If None, defaults to Config.queue_dir.

  • create (bool, default=None) – If True, create a new queue; a ValueError is raised if the queue already exists. If False, do not create the queue; a class:ValueError is raised if no queue exists. If None (default), create the queue if does not exist.

__class__

alias of BaseMetaClass

add(tasks, replace=False, lock=True)

Add input task(s) to the queue.

Parameters:
  • tasks (Task, list) – Task or list of tasks.

  • replace (bool, default=False) – If True, replace task(s) (as identified by their IDs) with the input ones. If False, an error is raised if input tasks (as identified by their IDs) are already in the queue. If None, do not add task if already in queue, but update task manager if task state is “PENDING” or “WAITING”.

  • lock (bool, default=True) – Ask for lock.

Returns:

futures – List of Future corresponding to input tasks.

Return type:

list

clear(kill=True)

Clear queue: delete and recreate.

counts(state=None, mid=None)

Count the number of tasks.

Parameters:
  • state (str, list, default=None) – If not None, select tasks with given state.

  • mid (str, list, default=None) – If not None, select tasks with given task manager ID.

Returns:

counts

Return type:

int

delete(kill=True)

Delete data base db from both this instance and the disk (and delete associated jobs).

managers(mid=None, property=None)

List managers.

Parameters:
  • mid (str, default=None) – If not None, return the task manager with given ID.

  • property (str, default=None) – If not None, instead of returning task manager(s), return this property (one of “mid”).

Returns:

tm – Task manager or property or list of such objects.

Return type:

TaskManager, list

pause()

Pause queue, i.e. set state to “PAUSED”.

property paused

Is queue paused?

pop(state='PENDING', **kwargs)

Retrieve a task to be run (i.e. in “PENDING” state).

Parameters:
  • state (str, list, default="PENDING") – Select a task with this state.

  • **kwargs (dict) – Optional arguments to select task: tid, mid, name, etc.

Returns:

task

Return type:

Task

processes()

List processes that have been launched.

resume()

Resume queue, i.e. set state to “ACTIVE”.

set_task_state(tid, state, jobid=None, t0=None)

Set the state of task with input ID tid to state.

property state

Get queue state (“ACTIVE” or “PAUSED”).

summary(mid=None, return_type='dict')

Return summary description of queue, i.e. number of tasks in all states TaskState.ALL.

Parameters:
  • mid (str, list, default=None) – If not None, select tasks with given task manager ID.

  • return_type (str, default="dict") – If “dict”, return a dictionary mapping task state to the task counts. If “str”, return a string.

Returns:

summary

Return type:

dict, str

tasks(tid=None, state=None, mid=None, jobid=None, name=None, index=None, one=None, property=None)

List tasks in queue.

Parameters:
  • tid (str, list, default=None) – If not None, select task with given ID.

  • state (str, list, default=None) – If not None, select tasks with given state.

  • mid (str, list, default=None) – If not None, select tasks with given task manager ID.

  • jobid (str, list, default=None) – If not None, select tasks with given job ID.

  • name (str, list, default=None) – If not None, select tasks with input application name.

  • index (int, list, default=None) – If not None, select tasks with input index.

  • one (bool, default=None) – If True, return only one task. If False, return list. If None, and tid is not None, return a single task; else a list of tasks.

  • property (str, list, default=None) – If not None, instead of returning task(s), return this property (one of “tid”, “state”, “mid”, “jobid”, “t0”, “task_manager”).

Returns:

tasks – Task or property or property or list of such objects.

Return type:

Task, list

class desipipe.task_manager.QueueState

Bases: object

exception desipipe.task_manager.SerializationError

Bases: Exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class desipipe.task_manager.Task(app, kwargs=None, state=None)

Bases: BaseClass

Class representing a task, i.e. a application (app) and the arguments to call it with (args and kwargs).

id

Task unique identifier. It is built from the pickle representation of (app, args, kwargs), such that different tasks have different identifiers (with extremely high probability).

Type:

str

app

Application.

Type:

BaseApp

kwargs

Dictionary of arguments to be passed to BaseApp.run().

Type:

dict

require_ids

List of task IDs that are required for this task to be run, infered from Future instances passed to args and kwargs.

Type:

list

state

Task state, one of TaskState.ALL, i.e. (“WAITING”, “PENDING”, “RUNNING”, “SUCCEEDED”, “FAILED”, “KILLED”, “UNKNOWN”).

Type:

str

jobid

Job identifier (not used for now).

Type:

str

errno

0 if no error, else error number (defaults to 42).

Type:

int

err

Error message; empty if no error.

Type:

str

out

Standard output message.

Type:

str

versions

Module versions.

Type:

dict

result

Value returned by BaseApp.func.

Type:

object

t0

Start time of BaseApp.run.

Type:

float

dtime

Running time of BaseApp.run.

Type:

float

Initialize Task.

Parameters:
  • app (BaseApp) – Application.

  • kwargs (dict, default=None) – Dictionary of arguments to be passed to BaseApp.run().

  • state (str, default=None) – Task state. Defaults to “WAITING” if this task requires others to be run, else to “PENDING”.

__class__

alias of BaseMetaClass

clone(*args, **kwargs)

Return an updated copy.

run(**kwargs)

Run task:

  • call BaseApp.run, saving main script where the task is defined and package versions in a folder “.desipipe” located in the directory where files are saved (if any).

  • set errno, result, err, out, versions and dtime.

  • set state: “KILLED” if termination signal, “FAILED” if BaseApp.run raised an exception, else “SUCCEEDED”.

update(**kwargs)

Update task with input attributes.

class desipipe.task_manager.TaskManager(queue, environ=None, scheduler=None, provider=None)

Bases: BaseClass

Task manager, main class to be used in scripts, e.g.:

queue = Queue("test", base_dir="_tests")
tm = TaskManager(queue, environ=Environment(), scheduler=dict(max_workers=10))

@tm.python_app
def test(n):
    print("hello" * n)
queue

Queue, see Queue.

Type:

Queue

tid

Task manager unique identifier. It is built from the pickle representation of (environ, scheduler, provider), such that different task managers have different identifiers (with extremely high probability).

Type:

str

environ

Tasks are run within this environment.

Type:

BaseEnvironment

scheduler

Tasks are distributed to workers with this scheduler.

Type:

BaseScheduler

provider

Tasks are executed on the machine with this provider.

Type:

BaseProvider

Initialize TaskManager.

Parameters:
  • queue (str, Queue) – Queue, see get_queue().

  • environ (BaseEnvironment, str, dict, default=None) – Tasks are run within this environment. See get_environ().

  • scheduler (BaseScheduler, str, dict, default=None) – Tasks are distributed to workers with this scheduler. See get_scheduler().

  • provider (BaseProvider, str, dict, default=None) – Tasks are executed on the machine with this provider. See get_provider().

__class__

alias of BaseMetaClass

clone(**kwargs)

Return an updated copy.

spawn(*args, **kwargs)

Distribute tasks to workers.

update(**kwargs)

Update task manager attributes.

class desipipe.task_manager.TaskManagerPickler(file, protocol=None, fix_imports=True, buffer_callback=None)

Bases: Pickler

Special pickler for tasks, handling BaseApp and Future instances.

clear_memo()

Clears the pickler’s “memo”.

The memo is the data structure that remembers which objects the pickler has already seen, so that shared or recursive objects are pickled by reference and not by value. This method is useful when re-using picklers.

dump(obj, /)

Write a pickled representation of the given object to the open file.

classmethod dumps(obj, *args, **kwargs)

Dump input obj to string. args and kwargs are passed to __init__().

class desipipe.task_manager.TaskManagerUnpickler(file, queue=None)

Bases: Unpickler

Unpickler that corresponds to TaskPickler, replacing Future instances by the actual result of the computation.

find_class(module_name, global_name, /)

Return an object from a specified module.

If necessary, the module will be imported. Subclasses may override this method (e.g. to restrict unpickling of arbitrary classes and functions).

This method is called whenever a class or a function object is needed. Both arguments passed are str objects.

load()

Load a pickle.

Read a pickled object representation from the open file object given in the constructor, and return the reconstituted object hierarchy specified therein.

classmethod loads(s, *args, **kwargs)

Load input string s. args and kwargs are passed to __init__().

class desipipe.task_manager.TaskPickler(*args, reduce_app=<function reduce_app>, **kwargs)

Bases: Pickler

Special pickler for tasks, handling BaseApp and Future instances.

Initialize pickler and add the special reduce method for BaseApp to the dispatch_table.

clear_memo()

Clears the pickler’s “memo”.

The memo is the data structure that remembers which objects the pickler has already seen, so that shared or recursive objects are pickled by reference and not by value. This method is useful when re-using picklers.

dump(obj, /)

Write a pickled representation of the given object to the open file.

classmethod dumps(obj, *args, **kwargs)

Dump input obj to string. args and kwargs are passed to __init__().

class desipipe.task_manager.TaskState

Bases: object

class desipipe.task_manager.TaskUnpickler(file, queue=None)

Bases: Unpickler

Unpickler that corresponds to TaskPickler, replacing Future instances by the actual result of the computation.

find_class(module_name, global_name, /)

Return an object from a specified module.

If necessary, the module will be imported. Subclasses may override this method (e.g. to restrict unpickling of arbitrary classes and functions).

This method is called whenever a class or a function object is needed. Both arguments passed are str objects.

load()

Load a pickle.

Read a pickled object representation from the open file object given in the constructor, and return the reconstituted object hierarchy specified therein.

classmethod loads(s, *args, **kwargs)

Load input string s. args and kwargs are passed to __init__().

class desipipe.task_manager.VarType

Bases: object

desipipe.task_manager.action_from_args(action='work', args=None)

Function called when using desipipe from the command line.

desipipe.task_manager.decorator(func)

Decorator to deal with decorator arguments, e.g.:

@tm.python_app
def test(n):
    print("hello" * n)

and

@tm.python_app(skip=False)
def test(n):
    print("hello" * n)

are equivalent.

desipipe.task_manager.get_queue(queue, create=None, one=True)

Return queue.

Parameters:
  • queue (str, Queue, list) – Queue names, Queue, or list of such objects.

  • create (bool, default=None) – If True, create a new queue; a ValueError is raised if the queue already exists. If False, do not create the queue; a class:ValueError is raised if no queue exists. If None (default), create the queue if does not exist.

  • one (bool, default=True) – If True, return one queue. Raise a ValueError if queue corresponds to several queues. If False, ensure a list of queues is returned.

Returns:

queue – Queue or list of queues.

Return type:

Queue, list

desipipe.task_manager.kill(queue=None, provider=None, jobid=None, state=None, **kwargs)

Kill launched jobs.

Parameters:
  • queue (Queue, list, default=None) – Queue or list of queues to process. If None, jobid must be provided.

  • provider (BaseProvider, str, dict, default=None) – To access BaseProvider.kill() method. See get_provider().

  • jobid (str, list, default=None) – IDs of jobs to kill, defaults to all.

  • state (str, default=None) – State of tasks to kill, defaults to all.

  • **kwargs (dict) – Optional arguments to select tasks in queue to be killed: tid, mid, name. See Queue.tasks().

desipipe.task_manager.reduce_app(self)

Special reduce method for BaseApp, dropping task_manager.

desipipe.task_manager.retry(queue, state='KILLED', **kwargs)

Move (by default killed) tasks into PENDING state, so they are rerun.

Parameters:
  • queue (Queue, list, default=None) – Queue or list of queues to process. If None, jobid must be provided.

  • state (str, default="KILLED") – State of tasks to move to PENDING state.

  • **kwargs (dict) – Optional arguments to select tasks in queue to be moved to PENDING state: tid, mid, name, etc.

desipipe.task_manager.select_modules(modules)

Select “standard” top-level modules: those which do not start with an underscore.

desipipe.task_manager.spawn(queue, timeout=86400, timestep=10.0, mode='', max_workers=None, spawn=False)

Distribute tasks to workers. If all queues are paused, the function terminates.

Parameters:
  • queue (Queue, list) – Queue or list of queues to process.

  • timeout (float, default=3600 * 24) – Time out after this delay (in seconds).

  • timestep (float, default=10.) – Period (in seconds) at which the queue is queried for new tasks, and running / pending jobs are checked (with a refreshment time step of timestep). Increase in case the provider (e.g. Slurm) cannot handle too frequent calls to the state of the queue.

  • mode (str, default="") – Processing mode. “stop_at_error” to stop as soon as a task is failed. “retry_at_timeout” to retry when time out. “no_stream” to not stream stderr/stdout during the tasks (helps when many jobs in parallel). “no_out” to not stream stderr/stdout and not save stdout.

  • spawn (bool, default=False) – If True, spawn a new manager process and exit this one.

desipipe.task_manager.unique_id(uid)

Return ~ unique ID from input object.

desipipe.task_manager.work(queue, mid=None, tid=None, name=None, provider=None, mode='', mpicomm=None, mpisplits=None, timestep=120.0)

Do the actual work: pop tasks from the input queue, and run them.

Parameters:
  • queue (Queue) – Queue to pop tasks from.

  • mid (str, list, default=None) – If not None, take tasks with this task manager ID.

  • tid (str, list, default=None) – If not None, take a task with this task ID.

  • name (str, list, default=None) – If not None, take a task with this name.

  • provider (str, default=None) – Name of provider, to get process ID. Defaults to the manager”s provider.

  • mode (str, default="") – Processing mode. “stop_at_error” to stop as soon as a task is failed. “retry_at_timeout” to retry when time out. “no_stream” to not stream stderr/stdout during the tasks (helps when many jobs in parallel). “no_out” to not stream stderr/stdout and not save stdout.

  • timestep (float, default=120) – Time step for streaming output.

  • mpicomm (MPI communicator, default=mpi.COMM_WORLD) – The MPI communicator.

  • mpisplits (int, default=None) – Split MPI communicator in this number of subcommunicators to run mpisplits tasks in parallel.

Provider

desipipe.provider.Provider(provider=None, **kwargs)

Convenient function that returns the provider.

Parameters:
  • provider (BaseProvider, str, dict, default=None) – A BaseProvider instance, which is then returned directly, a string specifying the name of the provider (e.g. ‘local’) or a dictionary of provider attributes. If not specified, the default provider in desipipe’s configuration (see Config) is used if provided, else ‘local’.

  • **kwargs (dict) – Optionally, additional provider attributes.

Returns:

provider

Return type:

BaseProvider

class desipipe.provider.RegisteredProvider(name, bases, class_dict)

Bases: BaseMetaClass

Metaclass registering BaseProvider-derived classes.

__base__

alias of BaseMetaClass

mro()

Return a type’s method resolution order.

set_logger()

Add attributes for logging:

  • logger

  • methods log_debug, log_info, log_warning, log_error, log_critical

desipipe.provider.decode_slurm_time(s)

Decode Slurm time.

desipipe.provider.get_provider(provider=None, **kwargs)

Convenient function that returns the provider.

Parameters:
  • provider (BaseProvider, str, dict, default=None) – A BaseProvider instance, which is then returned directly, a string specifying the name of the provider (e.g. ‘local’) or a dictionary of provider attributes. If not specified, the default provider in desipipe’s configuration (see Config) is used if provided, else ‘local’.

  • **kwargs (dict) – Optionally, additional provider attributes.

Returns:

provider

Return type:

BaseProvider

desipipe.provider.time_lru_cache(func)

Wrapper that caches function result for a dt second time period maximum. Idea taken from https://stackoverflow.com/questions/31771286/python-in-memory-cache-with-time-to-live

Scheduler

class desipipe.scheduler.RegisteredScheduler(name, bases, class_dict)

Bases: BaseMetaClass

Metaclass registering BaseScheduler-derived classes.

__base__

alias of BaseMetaClass

mro()

Return a type’s method resolution order.

set_logger()

Add attributes for logging:

  • logger

  • methods log_debug, log_info, log_warning, log_error, log_critical

desipipe.scheduler.Scheduler(scheduler=None, **kwargs)

Convenient function that returns the scheduler.

Parameters:
  • scheduler (BaseScheduler, str, dict, default=None) – A BaseScheduler instance, which is then returned directly, a string specifying the name of the scheduler (e.g. ‘simple’) or a dictionary of scheduler attributes. If not specified, the default scheduler in desipipe’s configuration (see Config) is used if provided, else ‘simple’.

  • **kwargs (dict) – Optionally, additional scheduler attributes.

Returns:

scheduler

Return type:

BaseScheduler

desipipe.scheduler.get_scheduler(scheduler=None, **kwargs)

Convenient function that returns the scheduler.

Parameters:
  • scheduler (BaseScheduler, str, dict, default=None) – A BaseScheduler instance, which is then returned directly, a string specifying the name of the scheduler (e.g. ‘simple’) or a dictionary of scheduler attributes. If not specified, the default scheduler in desipipe’s configuration (see Config) is used if provided, else ‘simple’.

  • **kwargs (dict) – Optionally, additional scheduler attributes.

Returns:

scheduler

Return type:

BaseScheduler

Utilities

A few utilities.

class desipipe.utils.BaseMetaClass(name, bases, class_dict)

Bases: type

Metaclass to add logging attributes to BaseClass derived classes.

mro()

Return a type’s method resolution order.

set_logger()

Add attributes for logging:

  • logger

  • methods log_debug, log_info, log_warning, log_error, log_critical

class desipipe.utils.DictMetaClass(name, bases, class_dict)

Bases: BaseMetaClass, ABCMeta

__base__

alias of BaseMetaClass

mro()

Return a type’s method resolution order.

register(subclass)

Register a virtual subclass of an ABC.

Returns the subclass, to allow usage as a class decorator.

set_logger()

Add attributes for logging:

  • logger

  • methods log_debug, log_info, log_warning, log_error, log_critical

class desipipe.utils.LoggingContext(level=None)

Bases: object

Class to locally update logging level:

>>> with LoggingContext('warning') as mem:
        ...
        # Logging level is warning
        logger = logging.getLogger('Logger')
        logger.info('This should not be printed')  # not logged
        logger.warning('This should be printed')  # logged
        ...
desipipe.utils.copytree(src, dst, symlinks=False, ignore=None, copy_function=<function copy2>, ignore_dangling_symlinks=False, dirs_exist_ok=False, dirs_copystat=True)

Slight modification of shutil.copytree. Added dirs_copystat flag: if false, shutil.copystat is not applied to directories. Motivation: shutil.copystat can fail if the owner of the destination directory or file is different from the current user. shutil.copy2 involves shutil.copystat as well. Thus to avoid this operation completely, use copy_function=shutil.copyfile and dirs_copystat=False.

Recursively copy a directory tree and return the destination directory.

If exception(s) occur, an Error is raised with a list of reasons.

If the optional symlinks flag is true, symbolic links in the source tree result in symbolic links in the destination tree; if it is false, the contents of the files pointed to by symbolic links are copied. If the file pointed by the symlink doesn’t exist, an exception will be added in the list of errors raised in an Error exception at the end of the copy process.

You can set the optional ignore_dangling_symlinks flag to true if you want to silence this exception. Notice that this has no effect on platforms that don’t support os.symlink.

The optional ignore argument is a callable. If given, it is called with the src parameter, which is the directory being visited by copytree(), and names which is the list of src contents, as returned by os.listdir():

callable(src, names) -> ignored_names

Since copytree() is called recursively, the callable will be called once for each directory that is copied. It returns a list of names relative to the src directory that should not be copied.

The optional copy_function argument is a callable that will be used to copy each file. It will be called with the source path and the destination path as arguments. By default, copy2() is used, but any function that supports the same signature (like copy()) can be used.

If dirs_exist_ok is false (the default) and dst already exists, a FileExistsError is raised. If dirs_exist_ok is true, the copying operation will continue if it encounters existing directories, and files within the dst tree will be overwritten by corresponding files from the src tree.

desipipe.utils.dict_to_yaml(d)

(Recursively) cast objects of input dictionary d to Python base types, such that they can be understood by the base yaml.

desipipe.utils.exception_handler(exc_type, exc_value, exc_traceback, mpicomm=None)

Print exception with a logger.

desipipe.utils.is_path(item)

Whether input item is a path.

desipipe.utils.is_sequence(item)

Whether input item is a tuple or list.

desipipe.utils.mkdir(dirname, **kwargs)

Try to create dirname and catch OSError.

desipipe.utils.setup_logging(level=20, stream=None, filename=None, filemode='w', **kwargs)

Set up logging.

Note

You may find it useful to have different logging level depending on the process; e.g. setup_logging(level=(logging.INFO if mpicomm.rank == 0 else logging.ERROR)) will set INFO level on rank 0, and ERROR level on all other ranks of the MPI communicator mpicomm.

Parameters:
  • level (string, int, default=logging.INFO) – Logging level.

  • stream (_io.TextIOWrapper, default=sys.stdout) – Where to stream.

  • filename (string, default=None) – If not None stream to file name.

  • filemode (string, default='w') – Mode to open file, only used if filename is not None.

  • kwargs (dict) – Other arguments for logging.basicConfig().