update queue_client errors handling

Этот коммит содержится в:
Vlad Pronsky 2023-07-07 01:40:53 +03:00
родитель 852675954e
Коммит 364c0ddfd0
3 изменённых файлов: 30 добавлений и 22 удалений

Просмотреть файл

@ -130,8 +130,9 @@ async def test_get_stats(pool_mock: AccountsPool):
assert stats["active"] == 1
# should update queue stats
await pool_mock.get_for_queue(Q)
acc = await pool_mock.get_for_queue(Q)
assert acc is not None
stats = await pool_mock.stats()
assert stats["total"] == 1
assert stats["active"] == 1
assert stats["locked_SearchTimeline"] == 1
assert stats[f"locked_{Q}"] == 1

Просмотреть файл

@ -229,14 +229,14 @@ class AccountsPool:
return account
async def stats(self):
def by_queue(queue: str):
def locks_count(queue: str):
return f"""
SELECT COUNT(*) FROM accounts
WHERE json_extract(locks, '$.{queue}') IS NOT NULL
AND json_extract(locks, '$.{queue}') > datetime('now')
"""
qs = "SELECT DISTINCT(f.key) as k from accounts, json_each(stats) f"
qs = "SELECT DISTINCT(f.key) as k from accounts, json_each(locks) f"
rs = await fetchall(self._db_file, qs)
gql_ops = [x["k"] for x in rs]
@ -244,7 +244,7 @@ class AccountsPool:
("total", "SELECT COUNT(*) FROM accounts"),
("active", "SELECT COUNT(*) FROM accounts WHERE active = true"),
("inactive", "SELECT COUNT(*) FROM accounts WHERE active = false"),
*[(f"locked_{x}", by_queue(x)) for x in gql_ops],
*[(f"locked_{x}", locks_count(x)) for x in gql_ops],
]
qs = f"SELECT {','.join([f'({q}) as {k}' for k, q in config])}"

Просмотреть файл

@ -90,6 +90,7 @@ class QueueClient:
print(f"API dump ({len(self.history)}) dumped to {filename}")
async def req(self, method: str, url: str, params: ReqParams = None):
retry_count = 0
while True:
ctx = await self._get_ctx()
@ -99,35 +100,41 @@ class QueueClient:
self._push_history(rep)
rep.raise_for_status()
ctx.req_count += 1 # count only successful
retry_count = 0
return rep
except httpx.HTTPStatusError as e:
rep = e.response
log_id = f"{req_id(rep)} on queue={self.queue}"
# rate limit
if rep.status_code == 429:
logger.debug(f"Rate limit for {log_id}")
reset_ts = int(rep.headers.get("x-rate-limit-reset", 0))
await self._close_ctx(reset_ts) # get next account on next iteration
continue
reset_ts, known_code = -1, True
# possible account banned
if rep.status_code in (401, 403):
if rep.status_code == 429:
# rate limit
reset_ts = int(rep.headers.get("x-rate-limit-reset", 0))
logger.debug(f"Rate limit for {log_id}")
elif rep.status_code == 400:
# twitter can return different types of cursors that not transfers between accounts
# just take the next account, the current cursor can work in it
logger.debug(f"Cursor not valid for {log_id}")
elif rep.status_code in (401, 403):
# account is locked or banned
reset_ts = utc_ts() + 60 * 60 # + 1 hour
logger.warning(f"Code {rep.status_code} for {log_id} – frozen for 1h")
await self._close_ctx(reset_ts) # get next account on next iteration
continue
# twitter can return different types of cursors that not transfers between accounts
# just take the next account, the current cursor can work in it
if rep.status_code == 400:
logger.debug(f"Cursor not valid for {log_id}")
continue
else:
known_code = False
logger.debug(f"HTTP Error {rep.status_code} {e.request.url}\n{rep.text}")
logger.error(f"[{rep.status_code}] {e.request.url}\n{rep.text}")
raise e
await self._close_ctx(reset_ts)
if not known_code:
raise e
except Exception as e:
logger.warning(f"Unknown error, retrying. Err ({type(e)}): {str(e)}")
retry_count += 1
if retry_count > 3:
await self._close_ctx(utc_ts() + 60 * 15) # 15 minutes
async def get(self, url: str, params: ReqParams = None):
try: