Configuration
- class desipipe.config.Config(user=None)
Bases:
BaseDictdesipipe configuration (‘config.yaml’) is saved under ‘DESIPIPE_CONFIG_DIR’ environment variable if defined, else ‘~/.desipipe’.
Read configuration.
- Parameters:
user (str, default=None) – User. Defaults to :func`getuser`.
- __class__
alias of
DictMetaClass
- clear() None. Remove all items from D.
- property config_fn
Path to .yaml configuration file.
- get(k[, d]) D[k] if k in D, else d. d defaults to None.
- items() a set-like object providing a view on D's items
- keys() a set-like object providing a view on D's keys
- pop(k[, d]) v, remove specified key and return the corresponding value.
If key is not found, d is returned if given, otherwise KeyError is raised.
- popitem() (k, v), remove and return some (key, value) pair
as a 2-tuple; but raise KeyError if D is empty.
- property queue_dir
Queue directory.
- setdefault(k[, d]) D.get(k,d), also set D[k]=d if k not in D
- update([E, ]**F) None. Update D from mapping/iterable E and F.
If E present and has a .keys() method, does: for k in E: D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v
- values() an object providing a view on D's values
Environment
- desipipe.environment.Environment(environ=None, data=None, **kwargs)
Convenient function that returns an environment.
- Parameters:
environ (BaseEnvironment, str, dict, default=None) – A
BaseEnvironmentinstance, which is then returned directly, a string specifying the name of the environment (e.g. ‘base’, ‘nersc-cosmodesi’) or a dictionary of environment variables. If not specified, the default environment in desipipe’s configuration (seeConfig) if provided, else ‘base’.data (dict, default=None) – Optionally, additional environment variables.
**kwargs (dict) – Optionally, bash command for the environment, see
BaseEnvironment.
- Returns:
environ
- Return type:
BaseEnvironment
- class desipipe.environment.RegisteredEnvironment(name, bases, class_dict)
Bases:
DictMetaClassMetaclass registering
BaseEnvironment-derived classes.- __base__
alias of
DictMetaClass
- mro()
Return a type’s method resolution order.
- register(subclass)
Register a virtual subclass of an ABC.
Returns the subclass, to allow usage as a class decorator.
- set_logger()
Add attributes for logging:
logger
methods log_debug, log_info, log_warning, log_error, log_critical
- desipipe.environment.bash_env(cmd)
Run input command in a subprocess and collect environment variables.
- desipipe.environment.change_environ(environ)
Temporarily set the process environment variables.
>>> with change_environ(PLUGINS_DIR='test/plugins'): ... "PLUGINS_DIR" in os.environ True
>>> "PLUGINS_DIR" in os.environ False
- Parameters:
environ (dict[str, unicode]) – Environment variables to set
- desipipe.environment.get_environ(environ=None, data=None, **kwargs)
Convenient function that returns an environment.
- Parameters:
environ (BaseEnvironment, str, dict, default=None) – A
BaseEnvironmentinstance, which is then returned directly, a string specifying the name of the environment (e.g. ‘base’, ‘nersc-cosmodesi’) or a dictionary of environment variables. If not specified, the default environment in desipipe’s configuration (seeConfig) if provided, else ‘base’.data (dict, default=None) – Optionally, additional environment variables.
**kwargs (dict) – Optionally, bash command for the environment, see
BaseEnvironment.
- Returns:
environ
- Return type:
BaseEnvironment
File manager
- class desipipe.file_manager.FileManager(database=(), environ=None)
Bases:
FileEntryCollectionBaseFile manager, main class to be used to get paths to / load / save files.
Initialize
FileEntryCollection.- Parameters:
data (dict, string, default=None) – Dictionary or path to a data base yaml file.
string (str, default=None) – If not
None, yaml format string to decode. Added on top ofdata.parser (callable, default=None) – Function that parses yaml string into a dictionary. Used when
datais string, orstringis notNone.**kwargs (dict) – Arguments for
parser().
- __class__
alias of
BaseMetaClass
- append(entry)
Append an input file entry, which may be e.g. a dictionary, or a
BaseFileEntryinstance.
- chgdir(newdir, olddir=None)
Change directory for new one in
path. As a default,olddir(directory to be replaced) is taken to be the common directory to all entries that does not contain replacement options.
- clone(**kwargs)
Return an updated copy.
- classmethod databases(keywords=None)
List of paths to available data bases following desipipe’s configuration (see
Config).
- exists(return_type='dict')
Return summary description of which files exist or not.
- Parameters:
return_type (str, default='dict') – If ‘dict’, return a dictionary mapping exists to the file paths. If ‘str’, return a string.
- Returns:
summary
- Return type:
dict, str
- property filepaths
All file paths in file collection.
- get(*args, check_exists=False, raise_error=True, **kwargs)
Return the
BaseFileinstance that matches input arguments, seeselect().- Parameters:
*args (dict) – If
select()returns several file entries, and / or file entries with multiples files, aValueErroris raised.**kwargs (dict) – If
select()returns several file entries, and / or file entries with multiples files, aValueErroris raised.check_exists (bool, default=False) – If
True, check whether file exists; if not, raise aFileNotFoundError.raise_error (bool, default=True) – If
Falseand an error is raised, catch it and returnNone.
- Returns:
file
- Return type:
BaseFile
- index(id=None, filetype=None, keywords=None, return_entry=False, empty_error=False, ignore=False, **kwargs)
Return indices for input identifiers, keywords, or options, e.g.
>>> db.index(keywords=['power cutsky', 'fiber'], options={'tracer': 'ELG'})
selects the index of data base entries whose description contains ‘power’ and ‘cutsky’ or ‘fiber’, and option ‘tracer’ is ‘ELG’.
- Parameters:
id (list, str, default=None) – List of file entry identifiers. Defaults to all identifiers (no selection).
filetype (list, str, default=None) – List of file types. Defaults to all file types (no selection).
keywords (list, str, default=None) – List of keywords to search for in the file entry descriptions. If a string contains several words, all of them must be in the description for the corresponding file entry to be selected. If a list of strings is provided, any of the strings must be in the description for the corresponding file entry to be selected. e.g.
['power cutsky', 'fiber']selects the data base entries whose description contains ‘power’ and ‘cutsky’ or ‘fiber’.empty_error (bool, default=False) – If
True, return (hopefully meaningful) error if no match is found.**kwargs (dict) – Restrict to these options, see
BaseFileEntry.select().
- Returns:
index – List of indices.
- Return type:
list
- iter(include=None, exclude=None, get=None, intersection=False)
Iterate over options that are common to all file entries (
options), and return the list of the (selected)FileManagerinstances.- Parameters:
include (str, list, default=None) – List of options to include in the iteration.
Noneto include all options.exclude (str, list, default=None) – List of options to exclude in the iteration.
Noneto not exclude any option.get (bool, default=None) – If
False, return a list ofFileManagerinstances. IfTrue, callFileManager.get()to return a list ofBaseFileinstances. IfNone, exclude isNone, and there are single options inFileManagerinstances, callFileManager.get()to return a list ofBaseFileinstances.intersection (bool, default=True) – If
False, iterate over all file entry options, i.e. not only those which are common to all file entries.
- Returns:
fms
- Return type:
list of the
FileManagerinstances.
- iter_options(include=None, exclude=None, intersection=False)
Iterate over options that are common to all file entries (
options).- Parameters:
include (str, list, default=None) – List of options to include in the iteration.
Noneto include all options.exclude (str, list, default=None) – List of options to exclude in the iteration.
Noneto not exclude any option.intersection (bool, default=True) – If
False, iterate over all file entry options, i.e. not only those which are common to all file entries.
- Returns:
options
- Return type:
list of the options.
- property options
Return intersection of all options, i.e. options that are common to all file entries.
- save(fn, replace_environ=False)
Save data base to yaml file
fn.- Parameters:
fn (str, Path) – Where to save file data base.
replace_environ (bool, default=False) – If
True, replace environment variables in entry’s path by their values.
- select(id=None, filetype=None, keywords=None, empty_error=False, **kwargs)
Restrict to input identifiers, keywords, or options, e.g.
>>> db.select(keywords=['power cutsky', 'fiber'], options={'tracer': 'ELG'})
selects the data base entries whose description contains ‘power’ and ‘cutsky’ or ‘fiber’, and option ‘tracer’ is ‘ELG’.
- Parameters:
id (list, str, default=None) – List of file entry identifiers. Defaults to all identifiers (no selection).
filetype (list, str, default=None) – List of file types. Defaults to all file types (no selection).
keywords (list, str, default=None) – List of keywords to search for in the file entry descriptions. If a string contains several words, all of them must be in the description for the corresponding file entry to be selected. If a list of strings is provided, any of the strings must be in the description for the corresponding file entry to be selected. e.g.
['power cutsky', 'fiber']selects the data base entries whose description contains ‘power’ and ‘cutsky’ or ‘fiber’.empty_error (bool, default=False) – If
True, return (hopefully meaningful) error if no match is found.ignore (bool, list, default=False) – If
True, also keep file entries that do not take the input options. If list, do not cut file entries to these input options.**kwargs (dict) – Restrict to these options, see
BaseFileEntry.select().
- Returns:
new – Selected data base.
- Return type:
FileEntryCollection
- symlink(raise_error=True)
Create symlink for all files in the collection.
- update(**kwargs)
Update
data(list ofBaseFileEntry) orenviron(dict).
- class desipipe.file_manager.JointMetaClass(name, bases, namespace, **kwargs)
Bases:
ABCMeta,BaseMetaClass- mro()
Return a type’s method resolution order.
- register(subclass)
Register a virtual subclass of an ABC.
Returns the subclass, to allow usage as a class decorator.
- set_logger()
Add attributes for logging:
logger
methods log_debug, log_info, log_warning, log_error, log_critical
- class desipipe.file_manager.RegisteredFileEntry(name, bases, class_dict)
Bases:
BaseMetaClassMetaclass registering
BaseFileEntry-derived classes.- __base__
alias of
BaseMetaClass
- mro()
Return a type’s method resolution order.
- set_logger()
Add attributes for logging:
logger
methods log_debug, log_info, log_warning, log_error, log_critical
- class desipipe.file_manager.YamlLoader(stream)
Bases:
SafeLoaderyaml loader that correctly parses numbers. Taken from https://stackoverflow.com/questions/30458977/yaml-loads-5e-6-as-string-and-not-a-number.
Initialize the scanner.
- check_state_key(key)
Block special attributes/methods from being set in a newly created object, to prevent user-controlled methods from being called during deserialization
- desipipe.file_manager.common_options(list_options, intersection=True)
Return the common options.
- desipipe.file_manager.get_file_entry(file_entry=None, file_entry_collection=None, **kwargs)
Convenient function that returns the file entry.
- Parameters:
file_entry (BaseFileEntry, str, dict, default=None) – A
BaseFileEntryinstance, which is then returned directly, a string specifying the name of the file entry (e.g. ‘base’) or a dictionary of file entry attributes. If not specified, the default file entry in desipipe’s configuration (seeConfig) is used if provided, else ‘base’.**kwargs (dict) – Optionally, additional provider attributes.
- Returns:
provider
- Return type:
BaseProvider
- desipipe.file_manager.in_options(values, options, return_index=False)
Return input values that are in options.
- desipipe.file_manager.yaml_parser(string)
Parse string in yaml format.
I/Os
To implement a new file format, just subclass BaseFile.
- class desipipe.io.RegisteredFile(name, bases, class_dict)
Bases:
BaseMetaClassMetaclass registering
BaseFile-derived classes.- __base__
alias of
BaseMetaClass
- mro()
Return a type’s method resolution order.
- set_logger()
Add attributes for logging:
logger
methods log_debug, log_info, log_warning, log_error, log_critical
- desipipe.io.get_filetype(filetype, path, *args, **kwargs)
Convenient function that returns a
BaseFileinstance.- Parameters:
filetype (str,
BaseFile) – Name ofBaseFile, orBaseFileinstance.path (str) – Path to file.
*args (tuple) – Other arguments for
BaseFile.**kwargs (dict) – Other optional arguments for
BaseFile.
- Returns:
file
- Return type:
BaseFile
Task manager
- exception desipipe.task_manager.DeserializationError
Bases:
Exception- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- class desipipe.task_manager.MyStream(stream, callback=None)
Bases:
object
- class desipipe.task_manager.Queue(name, base_dir=None, create=None)
Bases:
BaseClassQueue keeping track of all tasks that have been run and to be run, with sqlite backend.
Initialize queue.
- Parameters:
name (str) – Name of queue; can contain alphanumeric characters, underscores and hyphens. “this/queue” saves the queue as
base_dir/this/queue.sqlite. “/this/queue” saves the queue as “/this/queue.sqlite” (starting from root).base_dir (str, default=None) – Base directory where to save queue. If
None, defaults toConfig.queue_dir.create (bool, default=None) – If
True, create a new queue; aValueErroris raised if the queue already exists. IfFalse, do not create the queue; a class:ValueError is raised if no queue exists. IfNone(default), create the queue if does not exist.
- __class__
alias of
BaseMetaClass
- add(tasks, replace=False, lock=True)
Add input task(s) to the queue.
- Parameters:
tasks (Task, list) – Task or list of tasks.
replace (bool, default=False) – If
True, replace task(s) (as identified by their IDs) with the input ones. IfFalse, an error is raised if input tasks (as identified by their IDs) are already in the queue. IfNone, do not add task if already in queue, but update task manager if task state is “PENDING” or “WAITING”.lock (bool, default=True) – Ask for lock.
- Returns:
futures – List of
Futurecorresponding to input tasks.- Return type:
list
- clear(kill=True)
Clear queue: delete and recreate.
- counts(state=None, mid=None)
Count the number of tasks.
- Parameters:
state (str, list, default=None) – If not
None, select tasks with given state.mid (str, list, default=None) – If not
None, select tasks with given task manager ID.
- Returns:
counts
- Return type:
int
- delete(kill=True)
Delete data base
dbfrom both this instance and the disk (and delete associated jobs).
- managers(mid=None, property=None)
List managers.
- Parameters:
mid (str, default=None) – If not
None, return the task manager with given ID.property (str, default=None) – If not
None, instead of returning task manager(s), return this property (one of “mid”).
- Returns:
tm – Task manager or property or list of such objects.
- Return type:
TaskManager, list
- pause()
Pause queue, i.e. set state to “PAUSED”.
- property paused
Is queue paused?
- pop(state='PENDING', **kwargs)
Retrieve a task to be run (i.e. in “PENDING” state).
- Parameters:
state (str, list, default="PENDING") – Select a task with this state.
**kwargs (dict) – Optional arguments to select task: tid, mid, name, etc.
- Returns:
task
- Return type:
- processes()
List processes that have been launched.
- resume()
Resume queue, i.e. set state to “ACTIVE”.
- set_task_state(tid, state, jobid=None, t0=None)
Set the state of task with input ID
tidtostate.
- property state
Get queue state (“ACTIVE” or “PAUSED”).
- summary(mid=None, return_type='dict')
Return summary description of queue, i.e. number of tasks in all states
TaskState.ALL.- Parameters:
mid (str, list, default=None) – If not
None, select tasks with given task manager ID.return_type (str, default="dict") – If “dict”, return a dictionary mapping task state to the task counts. If “str”, return a string.
- Returns:
summary
- Return type:
dict, str
- tasks(tid=None, state=None, mid=None, jobid=None, name=None, index=None, one=None, property=None)
List tasks in queue.
- Parameters:
tid (str, list, default=None) – If not
None, select task with given ID.state (str, list, default=None) – If not
None, select tasks with given state.mid (str, list, default=None) – If not
None, select tasks with given task manager ID.jobid (str, list, default=None) – If not
None, select tasks with given job ID.name (str, list, default=None) – If not
None, select tasks with input application name.index (int, list, default=None) – If not
None, select tasks with input index.one (bool, default=None) – If
True, return only one task. IfFalse, return list. IfNone, andtidis notNone, return a single task; else a list of tasks.property (str, list, default=None) – If not
None, instead of returning task(s), return this property (one of “tid”, “state”, “mid”, “jobid”, “t0”, “task_manager”).
- Returns:
tasks – Task or property or property or list of such objects.
- Return type:
Task, list
- class desipipe.task_manager.QueueState
Bases:
object
- exception desipipe.task_manager.SerializationError
Bases:
Exception- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- class desipipe.task_manager.Task(app, kwargs=None, state=None)
Bases:
BaseClassClass representing a task, i.e. a application (
app) and the arguments to call it with (argsandkwargs).- id
Task unique identifier. It is built from the pickle representation of (
app,args,kwargs), such that different tasks have different identifiers (with extremely high probability).- Type:
str
- app
Application.
- Type:
BaseApp
- kwargs
Dictionary of arguments to be passed to
BaseApp.run().- Type:
dict
- require_ids
List of task IDs that are required for this task to be run, infered from
Futureinstances passed toargsandkwargs.- Type:
list
- state
Task state, one of
TaskState.ALL, i.e. (“WAITING”, “PENDING”, “RUNNING”, “SUCCEEDED”, “FAILED”, “KILLED”, “UNKNOWN”).- Type:
str
- jobid
Job identifier (not used for now).
- Type:
str
- errno
0 if no error, else error number (defaults to 42).
- Type:
int
- err
Error message; empty if no error.
- Type:
str
- out
Standard output message.
- Type:
str
- versions
Module versions.
- Type:
dict
- result
Value returned by
BaseApp.func.- Type:
object
- t0
Start time of
BaseApp.run.- Type:
float
- dtime
Running time of
BaseApp.run.- Type:
float
Initialize
Task.- Parameters:
app (BaseApp) – Application.
kwargs (dict, default=None) – Dictionary of arguments to be passed to
BaseApp.run().state (str, default=None) – Task state. Defaults to “WAITING” if this task requires others to be run, else to “PENDING”.
- __class__
alias of
BaseMetaClass
- clone(*args, **kwargs)
Return an updated copy.
- run(**kwargs)
Run task:
call
BaseApp.run, saving main script where the task is defined and package versions in a folder “.desipipe” located in the directory where files are saved (if any).set
state: “KILLED” if termination signal, “FAILED” ifBaseApp.runraised an exception, else “SUCCEEDED”.
- update(**kwargs)
Update task with input attributes.
- class desipipe.task_manager.TaskManager(queue, environ=None, scheduler=None, provider=None)
Bases:
BaseClassTask manager, main class to be used in scripts, e.g.:
queue = Queue("test", base_dir="_tests") tm = TaskManager(queue, environ=Environment(), scheduler=dict(max_workers=10)) @tm.python_app def test(n): print("hello" * n)
- tid
Task manager unique identifier. It is built from the pickle representation of (
environ,scheduler,provider), such that different task managers have different identifiers (with extremely high probability).- Type:
str
- environ
Tasks are run within this environment.
- Type:
BaseEnvironment
- scheduler
Tasks are distributed to workers with this scheduler.
- Type:
BaseScheduler
- provider
Tasks are executed on the machine with this provider.
- Type:
BaseProvider
Initialize
TaskManager.- Parameters:
queue (str, Queue) – Queue, see
get_queue().environ (BaseEnvironment, str, dict, default=None) – Tasks are run within this environment. See
get_environ().scheduler (BaseScheduler, str, dict, default=None) – Tasks are distributed to workers with this scheduler. See
get_scheduler().provider (BaseProvider, str, dict, default=None) – Tasks are executed on the machine with this provider. See
get_provider().
- __class__
alias of
BaseMetaClass
- clone(**kwargs)
Return an updated copy.
- spawn(*args, **kwargs)
Distribute tasks to workers.
- update(**kwargs)
Update task manager attributes.
- class desipipe.task_manager.TaskManagerPickler(file, protocol=None, fix_imports=True, buffer_callback=None)
Bases:
PicklerSpecial pickler for tasks, handling
BaseAppandFutureinstances.- clear_memo()
Clears the pickler’s “memo”.
The memo is the data structure that remembers which objects the pickler has already seen, so that shared or recursive objects are pickled by reference and not by value. This method is useful when re-using picklers.
- dump(obj, /)
Write a pickled representation of the given object to the open file.
- classmethod dumps(obj, *args, **kwargs)
Dump input
objto string. args and kwargs are passed to__init__().
- class desipipe.task_manager.TaskManagerUnpickler(file, queue=None)
Bases:
UnpicklerUnpickler that corresponds to
TaskPickler, replacingFutureinstances by the actual result of the computation.- find_class(module_name, global_name, /)
Return an object from a specified module.
If necessary, the module will be imported. Subclasses may override this method (e.g. to restrict unpickling of arbitrary classes and functions).
This method is called whenever a class or a function object is needed. Both arguments passed are str objects.
- load()
Load a pickle.
Read a pickled object representation from the open file object given in the constructor, and return the reconstituted object hierarchy specified therein.
- classmethod loads(s, *args, **kwargs)
Load input string
s. args and kwargs are passed to__init__().
- class desipipe.task_manager.TaskPickler(*args, reduce_app=<function reduce_app>, **kwargs)
Bases:
PicklerSpecial pickler for tasks, handling
BaseAppandFutureinstances.Initialize pickler and add the special reduce method for
BaseAppto thedispatch_table.- clear_memo()
Clears the pickler’s “memo”.
The memo is the data structure that remembers which objects the pickler has already seen, so that shared or recursive objects are pickled by reference and not by value. This method is useful when re-using picklers.
- dump(obj, /)
Write a pickled representation of the given object to the open file.
- classmethod dumps(obj, *args, **kwargs)
Dump input
objto string. args and kwargs are passed to__init__().
- class desipipe.task_manager.TaskState
Bases:
object
- class desipipe.task_manager.TaskUnpickler(file, queue=None)
Bases:
UnpicklerUnpickler that corresponds to
TaskPickler, replacingFutureinstances by the actual result of the computation.- find_class(module_name, global_name, /)
Return an object from a specified module.
If necessary, the module will be imported. Subclasses may override this method (e.g. to restrict unpickling of arbitrary classes and functions).
This method is called whenever a class or a function object is needed. Both arguments passed are str objects.
- load()
Load a pickle.
Read a pickled object representation from the open file object given in the constructor, and return the reconstituted object hierarchy specified therein.
- classmethod loads(s, *args, **kwargs)
Load input string
s. args and kwargs are passed to__init__().
- class desipipe.task_manager.VarType
Bases:
object
- desipipe.task_manager.action_from_args(action='work', args=None)
Function called when using desipipe from the command line.
- desipipe.task_manager.decorator(func)
Decorator to deal with decorator arguments, e.g.:
@tm.python_app def test(n): print("hello" * n)
and
@tm.python_app(skip=False) def test(n): print("hello" * n)
are equivalent.
- desipipe.task_manager.get_queue(queue, create=None, one=True)
Return queue.
- Parameters:
queue (str, Queue, list) – Queue names,
Queue, or list of such objects.create (bool, default=None) – If
True, create a new queue; aValueErroris raised if the queue already exists. IfFalse, do not create the queue; a class:ValueError is raised if no queue exists. IfNone(default), create the queue if does not exist.one (bool, default=True) – If
True, return one queue. Raise aValueErrorifqueuecorresponds to several queues. IfFalse, ensure a list of queues is returned.
- Returns:
queue – Queue or list of queues.
- Return type:
Queue, list
- desipipe.task_manager.kill(queue=None, provider=None, jobid=None, state=None, **kwargs)
Kill launched jobs.
- Parameters:
queue (Queue, list, default=None) – Queue or list of queues to process. If
None,jobidmust be provided.provider (BaseProvider, str, dict, default=None) – To access
BaseProvider.kill()method. Seeget_provider().jobid (str, list, default=None) – IDs of jobs to kill, defaults to all.
state (str, default=None) – State of tasks to kill, defaults to all.
**kwargs (dict) – Optional arguments to select tasks in
queueto be killed: tid, mid, name. SeeQueue.tasks().
- desipipe.task_manager.reduce_app(self)
Special reduce method for
BaseApp, droppingtask_manager.
- desipipe.task_manager.retry(queue, state='KILLED', **kwargs)
Move (by default killed) tasks into PENDING state, so they are rerun.
- Parameters:
queue (Queue, list, default=None) – Queue or list of queues to process. If
None,jobidmust be provided.state (str, default="KILLED") – State of tasks to move to PENDING state.
**kwargs (dict) – Optional arguments to select tasks in
queueto be moved to PENDING state: tid, mid, name, etc.
- desipipe.task_manager.select_modules(modules)
Select “standard” top-level modules: those which do not start with an underscore.
- desipipe.task_manager.spawn(queue, timeout=86400, timestep=10.0, mode='', max_workers=None, spawn=False)
Distribute tasks to workers. If all queues are paused, the function terminates.
- Parameters:
queue (Queue, list) – Queue or list of queues to process.
timeout (float, default=3600 * 24) – Time out after this delay (in seconds).
timestep (float, default=10.) – Period (in seconds) at which the queue is queried for new tasks, and running / pending jobs are checked (with a refreshment time step of timestep). Increase in case the provider (e.g. Slurm) cannot handle too frequent calls to the state of the queue.
mode (str, default="") – Processing mode. “stop_at_error” to stop as soon as a task is failed. “retry_at_timeout” to retry when time out. “no_stream” to not stream stderr/stdout during the tasks (helps when many jobs in parallel). “no_out” to not stream stderr/stdout and not save stdout.
spawn (bool, default=False) – If
True, spawn a new manager process and exit this one.
- desipipe.task_manager.unique_id(uid)
Return ~ unique ID from input object.
- desipipe.task_manager.work(queue, mid=None, tid=None, name=None, provider=None, mode='', mpicomm=None, mpisplits=None, timestep=120.0)
Do the actual work: pop tasks from the input queue, and run them.
- Parameters:
queue (Queue) – Queue to pop tasks from.
mid (str, list, default=None) – If not
None, take tasks with this task manager ID.tid (str, list, default=None) – If not
None, take a task with this task ID.name (str, list, default=None) – If not
None, take a task with this name.provider (str, default=None) – Name of provider, to get process ID. Defaults to the manager”s provider.
mode (str, default="") – Processing mode. “stop_at_error” to stop as soon as a task is failed. “retry_at_timeout” to retry when time out. “no_stream” to not stream stderr/stdout during the tasks (helps when many jobs in parallel). “no_out” to not stream stderr/stdout and not save stdout.
timestep (float, default=120) – Time step for streaming output.
mpicomm (MPI communicator, default=mpi.COMM_WORLD) – The MPI communicator.
mpisplits (int, default=None) – Split MPI communicator in this number of subcommunicators to run
mpisplitstasks in parallel.
Provider
- desipipe.provider.Provider(provider=None, **kwargs)
Convenient function that returns the provider.
- Parameters:
provider (BaseProvider, str, dict, default=None) – A
BaseProviderinstance, which is then returned directly, a string specifying the name of the provider (e.g. ‘local’) or a dictionary of provider attributes. If not specified, the default provider in desipipe’s configuration (seeConfig) is used if provided, else ‘local’.**kwargs (dict) – Optionally, additional provider attributes.
- Returns:
provider
- Return type:
BaseProvider
- class desipipe.provider.RegisteredProvider(name, bases, class_dict)
Bases:
BaseMetaClassMetaclass registering
BaseProvider-derived classes.- __base__
alias of
BaseMetaClass
- mro()
Return a type’s method resolution order.
- set_logger()
Add attributes for logging:
logger
methods log_debug, log_info, log_warning, log_error, log_critical
- desipipe.provider.decode_slurm_time(s)
Decode Slurm time.
- desipipe.provider.get_provider(provider=None, **kwargs)
Convenient function that returns the provider.
- Parameters:
provider (BaseProvider, str, dict, default=None) – A
BaseProviderinstance, which is then returned directly, a string specifying the name of the provider (e.g. ‘local’) or a dictionary of provider attributes. If not specified, the default provider in desipipe’s configuration (seeConfig) is used if provided, else ‘local’.**kwargs (dict) – Optionally, additional provider attributes.
- Returns:
provider
- Return type:
BaseProvider
- desipipe.provider.time_lru_cache(func)
Wrapper that caches function result for a
dtsecond time period maximum. Idea taken from https://stackoverflow.com/questions/31771286/python-in-memory-cache-with-time-to-live
Scheduler
- class desipipe.scheduler.RegisteredScheduler(name, bases, class_dict)
Bases:
BaseMetaClassMetaclass registering
BaseScheduler-derived classes.- __base__
alias of
BaseMetaClass
- mro()
Return a type’s method resolution order.
- set_logger()
Add attributes for logging:
logger
methods log_debug, log_info, log_warning, log_error, log_critical
- desipipe.scheduler.Scheduler(scheduler=None, **kwargs)
Convenient function that returns the scheduler.
- Parameters:
scheduler (BaseScheduler, str, dict, default=None) – A
BaseSchedulerinstance, which is then returned directly, a string specifying the name of the scheduler (e.g. ‘simple’) or a dictionary of scheduler attributes. If not specified, the default scheduler in desipipe’s configuration (seeConfig) is used if provided, else ‘simple’.**kwargs (dict) – Optionally, additional scheduler attributes.
- Returns:
scheduler
- Return type:
BaseScheduler
- desipipe.scheduler.get_scheduler(scheduler=None, **kwargs)
Convenient function that returns the scheduler.
- Parameters:
scheduler (BaseScheduler, str, dict, default=None) – A
BaseSchedulerinstance, which is then returned directly, a string specifying the name of the scheduler (e.g. ‘simple’) or a dictionary of scheduler attributes. If not specified, the default scheduler in desipipe’s configuration (seeConfig) is used if provided, else ‘simple’.**kwargs (dict) – Optionally, additional scheduler attributes.
- Returns:
scheduler
- Return type:
BaseScheduler
Utilities
A few utilities.
- class desipipe.utils.BaseMetaClass(name, bases, class_dict)
Bases:
typeMetaclass to add logging attributes to
BaseClassderived classes.- mro()
Return a type’s method resolution order.
- set_logger()
Add attributes for logging:
logger
methods log_debug, log_info, log_warning, log_error, log_critical
- class desipipe.utils.DictMetaClass(name, bases, class_dict)
Bases:
BaseMetaClass,ABCMeta- __base__
alias of
BaseMetaClass
- mro()
Return a type’s method resolution order.
- register(subclass)
Register a virtual subclass of an ABC.
Returns the subclass, to allow usage as a class decorator.
- set_logger()
Add attributes for logging:
logger
methods log_debug, log_info, log_warning, log_error, log_critical
- class desipipe.utils.LoggingContext(level=None)
Bases:
objectClass to locally update logging level:
>>> with LoggingContext('warning') as mem: ... # Logging level is warning logger = logging.getLogger('Logger') logger.info('This should not be printed') # not logged logger.warning('This should be printed') # logged ...
- desipipe.utils.copytree(src, dst, symlinks=False, ignore=None, copy_function=<function copy2>, ignore_dangling_symlinks=False, dirs_exist_ok=False, dirs_copystat=True)
Slight modification of shutil.copytree. Added dirs_copystat flag: if false, shutil.copystat is not applied to directories. Motivation: shutil.copystat can fail if the owner of the destination directory or file is different from the current user. shutil.copy2 involves shutil.copystat as well. Thus to avoid this operation completely, use copy_function=shutil.copyfile and dirs_copystat=False.
Recursively copy a directory tree and return the destination directory.
If exception(s) occur, an Error is raised with a list of reasons.
If the optional symlinks flag is true, symbolic links in the source tree result in symbolic links in the destination tree; if it is false, the contents of the files pointed to by symbolic links are copied. If the file pointed by the symlink doesn’t exist, an exception will be added in the list of errors raised in an Error exception at the end of the copy process.
You can set the optional ignore_dangling_symlinks flag to true if you want to silence this exception. Notice that this has no effect on platforms that don’t support os.symlink.
The optional ignore argument is a callable. If given, it is called with the src parameter, which is the directory being visited by copytree(), and names which is the list of src contents, as returned by os.listdir():
callable(src, names) -> ignored_names
Since copytree() is called recursively, the callable will be called once for each directory that is copied. It returns a list of names relative to the src directory that should not be copied.
The optional copy_function argument is a callable that will be used to copy each file. It will be called with the source path and the destination path as arguments. By default, copy2() is used, but any function that supports the same signature (like copy()) can be used.
If dirs_exist_ok is false (the default) and dst already exists, a FileExistsError is raised. If dirs_exist_ok is true, the copying operation will continue if it encounters existing directories, and files within the dst tree will be overwritten by corresponding files from the src tree.
- desipipe.utils.dict_to_yaml(d)
(Recursively) cast objects of input dictionary
dto Python base types, such that they can be understood by the base yaml.
- desipipe.utils.exception_handler(exc_type, exc_value, exc_traceback, mpicomm=None)
Print exception with a logger.
- desipipe.utils.is_path(item)
Whether input item is a path.
- desipipe.utils.is_sequence(item)
Whether input item is a tuple or list.
- desipipe.utils.mkdir(dirname, **kwargs)
Try to create
dirnameand catchOSError.
- desipipe.utils.setup_logging(level=20, stream=None, filename=None, filemode='w', **kwargs)
Set up logging.
Note
You may find it useful to have different logging level depending on the process; e.g.
setup_logging(level=(logging.INFO if mpicomm.rank == 0 else logging.ERROR))will set INFO level on rank 0, and ERROR level on all other ranks of the MPI communicatormpicomm.- Parameters:
level (string, int, default=logging.INFO) – Logging level.
stream (_io.TextIOWrapper, default=sys.stdout) – Where to stream.
filename (string, default=None) – If not
Nonestream to file name.filemode (string, default='w') – Mode to open file, only used if filename is not
None.kwargs (dict) – Other arguments for
logging.basicConfig().