---
title: prefactor_core.queue package
editUrl: true
head: []
template: doc
sidebar:
  hidden: false
  attrs: {}
pagefind: true
draft: false
---

# prefactor_core.queue package

Queue infrastructure layer for prefactor-core.

This module provides the foundation for asynchronous queue-based processing:
- Queue interface for different queue implementations
- InMemoryQueue for simple use cases
- TaskExecutor for managing worker pools

Future implementations can provide persistent queues (Redis, PostgreSQL, etc.)
by implementing the Queue interface.

### *class* prefactor_core.queue.InMemoryQueue

Bases: [`Queue`](prefactor_core.queue.base.md#prefactor_core.queue.base.Queue)[`T`]

Unbounded in-memory queue implementation.

This is the default queue implementation. It’s simple, fast, and
suitable for most use cases. All data is stored in memory and will
be lost if the process terminates before processing completes.

The queue uses asyncio.Queue internally for thread-safe operations.

### Example

queue = InMemoryQueue()
await queue.put(“operation”)
item = await queue.get()
await queue.close()

#### *async* close(num_waiters: int = 1) → None

Close the queue.

After closing, no new items can be added. Workers will continue
to process existing items until the queue is empty, then exit.

* **Parameters:**
  **num_waiters** – Number of sentinel values to enqueue to wake up
  that many workers currently blocked in get().

#### *property* closed *: bool*

Check if the queue has been closed.

* **Returns:**
  True if the queue is closed.

#### *async* get() → T

Remove and return an item from the queue.

* **Returns:**
  The next item from the queue.
* **Raises:**
  [**QueueClosedError**](#prefactor_core.queue.QueueClosedError) – If the queue is closed and empty.

#### *async* put(item: T) → None

Add an item to the queue.

* **Parameters:**
  **item** – The item to add.
* **Raises:**
  [**QueueClosedError**](#prefactor_core.queue.QueueClosedError) – If the queue has been closed.

#### size() → int

Return the current number of items in the queue.

* **Returns:**
  The queue size.

### *class* prefactor_core.queue.Queue

Bases: `ABC`, `Generic`[`T`]

Abstract base class for all queue implementations.

This interface defines the contract for queue operations used by
the TaskExecutor. Implementations must be thread-safe and support
async operations.

#### *abstractmethod async* close(num_waiters: int = 1) → None

Close the queue and signal workers to stop.

After closing, no new items can be added. Workers should
finish processing remaining items and then exit.

* **Parameters:**
  **num_waiters** – Number of workers currently blocked in get() that
  need to be woken up so they can observe the closed state.

#### *abstract property* closed *: bool*

Check if the queue has been closed.

* **Returns:**
  True if close() has been called, False otherwise.

#### *abstractmethod async* get() → T

Remove and return an item from the queue.

This method blocks until an item is available or the queue
is closed.

* **Returns:**
  The next item from the queue.
* **Raises:**
  [**QueueClosedError**](#prefactor_core.queue.QueueClosedError) – If the queue is closed and empty.

#### *abstractmethod async* put(item: T) → None

Add an item to the queue.

This method should return immediately without blocking the caller.
The item will be processed asynchronously by workers.

* **Parameters:**
  **item** – The item to add to the queue.
* **Raises:**
  [**QueueClosedError**](#prefactor_core.queue.QueueClosedError) – If the queue has been closed.

#### *abstractmethod* size() → int

Return the current number of items in the queue.

* **Returns:**
  The queue size (non-negative integer).

### *exception* prefactor_core.queue.QueueClosedError

Bases: `Exception`

Raised when attempting to use a closed queue.

### *class* prefactor_core.queue.TaskExecutor(queue: [Queue](prefactor_core.queue.base.md#prefactor_core.queue.base.Queue)[Any], handler: Callable[[Any], Awaitable[None]], num_workers: int = 3, max_retries: int = 3, , is_retryable: Callable[[Exception], bool] | None = None)

Bases: `object`

Manages async workers that process queue items.

The executor runs a configurable number of worker tasks that
continuously pull items from the queue and process them. If
processing fails, items are retried with exponential backoff.

### Example

async def handler(item: str) -> None:
: print(f”Processing: {item}”)

queue = InMemoryQueue()
executor = TaskExecutor(queue, handler, num_workers=3)
executor.start()

await queue.put(“item1”)
await queue.put(“item2”)

# Later, when done
await executor.stop()

#### start() → None

Start the worker tasks.

Workers will begin pulling items from the queue immediately.

#### *async* stop() → None

Stop all workers gracefully.

Closes the queue (so no new items can be added), wakes any workers
blocked in get(), and waits for them to drain the remaining items
and exit on their own.  Workers are never cancelled — that would
discard already-queued items.

## Submodules

* [prefactor_core.queue.base module](prefactor_core.queue.base.md)
  * [`Queue`](prefactor_core.queue.base.md#prefactor_core.queue.base.Queue)
    * [`Queue.close()`](prefactor_core.queue.base.md#prefactor_core.queue.base.Queue.close)
    * [`Queue.closed`](prefactor_core.queue.base.md#prefactor_core.queue.base.Queue.closed)
    * [`Queue.get()`](prefactor_core.queue.base.md#prefactor_core.queue.base.Queue.get)
    * [`Queue.put()`](prefactor_core.queue.base.md#prefactor_core.queue.base.Queue.put)
    * [`Queue.size()`](prefactor_core.queue.base.md#prefactor_core.queue.base.Queue.size)
  * [`QueueClosedError`](prefactor_core.queue.base.md#prefactor_core.queue.base.QueueClosedError)
* [prefactor_core.queue.executor module](prefactor_core.queue.executor.md)
  * [`TaskExecutor`](prefactor_core.queue.executor.md#prefactor_core.queue.executor.TaskExecutor)
    * [`TaskExecutor.start()`](prefactor_core.queue.executor.md#prefactor_core.queue.executor.TaskExecutor.start)
    * [`TaskExecutor.stop()`](prefactor_core.queue.executor.md#prefactor_core.queue.executor.TaskExecutor.stop)
* [prefactor_core.queue.memory module](prefactor_core.queue.memory.md)
  * [`InMemoryQueue`](prefactor_core.queue.memory.md#prefactor_core.queue.memory.InMemoryQueue)
    * [`InMemoryQueue.close()`](prefactor_core.queue.memory.md#prefactor_core.queue.memory.InMemoryQueue.close)
    * [`InMemoryQueue.closed`](prefactor_core.queue.memory.md#prefactor_core.queue.memory.InMemoryQueue.closed)
    * [`InMemoryQueue.get()`](prefactor_core.queue.memory.md#prefactor_core.queue.memory.InMemoryQueue.get)
    * [`InMemoryQueue.put()`](prefactor_core.queue.memory.md#prefactor_core.queue.memory.InMemoryQueue.put)
    * [`InMemoryQueue.size()`](prefactor_core.queue.memory.md#prefactor_core.queue.memory.InMemoryQueue.size)