From a3bb5d2dc84b0634622c81fde40fc73f259bc17e Mon Sep 17 00:00:00 2001 From: Vlad Pronsky Date: Sat, 15 Jul 2023 02:25:49 +0300 Subject: [PATCH] update api errors handling --- _get_gql_ops.py | 26 +++++++++++ twscrape/accounts_pool.py | 18 +++++-- twscrape/cli.py | 7 ++- twscrape/queue_client.py | 98 +++++++++++++++++++-------------------- 4 files changed, 94 insertions(+), 55 deletions(-) create mode 100644 _get_gql_ops.py diff --git a/_get_gql_ops.py b/_get_gql_ops.py new file mode 100644 index 0000000..a00e934 --- /dev/null +++ b/_get_gql_ops.py @@ -0,0 +1,26 @@ +import httpx + +# update this url on next run +url = "https://abs.twimg.com/responsive-web/client-web/api.f4ff3bfa.js" +script = httpx.get(url).text + +ops = """ +SearchTimeline +UserByRestId +UserByScreenName +TweetDetail +Followers +Following +Retweeters +Favoriters +UserTweets +UserTweetsAndReplies +ListLatestTweetsTimeline +""" + +ops = [op.strip() for op in ops.split("\n") if op.strip()] + +for x in ops: + idx = script.split(f'operationName:"{x}"')[0].split("queryId:")[-1] + idx = idx.strip('",') + print(f'OP_{x} = "{idx}/{x}"') diff --git a/twscrape/accounts_pool.py b/twscrape/accounts_pool.py index 2f59a45..9f483f9 100644 --- a/twscrape/accounts_pool.py +++ b/twscrape/accounts_pool.py @@ -29,7 +29,8 @@ def guess_delim(line: str): class AccountsPool: - _order_by: str = "RANDOM()" + # _order_by: str = "RANDOM()" + _order_by: str = "username" def __init__(self, db_file="accounts.db"): self._db_file = db_file @@ -104,6 +105,10 @@ class AccountsPool: qs = f"""DELETE FROM accounts WHERE username IN ({','.join([f'"{x}"' for x in usernames])})""" await execute(self._db_file, qs) + async def delete_inactive(self): + qs = "DELETE FROM accounts WHERE active = false" + await execute(self._db_file, qs) + async def get(self, username: str): qs = "SELECT * FROM accounts WHERE username = :username" rs = await fetchone(self._db_file, qs, {"username": username}) @@ -278,6 +283,13 @@ class AccountsPool: return "none" + async def mark_banned(self, username: str, error_msg: str): + qs = """ + UPDATE accounts SET active = false, error_msg = :error_msg + WHERE username = :username + """ + await execute(self._db_file, qs, {"username": username, "error_msg": error_msg}) + async def stats(self): def locks_count(queue: str): return f""" @@ -312,17 +324,17 @@ class AccountsPool: "active": x.active, "last_used": x.last_used, "total_req": sum(x.stats.values()), - "error_msg": x.error_msg, + "error_msg": str(x.error_msg)[0:60], } items.append(item) old_time = datetime(1970, 1, 1).replace(tzinfo=timezone.utc) items = sorted(items, key=lambda x: x["username"].lower()) - items = sorted(items, key=lambda x: x["active"], reverse=True) items = sorted( items, key=lambda x: x["last_used"] or old_time if x["total_req"] > 0 else old_time, reverse=True, ) + items = sorted(items, key=lambda x: x["active"], reverse=True) # items = sorted(items, key=lambda x: x["total_req"], reverse=True) return items diff --git a/twscrape/cli.py b/twscrape/cli.py index fd27e5c..6c09ad7 100644 --- a/twscrape/cli.py +++ b/twscrape/cli.py @@ -95,6 +95,10 @@ async def main(args): await pool.reset_locks() return + if args.command == "delete_inactive": + await pool.delete_inactive() + return + fn = args.command + "_raw" if args.raw else args.command fn = getattr(api, fn, None) if fn is None: @@ -151,6 +155,7 @@ def run(): subparsers.add_parser("version", help="Show version") subparsers.add_parser("accounts", help="List all accounts") + subparsers.add_parser("stats", help="Get current usage stats") add_accounts = subparsers.add_parser("add_accounts", help="Add accounts") add_accounts.add_argument("file_path", help="File with accounts") @@ -166,7 +171,7 @@ def run(): subparsers.add_parser("relogin_failed", help="Retry login for failed accounts") subparsers.add_parser("reset_locks", help="Reset all locks") - subparsers.add_parser("stats", help="Get current usage stats") + subparsers.add_parser("delete_inactive", help="Delete inactive accounts") c_lim("search", "Search for tweets", "query", "Search query") c_one("tweet_details", "Get tweet details", "tweet_id", "Tweet ID", int) diff --git a/twscrape/queue_client.py b/twscrape/queue_client.py index 5035aac..222c1bf 100644 --- a/twscrape/queue_client.py +++ b/twscrape/queue_client.py @@ -10,6 +10,7 @@ from .logger import logger from .utils import utc_ts ReqParams = dict[str, str | int] | None +TMP_TS = datetime.utcnow().isoformat().split(".")[0].replace("T", "_").replace(":", "-")[0:16] class Ctx: @@ -31,6 +32,14 @@ class ApiError(Exception): return f"ApiError on {req_id(self.rep)} {msg}" +class RateLimitError(Exception): + pass + + +class BannedError(Exception): + pass + + def req_id(rep: httpx.Response): lr = str(rep.headers.get("x-rate-limit-remaining", -1)) ll = str(rep.headers.get("x-rate-limit-limit", -1)) @@ -46,9 +55,8 @@ def dump_rep(rep: httpx.Response): setattr(dump_rep, "__count", count) acc = getattr(rep, "__username", "") - fts = datetime.utcnow().isoformat().split(".")[0].replace("T", "_").replace(":", "-")[0:16] outfile = f"{count:05d}_{rep.status_code}_{acc}.txt" - outfile = f"/tmp/twscrape-{fts}/{outfile}" + outfile = f"/tmp/twscrape-{TMP_TS}/{outfile}" os.makedirs(os.path.dirname(outfile), exist_ok=True) msg = [] @@ -83,17 +91,23 @@ class QueueClient: async def __aexit__(self, exc_type, exc_val, exc_tb): await self._close_ctx() - async def _close_ctx(self, reset_at=-1): + async def _close_ctx(self, reset_at=-1, banned=False, msg=""): if self.ctx is None: return ctx, self.ctx, self.req_count = self.ctx, None, 0 + username = ctx.acc.username await ctx.clt.aclose() - if reset_at <= 0: - await self.pool.unlock(ctx.acc.username, self.queue, ctx.req_count) - else: + if banned: + await self.pool.mark_banned(username, msg) + return + + if reset_at > 0: await self.pool.lock_until(ctx.acc.username, self.queue, reset_at, ctx.req_count) + return + + await self.pool.unlock(ctx.acc.username, self.queue, ctx.req_count) async def _get_ctx(self) -> Ctx: if self.ctx: @@ -115,28 +129,45 @@ class QueueClient: msg = "OK" if "errors" in res: - msg = "; ".join([f'({x.get("code", -1)}) {x["message"]}' for x in res["errors"]]) + msg = set([f'({x.get("code", -1)}) {x["message"]}' for x in res["errors"]]) + msg = "; ".join(list(msg)) if self.debug: fn = logger.debug if rep.status_code == 200 else logger.warning fn(f"{rep.status_code:3d} - {req_id(rep)} - {msg}") + # need to add some features in api.py if msg.startswith("The following features cannot be null"): logger.error(f"Invalid request: {msg}") exit(1) + # general api rate limit + if int(rep.headers.get("x-rate-limit-remaining", -1)) == 0: + await self._close_ctx(int(rep.headers.get("x-rate-limit-reset", -1))) + raise RateLimitError(msg) + + # possible new limits for tweets view per account + if msg.startswith("(88) Rate limit exceeded") or rep.status_code == 429: + await self._close_ctx(utc_ts() + 60 * 60 * 4) # lock for 4 hours + raise RateLimitError(msg) + + if msg.startswith("(326) Authorization: Denied by access control"): + await self._close_ctx(-1, banned=True, msg=msg) + raise BannedError(msg) + + # content not found if rep.status_code == 200 and "_Missing: No status found with that ID." in msg: return # ignore this error + # todo: (32) Could not authenticate you + if msg != "OK": raise ApiError(rep, res) rep.raise_for_status() - ll = int(rep.headers.get("x-rate-limit-remaining", -1)) - lr = int(rep.headers.get("x-rate-limit-reset", 0)) - if ll == 0: - await self._close_ctx(lr) + async def get(self, url: str, params: ReqParams = None): + return await self.req("GET", url, params=params) async def req(self, method: str, url: str, params: ReqParams = None): retry_count = 0 @@ -151,46 +182,11 @@ class QueueClient: ctx.req_count += 1 # count only successful retry_count = 0 return rep - except httpx.HTTPStatusError as e: - rep = e.response - log_id = f"{req_id(rep)} on queue={self.queue}" - - reset_ts, known_code = -1, True - - if rep.status_code == 429: - # rate limit - reset_ts = int(rep.headers.get("x-rate-limit-reset", 0)) - logger.debug(f"Rate limit for {log_id}") - - elif rep.status_code == 400: - # api can return different types of cursors that not transfers between accounts - # just take the next account, the current cursor can work in it - logger.debug(f"Cursor not valid for {log_id}") - - elif rep.status_code in (401, 403): - # account is locked or banned - reset_ts = utc_ts() + 60 * 60 # + 1 hour - logger.warning(f"Code {rep.status_code} for {log_id} – frozen for 1h") - - else: - known_code = False - logger.warning(f"HTTP Error {rep.status_code} {e.request.url}\n{rep.text}") - - await self._close_ctx(reset_ts) - if not known_code: - raise e - except ApiError as e: - # possible account banned - reset_ts = utc_ts() + 60 * 60 * 12 # 12 hours - await self._close_ctx(reset_ts) + except (RateLimitError, BannedError): + # already handled + continue except Exception as e: - logger.warning(f"Unknown error, retrying. Err ({type(e)}): {str(e)}") retry_count += 1 - if retry_count > 3: + if retry_count >= 3: + logger.warning(f"Unknown error {type(e)}: {e}") await self._close_ctx(utc_ts() + 60 * 15) # 15 minutes - - async def get(self, url: str, params: ReqParams = None): - try: - return await self.req("GET", url, params=params) - except httpx.HTTPStatusError as e: - raise e