For endpoints with max concurrent limits (Mave: 10, Focus Groups: 5, Video: 3), use a semaphore so you never exceed that many in-flight requests.
import asyncioimport httpx# Mave allows max 10 concurrentMAVE_SEMAPHORE = asyncio.Semaphore(10)async def mave_chat_with_concurrency_limit(message: str, thread_id: str = None): async with MAVE_SEMAPHORE: async with httpx.AsyncClient(timeout=120.0) as client: payload = {"message": message} if thread_id: payload["thread_id"] = thread_id resp = await client.post( "https://app.mavera.io/api/v1/mave/chat", headers={"Authorization": f"Bearer {API_KEY}"}, json=payload, ) resp.raise_for_status() return resp.json()
Combine semaphore with token bucket: semaphore for concurrency, token bucket for overall request rate. E.g. 10 concurrent Mave requests, but only 4 new Mave requests per minute across all workers.
When you have a list of items to process (e.g. 500 chat requests), push them into a queue and process at a controlled rate. Prevents spikes and respects limits.
import asyncioimport queueimport threadingdef process_queue_sync(items, process_one, rate_per_minute=60, max_workers=4): """ Process items through a queue with rate limiting. process_one(item) -> result for each item. """ q = queue.Queue() for item in items: q.put(item) interval = 60.0 / rate_per_minute results = [] lock = threading.Lock() def worker(): while True: try: item = q.get_nowait() except queue.Empty: break start = time.time() try: out = process_one(item) with lock: results.append(out) except Exception as e: with lock: results.append({"error": str(e)}) elapsed = time.time() - start sleep_for = max(0, interval - elapsed) time.sleep(sleep_for) q.task_done() threads = [threading.Thread(target=worker) for _ in range(max_workers)] for t in threads: t.start() for t in threads: t.join() return results