Event Processor and Executor

Overview

This module provides:

  • Processor: An asynchronous queue manager for Event objects, supporting capacity-limited batching and optional permission checks.

  • Executor: A coordinating class that stores events in a Pile and forwards them to an internal Processor for execution.

Processor

class lionagi.protocols.generic.processor.Processor(Observer)

Inherits from: Observer

Manages a queue of events that can be processed asynchronously in batches, respecting a capacity limit. Subclasses can override methods for request_permission checks or custom handling logic. The processor can be started and stopped, and it refreshes available capacity after each batch of events.

Attributes

event_typeClassVar[type[Event]]

Declares the type of Event this processor is meant to handle. Subclasses should set this to a specific Event subclass if needed.

queue_capacityint

Maximum number of events that can be processed in one batch (e.g., 10).

capacity_refresh_timefloat

Number of seconds after which the queue capacity is reset (e.g., 5.0).

queueasyncio.Queue

Internal async queue holding incoming events.

_available_capacityint

Tracks the current capacity (resets to queue_capacity after processing).

_execution_modebool

Indicates if the processor is currently in a continuous execute loop.

_stop_eventasyncio.Event

An event used to signal a requested stop in the loop.

Initialization

__init__(queue_capacity: int, capacity_refresh_time: float) None

Args

queue_capacityint

The maximum number of events allowed in a single processing batch.

capacity_refresh_timefloat

After this many seconds, the capacity is refreshed.

Raises

ValueError

If queue_capacity < 1 or capacity_refresh_time <= 0.

Properties

available_capacity

Tracks how many more events can be processed in the current batch.

execution_mode

A boolean indicating if the processor’s execute() method is actively running.

Methods

async enqueue(event: Event) None

Adds an Event to the internal async queue.

async dequeue() Event

Retrieves the next Event from the queue.

async join() None

Blocks until the queue is fully processed.

async stop() None

Signals this processor to stop (sets the internal stop event).

async start() None

Clears the stop signal, allowing the processor to continue.

is_stopped() bool

Checks whether the processor is in a stopped state.

async process() None

Retrieves and processes events from the queue, up to available_capacity. Each event’s status is set to PROCESSING if it passes a permission check (request_permission()). After processing these events, the capacity is reset if any were processed.

async request_permission(**kwargs: Any) bool

Determines whether an event is allowed to proceed. Subclasses can override to implement rate limiting, user permissions, or other custom checks.

async execute() None

Continuously processes events until stop() is called, sleeping for capacity_refresh_time between cycles to refresh capacity.

Executor

class lionagi.protocols.generic.processor.Executor(Observer)

Inherits from: Observer

A higher-level manager that maintains a pile of events and forwards them to an internal Processor for asynchronous execution. This allows classes to store events locally, possibly filter or arrange them, and then dispatch them in batches.

Attributes

processor_typeClassVar[type[Processor]]

Declares which Processor subclass to use for event handling.

processor_configdict[str, Any]

A dictionary of initialization arguments for processor.

pendingProgression

Tracks IDs of events that are ready to be forwarded to the processor.

processorProcessor | None

The internal processor instance; created lazily if None.

pilePile[Event]

A pile storing all events, enabling concurrency-safe access and type constraints.

Initialization

__init__(processor_config: dict[str, Any] | None = None, strict_event_type: bool = False) None

Args

processor_configdict[str, Any] | None

Configuration parameters for creating the processor (e.g., capacity, refresh time).

strict_event_typebool

If True, the underlying Pile enforces exact type matching for Event objects.

Properties

event_type

The type of event the processor handles (mirrors processor_type.event_type).

strict_event_type

Whether the Pile requires exact type checks for added events.

Methods

async forward() None

Sends all pending events from the pile to the processor, then calls processor.process() immediately.

async start() None

If the processor does not exist, create it with _create_processor(). Then calls processor.start() to enable event processing.

async stop() None

Stops the processor if it is active.

async append(event: Event) None

Adds a new Event to the pile and queues its ID in pending.

completed_events -> Pile[Event]
Returns a new pile containing all events marked :attr:`~lionagi.protocols.generic.event.EventStatus.COMPLETED`.
pending_events -> Pile[Event]
Returns a new pile containing all events still at :attr:`~lionagi.protocols.generic.event.EventStatus.PENDING`.
failed_events -> Pile[Event]
Returns a new pile containing all events at :attr:`~lionagi.protocols.generic.event.EventStatus.FAILED`.
__contains__(ref: ID.Ref) bool

Checks if a given event or event ID reference is stored in pile.

Private Methods

async _create_processor() None

Private instance method that instantiates the processor using the stored processor_config.

File Location

Source File: lionagi/protocols/generic/processor.py

Copyright (c) 2023 - 2024, HaiyangLi <quantocean.li at gmail dot com> SPDX-License-Identifier: Apache-2.0