Advanced Topics

This guide covers production patterns for CryptoBot Python, including retries, local throttling, caching, webhook processing, and persistence.

Environment Baseline

Use environment variables for all tokens:

import os

API_TOKEN = os.environ["CRYPTOBOT_API_TOKEN"]
TESTNET_TOKEN = os.environ.get("CRYPTOBOT_TESTNET_TOKEN")

Built-in Retry and Backoff

CryptoBotClient includes optional retry logic for transient failures. Retries are applied to:

  1. HTTP status codes in retryable_status_codes (default: 429, 500, 502, 503, 504)

  2. httpx transport exceptions such as timeouts and network errors

Retries are disabled by default (max_retries=0).

from cryptobot import CryptoBotClient
from cryptobot.models import Asset


client = CryptoBotClient(
    api_token=API_TOKEN,
    max_retries=3,
    retry_backoff=0.5,
)

invoice = client.create_invoice(asset=Asset.USDT, amount=10, description="Retry-safe invoice")
print(invoice.invoice_id)

You can override retryable status codes:

client = CryptoBotClient(
    api_token=API_TOKEN,
    max_retries=2,
    retry_backoff=1.0,
    retryable_status_codes={429, 503},
)

Equivalent async configuration:

import asyncio

from cryptobot import AsyncCryptoBotClient


async def warmup():
    async with AsyncCryptoBotClient(
        api_token=API_TOKEN,
        max_retries=2,
        retry_backoff=1.0,
        retryable_status_codes={429, 503},
    ) as client:
        print((await client.get_me()).name)


asyncio.run(warmup())

Tuned HTTPX Client

If you need custom connection limits or transport configuration, replace the internal httpx.Client after initialization:

import httpx

from cryptobot import CryptoBotClient


class TunedCryptoBotClient(CryptoBotClient):
    def __init__(self, api_token: str, is_mainnet: bool = True, timeout: float = 5.0, max_connections: int = 100):
        super().__init__(api_token=api_token, is_mainnet=is_mainnet, timeout=timeout)

        # Close the default client before replacing it.
        self._http_client.close()

        self._http_client = httpx.Client(
            base_url=self._base_url,
            timeout=timeout,
            headers={"Crypto-Pay-API-Token": self.api_token},
            limits=httpx.Limits(
                max_keepalive_connections=max_connections,
                max_connections=max_connections,
            ),
        )

    def close(self):
        self._http_client.close()


client = TunedCryptoBotClient(API_TOKEN, timeout=10.0, max_connections=50)
try:
    print(client.get_me().name)
finally:
    client.close()

Local Rate Limiting

Use a token bucket in-process to smooth bursts:

import threading
import time

from cryptobot import CryptoBotClient
from cryptobot.models import Asset


class TokenBucket:
    def __init__(self, rate_per_second: float, capacity: float):
        self.rate = rate_per_second
        self.capacity = capacity
        self.tokens = capacity
        self.last = time.monotonic()
        self.lock = threading.Lock()

    def acquire(self):
        while True:
            with self.lock:
                now = time.monotonic()
                elapsed = now - self.last
                self.last = now
                self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)

                if self.tokens >= 1:
                    self.tokens -= 1
                    return

                wait_for = (1 - self.tokens) / self.rate
            time.sleep(wait_for)


class RateLimitedCryptoBotClient:
    def __init__(self, api_token: str, requests_per_second: float = 5):
        self.client = CryptoBotClient(api_token)
        self.bucket = TokenBucket(rate_per_second=requests_per_second, capacity=requests_per_second)

    def create_invoice(self, **kwargs):
        self.bucket.acquire()
        return self.client.create_invoice(**kwargs)


limited = RateLimitedCryptoBotClient(API_TOKEN, requests_per_second=3)
for i in range(5):
    inv = limited.create_invoice(asset=Asset.USDT, amount=1, description=f"Burst #{i}")
    print(inv.invoice_id)

Exchange-Rate Caching

Cache exchange rates with a TTL to reduce API calls:

from datetime import datetime, timedelta
from threading import Lock
from typing import List, Optional

from cryptobot import CryptoBotClient
from cryptobot.models import ExchangeRate


class ExchangeRateCache:
    def __init__(self, client: CryptoBotClient, ttl_seconds: int = 60):
        self.client = client
        self.ttl = timedelta(seconds=ttl_seconds)
        self._rates: List[ExchangeRate] = []
        self._fetched_at: Optional[datetime] = None
        self._lock = Lock()

    def get_rates(self, force_refresh: bool = False) -> List[ExchangeRate]:
        with self._lock:
            now = datetime.utcnow()
            is_fresh = self._fetched_at and now - self._fetched_at < self.ttl
            if self._rates and is_fresh and not force_refresh:
                return self._rates

            self._rates = self.client.get_exchange_rates()
            self._fetched_at = now
            return self._rates


client = CryptoBotClient(API_TOKEN)
cache = ExchangeRateCache(client, ttl_seconds=120)
rates = cache.get_rates()
print(len(rates))

Webhook Queue Pattern

Keep webhook HTTP handlers fast and push business work to a queue worker:

import os
from queue import Queue
from threading import Thread

from cryptobot.webhook import Listener


queue: Queue[dict] = Queue(maxsize=1000)


def process_update(update: dict):
    if update.get("update_type") == "invoice_paid":
        payload = update.get("payload", {})
        print("Process paid invoice", payload.get("invoice_id"))


def worker():
    while True:
        item = queue.get()
        try:
            process_update(item)
        finally:
            queue.task_done()


def on_webhook(headers, data):
    # Signature validation is already handled by Listener.
    queue.put_nowait(data)


Thread(target=worker, daemon=True).start()

listener = Listener(
    host="0.0.0.0",
    callback=on_webhook,
    api_token=os.environ["CRYPTOBOT_API_TOKEN"],
    port=2203,
    url="/webhook",
    log_level="info",
)
listener.listen()

Persisting Invoices with SQLAlchemy

Persist created invoices and sync status from the API:

from datetime import datetime

from sqlalchemy import Column, DateTime, Integer, String, create_engine
from sqlalchemy.orm import declarative_base, sessionmaker

from cryptobot import CryptoBotClient
from cryptobot.models import Asset


Base = declarative_base()


class InvoiceRecord(Base):
    __tablename__ = "invoice_records"

    id = Column(Integer, primary_key=True)
    invoice_id = Column(Integer, unique=True, nullable=False)
    status = Column(String, nullable=False)
    asset = Column(String, nullable=False)
    amount = Column(String, nullable=False)
    payload = Column(String, nullable=True)
    updated_at = Column(DateTime, nullable=False, default=datetime.utcnow)


class InvoiceStore:
    def __init__(self, api_token: str, db_url: str):
        self.client = CryptoBotClient(api_token)
        self.engine = create_engine(db_url)
        Base.metadata.create_all(self.engine)
        self.Session = sessionmaker(bind=self.engine)

    def create_invoice(self, asset: Asset, amount: float, payload: str):
        invoice = self.client.create_invoice(asset=asset, amount=amount, payload=payload)
        session = self.Session()
        try:
            session.add(
                InvoiceRecord(
                    invoice_id=invoice.invoice_id,
                    status=invoice.status.name,
                    asset=invoice.asset.name,
                    amount=invoice.amount,
                    payload=invoice.payload,
                )
            )
            session.commit()
        finally:
            session.close()
        return invoice

    def sync_invoice(self, invoice_id: int):
        invoices = self.client.get_invoices(invoice_ids=str(invoice_id))
        if not invoices:
            return None

        latest = invoices[0]
        session = self.Session()
        try:
            row = session.query(InvoiceRecord).filter_by(invoice_id=invoice_id).first()
            if row:
                row.status = latest.status.name
                row.updated_at = datetime.utcnow()
                session.commit()
            return latest
        finally:
            session.close()

Structured Logging Wrapper

Log operations with request context and API error fields:

import logging

from cryptobot import CryptoBotClient
from cryptobot.errors import CryptoBotError
from cryptobot.models import Asset


logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("cryptobot")


class LoggedCryptoBotClient:
    def __init__(self, api_token: str):
        self.client = CryptoBotClient(api_token)

    def create_invoice(self, *, asset: Asset, amount: float, description: str):
        logger.info("create_invoice start asset=%s amount=%s", asset.name, amount)
        try:
            invoice = self.client.create_invoice(asset=asset, amount=amount, description=description)
            logger.info("create_invoice ok invoice_id=%s status=%s", invoice.invoice_id, invoice.status.name)
            return invoice
        except CryptoBotError as exc:
            logger.error("create_invoice failed code=%s name=%s", exc.code, exc.name)
            raise

Testing with MockTransport

For deterministic unit tests, replace the internal HTTP client transport:

import httpx

from cryptobot import CryptoBotClient
from cryptobot.models import Asset


def handler(request: httpx.Request) -> httpx.Response:
    if request.url.path.endswith("/createInvoice"):
        return httpx.Response(
            200,
            json={
                "ok": True,
                "result": {
                    "invoice_id": 1,
                    "status": "active",
                    "hash": "mock_hash",
                    "amount": "1",
                    "asset": "USDT",
                    "bot_invoice_url": "https://t.me/CryptoBot?start=mock",
                },
            },
        )
    return httpx.Response(404, json={"ok": False, "error": {"code": 404, "name": "NOT_FOUND"}})


transport = httpx.MockTransport(handler)
client = CryptoBotClient("test_token")
client._http_client.close()
client._http_client = httpx.Client(
    base_url=client._base_url,
    headers={"Crypto-Pay-API-Token": client.api_token},
    transport=transport,
)

invoice = client.create_invoice(asset=Asset.USDT, amount=1)
assert invoice.invoice_id == 1

Next Steps