diff --git a/pyproject.toml b/pyproject.toml index 6c12b4d..059ab26 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ classifiers = [ dependencies = [ "fake-useragent==1.1.3", "httpx==0.24.0", - "loguru==0.7.0" + "loguru==0.7.0", ] [project.optional-dependencies] @@ -38,6 +38,18 @@ repository = "https://github.com/vladkens/tw-api" [tool.setuptools] packages = ['twapi'] +[tool.pylint] +max-line-length = 99 +disable = [ + "C0103", # invalid-name + "C0114", # missing-module-docstring + "C0115", # missing-class-docstring + "C0116", # missing-function-docstring + "R0903", # too-few-public-methods + "R0913", # too-many-arguments + "W0105", # pointless-string-statement +] + [tool.pytest.ini_options] pythonpath = ["."] asyncio_mode = "auto" diff --git a/twapi/api.py b/twapi/api.py index 78f39df..89b84bc 100644 --- a/twapi/api.py +++ b/twapi/api.py @@ -1,4 +1,6 @@ +import json import time +from datetime import datetime from typing import Awaitable, Callable from httpx import AsyncClient, HTTPStatusError, Response @@ -7,12 +9,14 @@ from .accounts_pool import AccountsPool from .constants import GQL_FEATURES, GQL_URL, SEARCH_PARAMS, SEARCH_URL from .logger import logger from .models import Tweet, User -from .utils import encode_params, get_by_path, to_old_obj, to_search_like +from .utils import encode_params, find_obj, get_by_path, to_old_obj, to_search_like class API: - def __init__(self, pool: AccountsPool): + def __init__(self, pool: AccountsPool, debug=False): self.pool = pool + self.debug = debug + self._history: list[Response] = [] # http helpers @@ -39,6 +43,34 @@ class API: return new_total, not is_res, not is_cur or is_lim + def _push_history(self, rep: Response): + self._history.append(rep) + if len(self._history) > 3: + self._history.pop(0) + + def _dump_history(self, extra: str = ""): + if not self.debug: + return + + ts = str(datetime.now()).replace(":", "-").replace(" ", "_") + filename = f"/tmp/api_dump_{ts}.txt" + with open(filename, "w") as fp: + txt = f"{extra}\n" + for rep in self._history: + res = json.dumps(rep.json(), indent=2) + hdr = "\n".join([str(x) for x in list(rep.request.headers.items())]) + div = "-" * 20 + + msg = f"{div}\n{self._limit_msg(rep)}" + msg = f"{msg}\n{rep.request.method} {rep.request.url}" + msg = f"{msg}\n{rep.status_code}\n{div}" + msg = f"{msg}\n{hdr}\n{div}\n{res}\n\n" + txt += msg + + fp.write(txt) + + print(f"API dump ({len(self._history)}) dumped to {filename}") + async def _inf_req(self, queue: str, cb: Callable[[AsyncClient], Awaitable[Response]]): while True: account = await self.pool.get_account_or_wait(queue) @@ -47,52 +79,47 @@ class API: while True: rep = await cb(account.client) rep.raise_for_status() + self._push_history(rep) yield rep except HTTPStatusError as e: - if e.response.status_code == 429: - logger.debug(f"Rate limit for account={account.username} on queue={queue}") - reset_ts = int(e.response.headers.get("x-rate-limit-reset", 0)) + rep = e.response + self._push_history(rep) + log_id = f"{self._limit_msg(rep)} on queue={queue}" + + # rate limit + if rep.status_code == 429: + logger.debug(f"Rate limit for {log_id}") + reset_ts = int(rep.headers.get("x-rate-limit-reset", 0)) self.pool.update_limit(account, queue, reset_ts) continue - if e.response.status_code == 403: - logger.debug(f"Account={account.username} is banned on queue={queue}") + # possible account banned + if rep.status_code == 403: + logger.debug(f"Ban for {log_id}") reset_ts = int(time.time() + 60 * 60) # 1 hour self.pool.update_limit(account, queue, reset_ts) continue - logger.error(f"[{e.response.status_code}] {e.request.url}\n{e.response.text}") + # twitter can return different types of cursors that not transfers between accounts + # just take the next account, the current cursor can work in it + if rep.status_code == 400: + logger.debug(f"Cursor not valid for {log_id}") + continue + + logger.error(f"[{rep.status_code}] {e.request.url}\n{rep.text}") raise e finally: account.unlock(queue) - def _get_search_cursor(self, res: dict) -> str | None: - try: - for x in res["timeline"]["instructions"]: - entry = x.get("replaceEntry", None) - if entry is not None and entry["entryIdToReplace"] == "sq-cursor-bottom": - return entry["entry"]["content"]["operation"]["cursor"]["value"] - - for entry in x.get("addEntries", {}).get("entries", []): - if entry["entryId"] == "sq-cursor-bottom": - return entry["content"]["operation"]["cursor"]["value"] - except Exception as e: - logger.debug(e) - return None + def _get_cursor(self, obj: dict): + if cur := find_obj(obj, lambda x: x.get("cursorType") == "Bottom"): + return cur.get("value") + return None def _get_ql_entries(self, obj: dict) -> list[dict]: entries = get_by_path(obj, "entries") return entries or [] - def _get_ql_cursor(self, obj: dict) -> str | None: - try: - for entry in self._get_ql_entries(obj): - if entry["entryId"].startswith("cursor-bottom-"): - return entry["content"]["value"] - return None - except Exception: - return None - async def _ql_items(self, op: str, kv: dict, limit=-1): queue, cursor, count = op.split("/")[-1], None, 0 @@ -106,7 +133,7 @@ class API: # cursor-top / cursor-bottom always present entries = self._get_ql_entries(obj) entries = [x for x in entries if not x["entryId"].startswith("cursor-")] - cursor = self._get_ql_cursor(obj) + cursor = self._get_cursor(obj) check = self._is_end(rep, queue, entries, cursor, count, limit) count, end_before, end_after = check @@ -141,29 +168,26 @@ class API: params["cursor" if cursor else "requestContext"] = cursor if cursor else "launch" return await client.get(SEARCH_URL, params=params) - retries = 0 - async for rep in self._inf_req(queue, _get): - data = rep.json() + try: + async for rep in self._inf_req(queue, _get): + data = rep.json() - tweets = data.get("globalObjects", {}).get("tweets", []) - if not tweets and retries < 3: - retries += 1 - continue - else: - retries = 0 + tweets = data.get("globalObjects", {}).get("tweets", []) + cursor = self._get_cursor(data) - cursor = self._get_search_cursor(data) + check = self._is_end(rep, q, tweets, cursor, count, limit) + count, end_before, end_after = check - check = self._is_end(rep, q, tweets, cursor, count, limit) - count, end_before, end_after = check + if end_before: + return - if end_before: - return + yield rep - yield rep - - if end_after: - return + if end_after: + return + except HTTPStatusError as e: + self._dump_history(f"q={q}\ncount={count}\nwas_cur={cursor}\nnew_cur=None") + raise e async def search(self, q: str, limit=-1): async for rep in self.search_raw(q, limit=limit): diff --git a/twapi/utils.py b/twapi/utils.py index 98627fd..06be3ac 100644 --- a/twapi/utils.py +++ b/twapi/utils.py @@ -78,6 +78,32 @@ def find_item(lst: list[T], fn: Callable[[T], bool]) -> T | None: return None +def find_or_fail(lst: list[T], fn: Callable[[T], bool]) -> T: + item = find_item(lst, fn) + if item is None: + raise ValueError() + return item + + +def find_obj(obj: dict, fn: Callable[[dict], bool]) -> Any | None: + if not isinstance(obj, dict): + return None + + if fn(obj): + return obj + + for _, v in obj.items(): + if isinstance(v, dict): + if res := find_obj(v, fn): + return res + elif isinstance(v, list): + for x in v: + if res := find_obj(x, fn): + return res + + return None + + def get_typed_object(obj: dict, res: defaultdict[str, list]): obj_type = obj.get("__typename", None) if obj_type is not None: