Skip to content

prefactor_core.queue package

Queue infrastructure layer for prefactor-core.

This module provides the foundation for asynchronous queue-based processing:

  • Queue interface for different queue implementations
  • InMemoryQueue for simple use cases
  • TaskExecutor for managing worker pools

Future implementations can provide persistent queues (Redis, PostgreSQL, etc.) by implementing the Queue interface.

Bases: Queue[T]

Unbounded in-memory queue implementation.

This is the default queue implementation. It’s simple, fast, and suitable for most use cases. All data is stored in memory and will be lost if the process terminates before processing completes.

The queue uses asyncio.Queue internally for thread-safe operations.

queue = InMemoryQueue() await queue.put(“operation”) item = await queue.get() await queue.close()

async close(num_waiters: int = 1) → None

Section titled “async close(num_waiters: int = 1) → None”

Close the queue.

After closing, no new items can be added. Workers will continue to process existing items until the queue is empty, then exit.

  • Parameters: num_waiters – Number of sentinel values to enqueue to wake up that many workers currently blocked in get().

Check if the queue has been closed.

  • Returns: True if the queue is closed.

Remove and return an item from the queue.

  • Returns: The next item from the queue.
  • Raises: QueueClosedError – If the queue is closed and empty.

Add an item to the queue.

  • Parameters: item – The item to add.
  • Raises: QueueClosedError – If the queue has been closed.

Return the current number of items in the queue.

  • Returns: The queue size.

Bases: ABC, Generic[T]

Abstract base class for all queue implementations.

This interface defines the contract for queue operations used by the TaskExecutor. Implementations must be thread-safe and support async operations.

abstractmethod async close(num_waiters: int = 1) → None

Section titled “abstractmethod async close(num_waiters: int = 1) → None”

Close the queue and signal workers to stop.

After closing, no new items can be added. Workers should finish processing remaining items and then exit.

  • Parameters: num_waiters – Number of workers currently blocked in get() that need to be woken up so they can observe the closed state.

Check if the queue has been closed.

  • Returns: True if close() has been called, False otherwise.

Remove and return an item from the queue.

This method blocks until an item is available or the queue is closed.

  • Returns: The next item from the queue.
  • Raises: QueueClosedError – If the queue is closed and empty.

abstractmethod async put(item: T) → None

Section titled “abstractmethod async put(item: T) → None”

Add an item to the queue.

This method should return immediately without blocking the caller. The item will be processed asynchronously by workers.

  • Parameters: item – The item to add to the queue.
  • Raises: QueueClosedError – If the queue has been closed.

Return the current number of items in the queue.

  • Returns: The queue size (non-negative integer).

exception prefactor_core.queue.QueueClosedError

Section titled “exception prefactor_core.queue.QueueClosedError”

Bases: Exception

Raised when attempting to use a closed queue.

class prefactor_core.queue.TaskExecutor(queue: Queue[Any], handler: Callable[[Any], Awaitable[None]], num_workers: int = 3, max_retries: int = 3, , is_retryable: Callable[[Exception], bool] | None = None)

Section titled “class prefactor_core.queue.TaskExecutor(queue: Queue[Any], handler: Callable[[Any], Awaitable[None]], num_workers: int = 3, max_retries: int = 3, , is_retryable: Callable[[Exception], bool] | None = None)”

Bases: object

Manages async workers that process queue items.

The executor runs a configurable number of worker tasks that continuously pull items from the queue and process them. If processing fails, items are retried with exponential backoff.

async def handler(item: str) -> None: : print(f”Processing: {item}”)

queue = InMemoryQueue() executor = TaskExecutor(queue, handler, num_workers=3) executor.start()

await queue.put(“item1”) await queue.put(“item2”)

await executor.stop()

Start the worker tasks.

Workers will begin pulling items from the queue immediately.

Stop all workers gracefully.

Closes the queue (so no new items can be added), wakes any workers blocked in get(), and waits for them to drain the remaining items and exit on their own. Workers are never cancelled — that would discard already-queued items.