Skip to content

API Reference

Complete API documentation for LLMRateLimiter.

Main Module

The main module exports all public classes and functions.

LLM Rate Limiter - Client-side rate limiting for LLM API calls.

This library provides FIFO queue-based rate limiting to prevent hitting provider rate limits (TPM/RPM) when calling LLM APIs.

Basic usage (recommended: specify input and output tokens separately): >>> from llmratelimiter import RateLimiter >>> >>> limiter = RateLimiter("redis://localhost:6379", "gpt-4", tpm=100_000, rpm=100) >>> await limiter.acquire(input_tokens=3000, output_tokens=2000) >>> response = await openai.chat.completions.create(...)

With existing Redis client

from llmratelimiter import RateLimiter from redis.asyncio import Redis

redis = Redis(host="localhost", port=6379) limiter = RateLimiter(redis=redis, model="gpt-4", tpm=100_000, rpm=100) await limiter.acquire(input_tokens=3000, output_tokens=2000)

With connection manager (includes retry with exponential backoff): >>> from llmratelimiter import RateLimiter, RedisConnectionManager, RetryConfig >>> >>> manager = RedisConnectionManager( ... "redis://localhost:6379", ... retry_config=RetryConfig(max_retries=3, base_delay=0.1), ... ) >>> limiter = RateLimiter(manager, "gpt-4", tpm=100_000, rpm=100) >>> await limiter.acquire(input_tokens=3000, output_tokens=2000)

Split mode example (GCP Vertex AI): >>> limiter = RateLimiter( ... "redis://localhost:6379", "gemini-1.5-pro", ... input_tpm=4_000_000, output_tpm=128_000, rpm=360 ... ) >>> result = await limiter.acquire(input_tokens=5000, output_tokens=2048) >>> response = await vertex_ai.generate(...) >>> await limiter.adjust(result.record_id, actual_output=response.output_tokens)

AWS Bedrock with burndown rate (output tokens count 5x toward TPM): >>> limiter = RateLimiter( ... "redis://localhost:6379", "claude-sonnet", ... tpm=100_000, rpm=100, burndown_rate=5.0 ... ) >>> await limiter.acquire(input_tokens=3000, output_tokens=1000) # TPM consumption: 3000 + (5.0 * 1000) = 8000 tokens

RateLimiter

Unified rate limiter for LLM API calls.

Supports combined TPM, split TPM, or both based on the configuration.

Simple URL example

limiter = RateLimiter("redis://localhost:6379", "gpt-4", tpm=100_000, rpm=100) await limiter.acquire(tokens=5000)

Split mode example (GCP Vertex AI): >>> limiter = RateLimiter("redis://localhost", "gemini-1.5-pro", ... input_tpm=4_000_000, output_tpm=128_000, rpm=360) >>> result = await limiter.acquire(input_tokens=5000, output_tokens=2048) >>> await limiter.adjust(result.record_id, actual_output=1500)

With existing Redis client

limiter = RateLimiter(redis=existing_client, model="gpt-4", tpm=100_000, rpm=100)

With connection manager (includes retry support): >>> manager = RedisConnectionManager("redis://localhost", retry_config=RetryConfig()) >>> limiter = RateLimiter(manager, "gpt-4", tpm=100_000, rpm=100)

With config object (advanced): >>> config = RateLimitConfig(tpm=100_000, rpm=100, burst_multiplier=1.5) >>> limiter = RateLimiter("redis://localhost", "gpt-4", config=config)

AWS Bedrock with burndown rate (output tokens count 5x): >>> limiter = RateLimiter("redis://localhost", "claude-sonnet", ... tpm=100_000, rpm=100, burndown_rate=5.0) >>> await limiter.acquire(input_tokens=3000, output_tokens=1000) # TPM consumption: 3000 + (5.0 * 1000) = 8000 tokens

Azure OpenAI with RPS smoothing (burst prevention): >>> limiter = RateLimiter("redis://localhost", "gpt-4", ... tpm=300_000, rpm=600, smooth_requests=True) # Auto-calculates RPS = 600/60 = 10, enforces 100ms minimum gap

>>> limiter = RateLimiter("redis://localhost", "gpt-4",
...                       tpm=300_000, rpm=600, rps=8)
# Explicit RPS, auto-enables smoothing, enforces 125ms minimum gap

has_combined_limit property

has_combined_limit: bool

Whether this limiter has a combined TPM limit.

is_split_mode property

is_split_mode: bool

Whether this limiter uses split input/output TPM limits.

__init__

__init__(
    redis: RedisClient | None = None,
    model: str | None = None,
    config: RateLimitConfig | None = None,
    *,
    tpm: int = 0,
    rpm: int = 0,
    input_tpm: int = 0,
    output_tpm: int = 0,
    window_seconds: int = 60,
    burst_multiplier: float = 1.0,
    burndown_rate: float = 1.0,
    smooth_requests: bool = True,
    rps: int = 0,
    smoothing_interval: float = 1.0,
    password: str | None = None,
    db: int = 0,
    max_connections: int = 10,
    retry_config: RetryConfig | None = None,
    redis_client: Redis
    | RedisConnectionManager
    | None = None,
    model_name: str | None = None,
) -> None

Initialize the rate limiter.

Parameters:

Name Type Description Default
redis RedisClient | None

Redis URL string, async Redis client, or RedisConnectionManager.

None
model str | None

Name of the model (used for Redis key namespace).

None
config RateLimitConfig | None

Configuration for rate limits (optional if using kwargs).

None
tpm int

Combined tokens per minute limit.

0
rpm int

Requests per minute limit.

0
input_tpm int

Input tokens per minute limit (split mode).

0
output_tpm int

Output tokens per minute limit (split mode).

0
window_seconds int

Sliding window duration in seconds.

60
burst_multiplier float

Multiplier for burst capacity.

1.0
burndown_rate float

Output token multiplier for combined TPM (default 1.0). AWS Bedrock Claude models use 5.0.

1.0
smooth_requests bool

Enable RPS smoothing to prevent burst-triggered rate limits. When True, auto-calculates RPS from RPM. Default True.

True
rps int

Explicit requests-per-second limit. When set > 0, auto-enables smoothing. Set to 0 to auto-calculate from RPM when smooth_requests=True.

0
smoothing_interval float

Evaluation window in seconds for RPS enforcement. Azure uses 1.0s intervals. Default 1.0.

1.0
password str | None

Redis password (for URL connections).

None
db int

Redis database number (for URL connections).

0
max_connections int

Maximum connections in pool (for URL connections).

10
retry_config RetryConfig | None

Retry configuration for URL-based connections.

None
redis_client Redis | RedisConnectionManager | None

Deprecated, use 'redis' parameter.

None
model_name str | None

Deprecated, use 'model' parameter.

None

acquire async

acquire(*, tokens: int) -> AcquireResult
acquire(
    *, input_tokens: int, output_tokens: int = 0
) -> AcquireResult
acquire(
    *,
    tokens: int | None = None,
    input_tokens: int | None = None,
    output_tokens: int = 0,
) -> AcquireResult

Acquire rate limit capacity.

For combined mode with pre-calculated tokens, use tokens parameter: await limiter.acquire(tokens=5000) # Burndown rate is NOT applied - value is used directly

For separate input/output tracking, use input_tokens/output_tokens: await limiter.acquire(input_tokens=5000, output_tokens=2048) # Burndown rate IS applied: effective = input + (burndown_rate * output)

With burndown rate (e.g., AWS Bedrock with burndown_rate=5.0): await limiter.acquire(input_tokens=3000, output_tokens=1000) # TPM consumption: 3000 + (5.0 * 1000) = 8000 tokens

Blocks until capacity is available (FIFO ordering), then returns. On Redis failure (after retries if configured), allows the request (graceful degradation).

Note: The burndown_rate is only applied when using input_tokens/output_tokens. When using the tokens= parameter, it is assumed the burndown calculation has already been done by the caller. Split input/output TPM limits are not affected by burndown_rate.

Parameters:

Name Type Description Default
tokens int | None

Pre-calculated total tokens (burndown already applied if needed).

None
input_tokens int | None

Number of input tokens.

None
output_tokens int

Number of output tokens (default 0).

0

Returns:

Type Description
AcquireResult

AcquireResult with slot time, wait time, queue position, and record ID.

adjust async

adjust(record_id: str, actual_output: int) -> None

Adjust the output tokens for a consumption record.

Use this when the actual output tokens differ from the estimate. This frees up capacity if actual < estimated, or uses more if actual > estimated.

Parameters:

Name Type Description Default
record_id str

The record ID from the acquire() result.

required
actual_output int

The actual number of output tokens.

required

get_status async

get_status() -> RateLimitStatus

Get current rate limit status.

Returns:

Type Description
RateLimitStatus

RateLimitStatus with current usage and limits.

RateLimitConfig dataclass

Unified configuration for rate limiting.

Supports combined TPM, split TPM, or both. Set unused limits to 0 to disable.

Combined mode only

RateLimitConfig(tpm=100_000, rpm=100)

Split mode only

RateLimitConfig(input_tpm=4_000_000, output_tpm=128_000, rpm=360)

Mixed mode (all three limits): RateLimitConfig(tpm=100_000, input_tpm=80_000, output_tpm=20_000, rpm=100) # Request must satisfy ALL constraints

Disabling limits
  • Set rpm=0 to disable request rate limiting
  • Set tpm=0 to disable combined token limiting
  • Set input_tpm=0 or output_tpm=0 to disable that specific limit

Burndown rate (AWS Bedrock): RateLimitConfig(tpm=100_000, rpm=100, burndown_rate=5.0) # TPM consumption = input_tokens + (burndown_rate * output_tokens)

RPS smoothing (Azure OpenAI burst prevention): RateLimitConfig(tpm=300_000, rpm=600, smooth_requests=True) # Auto-calculates RPS = 600/60 = 10, enforces 100ms minimum gap

RateLimitConfig(tpm=300_000, rpm=600, rps=8)
# Explicit RPS, auto-enables smoothing, enforces 125ms minimum gap

Parameters:

Name Type Description Default
rpm int

Requests per minute limit. Set to 0 to disable.

required
tpm int

Combined tokens per minute limit (input + output). Set to 0 to disable.

0
input_tpm int

Input tokens per minute limit. Set to 0 to disable.

0
output_tpm int

Output tokens per minute limit. Set to 0 to disable.

0
window_seconds int

Sliding window duration in seconds.

60
burst_multiplier float

Multiplier for burst capacity above base limits.

1.0
burndown_rate float

Output token multiplier for combined TPM (default 1.0). AWS Bedrock Claude models use 5.0.

1.0
smooth_requests bool

Enable RPS smoothing to prevent burst-triggered rate limits. When True, auto-calculates RPS from RPM. Default True.

True
rps int

Explicit requests-per-second limit. When set > 0, auto-enables smoothing. Set to 0 to auto-calculate from RPM when smooth_requests=True.

0
smoothing_interval float

Evaluation window in seconds for RPS enforcement. Azure uses 1.0s intervals. Default 1.0.

1.0

effective_rps property

effective_rps: float

Calculate effective RPS limit.

Returns:

Type Description
float

Explicit rps if set, otherwise rpm/60 if smoothing enabled, else 0.

has_combined_limit property

has_combined_limit: bool

Whether this config has a combined TPM limit.

is_smoothing_enabled property

is_smoothing_enabled: bool

Whether RPS smoothing is active.

Smoothing is enabled when either: - smooth_requests=True (auto-calculate RPS from RPM) - rps > 0 (explicit RPS, auto-enables smoothing)

is_split_mode property

is_split_mode: bool

Whether this config uses split input/output TPM limits.

__post_init__

__post_init__() -> None

Validate configuration values.

RedisConnectionManager

Manages Redis connections with pooling and retry support.

Example with URL

async with RedisConnectionManager("redis://localhost:6379") as manager: ... client = manager.client ... await client.ping()

Example with host/port: >>> manager = RedisConnectionManager( ... host="localhost", ... port=6379, ... retry_config=RetryConfig(max_retries=5, base_delay=0.2), ... ) >>> limiter = RateLimiter(manager, "gpt-4", tpm=100_000, rpm=100)

client property

client: Redis

Get the Redis client, creating the pool if needed.

retry_config property

retry_config: RetryConfig

Get the retry configuration.

__aenter__ async

__aenter__() -> RedisConnectionManager

Enter async context manager.

__aexit__ async

__aexit__(*args: Any) -> None

Exit async context manager, closing connections.

__init__

__init__(
    url: str | None = None,
    *,
    host: str = "localhost",
    port: int = 6379,
    db: int = 0,
    password: str | None = None,
    max_connections: int = 10,
    retry_config: RetryConfig | None = None,
    decode_responses: bool = True,
    **redis_kwargs: Any,
) -> None

Initialize the connection manager.

Parameters:

Name Type Description Default
url str | None

Redis URL (e.g., "redis://localhost:6379/0", "rediss://..." for SSL).

None
host str

Redis server hostname (used if url is not provided).

'localhost'
port int

Redis server port (used if url is not provided).

6379
db int

Redis database number.

0
password str | None

Redis password.

None
max_connections int

Maximum connections in the pool.

10
retry_config RetryConfig | None

Configuration for retry behavior. Defaults to RetryConfig().

None
decode_responses bool

Whether to decode responses to strings.

True
**redis_kwargs Any

Additional arguments passed to Redis client.

{}

close async

close() -> None

Close all connections in the pool.

RetryConfig dataclass

Configuration for retry behavior with exponential backoff.

Parameters:

Name Type Description Default
max_retries int

Maximum number of retry attempts (0 = no retries).

3
base_delay float

Initial delay in seconds before first retry.

0.1
max_delay float

Maximum delay in seconds between retries.

5.0
exponential_base float

Multiplier for exponential backoff (delay * base^attempt).

2.0
jitter float

Random jitter factor (0.0 to 1.0) to prevent thundering herd.

0.1
Example

config = RetryConfig(max_retries=3, base_delay=0.1)

Retry delays: ~0.1s, ~0.2s, ~0.4s (with jitter)

__post_init__

__post_init__() -> None

Validate configuration values.

AcquireResult dataclass

Result from an acquire() call.

Attributes:

Name Type Description
slot_time float

The timestamp when the request is scheduled to execute.

wait_time float

Time in seconds the caller waited (or will wait).

queue_position int

Position in the FIFO queue (0 if immediate).

record_id str

Unique ID for this consumption record (for adjust()).

RateLimitStatus dataclass

Current status of a rate limiter.

Unified status for both combined and split mode limiters. Unused fields are set to 0.

Combined mode (tpm > 0): - tokens_used/tokens_limit contain combined token usage - input_tokens_used/input_tokens_limit are 0 - output_tokens_used/output_tokens_limit are 0

Split mode (input_tpm/output_tpm > 0): - tokens_used/tokens_limit are 0 - input_tokens_used/input_tokens_limit contain input token usage - output_tokens_used/output_tokens_limit contain output token usage

Attributes:

Name Type Description
model str

The model name this limiter is for.

window_seconds int

The sliding window duration.

tokens_used int

Current combined tokens consumed (combined mode).

tokens_limit int

Maximum combined tokens allowed (combined mode).

input_tokens_used int

Current input tokens consumed (split mode).

input_tokens_limit int

Maximum input tokens allowed (split mode).

output_tokens_used int

Current output tokens consumed (split mode).

output_tokens_limit int

Maximum output tokens allowed (split mode).

requests_used int

Current requests in the window.

requests_limit int

Maximum requests allowed per window.

queue_depth int

Number of pending requests (slot_time > now).

Configuration

Configuration dataclasses for rate limits and retry behavior.

Configuration dataclasses for rate limiters.

RateLimitConfig dataclass

Unified configuration for rate limiting.

Supports combined TPM, split TPM, or both. Set unused limits to 0 to disable.

Combined mode only

RateLimitConfig(tpm=100_000, rpm=100)

Split mode only

RateLimitConfig(input_tpm=4_000_000, output_tpm=128_000, rpm=360)

Mixed mode (all three limits): RateLimitConfig(tpm=100_000, input_tpm=80_000, output_tpm=20_000, rpm=100) # Request must satisfy ALL constraints

Disabling limits
  • Set rpm=0 to disable request rate limiting
  • Set tpm=0 to disable combined token limiting
  • Set input_tpm=0 or output_tpm=0 to disable that specific limit

Burndown rate (AWS Bedrock): RateLimitConfig(tpm=100_000, rpm=100, burndown_rate=5.0) # TPM consumption = input_tokens + (burndown_rate * output_tokens)

RPS smoothing (Azure OpenAI burst prevention): RateLimitConfig(tpm=300_000, rpm=600, smooth_requests=True) # Auto-calculates RPS = 600/60 = 10, enforces 100ms minimum gap

RateLimitConfig(tpm=300_000, rpm=600, rps=8)
# Explicit RPS, auto-enables smoothing, enforces 125ms minimum gap

Parameters:

Name Type Description Default
rpm int

Requests per minute limit. Set to 0 to disable.

required
tpm int

Combined tokens per minute limit (input + output). Set to 0 to disable.

0
input_tpm int

Input tokens per minute limit. Set to 0 to disable.

0
output_tpm int

Output tokens per minute limit. Set to 0 to disable.

0
window_seconds int

Sliding window duration in seconds.

60
burst_multiplier float

Multiplier for burst capacity above base limits.

1.0
burndown_rate float

Output token multiplier for combined TPM (default 1.0). AWS Bedrock Claude models use 5.0.

1.0
smooth_requests bool

Enable RPS smoothing to prevent burst-triggered rate limits. When True, auto-calculates RPS from RPM. Default True.

True
rps int

Explicit requests-per-second limit. When set > 0, auto-enables smoothing. Set to 0 to auto-calculate from RPM when smooth_requests=True.

0
smoothing_interval float

Evaluation window in seconds for RPS enforcement. Azure uses 1.0s intervals. Default 1.0.

1.0
Source code in src/llmratelimiter/config.py
@dataclass(frozen=True)
class RateLimitConfig:
    """Unified configuration for rate limiting.

    Supports combined TPM, split TPM, or both. Set unused limits to 0 to disable.

    Combined mode only:
        RateLimitConfig(tpm=100_000, rpm=100)

    Split mode only:
        RateLimitConfig(input_tpm=4_000_000, output_tpm=128_000, rpm=360)

    Mixed mode (all three limits):
        RateLimitConfig(tpm=100_000, input_tpm=80_000, output_tpm=20_000, rpm=100)
        # Request must satisfy ALL constraints

    Disabling limits:
        - Set rpm=0 to disable request rate limiting
        - Set tpm=0 to disable combined token limiting
        - Set input_tpm=0 or output_tpm=0 to disable that specific limit

    Burndown rate (AWS Bedrock):
        RateLimitConfig(tpm=100_000, rpm=100, burndown_rate=5.0)
        # TPM consumption = input_tokens + (burndown_rate * output_tokens)

    RPS smoothing (Azure OpenAI burst prevention):
        RateLimitConfig(tpm=300_000, rpm=600, smooth_requests=True)
        # Auto-calculates RPS = 600/60 = 10, enforces 100ms minimum gap

        RateLimitConfig(tpm=300_000, rpm=600, rps=8)
        # Explicit RPS, auto-enables smoothing, enforces 125ms minimum gap

    Args:
        rpm: Requests per minute limit. Set to 0 to disable.
        tpm: Combined tokens per minute limit (input + output). Set to 0 to disable.
        input_tpm: Input tokens per minute limit. Set to 0 to disable.
        output_tpm: Output tokens per minute limit. Set to 0 to disable.
        window_seconds: Sliding window duration in seconds.
        burst_multiplier: Multiplier for burst capacity above base limits.
        burndown_rate: Output token multiplier for combined TPM (default 1.0).
            AWS Bedrock Claude models use 5.0.
        smooth_requests: Enable RPS smoothing to prevent burst-triggered rate limits.
            When True, auto-calculates RPS from RPM. Default True.
        rps: Explicit requests-per-second limit. When set > 0, auto-enables smoothing.
            Set to 0 to auto-calculate from RPM when smooth_requests=True.
        smoothing_interval: Evaluation window in seconds for RPS enforcement.
            Azure uses 1.0s intervals. Default 1.0.
    """

    rpm: int
    tpm: int = 0
    input_tpm: int = 0
    output_tpm: int = 0
    window_seconds: int = 60
    burst_multiplier: float = 1.0
    burndown_rate: float = 1.0
    smooth_requests: bool = True
    rps: int = 0
    smoothing_interval: float = 1.0

    def __post_init__(self) -> None:
        """Validate configuration values."""
        if self.burndown_rate < 0:
            raise ValueError("burndown_rate must be >= 0")
        # Handle rps=None by treating it as 0 (disabled), and validate it's not negative
        if self.rps is not None and self.rps < 0:
            raise ValueError("rps must be >= 0")
        if self.smoothing_interval <= 0:
            raise ValueError("smoothing_interval must be > 0")

    @property
    def is_split_mode(self) -> bool:
        """Whether this config uses split input/output TPM limits."""
        return self.input_tpm > 0 or self.output_tpm > 0

    @property
    def has_combined_limit(self) -> bool:
        """Whether this config has a combined TPM limit."""
        return self.tpm > 0

    @property
    def is_smoothing_enabled(self) -> bool:
        """Whether RPS smoothing is active.

        Smoothing is enabled when either:
        - smooth_requests=True (auto-calculate RPS from RPM)
        - rps > 0 (explicit RPS, auto-enables smoothing)
        """
        # Handle rps=None as 0 (disabled)
        rps_val = self.rps if self.rps is not None else 0
        return rps_val > 0 or self.smooth_requests

    @property
    def effective_rps(self) -> float:
        """Calculate effective RPS limit.

        Returns:
            Explicit rps if set, otherwise rpm/60 if smoothing enabled, else 0.
        """
        # Handle rps=None as 0 (disabled)
        rps_val = self.rps if self.rps is not None else 0
        if rps_val > 0:
            return float(rps_val)
        if self.smooth_requests and self.rpm > 0:
            return self.rpm / 60.0
        return 0.0

effective_rps property

effective_rps: float

Calculate effective RPS limit.

Returns:

Type Description
float

Explicit rps if set, otherwise rpm/60 if smoothing enabled, else 0.

has_combined_limit property

has_combined_limit: bool

Whether this config has a combined TPM limit.

is_smoothing_enabled property

is_smoothing_enabled: bool

Whether RPS smoothing is active.

Smoothing is enabled when either: - smooth_requests=True (auto-calculate RPS from RPM) - rps > 0 (explicit RPS, auto-enables smoothing)

is_split_mode property

is_split_mode: bool

Whether this config uses split input/output TPM limits.

__post_init__

__post_init__() -> None

Validate configuration values.

Source code in src/llmratelimiter/config.py
def __post_init__(self) -> None:
    """Validate configuration values."""
    if self.burndown_rate < 0:
        raise ValueError("burndown_rate must be >= 0")
    # Handle rps=None by treating it as 0 (disabled), and validate it's not negative
    if self.rps is not None and self.rps < 0:
        raise ValueError("rps must be >= 0")
    if self.smoothing_interval <= 0:
        raise ValueError("smoothing_interval must be > 0")

RetryConfig dataclass

Configuration for retry behavior with exponential backoff.

Parameters:

Name Type Description Default
max_retries int

Maximum number of retry attempts (0 = no retries).

3
base_delay float

Initial delay in seconds before first retry.

0.1
max_delay float

Maximum delay in seconds between retries.

5.0
exponential_base float

Multiplier for exponential backoff (delay * base^attempt).

2.0
jitter float

Random jitter factor (0.0 to 1.0) to prevent thundering herd.

0.1
Example

config = RetryConfig(max_retries=3, base_delay=0.1)

Retry delays: ~0.1s, ~0.2s, ~0.4s (with jitter)

Source code in src/llmratelimiter/config.py
@dataclass(frozen=True)
class RetryConfig:
    """Configuration for retry behavior with exponential backoff.

    Args:
        max_retries: Maximum number of retry attempts (0 = no retries).
        base_delay: Initial delay in seconds before first retry.
        max_delay: Maximum delay in seconds between retries.
        exponential_base: Multiplier for exponential backoff (delay * base^attempt).
        jitter: Random jitter factor (0.0 to 1.0) to prevent thundering herd.

    Example:
        >>> config = RetryConfig(max_retries=3, base_delay=0.1)
        # Retry delays: ~0.1s, ~0.2s, ~0.4s (with jitter)
    """

    max_retries: int = 3
    base_delay: float = 0.1
    max_delay: float = 5.0
    exponential_base: float = 2.0
    jitter: float = 0.1

    def __post_init__(self) -> None:
        """Validate configuration values."""
        if self.max_retries < 0:
            raise ValueError("max_retries must be >= 0")
        if self.base_delay <= 0:
            raise ValueError("base_delay must be > 0")
        if self.max_delay < self.base_delay:
            raise ValueError("max_delay must be >= base_delay")
        if self.exponential_base < 1:
            raise ValueError("exponential_base must be >= 1")
        if not 0 <= self.jitter <= 1:
            raise ValueError("jitter must be between 0 and 1")

__post_init__

__post_init__() -> None

Validate configuration values.

Source code in src/llmratelimiter/config.py
def __post_init__(self) -> None:
    """Validate configuration values."""
    if self.max_retries < 0:
        raise ValueError("max_retries must be >= 0")
    if self.base_delay <= 0:
        raise ValueError("base_delay must be > 0")
    if self.max_delay < self.base_delay:
        raise ValueError("max_delay must be >= base_delay")
    if self.exponential_base < 1:
        raise ValueError("exponential_base must be >= 1")
    if not 0 <= self.jitter <= 1:
        raise ValueError("jitter must be between 0 and 1")

Connection Management

Redis connection pooling and retry logic.

Redis connection management with pooling and retry support.

RedisConnectionManager

Manages Redis connections with pooling and retry support.

Example with URL

async with RedisConnectionManager("redis://localhost:6379") as manager: ... client = manager.client ... await client.ping()

Example with host/port: >>> manager = RedisConnectionManager( ... host="localhost", ... port=6379, ... retry_config=RetryConfig(max_retries=5, base_delay=0.2), ... ) >>> limiter = RateLimiter(manager, "gpt-4", tpm=100_000, rpm=100)

Source code in src/llmratelimiter/connection.py
class RedisConnectionManager:
    """Manages Redis connections with pooling and retry support.

    Example with URL:
        >>> async with RedisConnectionManager("redis://localhost:6379") as manager:
        ...     client = manager.client
        ...     await client.ping()

    Example with host/port:
        >>> manager = RedisConnectionManager(
        ...     host="localhost",
        ...     port=6379,
        ...     retry_config=RetryConfig(max_retries=5, base_delay=0.2),
        ... )
        >>> limiter = RateLimiter(manager, "gpt-4", tpm=100_000, rpm=100)
    """

    def __init__(
        self,
        url: str | None = None,
        *,
        host: str = "localhost",
        port: int = 6379,
        db: int = 0,
        password: str | None = None,
        max_connections: int = 10,
        retry_config: RetryConfig | None = None,
        decode_responses: bool = True,
        **redis_kwargs: Any,
    ) -> None:
        """Initialize the connection manager.

        Args:
            url: Redis URL (e.g., "redis://localhost:6379/0", "rediss://..." for SSL).
            host: Redis server hostname (used if url is not provided).
            port: Redis server port (used if url is not provided).
            db: Redis database number.
            password: Redis password.
            max_connections: Maximum connections in the pool.
            retry_config: Configuration for retry behavior. Defaults to RetryConfig().
            decode_responses: Whether to decode responses to strings.
            **redis_kwargs: Additional arguments passed to Redis client.
        """
        self._url = url
        self._host = host
        self._port = port
        self._db = db
        self._password = password
        self._max_connections = max_connections
        self._retry_config = retry_config or RetryConfig()
        self._decode_responses = decode_responses
        self._redis_kwargs = redis_kwargs

        self._pool: ConnectionPool | None = None
        self._client: Redis | None = None

    @property
    def retry_config(self) -> RetryConfig:
        """Get the retry configuration."""
        return self._retry_config

    @property
    def client(self) -> Redis:
        """Get the Redis client, creating the pool if needed."""
        if self._client is None:
            # Build common kwargs
            pool_kwargs: dict[str, Any] = {
                "max_connections": self._max_connections,
                "decode_responses": self._decode_responses,
                **self._redis_kwargs,
            }

            if self._url is not None:
                # Use URL-based connection pool (use rediss:// for SSL)
                # Override db/password if explicitly provided
                if self._db != 0:
                    pool_kwargs["db"] = self._db
                if self._password is not None:
                    pool_kwargs["password"] = self._password

                self._pool = ConnectionPool.from_url(self._url, **pool_kwargs)
            else:
                # Use host/port-based connection pool
                self._pool = ConnectionPool(
                    host=self._host,
                    port=self._port,
                    db=self._db,
                    password=self._password,
                    **pool_kwargs,
                )
            self._client = Redis(connection_pool=self._pool)
        return self._client

    async def close(self) -> None:
        """Close all connections in the pool."""
        if self._client is not None:
            await self._client.aclose()
            self._client = None
        if self._pool is not None:
            await self._pool.disconnect()
            self._pool = None

    async def __aenter__(self) -> "RedisConnectionManager":
        """Enter async context manager."""
        return self

    async def __aexit__(self, *args: Any) -> None:
        """Exit async context manager, closing connections."""
        await self.close()

client property

client: Redis

Get the Redis client, creating the pool if needed.

retry_config property

retry_config: RetryConfig

Get the retry configuration.

__aenter__ async

__aenter__() -> RedisConnectionManager

Enter async context manager.

Source code in src/llmratelimiter/connection.py
async def __aenter__(self) -> "RedisConnectionManager":
    """Enter async context manager."""
    return self

__aexit__ async

__aexit__(*args: Any) -> None

Exit async context manager, closing connections.

Source code in src/llmratelimiter/connection.py
async def __aexit__(self, *args: Any) -> None:
    """Exit async context manager, closing connections."""
    await self.close()

__init__

__init__(
    url: str | None = None,
    *,
    host: str = "localhost",
    port: int = 6379,
    db: int = 0,
    password: str | None = None,
    max_connections: int = 10,
    retry_config: RetryConfig | None = None,
    decode_responses: bool = True,
    **redis_kwargs: Any,
) -> None

Initialize the connection manager.

Parameters:

Name Type Description Default
url str | None

Redis URL (e.g., "redis://localhost:6379/0", "rediss://..." for SSL).

None
host str

Redis server hostname (used if url is not provided).

'localhost'
port int

Redis server port (used if url is not provided).

6379
db int

Redis database number.

0
password str | None

Redis password.

None
max_connections int

Maximum connections in the pool.

10
retry_config RetryConfig | None

Configuration for retry behavior. Defaults to RetryConfig().

None
decode_responses bool

Whether to decode responses to strings.

True
**redis_kwargs Any

Additional arguments passed to Redis client.

{}
Source code in src/llmratelimiter/connection.py
def __init__(
    self,
    url: str | None = None,
    *,
    host: str = "localhost",
    port: int = 6379,
    db: int = 0,
    password: str | None = None,
    max_connections: int = 10,
    retry_config: RetryConfig | None = None,
    decode_responses: bool = True,
    **redis_kwargs: Any,
) -> None:
    """Initialize the connection manager.

    Args:
        url: Redis URL (e.g., "redis://localhost:6379/0", "rediss://..." for SSL).
        host: Redis server hostname (used if url is not provided).
        port: Redis server port (used if url is not provided).
        db: Redis database number.
        password: Redis password.
        max_connections: Maximum connections in the pool.
        retry_config: Configuration for retry behavior. Defaults to RetryConfig().
        decode_responses: Whether to decode responses to strings.
        **redis_kwargs: Additional arguments passed to Redis client.
    """
    self._url = url
    self._host = host
    self._port = port
    self._db = db
    self._password = password
    self._max_connections = max_connections
    self._retry_config = retry_config or RetryConfig()
    self._decode_responses = decode_responses
    self._redis_kwargs = redis_kwargs

    self._pool: ConnectionPool | None = None
    self._client: Redis | None = None

close async

close() -> None

Close all connections in the pool.

Source code in src/llmratelimiter/connection.py
async def close(self) -> None:
    """Close all connections in the pool."""
    if self._client is not None:
        await self._client.aclose()
        self._client = None
    if self._pool is not None:
        await self._pool.disconnect()
        self._pool = None

calculate_delay

calculate_delay(attempt: int, config: RetryConfig) -> float

Calculate delay for a retry attempt with exponential backoff and jitter.

Parameters:

Name Type Description Default
attempt int

The retry attempt number (0-indexed).

required
config RetryConfig

Retry configuration.

required

Returns:

Type Description
float

Delay in seconds before the next retry.

Source code in src/llmratelimiter/connection.py
def calculate_delay(attempt: int, config: RetryConfig) -> float:
    """Calculate delay for a retry attempt with exponential backoff and jitter.

    Args:
        attempt: The retry attempt number (0-indexed).
        config: Retry configuration.

    Returns:
        Delay in seconds before the next retry.
    """
    # Exponential backoff: base_delay * (exponential_base ** attempt)
    delay = config.base_delay * (config.exponential_base**attempt)

    # Cap at max_delay
    delay = min(delay, config.max_delay)

    # Add jitter: ±jitter% randomization
    if config.jitter > 0:
        jitter_range = delay * config.jitter
        delay += random.uniform(-jitter_range, jitter_range)

    return max(0, delay)  # Never negative

retry_with_backoff async

retry_with_backoff(
    operation: Callable[[], Awaitable[T]],
    config: RetryConfig,
    operation_name: str = "operation",
) -> T

Execute an async operation with exponential backoff retry.

Parameters:

Name Type Description Default
operation Callable[[], Awaitable[T]]

Async callable to execute.

required
config RetryConfig

Retry configuration.

required
operation_name str

Name for logging purposes.

'operation'

Returns:

Type Description
T

Result of the operation.

Raises:

Type Description
Exception

The last exception if all retries are exhausted.

Source code in src/llmratelimiter/connection.py
async def retry_with_backoff(
    operation: Callable[[], Awaitable[T]],
    config: RetryConfig,
    operation_name: str = "operation",
) -> T:
    """Execute an async operation with exponential backoff retry.

    Args:
        operation: Async callable to execute.
        config: Retry configuration.
        operation_name: Name for logging purposes.

    Returns:
        Result of the operation.

    Raises:
        Exception: The last exception if all retries are exhausted.
    """
    last_exception: Exception | None = None

    for attempt in range(config.max_retries + 1):  # +1 for initial attempt
        try:
            return await operation()
        except NON_RETRYABLE_ERRORS:
            # Don't retry these - re-raise immediately
            raise
        except RETRYABLE_ERRORS as e:
            last_exception = e

            if attempt < config.max_retries:
                delay = calculate_delay(attempt, config)
                logger.warning(
                    "%s failed (attempt %d/%d), retrying in %.2fs: %s",
                    operation_name,
                    attempt + 1,
                    config.max_retries + 1,
                    delay,
                    e,
                )
                await asyncio.sleep(delay)
            else:
                logger.warning(
                    "%s failed after %d attempts: %s",
                    operation_name,
                    config.max_retries + 1,
                    e,
                )
        except Exception:
            # Unknown error - log and re-raise
            logger.exception("Unexpected error in %s", operation_name)
            raise

    # All retries exhausted
    if last_exception is not None:
        raise last_exception

    # Should never reach here, but satisfy type checker
    raise RuntimeError("Retry logic error")

Rate Limiter

The main rate limiter implementation.

Unified rate limiter implementation.

RateLimiter

Unified rate limiter for LLM API calls.

Supports combined TPM, split TPM, or both based on the configuration.

Simple URL example

limiter = RateLimiter("redis://localhost:6379", "gpt-4", tpm=100_000, rpm=100) await limiter.acquire(tokens=5000)

Split mode example (GCP Vertex AI): >>> limiter = RateLimiter("redis://localhost", "gemini-1.5-pro", ... input_tpm=4_000_000, output_tpm=128_000, rpm=360) >>> result = await limiter.acquire(input_tokens=5000, output_tokens=2048) >>> await limiter.adjust(result.record_id, actual_output=1500)

With existing Redis client

limiter = RateLimiter(redis=existing_client, model="gpt-4", tpm=100_000, rpm=100)

With connection manager (includes retry support): >>> manager = RedisConnectionManager("redis://localhost", retry_config=RetryConfig()) >>> limiter = RateLimiter(manager, "gpt-4", tpm=100_000, rpm=100)

With config object (advanced): >>> config = RateLimitConfig(tpm=100_000, rpm=100, burst_multiplier=1.5) >>> limiter = RateLimiter("redis://localhost", "gpt-4", config=config)

AWS Bedrock with burndown rate (output tokens count 5x): >>> limiter = RateLimiter("redis://localhost", "claude-sonnet", ... tpm=100_000, rpm=100, burndown_rate=5.0) >>> await limiter.acquire(input_tokens=3000, output_tokens=1000) # TPM consumption: 3000 + (5.0 * 1000) = 8000 tokens

Azure OpenAI with RPS smoothing (burst prevention): >>> limiter = RateLimiter("redis://localhost", "gpt-4", ... tpm=300_000, rpm=600, smooth_requests=True) # Auto-calculates RPS = 600/60 = 10, enforces 100ms minimum gap

>>> limiter = RateLimiter("redis://localhost", "gpt-4",
...                       tpm=300_000, rpm=600, rps=8)
# Explicit RPS, auto-enables smoothing, enforces 125ms minimum gap
Source code in src/llmratelimiter/limiter.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
class RateLimiter:
    """Unified rate limiter for LLM API calls.

    Supports combined TPM, split TPM, or both based on the configuration.

    Simple URL example:
        >>> limiter = RateLimiter("redis://localhost:6379", "gpt-4", tpm=100_000, rpm=100)
        >>> await limiter.acquire(tokens=5000)

    Split mode example (GCP Vertex AI):
        >>> limiter = RateLimiter("redis://localhost", "gemini-1.5-pro",
        ...                       input_tpm=4_000_000, output_tpm=128_000, rpm=360)
        >>> result = await limiter.acquire(input_tokens=5000, output_tokens=2048)
        >>> await limiter.adjust(result.record_id, actual_output=1500)

    With existing Redis client:
        >>> limiter = RateLimiter(redis=existing_client, model="gpt-4", tpm=100_000, rpm=100)

    With connection manager (includes retry support):
        >>> manager = RedisConnectionManager("redis://localhost", retry_config=RetryConfig())
        >>> limiter = RateLimiter(manager, "gpt-4", tpm=100_000, rpm=100)

    With config object (advanced):
        >>> config = RateLimitConfig(tpm=100_000, rpm=100, burst_multiplier=1.5)
        >>> limiter = RateLimiter("redis://localhost", "gpt-4", config=config)

    AWS Bedrock with burndown rate (output tokens count 5x):
        >>> limiter = RateLimiter("redis://localhost", "claude-sonnet",
        ...                       tpm=100_000, rpm=100, burndown_rate=5.0)
        >>> await limiter.acquire(input_tokens=3000, output_tokens=1000)
        # TPM consumption: 3000 + (5.0 * 1000) = 8000 tokens

    Azure OpenAI with RPS smoothing (burst prevention):
        >>> limiter = RateLimiter("redis://localhost", "gpt-4",
        ...                       tpm=300_000, rpm=600, smooth_requests=True)
        # Auto-calculates RPS = 600/60 = 10, enforces 100ms minimum gap

        >>> limiter = RateLimiter("redis://localhost", "gpt-4",
        ...                       tpm=300_000, rpm=600, rps=8)
        # Explicit RPS, auto-enables smoothing, enforces 125ms minimum gap
    """

    def __init__(
        self,
        redis: RedisClient | None = None,
        model: str | None = None,
        config: RateLimitConfig | None = None,
        *,
        # Rate limit kwargs (alternative to config)
        tpm: int = 0,
        rpm: int = 0,
        input_tpm: int = 0,
        output_tpm: int = 0,
        window_seconds: int = 60,
        burst_multiplier: float = 1.0,
        burndown_rate: float = 1.0,
        smooth_requests: bool = True,
        rps: int = 0,
        smoothing_interval: float = 1.0,
        # Redis connection kwargs (for URL connections)
        password: str | None = None,
        db: int = 0,
        max_connections: int = 10,
        retry_config: RetryConfig | None = None,
        # Legacy positional support
        redis_client: Redis | RedisConnectionManager | None = None,
        model_name: str | None = None,
    ) -> None:
        """Initialize the rate limiter.

        Args:
            redis: Redis URL string, async Redis client, or RedisConnectionManager.
            model: Name of the model (used for Redis key namespace).
            config: Configuration for rate limits (optional if using kwargs).
            tpm: Combined tokens per minute limit.
            rpm: Requests per minute limit.
            input_tpm: Input tokens per minute limit (split mode).
            output_tpm: Output tokens per minute limit (split mode).
            window_seconds: Sliding window duration in seconds.
            burst_multiplier: Multiplier for burst capacity.
            burndown_rate: Output token multiplier for combined TPM (default 1.0).
                AWS Bedrock Claude models use 5.0.
            smooth_requests: Enable RPS smoothing to prevent burst-triggered rate limits.
                When True, auto-calculates RPS from RPM. Default True.
            rps: Explicit requests-per-second limit. When set > 0, auto-enables smoothing.
                Set to 0 to auto-calculate from RPM when smooth_requests=True.
            smoothing_interval: Evaluation window in seconds for RPS enforcement.
                Azure uses 1.0s intervals. Default 1.0.
            password: Redis password (for URL connections).
            db: Redis database number (for URL connections).
            max_connections: Maximum connections in pool (for URL connections).
            retry_config: Retry configuration for URL-based connections.
            redis_client: Deprecated, use 'redis' parameter.
            model_name: Deprecated, use 'model' parameter.
        """
        # Handle legacy parameter names for backward compatibility
        if redis_client is not None and redis is None:
            redis = redis_client
        if model_name is not None and model is None:
            model = model_name

        if redis is None:
            raise ValueError("redis parameter is required (URL string, Redis client, or RedisConnectionManager)")
        if model is None:
            raise ValueError("model parameter is required")

        # Handle different redis parameter types
        if isinstance(redis, str):
            # URL string - create a connection manager
            self._manager: RedisConnectionManager | None = RedisConnectionManager(
                url=redis,
                password=password,
                db=db,
                max_connections=max_connections,
                retry_config=retry_config,
            )
            self.redis = self._manager.client
            self._retry_config: RetryConfig | None = self._manager.retry_config
        elif isinstance(redis, RedisConnectionManager):
            self._manager = redis
            self.redis = redis.client
            self._retry_config = redis.retry_config
        else:
            # Raw Redis client
            self._manager = None
            self.redis = redis
            self._retry_config = retry_config

        self.model_name = model

        # Build config from kwargs if not provided
        if config is None:
            config = RateLimitConfig(
                tpm=tpm,
                rpm=rpm,
                input_tpm=input_tpm,
                output_tpm=output_tpm,
                window_seconds=window_seconds,
                burst_multiplier=burst_multiplier,
                burndown_rate=burndown_rate,
                smooth_requests=smooth_requests,
                rps=rps,
                smoothing_interval=smoothing_interval,
            )

        self.window_seconds = config.window_seconds
        self.burst_multiplier = config.burst_multiplier
        self._burndown_rate = config.burndown_rate
        self._config = config

        # Calculate effective limits with burst multiplier
        self.rpm_limit = int(config.rpm * config.burst_multiplier) if config.rpm > 0 else 0
        self.tpm_limit = int(config.tpm * config.burst_multiplier) if config.tpm > 0 else 0
        self.input_tpm_limit = int(config.input_tpm * config.burst_multiplier) if config.input_tpm > 0 else 0
        self.output_tpm_limit = int(config.output_tpm * config.burst_multiplier) if config.output_tpm > 0 else 0

        # RPS smoothing settings
        self._rps_limit = config.effective_rps
        self._smoothing_interval = config.smoothing_interval

        # Redis key for consumption records
        self.consumption_key = f"rate_limit:{model}:consumption"

        # Lua scripts
        self._acquire_script = ACQUIRE_SCRIPT
        self._adjust_script = ADJUST_SCRIPT
        self._status_script = STATUS_SCRIPT

        # For testing - can be set to False to skip actual waiting
        self._should_wait = True

    @property
    def is_split_mode(self) -> bool:
        """Whether this limiter uses split input/output TPM limits."""
        return self._config.is_split_mode

    @property
    def has_combined_limit(self) -> bool:
        """Whether this limiter has a combined TPM limit."""
        return self._config.has_combined_limit

    @overload
    async def acquire(self, *, tokens: int) -> AcquireResult:
        """Acquire for combined mode - tokens counted as input."""
        ...

    @overload
    async def acquire(self, *, input_tokens: int, output_tokens: int = 0) -> AcquireResult:
        """Acquire for split/mixed mode."""
        ...

    async def acquire(
        self,
        *,
        tokens: int | None = None,
        input_tokens: int | None = None,
        output_tokens: int = 0,
    ) -> AcquireResult:
        """Acquire rate limit capacity.

        For combined mode with pre-calculated tokens, use tokens parameter:
            await limiter.acquire(tokens=5000)
            # Burndown rate is NOT applied - value is used directly

        For separate input/output tracking, use input_tokens/output_tokens:
            await limiter.acquire(input_tokens=5000, output_tokens=2048)
            # Burndown rate IS applied: effective = input + (burndown_rate * output)

        With burndown rate (e.g., AWS Bedrock with burndown_rate=5.0):
            await limiter.acquire(input_tokens=3000, output_tokens=1000)
            # TPM consumption: 3000 + (5.0 * 1000) = 8000 tokens

        Blocks until capacity is available (FIFO ordering), then returns.
        On Redis failure (after retries if configured), allows the request
        (graceful degradation).

        Note: The burndown_rate is only applied when using input_tokens/output_tokens.
        When using the tokens= parameter, it is assumed the burndown calculation
        has already been done by the caller. Split input/output TPM limits
        are not affected by burndown_rate.

        Args:
            tokens: Pre-calculated total tokens (burndown already applied if needed).
            input_tokens: Number of input tokens.
            output_tokens: Number of output tokens (default 0).

        Returns:
            AcquireResult with slot time, wait time, queue position, and record ID.
        """
        # Resolve input tokens and determine if burndown rate should be applied
        if tokens is not None:
            if input_tokens is not None:
                raise ValueError("Cannot specify both tokens and input_tokens")
            # When tokens= is used, assume burndown is already applied
            # Use the value directly as effective_combined_tokens
            input_tokens = tokens
            effective_combined_tokens = float(tokens)
        else:
            if input_tokens is None:
                raise ValueError("Must specify either tokens or input_tokens")
            # When input_tokens/output_tokens are used, apply burndown rate
            effective_combined_tokens = input_tokens + (self._burndown_rate * output_tokens)

        return await self._execute_acquire(
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            effective_combined_tokens=effective_combined_tokens,
        )

    async def adjust(self, record_id: str, actual_output: int) -> None:
        """Adjust the output tokens for a consumption record.

        Use this when the actual output tokens differ from the estimate.
        This frees up capacity if actual < estimated, or uses more if actual > estimated.

        Args:
            record_id: The record ID from the acquire() result.
            actual_output: The actual number of output tokens.
        """

        async def do_adjust() -> None:
            result = await self.redis.eval(  # type: ignore[misc]
                self._adjust_script,
                1,
                self.consumption_key,
                record_id,
                actual_output,
            )
            if result[0] == 0:
                logger.warning("Record not found for adjustment: %s", record_id)

        try:
            if self._retry_config is not None:
                await retry_with_backoff(do_adjust, self._retry_config, "adjust")
            else:
                await do_adjust()
        except RETRYABLE_ERRORS as e:
            logger.warning("Failed to adjust record %s: %s", record_id, e)
        except Exception as e:
            logger.warning("Failed to adjust record %s: %s", record_id, e)

    async def get_status(self) -> RateLimitStatus:
        """Get current rate limit status.

        Returns:
            RateLimitStatus with current usage and limits.
        """
        current_time = time.time()

        async def do_get_status() -> tuple[int, int, int, int]:
            result = await self.redis.eval(  # type: ignore[misc]
                self._status_script,
                1,
                self.consumption_key,
                current_time,
                self.window_seconds,
            )
            return (
                int(result[0]),
                int(result[1]),
                int(result[2]),
                int(result[3]),
            )

        try:
            if self._retry_config is not None:
                total_input, total_output, total_requests, queue_depth = await retry_with_backoff(
                    do_get_status, self._retry_config, "get_status"
                )
            else:
                total_input, total_output, total_requests, queue_depth = await do_get_status()
        except Exception as e:
            logger.warning("Redis error getting status: %s", e)
            total_input = 0
            total_output = 0
            total_requests = 0
            queue_depth = 0

        return RateLimitStatus(
            model=self.model_name,
            window_seconds=self.window_seconds,
            tokens_used=total_input + total_output,
            tokens_limit=self.tpm_limit,
            input_tokens_used=total_input,
            input_tokens_limit=self.input_tpm_limit,
            output_tokens_used=total_output,
            output_tokens_limit=self.output_tpm_limit,
            requests_used=total_requests,
            requests_limit=self.rpm_limit,
            queue_depth=queue_depth,
        )

    async def _execute_acquire(
        self,
        input_tokens: int,
        output_tokens: int,
        effective_combined_tokens: float,
    ) -> AcquireResult:
        """Execute the acquire operation with the Lua script.

        Args:
            input_tokens: Number of input tokens.
            output_tokens: Number of output tokens.
            effective_combined_tokens: Pre-calculated combined tokens (with burndown rate if applicable).

        Returns:
            AcquireResult with slot time, wait time, queue position, and record ID.
        """
        current_time = time.time()
        record_id = str(uuid.uuid4())

        async def do_acquire() -> tuple[float, int, str, float]:
            result = await self.redis.eval(  # type: ignore[misc]
                self._acquire_script,
                1,  # number of keys
                self.consumption_key,
                input_tokens,
                output_tokens,
                self.tpm_limit,  # combined limit (0 = disabled)
                self.input_tpm_limit,  # input limit (0 = disabled)
                self.output_tpm_limit,  # output limit (0 = disabled)
                self.rpm_limit,  # request limit (0 = disabled)
                self.window_seconds,
                current_time,
                record_id,
                effective_combined_tokens,  # pre-calculated with burndown rate
                self._rps_limit,  # RPS limit (0 = disabled)
                self._smoothing_interval,  # smoothing interval in seconds
            )
            # Lua returns floats as strings to preserve precision (RESP2 truncates floats)
            # Handle both bytes and str types from Redis
            slot_time_val = result[0].decode() if isinstance(result[0], bytes) else result[0]
            wait_time_val = result[3].decode() if isinstance(result[3], bytes) else result[3]
            record_id_val = result[2].decode() if isinstance(result[2], bytes) else result[2]
            return (
                float(slot_time_val),
                int(result[1]),
                str(record_id_val),
                float(wait_time_val),
            )

        try:
            if self._retry_config is not None:
                slot_time, queue_position, returned_record_id, wait_time = await retry_with_backoff(
                    do_acquire, self._retry_config, "acquire"
                )
            else:
                slot_time, queue_position, returned_record_id, wait_time = await do_acquire()

            # Wait if needed
            if self._should_wait and wait_time > 0:
                logger.debug(
                    "Rate limited: waiting %.2fs (queue position %d)",
                    wait_time,
                    queue_position,
                )
                await asyncio.sleep(wait_time)

            return AcquireResult(
                slot_time=slot_time,
                wait_time=wait_time,
                queue_position=queue_position,
                record_id=returned_record_id,
            )

        except Exception as e:
            # Graceful degradation - allow request on Redis failure
            logger.warning("Redis error, allowing request: %s", e)
            return AcquireResult(
                slot_time=current_time,
                wait_time=0.0,
                queue_position=0,
                record_id=record_id,
            )

has_combined_limit property

has_combined_limit: bool

Whether this limiter has a combined TPM limit.

is_split_mode property

is_split_mode: bool

Whether this limiter uses split input/output TPM limits.

__init__

__init__(
    redis: RedisClient | None = None,
    model: str | None = None,
    config: RateLimitConfig | None = None,
    *,
    tpm: int = 0,
    rpm: int = 0,
    input_tpm: int = 0,
    output_tpm: int = 0,
    window_seconds: int = 60,
    burst_multiplier: float = 1.0,
    burndown_rate: float = 1.0,
    smooth_requests: bool = True,
    rps: int = 0,
    smoothing_interval: float = 1.0,
    password: str | None = None,
    db: int = 0,
    max_connections: int = 10,
    retry_config: RetryConfig | None = None,
    redis_client: Redis
    | RedisConnectionManager
    | None = None,
    model_name: str | None = None,
) -> None

Initialize the rate limiter.

Parameters:

Name Type Description Default
redis RedisClient | None

Redis URL string, async Redis client, or RedisConnectionManager.

None
model str | None

Name of the model (used for Redis key namespace).

None
config RateLimitConfig | None

Configuration for rate limits (optional if using kwargs).

None
tpm int

Combined tokens per minute limit.

0
rpm int

Requests per minute limit.

0
input_tpm int

Input tokens per minute limit (split mode).

0
output_tpm int

Output tokens per minute limit (split mode).

0
window_seconds int

Sliding window duration in seconds.

60
burst_multiplier float

Multiplier for burst capacity.

1.0
burndown_rate float

Output token multiplier for combined TPM (default 1.0). AWS Bedrock Claude models use 5.0.

1.0
smooth_requests bool

Enable RPS smoothing to prevent burst-triggered rate limits. When True, auto-calculates RPS from RPM. Default True.

True
rps int

Explicit requests-per-second limit. When set > 0, auto-enables smoothing. Set to 0 to auto-calculate from RPM when smooth_requests=True.

0
smoothing_interval float

Evaluation window in seconds for RPS enforcement. Azure uses 1.0s intervals. Default 1.0.

1.0
password str | None

Redis password (for URL connections).

None
db int

Redis database number (for URL connections).

0
max_connections int

Maximum connections in pool (for URL connections).

10
retry_config RetryConfig | None

Retry configuration for URL-based connections.

None
redis_client Redis | RedisConnectionManager | None

Deprecated, use 'redis' parameter.

None
model_name str | None

Deprecated, use 'model' parameter.

None
Source code in src/llmratelimiter/limiter.py
def __init__(
    self,
    redis: RedisClient | None = None,
    model: str | None = None,
    config: RateLimitConfig | None = None,
    *,
    # Rate limit kwargs (alternative to config)
    tpm: int = 0,
    rpm: int = 0,
    input_tpm: int = 0,
    output_tpm: int = 0,
    window_seconds: int = 60,
    burst_multiplier: float = 1.0,
    burndown_rate: float = 1.0,
    smooth_requests: bool = True,
    rps: int = 0,
    smoothing_interval: float = 1.0,
    # Redis connection kwargs (for URL connections)
    password: str | None = None,
    db: int = 0,
    max_connections: int = 10,
    retry_config: RetryConfig | None = None,
    # Legacy positional support
    redis_client: Redis | RedisConnectionManager | None = None,
    model_name: str | None = None,
) -> None:
    """Initialize the rate limiter.

    Args:
        redis: Redis URL string, async Redis client, or RedisConnectionManager.
        model: Name of the model (used for Redis key namespace).
        config: Configuration for rate limits (optional if using kwargs).
        tpm: Combined tokens per minute limit.
        rpm: Requests per minute limit.
        input_tpm: Input tokens per minute limit (split mode).
        output_tpm: Output tokens per minute limit (split mode).
        window_seconds: Sliding window duration in seconds.
        burst_multiplier: Multiplier for burst capacity.
        burndown_rate: Output token multiplier for combined TPM (default 1.0).
            AWS Bedrock Claude models use 5.0.
        smooth_requests: Enable RPS smoothing to prevent burst-triggered rate limits.
            When True, auto-calculates RPS from RPM. Default True.
        rps: Explicit requests-per-second limit. When set > 0, auto-enables smoothing.
            Set to 0 to auto-calculate from RPM when smooth_requests=True.
        smoothing_interval: Evaluation window in seconds for RPS enforcement.
            Azure uses 1.0s intervals. Default 1.0.
        password: Redis password (for URL connections).
        db: Redis database number (for URL connections).
        max_connections: Maximum connections in pool (for URL connections).
        retry_config: Retry configuration for URL-based connections.
        redis_client: Deprecated, use 'redis' parameter.
        model_name: Deprecated, use 'model' parameter.
    """
    # Handle legacy parameter names for backward compatibility
    if redis_client is not None and redis is None:
        redis = redis_client
    if model_name is not None and model is None:
        model = model_name

    if redis is None:
        raise ValueError("redis parameter is required (URL string, Redis client, or RedisConnectionManager)")
    if model is None:
        raise ValueError("model parameter is required")

    # Handle different redis parameter types
    if isinstance(redis, str):
        # URL string - create a connection manager
        self._manager: RedisConnectionManager | None = RedisConnectionManager(
            url=redis,
            password=password,
            db=db,
            max_connections=max_connections,
            retry_config=retry_config,
        )
        self.redis = self._manager.client
        self._retry_config: RetryConfig | None = self._manager.retry_config
    elif isinstance(redis, RedisConnectionManager):
        self._manager = redis
        self.redis = redis.client
        self._retry_config = redis.retry_config
    else:
        # Raw Redis client
        self._manager = None
        self.redis = redis
        self._retry_config = retry_config

    self.model_name = model

    # Build config from kwargs if not provided
    if config is None:
        config = RateLimitConfig(
            tpm=tpm,
            rpm=rpm,
            input_tpm=input_tpm,
            output_tpm=output_tpm,
            window_seconds=window_seconds,
            burst_multiplier=burst_multiplier,
            burndown_rate=burndown_rate,
            smooth_requests=smooth_requests,
            rps=rps,
            smoothing_interval=smoothing_interval,
        )

    self.window_seconds = config.window_seconds
    self.burst_multiplier = config.burst_multiplier
    self._burndown_rate = config.burndown_rate
    self._config = config

    # Calculate effective limits with burst multiplier
    self.rpm_limit = int(config.rpm * config.burst_multiplier) if config.rpm > 0 else 0
    self.tpm_limit = int(config.tpm * config.burst_multiplier) if config.tpm > 0 else 0
    self.input_tpm_limit = int(config.input_tpm * config.burst_multiplier) if config.input_tpm > 0 else 0
    self.output_tpm_limit = int(config.output_tpm * config.burst_multiplier) if config.output_tpm > 0 else 0

    # RPS smoothing settings
    self._rps_limit = config.effective_rps
    self._smoothing_interval = config.smoothing_interval

    # Redis key for consumption records
    self.consumption_key = f"rate_limit:{model}:consumption"

    # Lua scripts
    self._acquire_script = ACQUIRE_SCRIPT
    self._adjust_script = ADJUST_SCRIPT
    self._status_script = STATUS_SCRIPT

    # For testing - can be set to False to skip actual waiting
    self._should_wait = True

acquire async

acquire(*, tokens: int) -> AcquireResult
acquire(
    *, input_tokens: int, output_tokens: int = 0
) -> AcquireResult
acquire(
    *,
    tokens: int | None = None,
    input_tokens: int | None = None,
    output_tokens: int = 0,
) -> AcquireResult

Acquire rate limit capacity.

For combined mode with pre-calculated tokens, use tokens parameter: await limiter.acquire(tokens=5000) # Burndown rate is NOT applied - value is used directly

For separate input/output tracking, use input_tokens/output_tokens: await limiter.acquire(input_tokens=5000, output_tokens=2048) # Burndown rate IS applied: effective = input + (burndown_rate * output)

With burndown rate (e.g., AWS Bedrock with burndown_rate=5.0): await limiter.acquire(input_tokens=3000, output_tokens=1000) # TPM consumption: 3000 + (5.0 * 1000) = 8000 tokens

Blocks until capacity is available (FIFO ordering), then returns. On Redis failure (after retries if configured), allows the request (graceful degradation).

Note: The burndown_rate is only applied when using input_tokens/output_tokens. When using the tokens= parameter, it is assumed the burndown calculation has already been done by the caller. Split input/output TPM limits are not affected by burndown_rate.

Parameters:

Name Type Description Default
tokens int | None

Pre-calculated total tokens (burndown already applied if needed).

None
input_tokens int | None

Number of input tokens.

None
output_tokens int

Number of output tokens (default 0).

0

Returns:

Type Description
AcquireResult

AcquireResult with slot time, wait time, queue position, and record ID.

Source code in src/llmratelimiter/limiter.py
async def acquire(
    self,
    *,
    tokens: int | None = None,
    input_tokens: int | None = None,
    output_tokens: int = 0,
) -> AcquireResult:
    """Acquire rate limit capacity.

    For combined mode with pre-calculated tokens, use tokens parameter:
        await limiter.acquire(tokens=5000)
        # Burndown rate is NOT applied - value is used directly

    For separate input/output tracking, use input_tokens/output_tokens:
        await limiter.acquire(input_tokens=5000, output_tokens=2048)
        # Burndown rate IS applied: effective = input + (burndown_rate * output)

    With burndown rate (e.g., AWS Bedrock with burndown_rate=5.0):
        await limiter.acquire(input_tokens=3000, output_tokens=1000)
        # TPM consumption: 3000 + (5.0 * 1000) = 8000 tokens

    Blocks until capacity is available (FIFO ordering), then returns.
    On Redis failure (after retries if configured), allows the request
    (graceful degradation).

    Note: The burndown_rate is only applied when using input_tokens/output_tokens.
    When using the tokens= parameter, it is assumed the burndown calculation
    has already been done by the caller. Split input/output TPM limits
    are not affected by burndown_rate.

    Args:
        tokens: Pre-calculated total tokens (burndown already applied if needed).
        input_tokens: Number of input tokens.
        output_tokens: Number of output tokens (default 0).

    Returns:
        AcquireResult with slot time, wait time, queue position, and record ID.
    """
    # Resolve input tokens and determine if burndown rate should be applied
    if tokens is not None:
        if input_tokens is not None:
            raise ValueError("Cannot specify both tokens and input_tokens")
        # When tokens= is used, assume burndown is already applied
        # Use the value directly as effective_combined_tokens
        input_tokens = tokens
        effective_combined_tokens = float(tokens)
    else:
        if input_tokens is None:
            raise ValueError("Must specify either tokens or input_tokens")
        # When input_tokens/output_tokens are used, apply burndown rate
        effective_combined_tokens = input_tokens + (self._burndown_rate * output_tokens)

    return await self._execute_acquire(
        input_tokens=input_tokens,
        output_tokens=output_tokens,
        effective_combined_tokens=effective_combined_tokens,
    )

adjust async

adjust(record_id: str, actual_output: int) -> None

Adjust the output tokens for a consumption record.

Use this when the actual output tokens differ from the estimate. This frees up capacity if actual < estimated, or uses more if actual > estimated.

Parameters:

Name Type Description Default
record_id str

The record ID from the acquire() result.

required
actual_output int

The actual number of output tokens.

required
Source code in src/llmratelimiter/limiter.py
async def adjust(self, record_id: str, actual_output: int) -> None:
    """Adjust the output tokens for a consumption record.

    Use this when the actual output tokens differ from the estimate.
    This frees up capacity if actual < estimated, or uses more if actual > estimated.

    Args:
        record_id: The record ID from the acquire() result.
        actual_output: The actual number of output tokens.
    """

    async def do_adjust() -> None:
        result = await self.redis.eval(  # type: ignore[misc]
            self._adjust_script,
            1,
            self.consumption_key,
            record_id,
            actual_output,
        )
        if result[0] == 0:
            logger.warning("Record not found for adjustment: %s", record_id)

    try:
        if self._retry_config is not None:
            await retry_with_backoff(do_adjust, self._retry_config, "adjust")
        else:
            await do_adjust()
    except RETRYABLE_ERRORS as e:
        logger.warning("Failed to adjust record %s: %s", record_id, e)
    except Exception as e:
        logger.warning("Failed to adjust record %s: %s", record_id, e)

get_status async

get_status() -> RateLimitStatus

Get current rate limit status.

Returns:

Type Description
RateLimitStatus

RateLimitStatus with current usage and limits.

Source code in src/llmratelimiter/limiter.py
async def get_status(self) -> RateLimitStatus:
    """Get current rate limit status.

    Returns:
        RateLimitStatus with current usage and limits.
    """
    current_time = time.time()

    async def do_get_status() -> tuple[int, int, int, int]:
        result = await self.redis.eval(  # type: ignore[misc]
            self._status_script,
            1,
            self.consumption_key,
            current_time,
            self.window_seconds,
        )
        return (
            int(result[0]),
            int(result[1]),
            int(result[2]),
            int(result[3]),
        )

    try:
        if self._retry_config is not None:
            total_input, total_output, total_requests, queue_depth = await retry_with_backoff(
                do_get_status, self._retry_config, "get_status"
            )
        else:
            total_input, total_output, total_requests, queue_depth = await do_get_status()
    except Exception as e:
        logger.warning("Redis error getting status: %s", e)
        total_input = 0
        total_output = 0
        total_requests = 0
        queue_depth = 0

    return RateLimitStatus(
        model=self.model_name,
        window_seconds=self.window_seconds,
        tokens_used=total_input + total_output,
        tokens_limit=self.tpm_limit,
        input_tokens_used=total_input,
        input_tokens_limit=self.input_tpm_limit,
        output_tokens_used=total_output,
        output_tokens_limit=self.output_tpm_limit,
        requests_used=total_requests,
        requests_limit=self.rpm_limit,
        queue_depth=queue_depth,
    )

Models

Data models for results and status.

Result dataclasses for rate limiter operations.

AcquireResult dataclass

Result from an acquire() call.

Attributes:

Name Type Description
slot_time float

The timestamp when the request is scheduled to execute.

wait_time float

Time in seconds the caller waited (or will wait).

queue_position int

Position in the FIFO queue (0 if immediate).

record_id str

Unique ID for this consumption record (for adjust()).

Source code in src/llmratelimiter/models.py
@dataclass
class AcquireResult:
    """Result from an acquire() call.

    Attributes:
        slot_time: The timestamp when the request is scheduled to execute.
        wait_time: Time in seconds the caller waited (or will wait).
        queue_position: Position in the FIFO queue (0 if immediate).
        record_id: Unique ID for this consumption record (for adjust()).
    """

    slot_time: float
    wait_time: float
    queue_position: int
    record_id: str

RateLimitStatus dataclass

Current status of a rate limiter.

Unified status for both combined and split mode limiters. Unused fields are set to 0.

Combined mode (tpm > 0): - tokens_used/tokens_limit contain combined token usage - input_tokens_used/input_tokens_limit are 0 - output_tokens_used/output_tokens_limit are 0

Split mode (input_tpm/output_tpm > 0): - tokens_used/tokens_limit are 0 - input_tokens_used/input_tokens_limit contain input token usage - output_tokens_used/output_tokens_limit contain output token usage

Attributes:

Name Type Description
model str

The model name this limiter is for.

window_seconds int

The sliding window duration.

tokens_used int

Current combined tokens consumed (combined mode).

tokens_limit int

Maximum combined tokens allowed (combined mode).

input_tokens_used int

Current input tokens consumed (split mode).

input_tokens_limit int

Maximum input tokens allowed (split mode).

output_tokens_used int

Current output tokens consumed (split mode).

output_tokens_limit int

Maximum output tokens allowed (split mode).

requests_used int

Current requests in the window.

requests_limit int

Maximum requests allowed per window.

queue_depth int

Number of pending requests (slot_time > now).

Source code in src/llmratelimiter/models.py
@dataclass
class RateLimitStatus:
    """Current status of a rate limiter.

    Unified status for both combined and split mode limiters.
    Unused fields are set to 0.

    Combined mode (tpm > 0):
        - tokens_used/tokens_limit contain combined token usage
        - input_tokens_used/input_tokens_limit are 0
        - output_tokens_used/output_tokens_limit are 0

    Split mode (input_tpm/output_tpm > 0):
        - tokens_used/tokens_limit are 0
        - input_tokens_used/input_tokens_limit contain input token usage
        - output_tokens_used/output_tokens_limit contain output token usage

    Attributes:
        model: The model name this limiter is for.
        window_seconds: The sliding window duration.
        tokens_used: Current combined tokens consumed (combined mode).
        tokens_limit: Maximum combined tokens allowed (combined mode).
        input_tokens_used: Current input tokens consumed (split mode).
        input_tokens_limit: Maximum input tokens allowed (split mode).
        output_tokens_used: Current output tokens consumed (split mode).
        output_tokens_limit: Maximum output tokens allowed (split mode).
        requests_used: Current requests in the window.
        requests_limit: Maximum requests allowed per window.
        queue_depth: Number of pending requests (slot_time > now).
    """

    model: str
    window_seconds: int
    tokens_used: int = 0
    tokens_limit: int = 0
    input_tokens_used: int = 0
    input_tokens_limit: int = 0
    output_tokens_used: int = 0
    output_tokens_limit: int = 0
    requests_used: int = 0
    requests_limit: int = 0
    queue_depth: int = 0