Event Processor and Executor


This module provides:

  • Processor: An asynchronous queue manager for Event objects, supporting capacity-limited batching and optional permission checks.

  • Executor: A coordinating class that stores events in a Pile and forwards them to an internal Processor for execution.


class lionagi.protocols.generic.processor.Processor(Observer)

Inherits from: Observer

Manages a queue of events that can be processed asynchronously in batches, respecting a capacity limit. Subclasses can override methods for request_permission checks or custom handling logic. The processor can be started and stopped, and it refreshes available capacity after each batch of events.



Declares the type of Event this processor is meant to handle. Subclasses should set this to a specific Event subclass if needed.


Maximum number of events that can be processed in one batch (e.g., 10).


Number of seconds after which the queue capacity is reset (e.g., 5.0).


Internal async queue holding incoming events.


Tracks the current capacity (resets to queue_capacity after processing).


Indicates if the processor is currently in a continuous execute loop.


An event used to signal a requested stop in the loop.


__init__(queue_capacity: int, capacity_refresh_time: float) None



The maximum number of events allowed in a single processing batch.


After this many seconds, the capacity is refreshed.



If queue_capacity < 1 or capacity_refresh_time <= 0.



Tracks how many more events can be processed in the current batch.


A boolean indicating if the processor’s execute() method is actively running.


async enqueue(event: Event) None

Adds an Event to the internal async queue.

async dequeue() Event

Retrieves the next Event from the queue.

async join() None

Blocks until the queue is fully processed.

async stop() None

Signals this processor to stop (sets the internal stop event).

async start() None

Clears the stop signal, allowing the processor to continue.

is_stopped() bool

Checks whether the processor is in a stopped state.

async process() None

Retrieves and processes events from the queue, up to available_capacity. Each event’s status is set to PROCESSING if it passes a permission check (request_permission()). After processing these events, the capacity is reset if any were processed.

async request_permission(**kwargs: Any) bool

Determines whether an event is allowed to proceed. Subclasses can override to implement rate limiting, user permissions, or other custom checks.

async execute() None

Continuously processes events until stop() is called, sleeping for capacity_refresh_time between cycles to refresh capacity.


class lionagi.protocols.generic.processor.Executor(Observer)

Inherits from: Observer

A higher-level manager that maintains a pile of events and forwards them to an internal Processor for asynchronous execution. This allows classes to store events locally, possibly filter or arrange them, and then dispatch them in batches.



Declares which Processor subclass to use for event handling.

processor_configdict[str, Any]

A dictionary of initialization arguments for processor.


Tracks IDs of events that are ready to be forwarded to the processor.

processorProcessor | None

The internal processor instance; created lazily if None.


A pile storing all events, enabling concurrency-safe access and type constraints.


__init__(processor_config: dict[str, Any] | None = None, strict_event_type: bool = False) None


processor_configdict[str, Any] | None

Configuration parameters for creating the processor (e.g., capacity, refresh time).


If True, the underlying Pile enforces exact type matching for Event objects.



The type of event the processor handles (mirrors processor_type.event_type).


Whether the Pile requires exact type checks for added events.


async forward() None

Sends all pending events from the pile to the processor, then calls processor.process() immediately.

async start() None

If the processor does not exist, create it with _create_processor(). Then calls processor.start() to enable event processing.

async stop() None

Stops the processor if it is active.

async append(event: Event) None

Adds a new Event to the pile and queues its ID in pending.

completed_events -> Pile[Event]
Returns a new pile containing all events marked :attr:`~lionagi.protocols.generic.event.EventStatus.COMPLETED`.
pending_events -> Pile[Event]
Returns a new pile containing all events still at :attr:`~lionagi.protocols.generic.event.EventStatus.PENDING`.
failed_events -> Pile[Event]
Returns a new pile containing all events at :attr:`~lionagi.protocols.generic.event.EventStatus.FAILED`.
__contains__(ref: ID.Ref) bool

Checks if a given event or event ID reference is stored in pile.

Private Methods

async _create_processor() None

Private instance method that instantiates the processor using the stored processor_config.

File Location

Source File: lionagi/protocols/generic/processor.py

Copyright (c) 2023 - 2024, HaiyangLi <quantocean.li at gmail dot com> SPDX-License-Identifier: Apache-2.0