prefactor_core.queue package
prefactor_core.queue package
Section titled “prefactor_core.queue package”Queue infrastructure layer for prefactor-core.
This module provides the foundation for asynchronous queue-based processing:
- Queue interface for different queue implementations
- InMemoryQueue for simple use cases
- TaskExecutor for managing worker pools
Future implementations can provide persistent queues (Redis, PostgreSQL, etc.) by implementing the Queue interface.
class prefactor_core.queue.InMemoryQueue
Section titled “class prefactor_core.queue.InMemoryQueue”Bases: Queue[T]
Unbounded in-memory queue implementation.
This is the default queue implementation. It’s simple, fast, and suitable for most use cases. All data is stored in memory and will be lost if the process terminates before processing completes.
The queue uses asyncio.Queue internally for thread-safe operations.
Example
Section titled “Example”queue = InMemoryQueue() await queue.put(“operation”) item = await queue.get() await queue.close()
async close(num_waiters: int = 1) → None
Section titled “async close(num_waiters: int = 1) → None”Close the queue.
After closing, no new items can be added. Workers will continue to process existing items until the queue is empty, then exit.
- Parameters: num_waiters – Number of sentinel values to enqueue to wake up that many workers currently blocked in get().
property closed : bool
Section titled “property closed : bool”Check if the queue has been closed.
- Returns: True if the queue is closed.
async get() → T
Section titled “async get() → T”Remove and return an item from the queue.
- Returns: The next item from the queue.
- Raises: QueueClosedError – If the queue is closed and empty.
async put(item: T) → None
Section titled “async put(item: T) → None”Add an item to the queue.
- Parameters: item – The item to add.
- Raises: QueueClosedError – If the queue has been closed.
size() → int
Section titled “size() → int”Return the current number of items in the queue.
- Returns: The queue size.
class prefactor_core.queue.Queue
Section titled “class prefactor_core.queue.Queue”Bases: ABC, Generic[T]
Abstract base class for all queue implementations.
This interface defines the contract for queue operations used by the TaskExecutor. Implementations must be thread-safe and support async operations.
abstractmethod async close(num_waiters: int = 1) → None
Section titled “abstractmethod async close(num_waiters: int = 1) → None”Close the queue and signal workers to stop.
After closing, no new items can be added. Workers should finish processing remaining items and then exit.
- Parameters: num_waiters – Number of workers currently blocked in get() that need to be woken up so they can observe the closed state.
abstract property closed : bool
Section titled “abstract property closed : bool”Check if the queue has been closed.
- Returns: True if close() has been called, False otherwise.
abstractmethod async get() → T
Section titled “abstractmethod async get() → T”Remove and return an item from the queue.
This method blocks until an item is available or the queue is closed.
- Returns: The next item from the queue.
- Raises: QueueClosedError – If the queue is closed and empty.
abstractmethod async put(item: T) → None
Section titled “abstractmethod async put(item: T) → None”Add an item to the queue.
This method should return immediately without blocking the caller. The item will be processed asynchronously by workers.
- Parameters: item – The item to add to the queue.
- Raises: QueueClosedError – If the queue has been closed.
abstractmethod size() → int
Section titled “abstractmethod size() → int”Return the current number of items in the queue.
- Returns: The queue size (non-negative integer).
exception prefactor_core.queue.QueueClosedError
Section titled “exception prefactor_core.queue.QueueClosedError”Bases: Exception
Raised when attempting to use a closed queue.
class prefactor_core.queue.TaskExecutor(queue: Queue[Any], handler: Callable[[Any], Awaitable[None]], num_workers: int = 3, max_retries: int = 3, , is_retryable: Callable[[Exception], bool] | None = None)
Section titled “class prefactor_core.queue.TaskExecutor(queue: Queue[Any], handler: Callable[[Any], Awaitable[None]], num_workers: int = 3, max_retries: int = 3, , is_retryable: Callable[[Exception], bool] | None = None)”Bases: object
Manages async workers that process queue items.
The executor runs a configurable number of worker tasks that continuously pull items from the queue and process them. If processing fails, items are retried with exponential backoff.
Example
Section titled “Example”async def handler(item: str) -> None: : print(f”Processing: {item}”)
queue = InMemoryQueue() executor = TaskExecutor(queue, handler, num_workers=3) executor.start()
await queue.put(“item1”) await queue.put(“item2”)
Later, when done
Section titled “Later, when done”await executor.stop()
start() → None
Section titled “start() → None”Start the worker tasks.
Workers will begin pulling items from the queue immediately.
async stop() → None
Section titled “async stop() → None”Stop all workers gracefully.
Closes the queue (so no new items can be added), wakes any workers blocked in get(), and waits for them to drain the remaining items and exit on their own. Workers are never cancelled — that would discard already-queued items.