improve search api stability; add debug mode

Этот коммит содержится в:
Vlad Pronsky 2023-05-01 22:47:07 +03:00
родитель ab3ffda420
Коммит eefcf88d95
3 изменённых файлов: 112 добавлений и 50 удалений

Просмотреть файл

@ -20,7 +20,7 @@ classifiers = [
dependencies = [ dependencies = [
"fake-useragent==1.1.3", "fake-useragent==1.1.3",
"httpx==0.24.0", "httpx==0.24.0",
"loguru==0.7.0" "loguru==0.7.0",
] ]
[project.optional-dependencies] [project.optional-dependencies]
@ -38,6 +38,18 @@ repository = "https://github.com/vladkens/tw-api"
[tool.setuptools] [tool.setuptools]
packages = ['twapi'] packages = ['twapi']
[tool.pylint]
max-line-length = 99
disable = [
"C0103", # invalid-name
"C0114", # missing-module-docstring
"C0115", # missing-class-docstring
"C0116", # missing-function-docstring
"R0903", # too-few-public-methods
"R0913", # too-many-arguments
"W0105", # pointless-string-statement
]
[tool.pytest.ini_options] [tool.pytest.ini_options]
pythonpath = ["."] pythonpath = ["."]
asyncio_mode = "auto" asyncio_mode = "auto"

Просмотреть файл

@ -1,4 +1,6 @@
import json
import time import time
from datetime import datetime
from typing import Awaitable, Callable from typing import Awaitable, Callable
from httpx import AsyncClient, HTTPStatusError, Response from httpx import AsyncClient, HTTPStatusError, Response
@ -7,12 +9,14 @@ from .accounts_pool import AccountsPool
from .constants import GQL_FEATURES, GQL_URL, SEARCH_PARAMS, SEARCH_URL from .constants import GQL_FEATURES, GQL_URL, SEARCH_PARAMS, SEARCH_URL
from .logger import logger from .logger import logger
from .models import Tweet, User from .models import Tweet, User
from .utils import encode_params, get_by_path, to_old_obj, to_search_like from .utils import encode_params, find_obj, get_by_path, to_old_obj, to_search_like
class API: class API:
def __init__(self, pool: AccountsPool): def __init__(self, pool: AccountsPool, debug=False):
self.pool = pool self.pool = pool
self.debug = debug
self._history: list[Response] = []
# http helpers # http helpers
@ -39,6 +43,34 @@ class API:
return new_total, not is_res, not is_cur or is_lim return new_total, not is_res, not is_cur or is_lim
def _push_history(self, rep: Response):
self._history.append(rep)
if len(self._history) > 3:
self._history.pop(0)
def _dump_history(self, extra: str = ""):
if not self.debug:
return
ts = str(datetime.now()).replace(":", "-").replace(" ", "_")
filename = f"/tmp/api_dump_{ts}.txt"
with open(filename, "w") as fp:
txt = f"{extra}\n"
for rep in self._history:
res = json.dumps(rep.json(), indent=2)
hdr = "\n".join([str(x) for x in list(rep.request.headers.items())])
div = "-" * 20
msg = f"{div}\n{self._limit_msg(rep)}"
msg = f"{msg}\n{rep.request.method} {rep.request.url}"
msg = f"{msg}\n{rep.status_code}\n{div}"
msg = f"{msg}\n{hdr}\n{div}\n{res}\n\n"
txt += msg
fp.write(txt)
print(f"API dump ({len(self._history)}) dumped to {filename}")
async def _inf_req(self, queue: str, cb: Callable[[AsyncClient], Awaitable[Response]]): async def _inf_req(self, queue: str, cb: Callable[[AsyncClient], Awaitable[Response]]):
while True: while True:
account = await self.pool.get_account_or_wait(queue) account = await self.pool.get_account_or_wait(queue)
@ -47,52 +79,47 @@ class API:
while True: while True:
rep = await cb(account.client) rep = await cb(account.client)
rep.raise_for_status() rep.raise_for_status()
self._push_history(rep)
yield rep yield rep
except HTTPStatusError as e: except HTTPStatusError as e:
if e.response.status_code == 429: rep = e.response
logger.debug(f"Rate limit for account={account.username} on queue={queue}") self._push_history(rep)
reset_ts = int(e.response.headers.get("x-rate-limit-reset", 0)) log_id = f"{self._limit_msg(rep)} on queue={queue}"
# rate limit
if rep.status_code == 429:
logger.debug(f"Rate limit for {log_id}")
reset_ts = int(rep.headers.get("x-rate-limit-reset", 0))
self.pool.update_limit(account, queue, reset_ts) self.pool.update_limit(account, queue, reset_ts)
continue continue
if e.response.status_code == 403: # possible account banned
logger.debug(f"Account={account.username} is banned on queue={queue}") if rep.status_code == 403:
logger.debug(f"Ban for {log_id}")
reset_ts = int(time.time() + 60 * 60) # 1 hour reset_ts = int(time.time() + 60 * 60) # 1 hour
self.pool.update_limit(account, queue, reset_ts) self.pool.update_limit(account, queue, reset_ts)
continue continue
logger.error(f"[{e.response.status_code}] {e.request.url}\n{e.response.text}") # twitter can return different types of cursors that not transfers between accounts
# just take the next account, the current cursor can work in it
if rep.status_code == 400:
logger.debug(f"Cursor not valid for {log_id}")
continue
logger.error(f"[{rep.status_code}] {e.request.url}\n{rep.text}")
raise e raise e
finally: finally:
account.unlock(queue) account.unlock(queue)
def _get_search_cursor(self, res: dict) -> str | None: def _get_cursor(self, obj: dict):
try: if cur := find_obj(obj, lambda x: x.get("cursorType") == "Bottom"):
for x in res["timeline"]["instructions"]: return cur.get("value")
entry = x.get("replaceEntry", None) return None
if entry is not None and entry["entryIdToReplace"] == "sq-cursor-bottom":
return entry["entry"]["content"]["operation"]["cursor"]["value"]
for entry in x.get("addEntries", {}).get("entries", []):
if entry["entryId"] == "sq-cursor-bottom":
return entry["content"]["operation"]["cursor"]["value"]
except Exception as e:
logger.debug(e)
return None
def _get_ql_entries(self, obj: dict) -> list[dict]: def _get_ql_entries(self, obj: dict) -> list[dict]:
entries = get_by_path(obj, "entries") entries = get_by_path(obj, "entries")
return entries or [] return entries or []
def _get_ql_cursor(self, obj: dict) -> str | None:
try:
for entry in self._get_ql_entries(obj):
if entry["entryId"].startswith("cursor-bottom-"):
return entry["content"]["value"]
return None
except Exception:
return None
async def _ql_items(self, op: str, kv: dict, limit=-1): async def _ql_items(self, op: str, kv: dict, limit=-1):
queue, cursor, count = op.split("/")[-1], None, 0 queue, cursor, count = op.split("/")[-1], None, 0
@ -106,7 +133,7 @@ class API:
# cursor-top / cursor-bottom always present # cursor-top / cursor-bottom always present
entries = self._get_ql_entries(obj) entries = self._get_ql_entries(obj)
entries = [x for x in entries if not x["entryId"].startswith("cursor-")] entries = [x for x in entries if not x["entryId"].startswith("cursor-")]
cursor = self._get_ql_cursor(obj) cursor = self._get_cursor(obj)
check = self._is_end(rep, queue, entries, cursor, count, limit) check = self._is_end(rep, queue, entries, cursor, count, limit)
count, end_before, end_after = check count, end_before, end_after = check
@ -141,29 +168,26 @@ class API:
params["cursor" if cursor else "requestContext"] = cursor if cursor else "launch" params["cursor" if cursor else "requestContext"] = cursor if cursor else "launch"
return await client.get(SEARCH_URL, params=params) return await client.get(SEARCH_URL, params=params)
retries = 0 try:
async for rep in self._inf_req(queue, _get): async for rep in self._inf_req(queue, _get):
data = rep.json() data = rep.json()
tweets = data.get("globalObjects", {}).get("tweets", []) tweets = data.get("globalObjects", {}).get("tweets", [])
if not tweets and retries < 3: cursor = self._get_cursor(data)
retries += 1
continue
else:
retries = 0
cursor = self._get_search_cursor(data) check = self._is_end(rep, q, tweets, cursor, count, limit)
count, end_before, end_after = check
check = self._is_end(rep, q, tweets, cursor, count, limit) if end_before:
count, end_before, end_after = check return
if end_before: yield rep
return
yield rep if end_after:
return
if end_after: except HTTPStatusError as e:
return self._dump_history(f"q={q}\ncount={count}\nwas_cur={cursor}\nnew_cur=None")
raise e
async def search(self, q: str, limit=-1): async def search(self, q: str, limit=-1):
async for rep in self.search_raw(q, limit=limit): async for rep in self.search_raw(q, limit=limit):

Просмотреть файл

@ -78,6 +78,32 @@ def find_item(lst: list[T], fn: Callable[[T], bool]) -> T | None:
return None return None
def find_or_fail(lst: list[T], fn: Callable[[T], bool]) -> T:
item = find_item(lst, fn)
if item is None:
raise ValueError()
return item
def find_obj(obj: dict, fn: Callable[[dict], bool]) -> Any | None:
if not isinstance(obj, dict):
return None
if fn(obj):
return obj
for _, v in obj.items():
if isinstance(v, dict):
if res := find_obj(v, fn):
return res
elif isinstance(v, list):
for x in v:
if res := find_obj(x, fn):
return res
return None
def get_typed_object(obj: dict, res: defaultdict[str, list]): def get_typed_object(obj: dict, res: defaultdict[str, list]):
obj_type = obj.get("__typename", None) obj_type = obj.get("__typename", None)
if obj_type is not None: if obj_type is not None: