Event Processor and Executor¶
Overview¶
This module provides:
Processor¶
- class lionagi.protocols.generic.processor.Processor(Observer)¶
Inherits from:
Observer
Manages a queue of events that can be processed asynchronously in batches, respecting a capacity limit. Subclasses can override methods for request_permission checks or custom handling logic. The processor can be started and stopped, and it refreshes available capacity after each batch of events.
Attributes¶
- event_typeClassVar[type[Event]]
Declares the type of Event this processor is meant to handle. Subclasses should set this to a specific Event subclass if needed.
- queue_capacityint
Maximum number of events that can be processed in one batch (e.g., 10).
- capacity_refresh_timefloat
Number of seconds after which the queue capacity is reset (e.g., 5.0).
- queueasyncio.Queue
Internal async queue holding incoming events.
- _available_capacityint
Tracks the current capacity (resets to
queue_capacity
after processing).- _execution_modebool
Indicates if the processor is currently in a continuous execute loop.
- _stop_eventasyncio.Event
An event used to signal a requested stop in the loop.
Initialization¶
- __init__(queue_capacity: int, capacity_refresh_time: float) None ¶
Args¶
- queue_capacityint
The maximum number of events allowed in a single processing batch.
- capacity_refresh_timefloat
After this many seconds, the capacity is refreshed.
Raises¶
- ValueError
If
queue_capacity < 1
orcapacity_refresh_time <= 0
.
Properties¶
- available_capacity¶
Tracks how many more events can be processed in the current batch.
Methods¶
- async enqueue(event: Event) None ¶
Adds an Event to the internal async queue.
- async dequeue() Event ¶
Retrieves the next Event from the queue.
- async join() None ¶
Blocks until the queue is fully processed.
- async stop() None ¶
Signals this processor to stop (sets the internal stop event).
- async start() None ¶
Clears the stop signal, allowing the processor to continue.
- is_stopped() bool ¶
Checks whether the processor is in a stopped state.
- async process() None ¶
Retrieves and processes events from the queue, up to
available_capacity
. Each event’sstatus
is set toPROCESSING
if it passes a permission check (request_permission()
). After processing these events, the capacity is reset if any were processed.
- async request_permission(**kwargs: Any) bool ¶
Determines whether an event is allowed to proceed. Subclasses can override to implement rate limiting, user permissions, or other custom checks.
Executor¶
- class lionagi.protocols.generic.processor.Executor(Observer)¶
Inherits from:
Observer
A higher-level manager that maintains a pile of events and forwards them to an internal
Processor
for asynchronous execution. This allows classes to store events locally, possibly filter or arrange them, and then dispatch them in batches.Attributes¶
- processor_typeClassVar[type[Processor]]
Declares which Processor subclass to use for event handling.
- processor_configdict[str, Any]
A dictionary of initialization arguments for
processor
.- pendingProgression
Tracks IDs of events that are ready to be forwarded to the processor.
- processorProcessor | None
The internal processor instance; created lazily if None.
- pilePile[Event]
A pile storing all events, enabling concurrency-safe access and type constraints.
Initialization¶
- __init__(processor_config: dict[str, Any] | None = None, strict_event_type: bool = False) None ¶
Args¶
- processor_configdict[str, Any] | None
Configuration parameters for creating the processor (e.g., capacity, refresh time).
- strict_event_typebool
If True, the underlying
Pile
enforces exact type matching forEvent
objects.
Properties¶
- event_type¶
The type of event the processor handles (mirrors
processor_type.event_type
).
- strict_event_type¶
Whether the Pile requires exact type checks for added events.
Methods¶
- async forward() None ¶
Sends all pending events from the
pile
to theprocessor
, then callsprocessor.process()
immediately.
- async start() None ¶
If the processor does not exist, create it with
_create_processor()
. Then callsprocessor.start()
to enable event processing.
- async stop() None ¶
Stops the processor if it is active.
- async append(event: Event) None ¶
Adds a new
Event
to the pile and queues its ID inpending
.
- completed_events -> Pile[Event]
- Returns a new pile containing all events marked :attr:`~lionagi.protocols.generic.event.EventStatus.COMPLETED`.
- pending_events -> Pile[Event]
- Returns a new pile containing all events still at :attr:`~lionagi.protocols.generic.event.EventStatus.PENDING`.
- failed_events -> Pile[Event]
- Returns a new pile containing all events at :attr:`~lionagi.protocols.generic.event.EventStatus.FAILED`.
- __contains__(ref: ID.Ref) bool ¶
Checks if a given event or event ID reference is stored in
pile
.
Private Methods¶
- async _create_processor() None ¶
Private instance method that instantiates the
processor
using the storedprocessor_config
.
File Location¶
Source File:
lionagi/protocols/generic/processor.py
Copyright (c) 2023 - 2024, HaiyangLi <quantocean.li at gmail dot com>
SPDX-License-Identifier: Apache-2.0