diff --git a/twscrape/accounts_pool.py b/twscrape/accounts_pool.py index 0066b8f..dd3cd13 100644 --- a/twscrape/accounts_pool.py +++ b/twscrape/accounts_pool.py @@ -171,6 +171,10 @@ class AccountsPool: rs = await fetchall(self._db_file, qs) await self.relogin([x["username"] for x in rs]) + async def reset_locks(self): + qs = "UPDATE accounts SET locks = json_object()" + await execute(self._db_file, qs) + async def set_active(self, username: str, active: bool): qs = "UPDATE accounts SET active = :active WHERE username = :username" await execute(self._db_file, qs, {"username": username, "active": active}) @@ -233,10 +237,13 @@ class AccountsPool: return Account.from_rs(rs) if rs else None async def get_for_queue_or_wait(self, queue: str) -> Account: + msg_show = False while True: account = await self.get_for_queue(queue) if not account: - logger.debug(f"No accounts available for queue '{queue}' (sleeping for 5 sec)") + if not msg_show: + logger.info(f"No accounts available for queue '{queue}' (sleeping for 5 sec)") + msg_show = True await asyncio.sleep(5) continue diff --git a/twscrape/cli.py b/twscrape/cli.py index a2bdbcd..44ce45c 100644 --- a/twscrape/cli.py +++ b/twscrape/cli.py @@ -92,6 +92,10 @@ async def main(args): await pool.relogin(args.usernames) return + if args.command == "reset_locks": + await pool.reset_locks() + return + fn = args.command + "_raw" if args.raw else args.command fn = getattr(api, fn, None) if fn is None: @@ -162,6 +166,7 @@ def run(): relogin.add_argument("usernames", nargs="+", default=[], help="Usernames to re-login") subparsers.add_parser("relogin_failed", help="Retry login for failed accounts") + subparsers.add_parser("reset_locks", help="Reset all locks") subparsers.add_parser("stats", help="Get current usage stats") c_lim("search", "Search for tweets", "query", "Search query") diff --git a/twscrape/queue_client.py b/twscrape/queue_client.py index fa59767..be7236a 100644 --- a/twscrape/queue_client.py +++ b/twscrape/queue_client.py @@ -1,5 +1,5 @@ import json -from datetime import datetime +import os import httpx @@ -35,12 +35,36 @@ class ApiError(Exception): return f"ApiError ({self.rep.status_code}) {' ~ '.join(self.errors)}" +def dump_rep(rep: httpx.Response): + count = getattr(rep, "__count", -1) + 1 + setattr(rep, "__count", count) + + acc = getattr(rep, "__username", "") + outfile = f"/tmp/twscrape/{count:05d}_{rep.status_code}_{acc}.txt" + os.makedirs(os.path.dirname(outfile), exist_ok=True) + + msg = [] + msg.append(f"{count:,d} - {req_id(rep)}") + msg.append(f"{rep.status_code} {rep.request.method} {rep.request.url}") + msg.append("\n") + msg.append("\n".join([str(x) for x in list(rep.request.headers.items())])) + msg.append("\n") + + try: + msg.append(json.dumps(rep.json(), indent=2)) + except json.JSONDecodeError: + msg.append(rep.text) + + txt = "\n".join(msg) + with open(outfile, "w") as f: + f.write(txt) + + class QueueClient: def __init__(self, pool: AccountsPool, queue: str, debug=False): self.pool = pool self.queue = queue self.debug = debug - self.history: list[httpx.Response] = [] self.ctx: Ctx | None = None async def __aenter__(self): @@ -71,34 +95,6 @@ class QueueClient: self.ctx = Ctx(acc, clt) return self.ctx - def _push_history(self, rep: httpx.Response): - self.history.append(rep) - if len(self.history) > 3: - self.history.pop(0) - - def _dump_history(self, extra: str = ""): - if not self.debug: - return - - ts = str(datetime.now()).replace(":", "-").replace(" ", "_") - filename = f"/tmp/api_dump_{ts}.txt" - with open(filename, "w", encoding="utf-8") as fp: - txt = f"{extra}\n" - for rep in self.history: - res = json.dumps(rep.json(), indent=2) - hdr = "\n".join([str(x) for x in list(rep.request.headers.items())]) - div = "-" * 20 - - msg = f"{div}\n{req_id(rep)}" - msg = f"{msg}\n{rep.request.method} {rep.request.url}" - msg = f"{msg}\n{rep.status_code}\n{div}" - msg = f"{msg}\n{hdr}\n{div}\n{res}\n\n" - txt += msg - - fp.write(txt) - - print(f"API dump ({len(self.history)}) dumped to {filename}") - async def req(self, method: str, url: str, params: ReqParams = None): retry_count = 0 while True: @@ -107,13 +103,14 @@ class QueueClient: try: rep = await ctx.clt.request(method, url, params=params) setattr(rep, "__username", ctx.acc.username) - self._push_history(rep) + dump_rep(rep) - rep.raise_for_status() res = rep.json() if "errors" in res: raise ApiError(rep, res) + rep.raise_for_status() + ctx.req_count += 1 # count only successful retry_count = 0 return rep @@ -146,7 +143,8 @@ class QueueClient: if not known_code: raise e except ApiError as e: - reset_ts = utc_ts() + 60 * 60 * 4 # 4 hours + # possible account banned + reset_ts = utc_ts() + 60 * 60 * 12 # 12 hours await self._close_ctx(reset_ts) logger.warning(e) except Exception as e: @@ -159,5 +157,4 @@ class QueueClient: try: return await self.req("GET", url, params=params) except httpx.HTTPStatusError as e: - self._dump_history(f"GET {url} {json.dumps(params)}") raise e diff --git a/twscrape/utils.py b/twscrape/utils.py index 405ca21..b1c4ee1 100644 --- a/twscrape/utils.py +++ b/twscrape/utils.py @@ -1,7 +1,7 @@ import json from collections import defaultdict from datetime import datetime, timezone -from typing import Any, AsyncGenerator, Callable, TypeVar +from typing import Any, AsyncGenerator, Callable, TypedDict, TypeVar from httpx import HTTPStatusError, Response @@ -130,7 +130,7 @@ def to_old_obj(obj: dict): } -def to_old_rep(obj: dict): +def to_old_rep(obj: dict) -> dict[str, dict]: tmp = get_typed_object(obj, defaultdict(list)) tweets = [x for x in tmp.get("Tweet", []) if "legacy" in x]