From 364c0ddfd0fd9f439a4e07f4945974bb5ac97ebd Mon Sep 17 00:00:00 2001 From: Vlad Pronsky Date: Fri, 7 Jul 2023 01:40:53 +0300 Subject: [PATCH] update queue_client errors handling --- tests/test_pool.py | 5 +++-- twscrape/accounts_pool.py | 6 +++--- twscrape/queue_client.py | 41 +++++++++++++++++++++++---------------- 3 files changed, 30 insertions(+), 22 deletions(-) diff --git a/tests/test_pool.py b/tests/test_pool.py index 7804871..19e5095 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -130,8 +130,9 @@ async def test_get_stats(pool_mock: AccountsPool): assert stats["active"] == 1 # should update queue stats - await pool_mock.get_for_queue(Q) + acc = await pool_mock.get_for_queue(Q) + assert acc is not None stats = await pool_mock.stats() assert stats["total"] == 1 assert stats["active"] == 1 - assert stats["locked_SearchTimeline"] == 1 + assert stats[f"locked_{Q}"] == 1 diff --git a/twscrape/accounts_pool.py b/twscrape/accounts_pool.py index 06877a1..39ea6e4 100644 --- a/twscrape/accounts_pool.py +++ b/twscrape/accounts_pool.py @@ -229,14 +229,14 @@ class AccountsPool: return account async def stats(self): - def by_queue(queue: str): + def locks_count(queue: str): return f""" SELECT COUNT(*) FROM accounts WHERE json_extract(locks, '$.{queue}') IS NOT NULL AND json_extract(locks, '$.{queue}') > datetime('now') """ - qs = "SELECT DISTINCT(f.key) as k from accounts, json_each(stats) f" + qs = "SELECT DISTINCT(f.key) as k from accounts, json_each(locks) f" rs = await fetchall(self._db_file, qs) gql_ops = [x["k"] for x in rs] @@ -244,7 +244,7 @@ class AccountsPool: ("total", "SELECT COUNT(*) FROM accounts"), ("active", "SELECT COUNT(*) FROM accounts WHERE active = true"), ("inactive", "SELECT COUNT(*) FROM accounts WHERE active = false"), - *[(f"locked_{x}", by_queue(x)) for x in gql_ops], + *[(f"locked_{x}", locks_count(x)) for x in gql_ops], ] qs = f"SELECT {','.join([f'({q}) as {k}' for k, q in config])}" diff --git a/twscrape/queue_client.py b/twscrape/queue_client.py index fbaac24..6aaf07b 100644 --- a/twscrape/queue_client.py +++ b/twscrape/queue_client.py @@ -90,6 +90,7 @@ class QueueClient: print(f"API dump ({len(self.history)}) dumped to {filename}") async def req(self, method: str, url: str, params: ReqParams = None): + retry_count = 0 while True: ctx = await self._get_ctx() @@ -99,35 +100,41 @@ class QueueClient: self._push_history(rep) rep.raise_for_status() ctx.req_count += 1 # count only successful + retry_count = 0 return rep except httpx.HTTPStatusError as e: rep = e.response log_id = f"{req_id(rep)} on queue={self.queue}" - # rate limit - if rep.status_code == 429: - logger.debug(f"Rate limit for {log_id}") - reset_ts = int(rep.headers.get("x-rate-limit-reset", 0)) - await self._close_ctx(reset_ts) # get next account on next iteration - continue + reset_ts, known_code = -1, True - # possible account banned - if rep.status_code in (401, 403): + if rep.status_code == 429: + # rate limit + reset_ts = int(rep.headers.get("x-rate-limit-reset", 0)) + logger.debug(f"Rate limit for {log_id}") + + elif rep.status_code == 400: + # twitter can return different types of cursors that not transfers between accounts + # just take the next account, the current cursor can work in it + logger.debug(f"Cursor not valid for {log_id}") + + elif rep.status_code in (401, 403): + # account is locked or banned reset_ts = utc_ts() + 60 * 60 # + 1 hour logger.warning(f"Code {rep.status_code} for {log_id} – frozen for 1h") - await self._close_ctx(reset_ts) # get next account on next iteration - continue - # twitter can return different types of cursors that not transfers between accounts - # just take the next account, the current cursor can work in it - if rep.status_code == 400: - logger.debug(f"Cursor not valid for {log_id}") - continue + else: + known_code = False + logger.debug(f"HTTP Error {rep.status_code} {e.request.url}\n{rep.text}") - logger.error(f"[{rep.status_code}] {e.request.url}\n{rep.text}") - raise e + await self._close_ctx(reset_ts) + if not known_code: + raise e except Exception as e: logger.warning(f"Unknown error, retrying. Err ({type(e)}): {str(e)}") + retry_count += 1 + if retry_count > 3: + await self._close_ctx(utc_ts() + 60 * 15) # 15 minutes async def get(self, url: str, params: ReqParams = None): try: