| Title: | Simple Multi-Threaded Task Queuing |
|---|---|
| Description: | Implements a simple multi-threaded task queue using R6 classes. |
| Authors: | Danielle Navarro [aut, cre] (ORCID: <https://orcid.org/0000-0001-7648-6578>) |
| Maintainer: | Danielle Navarro <[email protected]> |
| License: | MIT + file LICENSE |
| Version: | 0.0.2 |
| Built: | 2026-05-20 08:08:04 UTC |
| Source: | https://github.com/djnavarro/queue |
A Queue executes tasks concurrently using multiple workers.
The Queue class is primary interface provided by the queue package. It
allows users to execute an arbitrary collection of tasks in parallel across
multiple R sessions, managed automatically in the background. Once a new
queue is initialised, tasks can be added to the queue using the add()
method. Once all tasks are added, they are executed in parallel
by calling the run() method. When completed, run() returns a
tibble that contains the results for all tasks, and some additional
metadata besides.
Internally, a Queue uses a TaskList object as its data store and a
WorkerPool object to execute the tasks in parallel. These objects can be
accessed by calling the get_tasks() method and the get_workers() methods.
Usually you would not need to do this, but occasionally it can be useful
because those objects have some handy methods that allow finer-grained
control (see the documentation for TaskList and WorkerPool respectively).
new()
Create a task queue
Queue$new(workers = 4L)
workersEither the number of workers to employ in the task queue,
or a WorkerPool object to use when deploying the tasks.
A new Queue object
add()
Adds a task to the queue
Queue$add(fun, args = list(), id = NULL)
funThe function to be called when the task is scheduled
argsA list of arguments to be passed to the task function (optional)
idA string specifying a unique identifier for the task (optional: tasks will be named "task_1", "task_2", etc if this is unspecified)
Invisibly returns the Task object
run()
Execute tasks in parallel using the worker pool, assigning tasks to workers in the same order in which they were added to the queue
Queue$run( timelimit = 60, message = "minimal", interval = 0.05, shutdown = TRUE )
timelimitHow long (in seconds) should the worker pool wait for a task to complete before terminating the child process and moving onto the next task? (default is 60 seconds, but this is fairly arbitrary)
messageWhat messages should be reported by the queue while it is running? Options are "none" (no messages), "minimal" (a spinner is shown alongside counts of waiting, running, and completed tasks), and "verbose" (in addition to the spinner, each task is summarized as it completes). Default is "minimal".
intervalHow often should the task queue poll the workers to see if they have finished their assigned tasks? Specified in seconds.
shutdownShould the workers in the pool be shut down (i.e., all
R sessions closed) once the tasks are completed. Defaults to TRUE.
Returns a tibble containing the results of all tasks and
various other useful metadata. Contains one row per task in the
Queue, and the following columns:
task_id A character string specifying the task identifiers
worker_id An integer specifying the worker process ids (pid)
state A character string indicating the status of each task
("created", "waiting", "assigned", "running", or "done")
result A list containing the function outputs, or NULL
runtime Completion time for the task (NA if the task is not done)
fun A list containing the functions
args A list containing the arguments passed to each function
created The time at which each task was created
queued The time at which each task was added to a Queue
assigned The time at which each task was assigned to a Worker
started The time at which a Worker called each function
finished The time at which a Worker output was returned for the task
code The status code returned by the callr R session (integer)
message The message returned by the callr R session (character)
stdout List column containing the contents of stdout during function execution
stderr List column containing the contents of stderr during function execution
error List column containing NULL values
Note: at present there is one field from the callr rsession::read() method
that isn't captured here, and that's the error field. I'll add that after
I've finished wrapping my head around what that actually does. The error
column, at present, is included only as a placeholder
get_workers()
Retrieve the workers
Queue$get_workers()
A WorkerPool object
get_tasks()
Retrieve the tasks
Queue$get_tasks()
A TaskList object
clone()
The objects of this class are cloneable with this method.
Queue$clone(deep = FALSE)
deepWhether to make a deep clone.
queue <- Queue$new(workers = 4L) wait <- function(x) Sys.sleep(runif(1)) for(i in 1:6) queue$add(wait) queue$run()queue <- Queue$new(workers = 4L) wait <- function(x) Sys.sleep(runif(1)) for(i in 1:6) queue$add(wait) queue$run()
A Task stores a function, arguments, output, and metadata.
A Task object is used as a storage class. It is a container used to hold an
R function and any arguments to be passed to the function. It can also hold
any output returned by the function, anything printed to stdout or stderr
when the function is called, and various other metadata such as the process
id of the worker that executed the function, timestamps, and so on.
The methods for Task objects fall into two groups, roughly speaking. The
get_*() methods are used to return information about the Task, and the
register_*() methods are used to register information related to events
relevant to the Task status.
The retrieve() method is special, and returns a tibble containing all
information stored about the task. Objects further up the hierarchy use this
method to return nicely organised output that summarise the results from
many tasks.
new()
Create a new task. Conceptually, a Task is viewed as a
function that will be executed by the Worker to which it is assigned,
and it is generally expected that any resources the function requires
are passed through the arguments since the execution context will be a
different R session to the one in which the function is defined.
Task$new(fun, args = list(), id = NULL)
funThe function to be called when the task executes.
argsA list of arguments to be passed to the function (optional).
idA string specifying a unique task identifier (optional).
A new Task object.
retrieve()
Retrieve a tidy summary of the task state.
Task$retrieve()
A tibble containing a single row, and the following columns:
task_id A character string specifying the task identifier
worker_id An integer specifying the worker process id (pid)
state A character string indicating the task status ("created",
"waiting", "assigned", "running", or "done")
result A list containing the function output, or NULL
runtime Completion time for the task (NA if the task is not done)
fun A list containing the function
args A list containing the arguments
created The time at which the task was created
queued The time at which the task was added to a Queue
assigned The time at which the task was assigned to a Worker
started The time at which the Worker called the function
finished The time at which the Worker output was returned
code The status code returned by the callr R session (integer)
message The message returned by the callr R session (character)
stdout List containing the contents of stdout during function execution
stderr List containing the contents of stderr during function execution
error List containing NULL
Note: at present there is one field from the callr rsession::read() method
that isn't captured here, and that's the error field. I'll add that after
I've finished wrapping my head around what that actually does. The error
column, at present, is included only as a placeholder
get_task_fun()
Retrieve the task function.
Task$get_task_fun()
A function.
get_task_args()
Retrieve the task arguments
Task$get_task_args()
A list.
get_task_state()
Retrieve the task state.
Task$get_task_state()
A string specifying the current state of the task. Possible values are "created" (task exists), "waiting" (task exists and is waiting in a queue), "assigned" (task has been assigned to a worker but has not yet started), "running" (task is running on a worker), or "done" (task is completed and results have been assigned back to the task object)
get_task_id()
Retrieve the task id.
Task$get_task_id()
A string containing the task identifier.
get_task_runtime()
Retrieve the task runtime.
Task$get_task_runtime()
If the task has completed, a difftime value. If the task has
yet to complete, a NA value is returned
register_task_created()
Register the task creation by updating internal storage.
When this method is called, the state of the Task is set to "created"
and a timestamp is recorded, registering the creation time for the task.
This method is intended to be called by Worker objects. Users should
not need to call it.
Task$register_task_created()
Returns NULL invisibly.
register_task_waiting()
Register the addition of the task to a queue by updating
internal storage. When this method is called, the state of the Task
is set to "waiting" and a timestamp is recorded, registering the time
at which the task was added to a queue. This method is intended to be
called by Worker objects. Users should not need to call it.
Task$register_task_waiting()
Returns NULL invisibly.
register_task_assigned()
Register the assignment of a task to a worker by updating
internal storage. When this method is called, the state of the Task
is set to "assigned" and a timestamp is recorded, registering the time
at which the task was assigned to a Worker. In addition, the
worker_id of the worker object (which is also it's pid) is registered
with the task. This method is intended to be called by Worker objects.
Users should not need to call it.
Task$register_task_assigned(worker_id)
worker_idIdentifier for the worker to which the task is assigned.
Returns NULL invisibly.
register_task_running()
Register the commencement of a task to a worker by updating
internal storage. When this method is called, the state of the Task is
set to "running" and a timestamp is recorded, registering the time at
which the Worker called the task function. In addition, the worker_id
is recorded, albeit somewhat unnecessarily since this information is
likely already stored when register_task_assigned() is called. This
method is intended to be called by Worker objects. Users should not
need to call it.
Task$register_task_running(worker_id)
worker_idIdentifier for the worker on which the task is starting.
Returns NULL invisibly.
register_task_done()
Register the finishing of a task to a worker by updating
internal storage. When this method is called, the state of the Task is
set to "done" and a timestamp is recorded, registering the time at which
the Worker returned results to the Task. The results object is
read from the R session, and is stored locally by the Task at this time.
This method is intended to be called by Worker objects. Users should
not need to call it.
Task$register_task_done(results)
resultsResults read from the R session.
Returns NULL invisibly.
clone()
The objects of this class are cloneable with this method.
Task$clone(deep = FALSE)
deepWhether to make a deep clone.
A TaskList stores and retrieves one or more tasks.
The TaskList class is used as a storage class. It provides a container that
holds a collection of Task objects, along with a collection of methods for
adding, removing, and getting Tasks. It can also report on the status of the
Tasks contained within the list and retrieve results from those Tasks. What
it cannot do is manage interactions with Workers or arrange for the Tasks to
be executed. That's the job of the Queue.
new()
Create a new task list
TaskList$new()
length()
Return the number of tasks in the list
TaskList$length()
Integer
add_task()
Add a task to the TaskList
TaskList$add_task(task)
taskThe Task object to be added
remove_task()
This method removes one or more tasks from the TaskList.
TaskList$remove_task(x)
xIndices of the tasks to be removed
get_task()
Return a single Task contained in the TaskList. The
Task is not removed from the TaskList, and has reference semantics:
if the listed task is completed by a Worker, then the status of any
Task returned by this method will update automatically
TaskList$get_task(x)
xThe index the task to return
A Task object
get_state()
Return the status of all tasks in the TaskList.
TaskList$get_state()
A character vector specifying the completion status for all listed tasks
get_tasks_in_state()
Return a list of tasks in a given state
TaskList$get_tasks_in_state(x)
xThe name of the state (e.g., "waiting")
A TaskList object
retrieve()
Retrieves the current state of all tasks.
TaskList$retrieve()
Returns a tibble containing the results of all tasks and
various other useful metadata. Contains one row per task in the
TaskList, and the following columns:
task_id A character string specifying the task identifiers
worker_id An integer specifying the worker process ids (pid)
state A character string indicating the status of each task
("created", "waiting", "assigned", "running", or "done")
result A list containing the function outputs, or NULL
runtime Completion time for the task (NA if the task is not done)
fun A list containing the functions
args A list containing the arguments passed to each function
created The time at which each task was created
queued The time at which each task was added to a Queue
assigned The time at which each task was assigned to a Worker
started The time at which a Worker called each function
finished The time at which a Worker output was returned for the task
code The status code returned by the callr R session (integer)
message The message returned by the callr R session (character)
stdout List column containing the contents of stdout during function execution
stderr List column containing the contents of stderr during function execution
error List column containing NULL values
If all tasks have completed this output is the same as the output as the
run() method for a Queue object.
Note: at present there is one field from the callr rsession::read() method
that isn't captured here, and that's the error field. I'll add that after
I've finished wrapping my head around what that actually does. The error
column, at present, is included only as a placeholder
clone()
The objects of this class are cloneable with this method.
TaskList$clone(deep = FALSE)
deepWhether to make a deep clone.
A Worker manages an external R session and completes tasks.
The Worker class interacts with an external R session, and possesses
methods that allow it to work with Task objects. At its core, the
class is a thin wrapper around a callr::r_session object, and in fact
the session object itself can be obtained by calling the
get_worker_session() method. In most cases this shouldn't be necessary
however, because Worker objects are typically created as part of a
WorkerPool that is managed by a Queue, and those higher level structures
use the methods exposed by the Worker object.
new()
Create a new worker object.
Worker$new()
A new Worker object.
get_worker_id()
Retrieve the worker identifier.
Worker$get_worker_id()
The worker identifier, which also the process id for the R session
get_worker_state()
Retrieve the worker state.
Worker$get_worker_state()
A string specifying the current state of the R session. Possible values are:
"starting": the R session is starting up.
"idle": the R session is ready to compute.
"busy": the R session is computing.
"finished": the R session has terminated.
Importantly, note that a task
function that is still running and a task function that is essentially
finished and waiting to return will both return "busy". To distinguish
between these two cases you need to use the poll_process() method of
a callr::rsession, as returned by get_worker_session().
get_worker_runtime()
Return the total length of time the worker session
has been running, and the length of the time that the current task
has been running. If the session is finished both values are NA.
If the session is idle (no task running) the total session time will
return a value but the current task time will be NA.
Worker$get_worker_runtime()
A vector of two difftimes.
get_worker_task()
Retrieve the task assigned to the worker.
Worker$get_worker_task()
The Task object currently assigned to this Worker, or NULL.
get_worker_session()
Retrieve the R session associated with a Worker
Worker$get_worker_session()
An R session object, see callr::r_session
try_assign()
Attempt to assign a task to this worker. This method checks
that the task and the worker are both in an appropriate state. If they
are, both objects register their connection to the other. This method is
intended to be called by a WorkerPool or a Queue.
Worker$try_assign(task)
taskA Task object corresponding to the to-be-assigned task.
Invisibly returns TRUE or FALSE, depending on whether the
attempt was successful.
try_start()
Attempt to start the task. This method checks to see if the
that worker has an assigned task, and if so starts it running within the
R session. It also registers the change of status within the Task
object itself. This method is intended to be called by a WorkerPool
or a Queue.
Worker$try_start()
Invisibly returns TRUE or FALSE, depending on whether the
attempt was successful.
try_finish()
Attempt to finish a running task politely. This method checks
to see if the worker has a running task, and if so polls the R session to
determine if the R process claims to be ready to return. If there is a
ready-to-return task the results are read from the R process and returned
to the Task object. The task status is updated, and then unassigned
from the Worker. This method is intended to be called by a WorkerPool
or a Queue.
Worker$try_finish(timeout = 0)
timeoutLength of time to wait when process is polled (default = 0)
Invisibly returns TRUE or FALSE, depending on whether the
attempt was successful.
shutdown_worker()
Attempt to shut down the R session gracefully, after making
an attempt to salvage any task that the worker believes it has been
assigned. The salvage operation depends on the state of the task. If the
Task has been assigned but not started, the Worker will return it
to a "waiting" state in the hope that the Queue will assign it to
another worker later, and unassign it. If the Task is running, the
Worker will attempt to read from the R session and then register the
Task as "done" regardless of the outcome. (The reason for this is to
ensure that tasks that crash or freeze the R session don't get returned
to the Queue).
Worker$shutdown_worker(grace = 1000)
graceGrace period in milliseconds. If the process is still running after this period, it will be killed.
clone()
The objects of this class are cloneable with this method.
Worker$clone(deep = FALSE)
deepWhether to make a deep clone.
A WorkerPool manages multiple workers.
The implementation for a WorkerPool is essentially a container that holds
one or more Worker objects, and posesses methods that allow it to instruct
them to assign, start, and complete Tasks. It can also check to
see if any of the R sessions associated with the Workers have crashed or
stalled, and replace them as needed.
new()
Create a new worker pool
WorkerPool$new(workers = 4L)
workersThe number of workers in the pool.
A new WorkerPool object.
get_pool_worker()
Return a specific Worker
WorkerPool$get_pool_worker(x)
xAn integer specifying the index of the worker in the pool.
The corresponding Worker object.
get_pool_state()
Return a summary of the worker pool
WorkerPool$get_pool_state()
A named character vector specifying the current state of each worker ("starting", "idle", "busy", or "finished"). Names denote worker ids, and the interpretations of each return value is as follows:
"starting": the R session is starting up.
"idle": the R session is ready to compute.
"busy": the R session is computing.
"finished": the R session has terminated.
try_assign()
Attempt to assign tasks to workers. This method is
intended to be called by Queue objects. When called, this method
will iterate over tasks in the list and workers in the pool, assigning
tasks to workers as long as there are both idle workers and waiting
tasks. It returns once it runs out of one resource or the other. Note
that this method assigns tasks to workers: it does not instruct the
workers to to start working on the tasks. That is the job of
try_start().
WorkerPool$try_assign(tasks)
tasksA TaskList object
Invisibly returns NULL
try_start()
Iterates over Workers in the pool and asks them to
start any jobs that the have been assigned but have not yet started.
This method is intended to be called by Queue objects.
WorkerPool$try_start()
Invisibly returns NULL
try_finish()
Iterate over Workers in the pool and checks to see if
any of the busy sessions are ready to return results. For those that
are, it finishes the tasks and ensures those results are returned to
the Task object. This method is intended to be called by Queue
objects.
WorkerPool$try_finish()
Invisibly returns NULL
refill_pool()
Check the WorkerPool looking for Workers that
have crashed or been shutdown, and replace them with fresh workers.
WorkerPool$refill_pool()
This function is called primarily for its side effect. It returns a named character documenting the outcome, indicating the current state of each worker: should not be "finished" for any worker. Names denote worker ids.
shutdown_pool()
Terminate all workers in the pool.
WorkerPool$shutdown_pool(grace = 1000)
graceGrace period in milliseconds. If a worker process is still running after this period, it will be killed.
This function is called primarily for its side effect. It returns a named character documenting the outcome, indicating the current state of each worker: should be "finished" for all workers. Names denote worker ids.
shutdown_overdue_workers()
Terminate workers that have worked on their current task for longer than a pre-specified time limit.
WorkerPool$shutdown_overdue_workers(timelimit, grace = 1000)
timelimitPre-specified time limit for the task, in seconds.
graceGrace period for the shutdown, in milliseconds. If a worker process is still running after this period, it will be killed.
This function is called primarily for its side effect. It returns a named character documenting the outcome, indicating the current state of each worker: should be "finished" for all workers. Names denote worker ids.
clone()
The objects of this class are cloneable with this method.
WorkerPool$clone(deep = FALSE)
deepWhether to make a deep clone.