ability to reset locks for accounts; update replies dumps

Этот коммит содержится в:
Vlad Pronsky 2023-07-12 14:34:05 +03:00
родитель 7866e44ade
Коммит 87ef50878d
4 изменённых файлов: 46 добавлений и 37 удалений

Просмотреть файл

@ -171,6 +171,10 @@ class AccountsPool:
rs = await fetchall(self._db_file, qs)
await self.relogin([x["username"] for x in rs])
async def reset_locks(self):
qs = "UPDATE accounts SET locks = json_object()"
await execute(self._db_file, qs)
async def set_active(self, username: str, active: bool):
qs = "UPDATE accounts SET active = :active WHERE username = :username"
await execute(self._db_file, qs, {"username": username, "active": active})
@ -233,10 +237,13 @@ class AccountsPool:
return Account.from_rs(rs) if rs else None
async def get_for_queue_or_wait(self, queue: str) -> Account:
msg_show = False
while True:
account = await self.get_for_queue(queue)
if not account:
logger.debug(f"No accounts available for queue '{queue}' (sleeping for 5 sec)")
if not msg_show:
logger.info(f"No accounts available for queue '{queue}' (sleeping for 5 sec)")
msg_show = True
await asyncio.sleep(5)
continue

Просмотреть файл

@ -92,6 +92,10 @@ async def main(args):
await pool.relogin(args.usernames)
return
if args.command == "reset_locks":
await pool.reset_locks()
return
fn = args.command + "_raw" if args.raw else args.command
fn = getattr(api, fn, None)
if fn is None:
@ -162,6 +166,7 @@ def run():
relogin.add_argument("usernames", nargs="+", default=[], help="Usernames to re-login")
subparsers.add_parser("relogin_failed", help="Retry login for failed accounts")
subparsers.add_parser("reset_locks", help="Reset all locks")
subparsers.add_parser("stats", help="Get current usage stats")
c_lim("search", "Search for tweets", "query", "Search query")

Просмотреть файл

@ -1,5 +1,5 @@
import json
from datetime import datetime
import os
import httpx
@ -35,12 +35,36 @@ class ApiError(Exception):
return f"ApiError ({self.rep.status_code}) {' ~ '.join(self.errors)}"
def dump_rep(rep: httpx.Response):
count = getattr(rep, "__count", -1) + 1
setattr(rep, "__count", count)
acc = getattr(rep, "__username", "<unknown>")
outfile = f"/tmp/twscrape/{count:05d}_{rep.status_code}_{acc}.txt"
os.makedirs(os.path.dirname(outfile), exist_ok=True)
msg = []
msg.append(f"{count:,d} - {req_id(rep)}")
msg.append(f"{rep.status_code} {rep.request.method} {rep.request.url}")
msg.append("\n")
msg.append("\n".join([str(x) for x in list(rep.request.headers.items())]))
msg.append("\n")
try:
msg.append(json.dumps(rep.json(), indent=2))
except json.JSONDecodeError:
msg.append(rep.text)
txt = "\n".join(msg)
with open(outfile, "w") as f:
f.write(txt)
class QueueClient:
def __init__(self, pool: AccountsPool, queue: str, debug=False):
self.pool = pool
self.queue = queue
self.debug = debug
self.history: list[httpx.Response] = []
self.ctx: Ctx | None = None
async def __aenter__(self):
@ -71,34 +95,6 @@ class QueueClient:
self.ctx = Ctx(acc, clt)
return self.ctx
def _push_history(self, rep: httpx.Response):
self.history.append(rep)
if len(self.history) > 3:
self.history.pop(0)
def _dump_history(self, extra: str = ""):
if not self.debug:
return
ts = str(datetime.now()).replace(":", "-").replace(" ", "_")
filename = f"/tmp/api_dump_{ts}.txt"
with open(filename, "w", encoding="utf-8") as fp:
txt = f"{extra}\n"
for rep in self.history:
res = json.dumps(rep.json(), indent=2)
hdr = "\n".join([str(x) for x in list(rep.request.headers.items())])
div = "-" * 20
msg = f"{div}\n{req_id(rep)}"
msg = f"{msg}\n{rep.request.method} {rep.request.url}"
msg = f"{msg}\n{rep.status_code}\n{div}"
msg = f"{msg}\n{hdr}\n{div}\n{res}\n\n"
txt += msg
fp.write(txt)
print(f"API dump ({len(self.history)}) dumped to {filename}")
async def req(self, method: str, url: str, params: ReqParams = None):
retry_count = 0
while True:
@ -107,13 +103,14 @@ class QueueClient:
try:
rep = await ctx.clt.request(method, url, params=params)
setattr(rep, "__username", ctx.acc.username)
self._push_history(rep)
dump_rep(rep)
rep.raise_for_status()
res = rep.json()
if "errors" in res:
raise ApiError(rep, res)
rep.raise_for_status()
ctx.req_count += 1 # count only successful
retry_count = 0
return rep
@ -146,7 +143,8 @@ class QueueClient:
if not known_code:
raise e
except ApiError as e:
reset_ts = utc_ts() + 60 * 60 * 4 # 4 hours
# possible account banned
reset_ts = utc_ts() + 60 * 60 * 12 # 12 hours
await self._close_ctx(reset_ts)
logger.warning(e)
except Exception as e:
@ -159,5 +157,4 @@ class QueueClient:
try:
return await self.req("GET", url, params=params)
except httpx.HTTPStatusError as e:
self._dump_history(f"GET {url} {json.dumps(params)}")
raise e

Просмотреть файл

@ -1,7 +1,7 @@
import json
from collections import defaultdict
from datetime import datetime, timezone
from typing import Any, AsyncGenerator, Callable, TypeVar
from typing import Any, AsyncGenerator, Callable, TypedDict, TypeVar
from httpx import HTTPStatusError, Response
@ -130,7 +130,7 @@ def to_old_obj(obj: dict):
}
def to_old_rep(obj: dict):
def to_old_rep(obj: dict) -> dict[str, dict]:
tmp = get_typed_object(obj, defaultdict(list))
tweets = [x for x in tmp.get("Tweet", []) if "legacy" in x]