Skip to content

prefactor_core package

Public API for prefactor-core.

This module exports the main classes and functions for the prefactor-core SDK.

class prefactor_core.AgentInstance(id: str, agent_id: str, status: str = ‘pending’, created_at: datetime = , started_at: datetime | None = None, finished_at: datetime | None = None, metadata: dict[str, ~typing.Any]=)

Section titled “class prefactor_core.AgentInstance(id: str, agent_id: str, status: str = ‘pending’, created_at: datetime = , started_at: datetime | None = None, finished_at: datetime | None = None, metadata: dict[str, ~typing.Any]=)”

Bases: object

Represents an agent instance.

An agent instance is a single execution of an agent. It tracks the lifecycle from registration through completion.

Unique identifier for this instance.

  • Type: str

ID of the agent this is an instance of.

  • Type: str

Current status (pending, active, complete).

  • Type: str

When the instance was registered.

  • Type: datetime.datetime

When the instance started executing (if started).

  • Type: datetime.datetime | None

When the instance completed (if finished).

  • Type: datetime.datetime | None

Additional metadata about the instance.

  • Type: dict[str, Any]

Bases: object

Handle to an agent instance with convenience methods.

This class provides a clean interface for:

  • Starting and finishing the instance
  • Creating spans within the instance
  • Managing the instance lifecycle

async with client.create_agent_instance(…) as instance: : await instance.start()
async with instance.span(“agent:llm”) as span: : span.set_payload({“model”: “gpt-4”}) # … do work …
await instance.finish()

async create_span(schema_name: str, parent_span_id: str | None = None, payload: dict[str, Any] | None = None) → str

Section titled “async create_span(schema_name: str, parent_span_id: str | None = None, payload: dict[str, Any] | None = None) → str”

Create a span within this instance and return its ID.

The span stays open until finish_span() is called.

  • Parameters:
    • schema_name – Name of the schema for this span.
    • parent_span_id – Optional explicit parent span ID.
    • payload – Optional initial payload (params/inputs) stored on creation.
  • Returns: The span ID.

Mark the instance as finished.

This queues a finish operation for the instance.

async finish_span(span_id: str, result_payload: dict[str, Any] | None = None) → None

Section titled “async finish_span(span_id: str, result_payload: dict[str, Any] | None = None) → None”

Finish a previously created span.

  • Parameters:
    • span_id – The ID of the span to finish.
    • result_payload – Optional result data to store on the span.

Get the instance ID.

  • Returns: The unique identifier for this agent instance.

span(schema_name: str, parent_span_id: str | None = None, payload: dict[str, Any] | None = None)

Section titled “span(schema_name: str, parent_span_id: str | None = None, payload: dict[str, Any] | None = None)”

Create a span within this instance.

This is a convenience method that delegates to the client.

  • Parameters:
    • schema_name – Name of the schema for this span.
    • parent_span_id – Optional explicit parent span ID.
    • payload – Optional initial payload (params/inputs) stored on creation.
  • Yields: SpanContext for the created span.

Mark the instance as started.

This queues a start operation for the instance.

exception prefactor_core.ClientAlreadyInitializedError

Section titled “exception prefactor_core.ClientAlreadyInitializedError”

Bases: PrefactorCoreError

Raised when attempting to initialize a client that’s already initialized.

exception prefactor_core.ClientNotInitializedError

Section titled “exception prefactor_core.ClientNotInitializedError”

Bases: PrefactorCoreError

Raised when attempting to use a client that hasn’t been initialized.

Bases: Queue[T]

Unbounded in-memory queue implementation.

This is the default queue implementation. It’s simple, fast, and suitable for most use cases. All data is stored in memory and will be lost if the process terminates before processing completes.

The queue uses asyncio.Queue internally for thread-safe operations.

queue = InMemoryQueue() await queue.put(“operation”) item = await queue.get() await queue.close()

async close(num_waiters: int = 1) → None

Section titled “async close(num_waiters: int = 1) → None”

Close the queue.

After closing, no new items can be added. Workers will continue to process existing items until the queue is empty, then exit.

  • Parameters: num_waiters – Number of sentinel values to enqueue to wake up that many workers currently blocked in get().

Check if the queue has been closed.

  • Returns: True if the queue is closed.

Remove and return an item from the queue.

  • Returns: The next item from the queue.
  • Raises: QueueClosedError – If the queue is closed and empty.

Add an item to the queue.

  • Parameters: item – The item to add.
  • Raises: QueueClosedError – If the queue has been closed.

Return the current number of items in the queue.

  • Returns: The queue size.

exception prefactor_core.InstanceNotFoundError

Section titled “exception prefactor_core.InstanceNotFoundError”

Bases: PrefactorCoreError

Raised when an agent instance is not found.

class prefactor_core.Operation(type: ~prefactor_core.operations.OperationType, payload: dict[str, ~typing.Any], timestamp: ~datetime.datetime, idempotency_key: str | None = None, metadata: dict[str, ~typing.Any] = )

Section titled “class prefactor_core.Operation(type: ~prefactor_core.operations.OperationType, payload: dict[str, ~typing.Any], timestamp: ~datetime.datetime, idempotency_key: str | None = None, metadata: dict[str, ~typing.Any] = )”

Bases: object

A single operation to be queued and processed.

Operations are immutable and contain all data needed for execution. They are created synchronously and processed asynchronously by workers.

The type of operation to perform.

Dictionary containing operation-specific data.

  • Type: dict[str, Any]

When the operation was created.

  • Type: datetime.datetime

Optional key for idempotent operations.

  • Type: str | None

Optional additional metadata.

  • Type: dict[str, Any]

from datetime import datetime, timezone

operation = Operation( : type=OperationType.CREATE_SPAN, payload={

“instance_id”: “inst-123”, “schema_name”: “agent:llm”, “span_id”: “span-456”


}, timestamp=datetime.now(timezone.utc), idempotency_key=”span-456”

)

exception prefactor_core.OperationError(message: str, operation_type: str | None = None)

Section titled “exception prefactor_core.OperationError(message: str, operation_type: str | None = None)”

Bases: PrefactorCoreError

Raised when an operation fails to process.

class prefactor_core.OperationType(*values)

Section titled “class prefactor_core.OperationType(*values)”

Bases: Enum

Types of operations that can be performed.

Each operation type corresponds to a specific API endpoint action.

Bases: object

Main entry point for the prefactor-core SDK.

This client provides a high-level interface for managing agent instances and spans. All operations are queued and processed asynchronously, ensuring minimal impact on agent execution flow.

The client must be initialized before use, either by calling initialize() or using it as an async context manager.

config = PrefactorCoreConfig(http_config=…)

async with PrefactorCoreClient(config) as client: : instance = await client.create_agent_instance(…) await instance.start()
async with instance.span(“agent:llm”) as span: : span.set_payload({“model”: “gpt-4”}) # Your agent logic here
await instance.finish()

Close the client and cleanup resources.

This method gracefully shuts down the executor and closes the HTTP client. It should be called when the client is no longer needed.

async create_agent_instance(agent_id: str, agent_version: dict[str, Any], agent_schema_version: dict[str, Any] | None = None, instance_id: str | None = None, external_schema_version_id: str | None = None) → AgentInstanceHandle

Section titled “async create_agent_instance(agent_id: str, agent_version: dict[str, Any], agent_schema_version: dict[str, Any] | None = None, instance_id: str | None = None, external_schema_version_id: str | None = None) → AgentInstanceHandle”

Create a new agent instance.

Returns immediately with a handle. The actual registration happens asynchronously via the queue.

If agent_schema_version is not provided but the client has a schema_registry, the registry’s schemas will be used automatically.

  • Parameters:
    • agent_id – ID of the agent to create an instance for.
    • agent_version – Version information (name, etc.).
    • agent_schema_version – Schema version. Uses registry if not provided and registry is configured.
    • instance_id – Optional custom ID for the instance.
    • external_schema_version_id – Optional external identifier for the schema version. Defaults to “auto-generated” when using registry.
  • Returns: AgentInstanceHandle for the created instance.
  • Raises:
    • ClientNotInitializedError – If the client is not initialized.
    • ValueError – If no schema version provided and registry not configured.

async create_span(instance_id: str, schema_name: str, parent_span_id: str | None = None, payload: dict[str, Any] | None = None) → str

Section titled “async create_span(instance_id: str, schema_name: str, parent_span_id: str | None = None, payload: dict[str, Any] | None = None) → str”

Create a span and return its ID without finishing it.

Use this for spans that need to stay open across multiple operations. Call finish_span() when done.

  • Parameters:
    • instance_id – ID of the agent instance this span belongs to.
    • schema_name – Name of the schema for this span.
    • parent_span_id – Optional explicit parent span ID.
    • payload – Optional initial payload (params/inputs) stored on creation.
  • Returns: The span ID.

async finish_span(span_id: str, result_payload: dict[str, Any] | None = None) → None

Section titled “async finish_span(span_id: str, result_payload: dict[str, Any] | None = None) → None”

Finish a previously created span.

  • Parameters:
    • span_id – The ID of the span to finish.
    • result_payload – Optional result data to store on the span.

Initialize the client and start processing.

This method:

  1. Initializes the HTTP client
  2. Starts the task executor
  3. Initializes managers

Public accessor for the agent instance manager.

span(instance_id: str, schema_name: str, parent_span_id: str | None = None, payload: dict[str, Any] | None = None)

Section titled “span(instance_id: str, schema_name: str, parent_span_id: str | None = None, payload: dict[str, Any] | None = None)”

Context manager for creating and finishing a span.

If parent_span_id is not provided, the current span from the SpanContextStack is used as the parent.

The returned SpanContext supports an explicit lifecycle:

  1. await span.start(payload) — POST the span to the API.
  2. Do work.
  3. await span.complete(result) / span.fail(result) / span.cancel() — finish with a specific status.

If start() or a finish method is not called explicitly, the context manager handles them automatically on exit.

  • Parameters:
    • instance_id – ID of the agent instance this span belongs to.
    • schema_name – Name of the schema for this span.
    • parent_span_id – Optional explicit parent span ID.
    • payload – Optional initial payload sent via auto-start on exit if start() is never called explicitly.
  • Yields: SpanContext for the created span.

Bases: BaseModel

Complete configuration for PrefactorCoreClient.

Configuration for the HTTP client.

Configuration for queue processing.

Optional schema registry for aggregating span type definitions.

  • Type: Any

from prefactor_core.schema_registry import SchemaRegistry from prefactor_core import PrefactorCoreConfig

registry = SchemaRegistry() registry.register(“langchain:llm”, {“type”: “object”})

config = PrefactorCoreConfig( : http_config=HttpClientConfig(…), schema_registry=registry

)

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

exception prefactor_core.PrefactorCoreError

Section titled “exception prefactor_core.PrefactorCoreError”

Bases: Exception

Base exception for all prefactor-core errors.

exception prefactor_core.PrefactorTelemetryFailureError(message: str, , cause: Exception, operation_type: str | None = None, dropped_operations: int = 0)

Section titled “exception prefactor_core.PrefactorTelemetryFailureError(message: str, , cause: Exception, operation_type: str | None = None, dropped_operations: int = 0)”

Bases: PrefactorCoreError

Raised when telemetry enters a permanent failure state.

Bases: ABC, Generic[T]

Abstract base class for all queue implementations.

This interface defines the contract for queue operations used by the TaskExecutor. Implementations must be thread-safe and support async operations.

abstractmethod async close(num_waiters: int = 1) → None

Section titled “abstractmethod async close(num_waiters: int = 1) → None”

Close the queue and signal workers to stop.

After closing, no new items can be added. Workers should finish processing remaining items and then exit.

  • Parameters: num_waiters – Number of workers currently blocked in get() that need to be woken up so they can observe the closed state.

Check if the queue has been closed.

  • Returns: True if close() has been called, False otherwise.

Remove and return an item from the queue.

This method blocks until an item is available or the queue is closed.

  • Returns: The next item from the queue.
  • Raises: QueueClosedError – If the queue is closed and empty.

abstractmethod async put(item: T) → None

Section titled “abstractmethod async put(item: T) → None”

Add an item to the queue.

This method should return immediately without blocking the caller. The item will be processed asynchronously by workers.

  • Parameters: item – The item to add to the queue.
  • Raises: QueueClosedError – If the queue has been closed.

Return the current number of items in the queue.

  • Returns: The queue size (non-negative integer).

Bases: Exception

Raised when attempting to use a closed queue.

class prefactor_core.QueueConfig(, num_workers: Annotated[int, Ge(ge=1), Le(le=20)] = 3, max_retries: Annotated[int, Ge(ge=0)] = 3, retry_delay_base: Annotated[float, Gt(gt=0)] = 1.0)

Section titled “class prefactor_core.QueueConfig(, num_workers: Annotated[int, Ge(ge=1), Le(le=20)] = 3, max_retries: Annotated[int, Ge(ge=0)] = 3, retry_delay_base: Annotated[float, Gt(gt=0)] = 1.0)”

Bases: BaseModel

Configuration for queue processing.

Number of concurrent worker tasks.

  • Type: int

Maximum retry attempts per failed operation.

  • Type: int

Base delay for exponential backoff (seconds).

  • Type: float

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Bases: object

Central registry for span schemas - allows pre-registration.

Multiple components can register their schemas independently before agent instance creation. The registry aggregates all into a single format suitable for the API.

The API supports three ways to define span schemas, in increasing order of expressiveness:

  • span_schemas: flat map of span name → params JSON schema
  • span_result_schemas: flat map of span name → result JSON schema
  • span_type_schemas: structured list with params, result, title, description, and template per span type

Use register() for simple payload schemas, register_result() to add a result schema for an existing entry, or register_type() for the full structured form. All three approaches can be mixed; to_agent_schema_version() emits whichever fields are populated.

registry = SchemaRegistry()

registry.register(“langchain:agent”, {“type”: “object”})

Full structured schema with result and display metadata

Section titled “Full structured schema with result and display metadata”

registry.register_type(

name=”agent:llm”, params_schema={

“type”: “object”, “properties”: {

“model”: {“type”: “string”}, “prompt”: {“type”: “string”},

}, “required”: [“model”, “prompt”],

}, result_schema={

“type”: “object”, “properties”: {“response”: {“type”: “string”}},

}, title=”LLM Call”, description=”A call to a language model”, template=”{{model}}: {{prompt}} → {{response}}”,

)

version = registry.to_agent_schema_version(“combined-1.0.0”)

get(schema_name: str) → dict[str, Any] | None

Section titled “get(schema_name: str) → dict[str, Any] | None”

Get a params schema by name.

  • Parameters: schema_name – The schema identifier to look up
  • Returns: The schema dict if found, None otherwise

Check if a params schema is registered for a span type.

  • Parameters: schema_name – The schema identifier to check
  • Returns: True if the schema is registered, False otherwise

List all registered span schema names (params schemas only).

  • Returns: List of registered schema names

Merge schemas from another registry into this one.

  • Parameters: other – Another SchemaRegistry to merge. Conflicting schemas from the other registry will be rejected.
  • Raises: ValueError – If there are conflicting schema names in any category.

register(schema_name: str, schema: dict[str, Any]) → None

Section titled “register(schema_name: str, schema: dict[str, Any]) → None”

Register a params schema for a span type.

Adds to span_schemas (the flat params-schema map). Use register_type() if you also need a result schema, title, description, or template.

  • Parameters:
    • schema_name – Unique identifier for this span type (e.g., “langchain:llm”)
    • schema – JSON Schema dict defining the span payload structure
  • Raises: ValueError – If schema_name is already registered.

register_result(schema_name: str, result_schema: dict[str, Any]) → None

Section titled “register_result(schema_name: str, result_schema: dict[str, Any]) → None”

Register a result schema for a span type.

Adds to span_result_schemas (the flat result-schema map). The span type does not need to have a params schema registered first.

  • Parameters:
    • schema_name – Span type identifier (e.g., “agent:llm”)
    • result_schema – JSON Schema dict defining the span result payload
  • Raises: ValueError – If a result schema for schema_name is already registered.

register_type(name: str, params_schema: dict[str, Any], result_schema: dict[str, Any] | None = None, title: str | None = None, description: str | None = None, template: str | None = None) → None

Section titled “register_type(name: str, params_schema: dict[str, Any], result_schema: dict[str, Any] | None = None, title: str | None = None, description: str | None = None, template: str | None = None) → None”

Register a full structured span type schema.

Adds to span_type_schemas. This is the richest form and supports all API fields: params schema, result schema, human-readable title, description, and a display template.

  • Parameters:
    • name – Span type name (e.g., “agent:llm”)
    • params_schema – JSON Schema for the span payload (params)
    • result_schema – Optional JSON Schema for the span result payload
    • title – Optional human-readable title (defaults to name on the API)
    • description – Optional description of the span type
    • template – Optional display template using {{field}} interpolation
  • Raises: ValueError – If name is already registered as a span type schema.

register_unsafe(schema_name: str, schema: dict[str, Any]) → None

Section titled “register_unsafe(schema_name: str, schema: dict[str, Any]) → None”

Register a params schema, overwriting if it already exists.

  • Parameters:
    • schema_name – Unique identifier for this span type
    • schema – JSON Schema dict defining the span payload structure

to_agent_schema_version(external_id: str) → dict[str, Any]

Section titled “to_agent_schema_version(external_id: str) → dict[str, Any]”

Convert registry contents to API-compatible agent_schema_version format.

Emits span_schemas, span_result_schemas, and span_type_schemas for whichever have been populated.

  • Parameters: external_id – External identifier for this combined schema version
  • Returns: Dict with external_identifier and whichever schema fields are non-empty.

class prefactor_core.Span(id: str, instance_id: str, schema_name: str, parent_span_id: str | None = None, status: str = ‘pending’, payload: dict[str, ~typing.Any]=, created_at: datetime = , started_at: datetime | None = None, finished_at: datetime | None = None)

Section titled “class prefactor_core.Span(id: str, instance_id: str, schema_name: str, parent_span_id: str | None = None, status: str = ‘pending’, payload: dict[str, ~typing.Any]=, created_at: datetime = , started_at: datetime | None = None, finished_at: datetime | None = None)”

Bases: object

Represents a span within an agent instance.

Spans represent discrete units of work within an agent execution, such as LLM calls, tool executions, or processing steps.

Unique identifier for this span.

  • Type: str

ID of the agent instance this span belongs to.

  • Type: str

ID of the parent span (if nested).

  • Type: str | None

Name of the schema defining this span type.

  • Type: str

Current status (pending, active, complete).

  • Type: str

Arbitrary data associated with this span.

  • Type: dict[str, Any]

When the span was created.

  • Type: datetime.datetime

When the span started (defaults to created_at).

  • Type: datetime.datetime | None

When the span completed (if finished).

  • Type: datetime.datetime | None

class prefactor_core.SpanContext(temp_id: str, span_manager: SpanManager, default_payload: dict[str, Any] | None = None)

Section titled “class prefactor_core.SpanContext(temp_id: str, span_manager: SpanManager, default_payload: dict[str, Any] | None = None)”

Bases: object

Context for an active span.

Returned by instance.span() / client.span() context managers.

Spans follow a three-phase lifecycle:

  1. Enter context — span is prepared locally (no HTTP call yet).
  2. “await span.start(payload)“ — POSTs the span to the API as active with the given params payload.
  3. “await span.complete(result)“ (or .fail() / .cancel()) — finishes the span with the appropriate terminal status.

cancelled before start is handled via pending → cancelled; once started, the span transitions from active to a terminal status.

If start() or a finish method is omitted, the context manager calls them automatically on exit (auto-start uses default_payload; the default finish status is complete), so explicit calls are opt-in.

Example:

async with instance.span("agent:llm_call") as span:
await span.start({"model": "claude-3-5-sonnet", "prompt": "Hi"})
try:
response = await call_llm(...)
await span.complete({"response": response, "tokens": 42})
except Exception as exc:
await span.fail({"error": str(exc)})
# Skip start entirely to cancel before any work begins:
async with instance.span("agent:retrieval") as span:
if not needed:
await span.cancel()
else:
await span.start({"query": "..."})
...

Finish the span with cancelled status.

Can be called before or after start(). If start() has not been called yet, the span is posted as pending then immediately cancelled — the API only accepts cancellation from the pending state, so this is always a valid sequence.

async complete(result: dict[str, Any] | None = None) → None

Section titled “async complete(result: dict[str, Any] | None = None) → None”

Finish the span with complete status.

  • Parameters: result – Optional result payload to attach to the span.

async fail(result: dict[str, Any] | None = None) → None

Section titled “async fail(result: dict[str, Any] | None = None) → None”

Finish the span with failed status.

  • Parameters: result – Optional result payload (e.g. error details).

Finish the span using whichever status was last set (default: complete).

Called automatically when exiting the context manager. Can also be called manually; subsequent calls are no-ops.

Get the span ID.

Before start() is called this returns the temporary local ID. After start() it returns the API-generated ID.

  • Returns: The span identifier.

Store result data to be sent when the span finishes.

The data is merged and sent as result_payload when the span finishes. Calling this does not finish the span.

  • Parameters: data – Dictionary of result data for the span.

async start(payload: dict[str, Any] | None = None) → None

Section titled “async start(payload: dict[str, Any] | None = None) → None”

Post the span to the API as active with the given params payload.

This triggers POST /api/v1/agent_spans. The span is created as pending so that any terminal status (complete, failed, cancelled) is a valid transition via the finish endpoint. Must be called at most once; subsequent calls are no-ops.

  • Parameters: payload – Optional params/inputs for the span (e.g. model name, prompt text, tool input). Stored as the span’s payload field in the API.

Bases: object

Manages a stack of active span IDs within an async context.

The stack tracks the hierarchy of nested spans. The top of the stack is always the current (innermost) active span, which serves as the default parent for any new spans created within the same context.

This class uses contextvars to ensure that each async task maintains its own independent stack, preventing interference between concurrent operations.

SpanContextStack.push(“span-1”) assert SpanContextStack.peek() == “span-1”

SpanContextStack.push(“span-2”) assert SpanContextStack.peek() == “span-2”

SpanContextStack.pop() assert SpanContextStack.peek() == “span-1”

SpanContextStack.pop() assert SpanContextStack.peek() is None

Get the current nesting depth.

  • Returns: The number of active spans in the stack (0 if empty).

Get the current span stack for this async context.

  • Returns: A list of span IDs, from outermost to innermost. Returns an empty list if no spans are active.

Check if the stack is empty.

  • Returns: True if no spans are currently active.

Get the current span ID without removing it from the stack.

  • Returns: The ID of the current (innermost) span, or None if no spans are active in this context.

Pop and return the current span ID from the stack.

  • Returns: The span ID that was removed from the stack, or None if the stack was empty.

Push a span ID onto the stack.

This marks the span as the current (innermost) active span.

  • Parameters: span_id – The ID of the span to push onto the stack.

exception prefactor_core.SpanNotFoundError

Section titled “exception prefactor_core.SpanNotFoundError”

Bases: PrefactorCoreError

Raised when a span is not found.

class prefactor_core.TaskExecutor(queue: Queue[Any], handler: Callable[[Any], Awaitable[None]], num_workers: int = 3, max_retries: int = 3, , is_retryable: Callable[[Exception], bool] | None = None)

Section titled “class prefactor_core.TaskExecutor(queue: Queue[Any], handler: Callable[[Any], Awaitable[None]], num_workers: int = 3, max_retries: int = 3, , is_retryable: Callable[[Exception], bool] | None = None)”

Bases: object

Manages async workers that process queue items.

The executor runs a configurable number of worker tasks that continuously pull items from the queue and process them. If processing fails, items are retried with exponential backoff.

async def handler(item: str) -> None: : print(f”Processing: {item}”)

queue = InMemoryQueue() executor = TaskExecutor(queue, handler, num_workers=3) executor.start()

await queue.put(“item1”) await queue.put(“item2”)

await executor.stop()

Start the worker tasks.

Workers will begin pulling items from the queue immediately.

Stop all workers gracefully.

Closes the queue (so no new items can be added), wakes any workers blocked in get(), and waits for them to drain the remaining items and exit on their own. Workers are never cancelled — that would discard already-queued items.

prefactor_core.generate_idempotency_key() → str

Section titled “prefactor_core.generate_idempotency_key() → str”

Generate a new UUID-based idempotency key.

The returned key is a UUID4 string (36 characters), always within the 64-character API limit.

  • Returns: A unique idempotency key string.

prefactor_core.validate_idempotency_key(key: str) → str

Section titled “prefactor_core.validate_idempotency_key(key: str) → str”

Validate that an idempotency key is a non-empty string of at most 64 characters.

  • Parameters: key – The idempotency key to validate.
  • Returns: The key unchanged if valid.
  • Raises: ValueError – If the key is empty or exceeds 64 characters.