From 2ed247af0f8fb8b5ce7d7801e34c47f0a094a5bb Mon Sep 17 00:00:00 2001 From: Vlad Pronsky Date: Sun, 23 Apr 2023 17:28:56 +0300 Subject: [PATCH] add accounts pool to perform search queries --- readme.md | 14 ++++++++---- twapi/client.py | 61 +++++++++++++++++++++++++++++-------------------- twapi/pool.py | 55 ++++++++++++++++++++++++++++++++++++++++++++ twapi/search.py | 48 +++++++++++++++++++------------------- 4 files changed, 126 insertions(+), 52 deletions(-) create mode 100644 twapi/pool.py diff --git a/readme.md b/readme.md index aacc8df..eaad5ac 100644 --- a/readme.md +++ b/readme.md @@ -5,14 +5,20 @@ Twitter GraphQL and Search API implementation with [SNScrape](https://github.com ```python import asyncio from twapi.client import UserClient +from twapi.pool import AccountsPool from twapi.search import Search async def main(): - account = UserClient("user", "pass", "user@example.com", "email_pass") - search = Search(account) + acc1 = UserClient("user1", "pass1", "user1@example.com", "email_pass1") + acc2 = UserClient("user2", "pass2", "user2@example.com", "email_pass2") - async for rep, data, _ in search.query("elon musk"): - print(rep.status_code, data) + pool = AccountsPool() + pool.add_account(acc1) + pool.add_account(acc2) + + search = Search(pool) + async for rep in search.query("elon musk"): + print(rep.status_code, rep.json()) if __name__ == "__main__": asyncio.run(main()) diff --git a/twapi/client.py b/twapi/client.py index 04ae230..b42e1e0 100644 --- a/twapi/client.py +++ b/twapi/client.py @@ -3,11 +3,10 @@ import imaplib import json import os import time -from collections import defaultdict from datetime import datetime, timezone from fake_useragent import UserAgent -from httpx import Client, HTTPStatusError, Response +from httpx import AsyncClient, Client, HTTPStatusError, Response from loguru import logger TOKEN = "Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA" @@ -198,42 +197,54 @@ class UserClient: dirname = os.path.dirname(self.session_path) os.makedirs(dirname, exist_ok=True) - self.limits = defaultdict(dict) - self.locked = False + self.limits: dict[str, datetime] = {} + self.locked: dict[str, bool] = {} def save(self): - data = { - "cookies": dict(self.client.cookies), - "headers": dict(self.client.headers), - "limits": dict(self.limits), - } + cookies = dict(self.client.cookies) + headers = dict(self.client.headers) + limits = dict(self.limits) + with open(self.session_path, "w") as fp: - json.dump(data, fp, indent=2) + json.dump({"cookies": cookies, "headers": headers, "limits": limits}, fp, indent=2) def restore(self): try: with open(self.session_path) as fp: data = json.load(fp) - self.client.cookies.update(data["cookies"]) - self.client.headers.update(data["headers"]) - self.limits.update(data.get("limits", {})) + self.client.cookies.update(data.get("cookies", {})) + self.client.headers.update(data.get("headers", {})) + self.limits.update( + {k: datetime.fromisoformat(v) for k, v in data.get("limits", {}).items()} + ) return True except (FileNotFoundError, json.JSONDecodeError): return False - def _update_limits(self, rep: Response): - for k, v in rep.headers.items(): - if k.startswith("x-rate-limit-"): - self.limits[rep.url.path][k] = v - self.save() + def make_client(self) -> AsyncClient: + client = AsyncClient() + client.headers.update(self.client.headers) + client.cookies.update(self.client.cookies) + return client - def check_limits(self, rep: Response, cursor: str | None): - if rep.status_code == 429: - reset = int(rep.headers.get("x-rate-limit-reset")) - reset_time = datetime.fromtimestamp(reset, tz=timezone.utc) - logger.debug(f"Rate limit exceeded for {self.username} - reset at {reset_time}") - self._update_limits(rep) - raise RateLimitExceeded(reset, cursor) + def can_use(self, queue: str): + if self.locked.get(queue, False): + return False + + limit = self.limits.get(queue) + return not limit or limit <= datetime.now(timezone.utc) + + def lock(self, queue: str): + self.locked[queue] = True + + def unlock(self, queue: str): + self.locked[queue] = False + + def update_limit(self, queue: str, rep: Response): + reset = rep.headers.get("x-rate-limit-reset", 0) + reset = datetime.fromtimestamp(int(reset), tz=timezone.utc) + self.limits[queue] = reset + self.save() def print_session(self): for x in self.client.headers.items(): diff --git a/twapi/pool.py b/twapi/pool.py new file mode 100644 index 0000000..eac0631 --- /dev/null +++ b/twapi/pool.py @@ -0,0 +1,55 @@ +import asyncio +from typing import AsyncGenerator, Callable, Tuple + +from httpx import AsyncClient, HTTPStatusError, Response +from loguru import logger + +from .client import UserClient + + +class AccountsPool: + def __init__(self): + self.accounts: list[UserClient] = [] + + def add_account(self, account: UserClient): + self.accounts.append(account) + + def get_account(self, queue: str) -> UserClient | None: + for x in self.accounts: + if x.can_use(queue): + return x + return None + + async def execute( + self, + queue: str, + cb: Callable[ + [AsyncClient, str | None], AsyncGenerator[Tuple[Response, dict, str | None], None] + ], + cursor: str | None = None, + ): + while True: + account = self.get_account(queue) + if not account: + logger.debug(f"No accounts available for queue {queue}, sleeping 5 seconds") + await asyncio.sleep(5) + continue + else: + account.lock(queue) + logger.debug(f"Using account {account.username} for queue {queue}") + + try: + client = account.make_client() + async for x in cb(client, cursor): + rep, data, cursor = x + yield rep, data, cursor + return # exit if no more results + except HTTPStatusError as e: + if e.response.status_code == 429: + account.update_limit(queue, e.response) + logger.debug(f"Account {account.username} is frozen") + continue + else: + raise e + finally: + account.unlock(queue) diff --git a/twapi/search.py b/twapi/search.py index bf33687..50cf7b6 100644 --- a/twapi/search.py +++ b/twapi/search.py @@ -1,7 +1,7 @@ from httpx import AsyncClient, Response from loguru import logger -from .client import UserClient, raise_for_status +from .pool import AccountsPool BASIC_SEARCH_PARAMS = """ include_profile_interstitial_type=1 @@ -45,9 +45,13 @@ SEARCH_URL = "https://api.twitter.com/2/search/adaptive.json" SEARCH_PARAMS = dict(x.split("=") for x in BASIC_SEARCH_PARAMS.splitlines() if x) +def rep_info(rep: Response) -> str: + return f"[{rep.headers['x-rate-limit-remaining']}/{rep.headers['x-rate-limit-limit']}]" + + class Search: - def __init__(self, account: UserClient): - self.account = account + def __init__(self, pool: AccountsPool): + self.pool = pool def get_next_cursor(self, res: dict) -> str | None: try: @@ -63,34 +67,32 @@ class Search: logger.debug(e) return None - def _log(self, rep: Response, msg: str): - limit = int(rep.headers.get("x-rate-limit-limit", -1)) - remaining = int(rep.headers.get("x-rate-limit-remaining", -1)) - logger.debug(f"[{remaining:3,d}/{limit:3,d}] {self.account.username} - {msg}") - - async def query(self, q: str, cursor: str | None = None): - client = AsyncClient() - client.headers.update(self.account.client.headers) - client.cookies.update(self.account.client.cookies) - - total_count = 0 + async def get(self, client: AsyncClient, q: str, cursor: str | None): while True: params = {**SEARCH_PARAMS, "q": q, "count": 20} params["cursor" if cursor else "requestContext"] = cursor if cursor else "launch" rep = await client.get(SEARCH_URL, params=params) - self.account.check_limits(rep, cursor) - raise_for_status(rep, "search") + rep.raise_for_status() data = rep.json() - tweets = data.get("globalObjects", {}).get("tweets", []) cursor = self.get_next_cursor(data) - # logger.debug(rep.text) - - total_count += len(tweets) - self._log(rep, f"{total_count:,d} (+{len(tweets):,d}) tweets - {q}") - + tweets = data.get("globalObjects", {}).get("tweets", []) if not tweets or not cursor: + is_tweets = len(tweets) > 0 + is_cursor = cursor is not None + logger.debug(f"{q} - no more results [res: {is_tweets}, cur: {is_cursor}]") return - yield rep.text + yield rep, data, cursor + + async def query(self, q: str): + total_count = 0 + async for x in self.pool.execute("search", lambda c, cur: self.get(c, q, cur)): + rep, data, cursor = x + + tweets = data.get("globalObjects", {}).get("tweets", []) + total_count += len(tweets) + logger.debug(f"{q} - {total_count:,d} (+{len(tweets):,d}) {rep_info(rep)}") + + yield rep