зеркало из
https://github.com/viginum-datalab/twscrape.git
synced 2025-10-29 21:16:25 +02:00
update api errors handling
Этот коммит содержится в:
родитель
8986758666
Коммит
a3bb5d2dc8
26
_get_gql_ops.py
Обычный файл
26
_get_gql_ops.py
Обычный файл
@ -0,0 +1,26 @@
|
|||||||
|
import httpx
|
||||||
|
|
||||||
|
# update this url on next run
|
||||||
|
url = "https://abs.twimg.com/responsive-web/client-web/api.f4ff3bfa.js"
|
||||||
|
script = httpx.get(url).text
|
||||||
|
|
||||||
|
ops = """
|
||||||
|
SearchTimeline
|
||||||
|
UserByRestId
|
||||||
|
UserByScreenName
|
||||||
|
TweetDetail
|
||||||
|
Followers
|
||||||
|
Following
|
||||||
|
Retweeters
|
||||||
|
Favoriters
|
||||||
|
UserTweets
|
||||||
|
UserTweetsAndReplies
|
||||||
|
ListLatestTweetsTimeline
|
||||||
|
"""
|
||||||
|
|
||||||
|
ops = [op.strip() for op in ops.split("\n") if op.strip()]
|
||||||
|
|
||||||
|
for x in ops:
|
||||||
|
idx = script.split(f'operationName:"{x}"')[0].split("queryId:")[-1]
|
||||||
|
idx = idx.strip('",')
|
||||||
|
print(f'OP_{x} = "{idx}/{x}"')
|
||||||
@ -29,7 +29,8 @@ def guess_delim(line: str):
|
|||||||
|
|
||||||
|
|
||||||
class AccountsPool:
|
class AccountsPool:
|
||||||
_order_by: str = "RANDOM()"
|
# _order_by: str = "RANDOM()"
|
||||||
|
_order_by: str = "username"
|
||||||
|
|
||||||
def __init__(self, db_file="accounts.db"):
|
def __init__(self, db_file="accounts.db"):
|
||||||
self._db_file = db_file
|
self._db_file = db_file
|
||||||
@ -104,6 +105,10 @@ class AccountsPool:
|
|||||||
qs = f"""DELETE FROM accounts WHERE username IN ({','.join([f'"{x}"' for x in usernames])})"""
|
qs = f"""DELETE FROM accounts WHERE username IN ({','.join([f'"{x}"' for x in usernames])})"""
|
||||||
await execute(self._db_file, qs)
|
await execute(self._db_file, qs)
|
||||||
|
|
||||||
|
async def delete_inactive(self):
|
||||||
|
qs = "DELETE FROM accounts WHERE active = false"
|
||||||
|
await execute(self._db_file, qs)
|
||||||
|
|
||||||
async def get(self, username: str):
|
async def get(self, username: str):
|
||||||
qs = "SELECT * FROM accounts WHERE username = :username"
|
qs = "SELECT * FROM accounts WHERE username = :username"
|
||||||
rs = await fetchone(self._db_file, qs, {"username": username})
|
rs = await fetchone(self._db_file, qs, {"username": username})
|
||||||
@ -278,6 +283,13 @@ class AccountsPool:
|
|||||||
|
|
||||||
return "none"
|
return "none"
|
||||||
|
|
||||||
|
async def mark_banned(self, username: str, error_msg: str):
|
||||||
|
qs = """
|
||||||
|
UPDATE accounts SET active = false, error_msg = :error_msg
|
||||||
|
WHERE username = :username
|
||||||
|
"""
|
||||||
|
await execute(self._db_file, qs, {"username": username, "error_msg": error_msg})
|
||||||
|
|
||||||
async def stats(self):
|
async def stats(self):
|
||||||
def locks_count(queue: str):
|
def locks_count(queue: str):
|
||||||
return f"""
|
return f"""
|
||||||
@ -312,17 +324,17 @@ class AccountsPool:
|
|||||||
"active": x.active,
|
"active": x.active,
|
||||||
"last_used": x.last_used,
|
"last_used": x.last_used,
|
||||||
"total_req": sum(x.stats.values()),
|
"total_req": sum(x.stats.values()),
|
||||||
"error_msg": x.error_msg,
|
"error_msg": str(x.error_msg)[0:60],
|
||||||
}
|
}
|
||||||
items.append(item)
|
items.append(item)
|
||||||
|
|
||||||
old_time = datetime(1970, 1, 1).replace(tzinfo=timezone.utc)
|
old_time = datetime(1970, 1, 1).replace(tzinfo=timezone.utc)
|
||||||
items = sorted(items, key=lambda x: x["username"].lower())
|
items = sorted(items, key=lambda x: x["username"].lower())
|
||||||
items = sorted(items, key=lambda x: x["active"], reverse=True)
|
|
||||||
items = sorted(
|
items = sorted(
|
||||||
items,
|
items,
|
||||||
key=lambda x: x["last_used"] or old_time if x["total_req"] > 0 else old_time,
|
key=lambda x: x["last_used"] or old_time if x["total_req"] > 0 else old_time,
|
||||||
reverse=True,
|
reverse=True,
|
||||||
)
|
)
|
||||||
|
items = sorted(items, key=lambda x: x["active"], reverse=True)
|
||||||
# items = sorted(items, key=lambda x: x["total_req"], reverse=True)
|
# items = sorted(items, key=lambda x: x["total_req"], reverse=True)
|
||||||
return items
|
return items
|
||||||
|
|||||||
@ -95,6 +95,10 @@ async def main(args):
|
|||||||
await pool.reset_locks()
|
await pool.reset_locks()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if args.command == "delete_inactive":
|
||||||
|
await pool.delete_inactive()
|
||||||
|
return
|
||||||
|
|
||||||
fn = args.command + "_raw" if args.raw else args.command
|
fn = args.command + "_raw" if args.raw else args.command
|
||||||
fn = getattr(api, fn, None)
|
fn = getattr(api, fn, None)
|
||||||
if fn is None:
|
if fn is None:
|
||||||
@ -151,6 +155,7 @@ def run():
|
|||||||
|
|
||||||
subparsers.add_parser("version", help="Show version")
|
subparsers.add_parser("version", help="Show version")
|
||||||
subparsers.add_parser("accounts", help="List all accounts")
|
subparsers.add_parser("accounts", help="List all accounts")
|
||||||
|
subparsers.add_parser("stats", help="Get current usage stats")
|
||||||
|
|
||||||
add_accounts = subparsers.add_parser("add_accounts", help="Add accounts")
|
add_accounts = subparsers.add_parser("add_accounts", help="Add accounts")
|
||||||
add_accounts.add_argument("file_path", help="File with accounts")
|
add_accounts.add_argument("file_path", help="File with accounts")
|
||||||
@ -166,7 +171,7 @@ def run():
|
|||||||
|
|
||||||
subparsers.add_parser("relogin_failed", help="Retry login for failed accounts")
|
subparsers.add_parser("relogin_failed", help="Retry login for failed accounts")
|
||||||
subparsers.add_parser("reset_locks", help="Reset all locks")
|
subparsers.add_parser("reset_locks", help="Reset all locks")
|
||||||
subparsers.add_parser("stats", help="Get current usage stats")
|
subparsers.add_parser("delete_inactive", help="Delete inactive accounts")
|
||||||
|
|
||||||
c_lim("search", "Search for tweets", "query", "Search query")
|
c_lim("search", "Search for tweets", "query", "Search query")
|
||||||
c_one("tweet_details", "Get tweet details", "tweet_id", "Tweet ID", int)
|
c_one("tweet_details", "Get tweet details", "tweet_id", "Tweet ID", int)
|
||||||
|
|||||||
@ -10,6 +10,7 @@ from .logger import logger
|
|||||||
from .utils import utc_ts
|
from .utils import utc_ts
|
||||||
|
|
||||||
ReqParams = dict[str, str | int] | None
|
ReqParams = dict[str, str | int] | None
|
||||||
|
TMP_TS = datetime.utcnow().isoformat().split(".")[0].replace("T", "_").replace(":", "-")[0:16]
|
||||||
|
|
||||||
|
|
||||||
class Ctx:
|
class Ctx:
|
||||||
@ -31,6 +32,14 @@ class ApiError(Exception):
|
|||||||
return f"ApiError on {req_id(self.rep)} {msg}"
|
return f"ApiError on {req_id(self.rep)} {msg}"
|
||||||
|
|
||||||
|
|
||||||
|
class RateLimitError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class BannedError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def req_id(rep: httpx.Response):
|
def req_id(rep: httpx.Response):
|
||||||
lr = str(rep.headers.get("x-rate-limit-remaining", -1))
|
lr = str(rep.headers.get("x-rate-limit-remaining", -1))
|
||||||
ll = str(rep.headers.get("x-rate-limit-limit", -1))
|
ll = str(rep.headers.get("x-rate-limit-limit", -1))
|
||||||
@ -46,9 +55,8 @@ def dump_rep(rep: httpx.Response):
|
|||||||
setattr(dump_rep, "__count", count)
|
setattr(dump_rep, "__count", count)
|
||||||
|
|
||||||
acc = getattr(rep, "__username", "<unknown>")
|
acc = getattr(rep, "__username", "<unknown>")
|
||||||
fts = datetime.utcnow().isoformat().split(".")[0].replace("T", "_").replace(":", "-")[0:16]
|
|
||||||
outfile = f"{count:05d}_{rep.status_code}_{acc}.txt"
|
outfile = f"{count:05d}_{rep.status_code}_{acc}.txt"
|
||||||
outfile = f"/tmp/twscrape-{fts}/{outfile}"
|
outfile = f"/tmp/twscrape-{TMP_TS}/{outfile}"
|
||||||
os.makedirs(os.path.dirname(outfile), exist_ok=True)
|
os.makedirs(os.path.dirname(outfile), exist_ok=True)
|
||||||
|
|
||||||
msg = []
|
msg = []
|
||||||
@ -83,17 +91,23 @@ class QueueClient:
|
|||||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||||
await self._close_ctx()
|
await self._close_ctx()
|
||||||
|
|
||||||
async def _close_ctx(self, reset_at=-1):
|
async def _close_ctx(self, reset_at=-1, banned=False, msg=""):
|
||||||
if self.ctx is None:
|
if self.ctx is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
ctx, self.ctx, self.req_count = self.ctx, None, 0
|
ctx, self.ctx, self.req_count = self.ctx, None, 0
|
||||||
|
username = ctx.acc.username
|
||||||
await ctx.clt.aclose()
|
await ctx.clt.aclose()
|
||||||
|
|
||||||
if reset_at <= 0:
|
if banned:
|
||||||
await self.pool.unlock(ctx.acc.username, self.queue, ctx.req_count)
|
await self.pool.mark_banned(username, msg)
|
||||||
else:
|
return
|
||||||
|
|
||||||
|
if reset_at > 0:
|
||||||
await self.pool.lock_until(ctx.acc.username, self.queue, reset_at, ctx.req_count)
|
await self.pool.lock_until(ctx.acc.username, self.queue, reset_at, ctx.req_count)
|
||||||
|
return
|
||||||
|
|
||||||
|
await self.pool.unlock(ctx.acc.username, self.queue, ctx.req_count)
|
||||||
|
|
||||||
async def _get_ctx(self) -> Ctx:
|
async def _get_ctx(self) -> Ctx:
|
||||||
if self.ctx:
|
if self.ctx:
|
||||||
@ -115,28 +129,45 @@ class QueueClient:
|
|||||||
|
|
||||||
msg = "OK"
|
msg = "OK"
|
||||||
if "errors" in res:
|
if "errors" in res:
|
||||||
msg = "; ".join([f'({x.get("code", -1)}) {x["message"]}' for x in res["errors"]])
|
msg = set([f'({x.get("code", -1)}) {x["message"]}' for x in res["errors"]])
|
||||||
|
msg = "; ".join(list(msg))
|
||||||
|
|
||||||
if self.debug:
|
if self.debug:
|
||||||
fn = logger.debug if rep.status_code == 200 else logger.warning
|
fn = logger.debug if rep.status_code == 200 else logger.warning
|
||||||
fn(f"{rep.status_code:3d} - {req_id(rep)} - {msg}")
|
fn(f"{rep.status_code:3d} - {req_id(rep)} - {msg}")
|
||||||
|
|
||||||
|
# need to add some features in api.py
|
||||||
if msg.startswith("The following features cannot be null"):
|
if msg.startswith("The following features cannot be null"):
|
||||||
logger.error(f"Invalid request: {msg}")
|
logger.error(f"Invalid request: {msg}")
|
||||||
exit(1)
|
exit(1)
|
||||||
|
|
||||||
|
# general api rate limit
|
||||||
|
if int(rep.headers.get("x-rate-limit-remaining", -1)) == 0:
|
||||||
|
await self._close_ctx(int(rep.headers.get("x-rate-limit-reset", -1)))
|
||||||
|
raise RateLimitError(msg)
|
||||||
|
|
||||||
|
# possible new limits for tweets view per account
|
||||||
|
if msg.startswith("(88) Rate limit exceeded") or rep.status_code == 429:
|
||||||
|
await self._close_ctx(utc_ts() + 60 * 60 * 4) # lock for 4 hours
|
||||||
|
raise RateLimitError(msg)
|
||||||
|
|
||||||
|
if msg.startswith("(326) Authorization: Denied by access control"):
|
||||||
|
await self._close_ctx(-1, banned=True, msg=msg)
|
||||||
|
raise BannedError(msg)
|
||||||
|
|
||||||
|
# content not found
|
||||||
if rep.status_code == 200 and "_Missing: No status found with that ID." in msg:
|
if rep.status_code == 200 and "_Missing: No status found with that ID." in msg:
|
||||||
return # ignore this error
|
return # ignore this error
|
||||||
|
|
||||||
|
# todo: (32) Could not authenticate you
|
||||||
|
|
||||||
if msg != "OK":
|
if msg != "OK":
|
||||||
raise ApiError(rep, res)
|
raise ApiError(rep, res)
|
||||||
|
|
||||||
rep.raise_for_status()
|
rep.raise_for_status()
|
||||||
|
|
||||||
ll = int(rep.headers.get("x-rate-limit-remaining", -1))
|
async def get(self, url: str, params: ReqParams = None):
|
||||||
lr = int(rep.headers.get("x-rate-limit-reset", 0))
|
return await self.req("GET", url, params=params)
|
||||||
if ll == 0:
|
|
||||||
await self._close_ctx(lr)
|
|
||||||
|
|
||||||
async def req(self, method: str, url: str, params: ReqParams = None):
|
async def req(self, method: str, url: str, params: ReqParams = None):
|
||||||
retry_count = 0
|
retry_count = 0
|
||||||
@ -151,46 +182,11 @@ class QueueClient:
|
|||||||
ctx.req_count += 1 # count only successful
|
ctx.req_count += 1 # count only successful
|
||||||
retry_count = 0
|
retry_count = 0
|
||||||
return rep
|
return rep
|
||||||
except httpx.HTTPStatusError as e:
|
except (RateLimitError, BannedError):
|
||||||
rep = e.response
|
# already handled
|
||||||
log_id = f"{req_id(rep)} on queue={self.queue}"
|
continue
|
||||||
|
|
||||||
reset_ts, known_code = -1, True
|
|
||||||
|
|
||||||
if rep.status_code == 429:
|
|
||||||
# rate limit
|
|
||||||
reset_ts = int(rep.headers.get("x-rate-limit-reset", 0))
|
|
||||||
logger.debug(f"Rate limit for {log_id}")
|
|
||||||
|
|
||||||
elif rep.status_code == 400:
|
|
||||||
# api can return different types of cursors that not transfers between accounts
|
|
||||||
# just take the next account, the current cursor can work in it
|
|
||||||
logger.debug(f"Cursor not valid for {log_id}")
|
|
||||||
|
|
||||||
elif rep.status_code in (401, 403):
|
|
||||||
# account is locked or banned
|
|
||||||
reset_ts = utc_ts() + 60 * 60 # + 1 hour
|
|
||||||
logger.warning(f"Code {rep.status_code} for {log_id} – frozen for 1h")
|
|
||||||
|
|
||||||
else:
|
|
||||||
known_code = False
|
|
||||||
logger.warning(f"HTTP Error {rep.status_code} {e.request.url}\n{rep.text}")
|
|
||||||
|
|
||||||
await self._close_ctx(reset_ts)
|
|
||||||
if not known_code:
|
|
||||||
raise e
|
|
||||||
except ApiError as e:
|
|
||||||
# possible account banned
|
|
||||||
reset_ts = utc_ts() + 60 * 60 * 12 # 12 hours
|
|
||||||
await self._close_ctx(reset_ts)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Unknown error, retrying. Err ({type(e)}): {str(e)}")
|
|
||||||
retry_count += 1
|
retry_count += 1
|
||||||
if retry_count > 3:
|
if retry_count >= 3:
|
||||||
|
logger.warning(f"Unknown error {type(e)}: {e}")
|
||||||
await self._close_ctx(utc_ts() + 60 * 15) # 15 minutes
|
await self._close_ctx(utc_ts() + 60 * 15) # 15 minutes
|
||||||
|
|
||||||
async def get(self, url: str, params: ReqParams = None):
|
|
||||||
try:
|
|
||||||
return await self.req("GET", url, params=params)
|
|
||||||
except httpx.HTTPStatusError as e:
|
|
||||||
raise e
|
|
||||||
|
|||||||
Загрузка…
x
Ссылка в новой задаче
Block a user