Title: | Simple Multi-Threaded Task Queuing |
---|---|
Description: | Implements a simple multi-threaded task queue using R6 classes. |
Authors: | Danielle Navarro [aut, cre] |
Maintainer: | Danielle Navarro <[email protected]> |
License: | MIT + file LICENSE |
Version: | 0.0.2 |
Built: | 2025-03-11 03:15:09 UTC |
Source: | https://github.com/djnavarro/queue |
A Queue
executes tasks concurrently using multiple workers.
The Queue
class is primary interface provided by the queue package. It
allows users to execute an arbitrary collection of tasks in parallel across
multiple R sessions, managed automatically in the background. Once a new
queue is initialised, tasks can be added to the queue using the add()
method. Once all tasks are added, they are executed in parallel
by calling the run()
method. When completed, run()
returns a
tibble that contains the results for all tasks, and some additional
metadata besides.
Internally, a Queue
uses a TaskList
object as its data store and a
WorkerPool
object to execute the tasks in parallel. These objects can be
accessed by calling the get_tasks()
method and the get_workers()
methods.
Usually you would not need to do this, but occasionally it can be useful
because those objects have some handy methods that allow finer-grained
control (see the documentation for TaskList
and WorkerPool
respectively).
new()
Create a task queue
Queue$new(workers = 4L)
workers
Either the number of workers to employ in the task queue,
or a WorkerPool
object to use when deploying the tasks.
A new Queue
object
add()
Adds a task to the queue
Queue$add(fun, args = list(), id = NULL)
fun
The function to be called when the task is scheduled
args
A list of arguments to be passed to the task function (optional)
id
A string specifying a unique identifier for the task (optional: tasks will be named "task_1", "task_2", etc if this is unspecified)
Invisibly returns the Task
object
run()
Execute tasks in parallel using the worker pool, assigning tasks to workers in the same order in which they were added to the queue
Queue$run( timelimit = 60, message = "minimal", interval = 0.05, shutdown = TRUE )
timelimit
How long (in seconds) should the worker pool wait for a task to complete before terminating the child process and moving onto the next task? (default is 60 seconds, but this is fairly arbitrary)
message
What messages should be reported by the queue while it is running? Options are "none" (no messages), "minimal" (a spinner is shown alongside counts of waiting, running, and completed tasks), and "verbose" (in addition to the spinner, each task is summarized as it completes). Default is "minimal".
interval
How often should the task queue poll the workers to see if they have finished their assigned tasks? Specified in seconds.
shutdown
Should the workers in the pool be shut down (i.e., all
R sessions closed) once the tasks are completed. Defaults to TRUE
.
Returns a tibble containing the results of all tasks and
various other useful metadata. Contains one row per task in the
Queue
, and the following columns:
task_id
A character string specifying the task identifiers
worker_id
An integer specifying the worker process ids (pid)
state
A character string indicating the status of each task
("created", "waiting", "assigned", "running", or "done")
result
A list containing the function outputs, or NULL
runtime
Completion time for the task (NA if the task is not done)
fun
A list containing the functions
args
A list containing the arguments passed to each function
created
The time at which each task was created
queued
The time at which each task was added to a Queue
assigned
The time at which each task was assigned to a Worker
started
The time at which a Worker
called each function
finished
The time at which a Worker
output was returned for the task
code
The status code returned by the callr R session (integer)
message
The message returned by the callr R session (character)
stdout
List column containing the contents of stdout during function execution
stderr
List column containing the contents of stderr during function execution
error
List column containing NULL
values
Note: at present there is one field from the callr rsession::read() method
that isn't captured here, and that's the error field. I'll add that after
I've finished wrapping my head around what that actually does. The error
column, at present, is included only as a placeholder
get_workers()
Retrieve the workers
Queue$get_workers()
A WorkerPool
object
get_tasks()
Retrieve the tasks
Queue$get_tasks()
A TaskList
object
clone()
The objects of this class are cloneable with this method.
Queue$clone(deep = FALSE)
deep
Whether to make a deep clone.
queue <- Queue$new(workers = 4L) wait <- function(x) Sys.sleep(runif(1)) for(i in 1:6) queue$add(wait) queue$run()
queue <- Queue$new(workers = 4L) wait <- function(x) Sys.sleep(runif(1)) for(i in 1:6) queue$add(wait) queue$run()
A Task
stores a function, arguments, output, and metadata.
A Task
object is used as a storage class. It is a container used to hold an
R function and any arguments to be passed to the function. It can also hold
any output returned by the function, anything printed to stdout or stderr
when the function is called, and various other metadata such as the process
id of the worker that executed the function, timestamps, and so on.
The methods for Task
objects fall into two groups, roughly speaking. The
get_*()
methods are used to return information about the Task
, and the
register_*()
methods are used to register information related to events
relevant to the Task
status.
The retrieve()
method is special, and returns a tibble containing all
information stored about the task. Objects further up the hierarchy use this
method to return nicely organised output that summarise the results from
many tasks.
new()
Create a new task. Conceptually, a Task
is viewed as a
function that will be executed by the Worker
to which it is assigned,
and it is generally expected that any resources the function requires
are passed through the arguments since the execution context will be a
different R session to the one in which the function is defined.
Task$new(fun, args = list(), id = NULL)
fun
The function to be called when the task executes.
args
A list of arguments to be passed to the function (optional).
id
A string specifying a unique task identifier (optional).
A new Task
object.
retrieve()
Retrieve a tidy summary of the task state.
Task$retrieve()
A tibble containing a single row, and the following columns:
task_id
A character string specifying the task identifier
worker_id
An integer specifying the worker process id (pid)
state
A character string indicating the task status ("created",
"waiting", "assigned", "running", or "done")
result
A list containing the function output, or NULL
runtime
Completion time for the task (NA if the task is not done)
fun
A list containing the function
args
A list containing the arguments
created
The time at which the task was created
queued
The time at which the task was added to a Queue
assigned
The time at which the task was assigned to a Worker
started
The time at which the Worker
called the function
finished
The time at which the Worker
output was returned
code
The status code returned by the callr R session (integer)
message
The message returned by the callr R session (character)
stdout
List containing the contents of stdout during function execution
stderr
List containing the contents of stderr during function execution
error
List containing NULL
Note: at present there is one field from the callr rsession::read() method
that isn't captured here, and that's the error field. I'll add that after
I've finished wrapping my head around what that actually does. The error
column, at present, is included only as a placeholder
get_task_fun()
Retrieve the task function.
Task$get_task_fun()
A function.
get_task_args()
Retrieve the task arguments
Task$get_task_args()
A list.
get_task_state()
Retrieve the task state.
Task$get_task_state()
A string specifying the current state of the task. Possible values are "created" (task exists), "waiting" (task exists and is waiting in a queue), "assigned" (task has been assigned to a worker but has not yet started), "running" (task is running on a worker), or "done" (task is completed and results have been assigned back to the task object)
get_task_id()
Retrieve the task id.
Task$get_task_id()
A string containing the task identifier.
get_task_runtime()
Retrieve the task runtime.
Task$get_task_runtime()
If the task has completed, a difftime value. If the task has
yet to complete, a NA
value is returned
register_task_created()
Register the task creation by updating internal storage.
When this method is called, the state of the Task
is set to "created"
and a timestamp is recorded, registering the creation time for the task.
This method is intended to be called by Worker
objects. Users should
not need to call it.
Task$register_task_created()
Returns NULL
invisibly.
register_task_waiting()
Register the addition of the task to a queue by updating
internal storage. When this method is called, the state of the Task
is set to "waiting" and a timestamp is recorded, registering the time
at which the task was added to a queue. This method is intended to be
called by Worker
objects. Users should not need to call it.
Task$register_task_waiting()
Returns NULL
invisibly.
register_task_assigned()
Register the assignment of a task to a worker by updating
internal storage. When this method is called, the state of the Task
is set to "assigned" and a timestamp is recorded, registering the time
at which the task was assigned to a Worker
. In addition, the
worker_id
of the worker object (which is also it's pid) is registered
with the task. This method is intended to be called by Worker
objects.
Users should not need to call it.
Task$register_task_assigned(worker_id)
worker_id
Identifier for the worker to which the task is assigned.
Returns NULL
invisibly.
register_task_running()
Register the commencement of a task to a worker by updating
internal storage. When this method is called, the state of the Task
is
set to "running" and a timestamp is recorded, registering the time at
which the Worker
called the task function. In addition, the worker_id
is recorded, albeit somewhat unnecessarily since this information is
likely already stored when register_task_assigned()
is called. This
method is intended to be called by Worker
objects. Users should not
need to call it.
Task$register_task_running(worker_id)
worker_id
Identifier for the worker on which the task is starting.
Returns NULL
invisibly.
register_task_done()
Register the finishing of a task to a worker by updating
internal storage. When this method is called, the state of the Task
is
set to "done" and a timestamp is recorded, registering the time at which
the Worker
returned results to the Task
. The results
object is
read from the R session, and is stored locally by the Task
at this time.
This method is intended to be called by Worker
objects. Users should
not need to call it.
Task$register_task_done(results)
results
Results read from the R session.
Returns NULL
invisibly.
clone()
The objects of this class are cloneable with this method.
Task$clone(deep = FALSE)
deep
Whether to make a deep clone.
A TaskList
stores and retrieves one or more tasks.
The TaskList
class is used as a storage class. It provides a container that
holds a collection of Task
objects, along with a collection of methods for
adding, removing, and getting Task
s. It can also report on the status of the
Task
s contained within the list and retrieve results from those Task
s. What
it cannot do is manage interactions with Worker
s or arrange for the Task
s to
be executed. That's the job of the Queue
.
new()
Create a new task list
TaskList$new()
length()
Return the number of tasks in the list
TaskList$length()
Integer
add_task()
Add a task to the TaskList
TaskList$add_task(task)
task
The Task
object to be added
remove_task()
This method removes one or more tasks from the TaskList
.
TaskList$remove_task(x)
x
Indices of the tasks to be removed
get_task()
Return a single Task
contained in the TaskList
. The
Task
is not removed from the TaskList
, and has reference semantics:
if the listed task is completed by a Worker
, then the status of any
Task
returned by this method will update automatically
TaskList$get_task(x)
x
The index the task to return
A Task
object
get_state()
Return the status of all tasks in the TaskList
.
TaskList$get_state()
A character vector specifying the completion status for all listed tasks
get_tasks_in_state()
Return a list of tasks in a given state
TaskList$get_tasks_in_state(x)
x
The name of the state (e.g., "waiting")
A TaskList
object
retrieve()
Retrieves the current state of all tasks.
TaskList$retrieve()
Returns a tibble containing the results of all tasks and
various other useful metadata. Contains one row per task in the
TaskList
, and the following columns:
task_id
A character string specifying the task identifiers
worker_id
An integer specifying the worker process ids (pid)
state
A character string indicating the status of each task
("created", "waiting", "assigned", "running", or "done")
result
A list containing the function outputs, or NULL
runtime
Completion time for the task (NA if the task is not done)
fun
A list containing the functions
args
A list containing the arguments passed to each function
created
The time at which each task was created
queued
The time at which each task was added to a Queue
assigned
The time at which each task was assigned to a Worker
started
The time at which a Worker
called each function
finished
The time at which a Worker
output was returned for the task
code
The status code returned by the callr R session (integer)
message
The message returned by the callr R session (character)
stdout
List column containing the contents of stdout during function execution
stderr
List column containing the contents of stderr during function execution
error
List column containing NULL
values
If all tasks have completed this output is the same as the output as the
run()
method for a Queue
object.
Note: at present there is one field from the callr rsession::read() method
that isn't captured here, and that's the error field. I'll add that after
I've finished wrapping my head around what that actually does. The error
column, at present, is included only as a placeholder
clone()
The objects of this class are cloneable with this method.
TaskList$clone(deep = FALSE)
deep
Whether to make a deep clone.
A Worker
manages an external R session and completes tasks.
The Worker
class interacts with an external R session, and possesses
methods that allow it to work with Task
objects. At its core, the
class is a thin wrapper around a callr::r_session
object, and in fact
the session object itself can be obtained by calling the
get_worker_session()
method. In most cases this shouldn't be necessary
however, because Worker
objects are typically created as part of a
WorkerPool
that is managed by a Queue
, and those higher level structures
use the methods exposed by the Worker
object.
new()
Create a new worker object.
Worker$new()
A new Worker
object.
get_worker_id()
Retrieve the worker identifier.
Worker$get_worker_id()
The worker identifier, which also the process id for the R session
get_worker_state()
Retrieve the worker state.
Worker$get_worker_state()
A string specifying the current state of the R session. Possible values are:
"starting"
: the R session is starting up.
"idle"
: the R session is ready to compute.
"busy"
: the R session is computing.
"finished"
: the R session has terminated.
Importantly, note that a task
function that is still running and a task function that is essentially
finished and waiting to return will both return "busy". To distinguish
between these two cases you need to use the poll_process()
method of
a callr::rsession
, as returned by get_worker_session()
.
get_worker_runtime()
Return the total length of time the worker session
has been running, and the length of the time that the current task
has been running. If the session is finished both values are NA
.
If the session is idle (no task running) the total session time will
return a value but the current task time will be NA
.
Worker$get_worker_runtime()
A vector of two difftimes.
get_worker_task()
Retrieve the task assigned to the worker.
Worker$get_worker_task()
The Task
object currently assigned to this Worker
, or NULL
.
get_worker_session()
Retrieve the R session associated with a Worker
Worker$get_worker_session()
An R session object, see callr::r_session
try_assign()
Attempt to assign a task to this worker. This method checks
that the task and the worker are both in an appropriate state. If they
are, both objects register their connection to the other. This method is
intended to be called by a WorkerPool
or a Queue
.
Worker$try_assign(task)
task
A Task
object corresponding to the to-be-assigned task.
Invisibly returns TRUE
or FALSE
, depending on whether the
attempt was successful.
try_start()
Attempt to start the task. This method checks to see if the
that worker has an assigned task, and if so starts it running within the
R session. It also registers the change of status within the Task
object itself. This method is intended to be called by a WorkerPool
or a Queue
.
Worker$try_start()
Invisibly returns TRUE
or FALSE
, depending on whether the
attempt was successful.
try_finish()
Attempt to finish a running task politely. This method checks
to see if the worker has a running task, and if so polls the R session to
determine if the R process claims to be ready to return. If there is a
ready-to-return task the results are read from the R process and returned
to the Task
object. The task status is updated, and then unassigned
from the Worker
. This method is intended to be called by a WorkerPool
or a Queue
.
Worker$try_finish(timeout = 0)
timeout
Length of time to wait when process is polled (default = 0)
Invisibly returns TRUE
or FALSE
, depending on whether the
attempt was successful.
shutdown_worker()
Attempt to shut down the R session gracefully, after making
an attempt to salvage any task that the worker believes it has been
assigned. The salvage operation depends on the state of the task. If the
Task
has been assigned but not started, the Worker
will return it
to a "waiting" state in the hope that the Queue
will assign it to
another worker later, and unassign it. If the Task
is running, the
Worker
will attempt to read from the R session and then register the
Task
as "done" regardless of the outcome. (The reason for this is to
ensure that tasks that crash or freeze the R session don't get returned
to the Queue
).
Worker$shutdown_worker(grace = 1000)
grace
Grace period in milliseconds. If the process is still running after this period, it will be killed.
clone()
The objects of this class are cloneable with this method.
Worker$clone(deep = FALSE)
deep
Whether to make a deep clone.
A WorkerPool
manages multiple workers.
The implementation for a WorkerPool
is essentially a container that holds
one or more Worker
objects, and posesses methods that allow it to instruct
them to assign, start, and complete Task
s. It can also check to
see if any of the R sessions associated with the Worker
s have crashed or
stalled, and replace them as needed.
new()
Create a new worker pool
WorkerPool$new(workers = 4L)
workers
The number of workers in the pool.
A new WorkerPool
object.
get_pool_worker()
Return a specific Worker
WorkerPool$get_pool_worker(x)
x
An integer specifying the index of the worker in the pool.
The corresponding Worker
object.
get_pool_state()
Return a summary of the worker pool
WorkerPool$get_pool_state()
A named character vector specifying the current state of each worker ("starting", "idle", "busy", or "finished"). Names denote worker ids, and the interpretations of each return value is as follows:
"starting"
: the R session is starting up.
"idle"
: the R session is ready to compute.
"busy"
: the R session is computing.
"finished"
: the R session has terminated.
try_assign()
Attempt to assign tasks to workers. This method is
intended to be called by Queue
objects. When called, this method
will iterate over tasks in the list and workers in the pool, assigning
tasks to workers as long as there are both idle workers and waiting
tasks. It returns once it runs out of one resource or the other. Note
that this method assigns tasks to workers: it does not instruct the
workers to to start working on the tasks. That is the job of
try_start()
.
WorkerPool$try_assign(tasks)
tasks
A TaskList
object
Invisibly returns NULL
try_start()
Iterates over Workers
in the pool and asks them to
start any jobs that the have been assigned but have not yet started.
This method is intended to be called by Queue
objects.
WorkerPool$try_start()
Invisibly returns NULL
try_finish()
Iterate over Workers
in the pool and checks to see if
any of the busy sessions are ready to return results. For those that
are, it finishes the tasks and ensures those results are returned to
the Task
object. This method is intended to be called by Queue
objects.
WorkerPool$try_finish()
Invisibly returns NULL
refill_pool()
Check the WorkerPool
looking for Workers
that
have crashed or been shutdown, and replace them with fresh workers.
WorkerPool$refill_pool()
This function is called primarily for its side effect. It returns a named character documenting the outcome, indicating the current state of each worker: should not be "finished" for any worker. Names denote worker ids.
shutdown_pool()
Terminate all workers in the pool.
WorkerPool$shutdown_pool(grace = 1000)
grace
Grace period in milliseconds. If a worker process is still running after this period, it will be killed.
This function is called primarily for its side effect. It returns a named character documenting the outcome, indicating the current state of each worker: should be "finished" for all workers. Names denote worker ids.
shutdown_overdue_workers()
Terminate workers that have worked on their current task for longer than a pre-specified time limit.
WorkerPool$shutdown_overdue_workers(timelimit, grace = 1000)
timelimit
Pre-specified time limit for the task, in seconds.
grace
Grace period for the shutdown, in milliseconds. If a worker process is still running after this period, it will be killed.
This function is called primarily for its side effect. It returns a named character documenting the outcome, indicating the current state of each worker: should be "finished" for all workers. Names denote worker ids.
clone()
The objects of this class are cloneable with this method.
WorkerPool$clone(deep = FALSE)
deep
Whether to make a deep clone.