import asyncio
from collections.abc import AsyncIterator, Awaitable
from contextlib import suppress
from types import TracebackType
from typing import Any, Callable, List, Optional, Set, Union
import httpx
from ._utils import parse_json
from .errors import CryptoBotError
from .models import (
App,
AppStats,
Asset,
Balance,
ButtonName,
Check,
CheckStatus,
Currency,
ExchangeRate,
Invoice,
Status,
Transfer,
)
[docs]
class AsyncCryptoBotClient:
"""Async Crypto Bot API client for modern service workloads."""
@staticmethod
def _short_text(response: httpx.Response, limit: int = 100) -> str:
return str(getattr(response, "text", ""))[:limit]
[docs]
def __init__(
self,
api_token: str,
is_mainnet: bool = True,
timeout: float = 5.0,
max_retries: int = 0,
retry_backoff: float = 0.5,
retryable_status_codes: Optional[Set[int]] = None,
):
self.api_token = api_token
self.timeout = timeout
self.max_retries = max(0, max_retries)
self.retry_backoff = max(0.0, retry_backoff)
self.retryable_status_codes = (
set(retryable_status_codes) if retryable_status_codes is not None else {429, 500, 502, 503, 504}
)
self._base_url = "https://pay.crypt.bot/api" if is_mainnet else "https://testnet-pay.crypt.bot/api"
self._http_client = httpx.AsyncClient(
base_url=self._base_url,
timeout=self.timeout,
headers={"Crypto-Pay-API-Token": self.api_token},
)
def _retry_delay(self, attempt: int, response: Optional[httpx.Response] = None) -> float:
delay: float = self.retry_backoff * (2**attempt)
if response is not None:
retry_after = response.headers.get("Retry-After")
if retry_after:
with suppress(ValueError):
delay = max(delay, float(retry_after))
return delay if delay > 0.0 else 0.0
async def _execute_with_retry(
self, request_fn: Callable[..., Awaitable[httpx.Response]], *args: Any, **kwargs: Any
) -> httpx.Response:
retryable_exceptions = (
httpx.TimeoutException,
httpx.NetworkError,
httpx.RemoteProtocolError,
)
for attempt in range(self.max_retries + 1):
try:
response = await request_fn(*args, **kwargs)
except retryable_exceptions:
if attempt >= self.max_retries:
raise
delay = self._retry_delay(attempt)
if delay > 0:
await asyncio.sleep(delay)
continue
if response.status_code in self.retryable_status_codes and attempt < self.max_retries:
delay = self._retry_delay(attempt, response)
if delay > 0:
await asyncio.sleep(delay)
continue
return response
raise RuntimeError("Unexpected retry flow state") # pragma: no cover
def _handle_response(self, response: httpx.Response) -> Any:
try:
payload = response.json()
except ValueError as exc:
raise CryptoBotError(
code=response.status_code,
name=f"Invalid JSON response: {self._short_text(response)}",
) from exc
if response.status_code == 200:
if isinstance(payload, dict) and "result" in payload:
return payload["result"]
raise CryptoBotError(
code=200,
name="Malformed success response: missing 'result'",
)
if isinstance(payload, dict) and isinstance(payload.get("error"), dict):
raise CryptoBotError.from_json(payload["error"])
raise CryptoBotError(
code=response.status_code,
name=f"HTTPError: {self._short_text(response)}",
)
[docs]
async def close(self) -> None:
await self._http_client.aclose()
async def __aenter__(self) -> "AsyncCryptoBotClient":
return self
async def __aexit__(
self,
exc_type: Optional[type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType],
) -> None:
await self.close()
@staticmethod
def _validate_count(count: int) -> None:
if count < 1 or count > 1000:
raise ValueError("count must be between 1 and 1000")
@staticmethod
def _validate_expires_in(expires_in: int) -> None:
if expires_in < 1 or expires_in > 2678400:
raise ValueError("expires_in must be between 1 and 2678400 seconds")
@staticmethod
def _normalize_ids(ids: Optional[Union[str, List[int]]], name: str = "ids") -> Optional[str]:
if ids is None:
return None
if isinstance(ids, str):
normalized = ",".join(part.strip() for part in ids.split(",") if part.strip())
if not normalized:
raise ValueError(f"{name} string cannot be empty")
for part in normalized.split(","):
if not part.isdigit() or int(part) <= 0:
raise ValueError(f"{name} string must contain positive integer IDs")
return normalized
if isinstance(ids, list):
if not ids:
raise ValueError(f"{name} list cannot be empty")
normalized_parts = []
for id_val in ids:
if isinstance(id_val, bool) or not isinstance(id_val, int) or id_val <= 0:
raise ValueError(f"{name} list must contain positive integers")
normalized_parts.append(str(id_val))
return ",".join(normalized_parts)
raise TypeError(f"{name} must be a comma-separated string or list of integers")
[docs]
async def get_me(self) -> App:
response = await self._execute_with_retry(self._http_client.get, "/getMe")
info = self._handle_response(response)
return parse_json(App, **info)
async def _create_invoice(self, **kwargs: Any) -> Invoice:
response = await self._execute_with_retry(self._http_client.post, "/createInvoice", json=kwargs)
info = self._handle_response(response)
return parse_json(Invoice, **info)
[docs]
async def create_invoice(
self,
amount: float,
asset: Optional[Asset] = None,
currency_type: Optional[str] = None,
fiat: Optional[str] = None,
accepted_assets: Optional[str] = None,
description: Optional[str] = None,
hidden_message: Optional[str] = None,
paid_btn_name: Optional[ButtonName] = None,
paid_btn_url: Optional[str] = None,
payload: Optional[str] = None,
allow_comments: Optional[bool] = None,
allow_anonymous: Optional[bool] = None,
expires_in: Optional[int] = None,
swap_to: Optional[str] = None,
) -> Invoice:
if amount <= 0:
raise ValueError("Amount must be greater than 0")
if expires_in is not None:
self._validate_expires_in(expires_in)
data: dict[str, Any] = {
"amount": str(amount),
"currency_type": currency_type,
"fiat": fiat,
"accepted_assets": accepted_assets,
"description": description,
"hidden_message": hidden_message,
"paid_btn_url": paid_btn_url,
"payload": payload,
"allow_comments": allow_comments,
"allow_anonymous": allow_anonymous,
"expires_in": expires_in,
"swap_to": swap_to,
}
if asset is not None:
data["asset"] = asset.name
for key, value in dict(data).items():
if value is None:
del data[key]
if paid_btn_name:
data["paid_btn_name"] = paid_btn_name.name
return await self._create_invoice(**data)
[docs]
async def transfer(
self,
user_id: int,
asset: Asset,
amount: float,
spend_id: str,
comment: Optional[str] = None,
disable_send_notification: bool = False,
) -> Transfer:
if amount <= 0:
raise ValueError("Amount must be greater than 0")
data = {
"user_id": user_id,
"asset": asset.name,
"amount": str(amount),
"spend_id": spend_id,
"comment": comment,
"disable_send_notification": disable_send_notification,
}
response = await self._execute_with_retry(self._http_client.post, "/transfer", json=data)
info = self._handle_response(response)
return parse_json(Transfer, **info)
[docs]
async def get_invoices(
self,
asset: Optional[Asset] = None,
fiat: Optional[str] = None,
invoice_ids: Optional[Union[str, List[int]]] = None,
status: Optional[Status] = None,
offset: int = 0,
count: int = 100,
) -> List[Invoice]:
if offset < 0:
raise ValueError("offset must be greater than or equal to 0")
self._validate_count(count)
normalized_invoice_ids = self._normalize_ids(invoice_ids, "invoice_ids")
data: dict[str, Any] = {}
if asset:
data["asset"] = asset.name
if fiat:
data["fiat"] = fiat
if normalized_invoice_ids is not None:
data["invoice_ids"] = normalized_invoice_ids
if status:
data["status"] = status.name
if offset:
data["offset"] = offset
data["count"] = count
response = await self._execute_with_retry(self._http_client.get, "/getInvoices", params=data)
info = self._handle_response(response)
return [parse_json(Invoice, **i) for i in info["items"]]
[docs]
async def iter_invoice_pages(
self,
asset: Optional[Asset] = None,
invoice_ids: Optional[Union[str, List[int]]] = None,
status: Optional[Status] = None,
page_size: int = 100,
start_offset: int = 0,
) -> AsyncIterator[List[Invoice]]:
if start_offset < 0:
raise ValueError("start_offset must be greater than or equal to 0")
self._validate_count(page_size)
offset = start_offset
while True:
page = await self.get_invoices(
asset=asset,
invoice_ids=invoice_ids,
status=status,
offset=offset,
count=page_size,
)
if not page:
break
yield page
if len(page) < page_size:
break
offset += page_size
[docs]
async def iter_invoices(
self,
asset: Optional[Asset] = None,
invoice_ids: Optional[Union[str, List[int]]] = None,
status: Optional[Status] = None,
page_size: int = 100,
start_offset: int = 0,
) -> AsyncIterator[Invoice]:
async for page in self.iter_invoice_pages(
asset=asset,
invoice_ids=invoice_ids,
status=status,
page_size=page_size,
start_offset=start_offset,
):
for invoice in page:
yield invoice
[docs]
async def get_balances(self) -> List[Balance]:
response = await self._execute_with_retry(self._http_client.get, "/getBalance")
info = self._handle_response(response)
return [parse_json(Balance, **i) for i in info]
[docs]
async def get_exchange_rates(self) -> List[ExchangeRate]:
response = await self._execute_with_retry(self._http_client.get, "/getExchangeRates")
info = self._handle_response(response)
return [parse_json(ExchangeRate, **i) for i in info]
[docs]
async def get_currencies(self) -> List[Currency]:
response = await self._execute_with_retry(self._http_client.get, "/getCurrencies")
info = self._handle_response(response)
return [parse_json(Currency, **i) for i in info]
[docs]
async def delete_invoice(self, invoice_id: int) -> bool:
"""Delete an invoice by ID."""
response = await self._execute_with_retry(self._http_client.post, "/deleteInvoice", json={"invoice_id": invoice_id})
result: bool = self._handle_response(response)
return result
[docs]
async def create_check(
self,
asset: Asset,
amount: float,
pin_to_user_id: Optional[int] = None,
pin_to_username: Optional[str] = None,
) -> Check:
"""Create a new crypto check."""
if amount <= 0:
raise ValueError("Amount must be greater than 0")
data: dict[str, Any] = {
"asset": asset.name,
"amount": str(amount),
}
if pin_to_user_id is not None:
data["pin_to_user_id"] = pin_to_user_id
if pin_to_username is not None:
data["pin_to_username"] = pin_to_username
response = await self._execute_with_retry(self._http_client.post, "/createCheck", json=data)
info = self._handle_response(response)
return parse_json(Check, **info)
[docs]
async def delete_check(self, check_id: int) -> bool:
"""Delete a check by ID."""
response = await self._execute_with_retry(self._http_client.post, "/deleteCheck", json={"check_id": check_id})
result: bool = self._handle_response(response)
return result
[docs]
async def get_transfers(
self,
asset: Optional[Asset] = None,
transfer_ids: Optional[Union[str, List[int]]] = None,
spend_id: Optional[str] = None,
offset: int = 0,
count: int = 100,
) -> List[Transfer]:
"""Get a list of transfers."""
if offset < 0:
raise ValueError("offset must be greater than or equal to 0")
self._validate_count(count)
normalized_ids = self._normalize_ids(transfer_ids, "transfer_ids")
data: dict[str, Any] = {}
if asset:
data["asset"] = asset.name
if normalized_ids is not None:
data["transfer_ids"] = normalized_ids
if spend_id is not None:
data["spend_id"] = spend_id
if offset:
data["offset"] = offset
data["count"] = count
response = await self._execute_with_retry(self._http_client.get, "/getTransfers", params=data)
info = self._handle_response(response)
return [parse_json(Transfer, **i) for i in info["items"]]
[docs]
async def iter_transfer_pages(
self,
asset: Optional[Asset] = None,
transfer_ids: Optional[Union[str, List[int]]] = None,
spend_id: Optional[str] = None,
page_size: int = 100,
start_offset: int = 0,
) -> AsyncIterator[List[Transfer]]:
"""Iterate over transfer result pages."""
if start_offset < 0:
raise ValueError("start_offset must be greater than or equal to 0")
self._validate_count(page_size)
offset = start_offset
while True:
page = await self.get_transfers(
asset=asset,
transfer_ids=transfer_ids,
spend_id=spend_id,
offset=offset,
count=page_size,
)
if not page:
break
yield page
if len(page) < page_size:
break
offset += page_size
[docs]
async def iter_transfers(
self,
asset: Optional[Asset] = None,
transfer_ids: Optional[Union[str, List[int]]] = None,
spend_id: Optional[str] = None,
page_size: int = 100,
start_offset: int = 0,
) -> AsyncIterator[Transfer]:
"""Iterate transfers item-by-item across paginated results."""
async for page in self.iter_transfer_pages(
asset=asset,
transfer_ids=transfer_ids,
spend_id=spend_id,
page_size=page_size,
start_offset=start_offset,
):
for transfer in page:
yield transfer
[docs]
async def get_checks(
self,
asset: Optional[Asset] = None,
check_ids: Optional[Union[str, List[int]]] = None,
status: Optional[CheckStatus] = None,
offset: int = 0,
count: int = 100,
) -> List[Check]:
"""Get a list of checks."""
if offset < 0:
raise ValueError("offset must be greater than or equal to 0")
self._validate_count(count)
normalized_ids = self._normalize_ids(check_ids, "check_ids")
data: dict[str, Any] = {}
if asset:
data["asset"] = asset.name
if normalized_ids is not None:
data["check_ids"] = normalized_ids
if status:
data["status"] = status.name
if offset:
data["offset"] = offset
data["count"] = count
response = await self._execute_with_retry(self._http_client.get, "/getChecks", params=data)
info = self._handle_response(response)
return [parse_json(Check, **i) for i in info["items"]]
[docs]
async def iter_check_pages(
self,
asset: Optional[Asset] = None,
check_ids: Optional[Union[str, List[int]]] = None,
status: Optional[CheckStatus] = None,
page_size: int = 100,
start_offset: int = 0,
) -> AsyncIterator[List[Check]]:
"""Iterate over check result pages."""
if start_offset < 0:
raise ValueError("start_offset must be greater than or equal to 0")
self._validate_count(page_size)
offset = start_offset
while True:
page = await self.get_checks(
asset=asset,
check_ids=check_ids,
status=status,
offset=offset,
count=page_size,
)
if not page:
break
yield page
if len(page) < page_size:
break
offset += page_size
[docs]
async def iter_checks(
self,
asset: Optional[Asset] = None,
check_ids: Optional[Union[str, List[int]]] = None,
status: Optional[CheckStatus] = None,
page_size: int = 100,
start_offset: int = 0,
) -> AsyncIterator[Check]:
"""Iterate checks item-by-item across paginated results."""
async for page in self.iter_check_pages(
asset=asset,
check_ids=check_ids,
status=status,
page_size=page_size,
start_offset=start_offset,
):
for check in page:
yield check
[docs]
async def get_stats(
self,
start_at: Optional[str] = None,
end_at: Optional[str] = None,
) -> AppStats:
"""Get app statistics."""
data: dict[str, Any] = {}
if start_at is not None:
data["start_at"] = start_at
if end_at is not None:
data["end_at"] = end_at
response = await self._execute_with_retry(self._http_client.get, "/getStats", params=data)
info = self._handle_response(response)
return parse_json(AppStats, **info)