fix: account was NOT locked after failed request

Этот коммит содержится в:
Vlad Pronsky 2023-07-06 00:39:29 +03:00
родитель 96cacbac2b
Коммит 86d10b19dc
4 изменённых файлов: 106 добавлений и 51 удалений

Просмотреть файл

@ -6,17 +6,18 @@ from twscrape.queue_client import QueueClient
@pytest.fixture @pytest.fixture
def pool_mock(tmp_path) -> AccountsPool: # type: ignore def pool_mock(tmp_path) -> AccountsPool:
db_path = tmp_path / "test.db" db_path = tmp_path / "test.db"
yield AccountsPool(db_path) yield AccountsPool(db_path) # type: ignore
@pytest.fixture @pytest.fixture
async def client_fixture(pool_mock: AccountsPool): async def client_fixture(pool_mock: AccountsPool):
await pool_mock.add_account("user1", "pass1", "email1", "email_pass1") pool_mock._order_by = "username"
await pool_mock.add_account("user2", "pass2", "email2", "email_pass2")
await pool_mock.set_active("user1", True) for x in range(1, 3):
await pool_mock.set_active("user2", True) await pool_mock.add_account(f"user{x}", f"pass{x}", f"email{x}", f"email_pass{x}")
await pool_mock.set_active(f"user{x}", True)
client = QueueClient(pool_mock, "SearchTimeline") client = QueueClient(pool_mock, "SearchTimeline")
yield pool_mock, client yield pool_mock, client

Просмотреть файл

@ -1,79 +1,115 @@
import httpx import httpx
from pytest_httpx import HTTPXMock from pytest_httpx import HTTPXMock
from twscrape.accounts_pool import AccountsPool
from twscrape.logger import set_log_level from twscrape.logger import set_log_level
from twscrape.queue_client import QueueClient
set_log_level("ERROR")
DB_FILE = "/tmp/twscrape_test_queue_client.db" DB_FILE = "/tmp/twscrape_test_queue_client.db"
URL = "https://example.com/api" URL = "https://example.com/api"
CF = tuple[AccountsPool, QueueClient]
set_log_level("ERROR")
async def get_locked(pool: AccountsPool) -> set[str]:
rep = await pool.get_all()
return set([x.username for x in rep if x.locks.get("SearchTimeline", None) is not None])
async def test_lock_account_when_used(httpx_mock: HTTPXMock, client_fixture): async def test_lock_account_when_used(httpx_mock: HTTPXMock, client_fixture):
pool, client = client_fixture pool, client = client_fixture
assert (await pool.stats())["locked_SearchTimeline"] == 0
locked = await get_locked(pool)
assert len(locked) == 0
# should lock account on getting it
await client.__aenter__() await client.__aenter__()
assert (await pool.stats())["locked_SearchTimeline"] == 1 locked = await get_locked(pool)
assert len(locked) == 1
assert "user1" in locked
# keep locked on request
httpx_mock.add_response(url=URL, json={"foo": "bar"}, status_code=200) httpx_mock.add_response(url=URL, json={"foo": "bar"}, status_code=200)
assert (await client.get(URL)).json() == {"foo": "bar"} assert (await client.get(URL)).json() == {"foo": "bar"}
locked = await get_locked(pool)
assert len(locked) == 1
assert "user1" in locked
# unlock on exit
await client.__aexit__(None, None, None) await client.__aexit__(None, None, None)
assert (await pool.stats())["locked_SearchTimeline"] == 0 locked = await get_locked(pool)
assert len(locked) == 0
async def test_do_not_switch_account_on_200(httpx_mock: HTTPXMock, client_fixture): async def test_do_not_switch_account_on_200(httpx_mock: HTTPXMock, client_fixture: CF):
pool, client = client_fixture
assert (await pool.stats())["locked_SearchTimeline"] == 0
await client.__aenter__()
httpx_mock.add_response(url=URL, json={"foo": "1"}, status_code=200)
httpx_mock.add_response(url=URL, json={"foo": "2"}, status_code=200)
rep = await client.get(URL)
assert rep.json() == {"foo": "1"}
rep = await client.get(URL)
assert rep.json() == {"foo": "2"}
assert (await pool.stats())["locked_SearchTimeline"] == 1
await client.__aexit__(None, None, None)
async def test_switch_acc_on_http_error(httpx_mock: HTTPXMock, client_fixture):
pool, client = client_fixture pool, client = client_fixture
assert (await pool.stats())["locked_SearchTimeline"] == 0 # get account and lock it
await client.__aenter__() await client.__aenter__()
locked1 = await get_locked(pool)
assert len(locked1) == 1
# make several requests with status=200
for x in range(1):
httpx_mock.add_response(url=URL, json={"foo": x}, status_code=200)
rep = await client.get(URL)
assert rep.json() == {"foo": x}
# account should not be switched
locked2 = await get_locked(pool)
assert locked1 == locked2
# unlock on exit
await client.__aexit__(None, None, None)
locked3 = await get_locked(pool)
assert len(locked3) == 0
async def test_switch_acc_on_http_error(httpx_mock: HTTPXMock, client_fixture: CF):
pool, client = client_fixture
# locked account on enter
await client.__aenter__()
locked1 = await get_locked(pool)
assert len(locked1) == 1
# fail one request, account should be switched
httpx_mock.add_response(url=URL, json={"foo": "1"}, status_code=403) httpx_mock.add_response(url=URL, json={"foo": "1"}, status_code=403)
httpx_mock.add_response(url=URL, json={"foo": "2"}, status_code=200) httpx_mock.add_response(url=URL, json={"foo": "2"}, status_code=200)
rep = await client.get(URL) rep = await client.get(URL)
assert rep.json() == {"foo": "2"} assert rep.json() == {"foo": "2"}
assert (await pool.stats())["locked_SearchTimeline"] == 1 # user1 unlocked, user2 locked locked2 = await get_locked(pool)
assert len(locked2) == 2
# unlock on exit (failed account still should locked)
await client.__aexit__(None, None, None) await client.__aexit__(None, None, None)
locked3 = await get_locked(pool)
assert len(locked3) == 1
assert locked1 == locked3 # failed account locked
async def test_retry_with_same_acc_on_network_error(httpx_mock: HTTPXMock, client_fixture): async def test_retry_with_same_acc_on_network_error(httpx_mock: HTTPXMock, client_fixture):
pool, client = client_fixture pool, client = client_fixture
await client.__aenter__()
# locked account on enter
await client.__aenter__()
locked1 = await get_locked(pool)
assert len(locked1) == 1
# timeout on first request, account should not be switched
httpx_mock.add_exception(httpx.ReadTimeout("Unable to read within timeout")) httpx_mock.add_exception(httpx.ReadTimeout("Unable to read within timeout"))
httpx_mock.add_response(url=URL, json={"foo": "2"}, status_code=200) httpx_mock.add_response(url=URL, json={"foo": "2"}, status_code=200)
rep = await client.get(URL) rep = await client.get(URL)
assert rep.json() == {"foo": "2"} assert rep.json() == {"foo": "2"}
assert (await pool.stats())["locked_SearchTimeline"] == 1 locked2 = await get_locked(pool)
assert locked2 == locked1
# check username added to request obj (for logging)
username = getattr(rep, "__username", None) username = getattr(rep, "__username", None)
assert username is not None assert username is not None
acc1 = await pool.get(username)
assert len(acc1.locks) > 0
acc2 = await pool.get("user2" if username == "user1" else "user1")
assert len(acc2.locks) == 0

Просмотреть файл

@ -3,6 +3,7 @@ import asyncio
import sqlite3 import sqlite3
import uuid import uuid
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import TypedDict
from fake_useragent import UserAgent from fake_useragent import UserAgent
@ -13,12 +14,23 @@ from .login import login
from .utils import utc_ts from .utils import utc_ts
class AccountInfo(TypedDict):
username: str
logged_in: bool
active: bool
last_used: datetime | None
total_req: int
error_msg: str | None
def guess_delim(line: str): def guess_delim(line: str):
lp, rp = tuple([x.strip() for x in line.split("username")]) lp, rp = tuple([x.strip() for x in line.split("username")])
return rp[0] if not lp else lp[-1] return rp[0] if not lp else lp[-1]
class AccountsPool: class AccountsPool:
_order_by: str = "RANDOM()"
def __init__(self, db_file="accounts.db"): def __init__(self, db_file="accounts.db"):
self._db_file = db_file self._db_file = db_file
@ -123,7 +135,9 @@ class AccountsPool:
async def lock_until(self, username: str, queue: str, unlock_at: int): async def lock_until(self, username: str, queue: str, unlock_at: int):
qs = f""" qs = f"""
UPDATE accounts SET locks = json_set(locks, '$.{queue}', datetime({unlock_at}, 'unixepoch')) UPDATE accounts SET
locks = json_set(locks, '$.{queue}', datetime({unlock_at}, 'unixepoch')),
last_used = datetime({utc_ts()}, 'unixepoch')
WHERE username = :username WHERE username = :username
""" """
await execute(self._db_file, qs, {"username": username}) await execute(self._db_file, qs, {"username": username})
@ -146,13 +160,15 @@ class AccountsPool:
OR json_extract(locks, '$.{queue}') IS NULL OR json_extract(locks, '$.{queue}') IS NULL
OR json_extract(locks, '$.{queue}') < datetime('now') OR json_extract(locks, '$.{queue}') < datetime('now')
) )
ORDER BY RANDOM() ORDER BY {self._order_by}
LIMIT 1 LIMIT 1
""" """
if int(sqlite3.sqlite_version_info[1]) >= 35: if int(sqlite3.sqlite_version_info[1]) >= 35:
qs = f""" qs = f"""
UPDATE accounts SET locks = json_set(locks, '$.{queue}', datetime('now', '+15 minutes')) UPDATE accounts SET
locks = json_set(locks, '$.{queue}', datetime('now', '+15 minutes')),
last_used = datetime({utc_ts()}, 'unixepoch')
WHERE username = ({q1}) WHERE username = ({q1})
RETURNING * RETURNING *
""" """
@ -160,8 +176,10 @@ class AccountsPool:
else: else:
tx = uuid.uuid4().hex tx = uuid.uuid4().hex
qs = f""" qs = f"""
UPDATE accounts UPDATE accounts SET
SET locks = json_set(locks, '$.{queue}', datetime('now', '+15 minutes')), _tx = '{tx}' locks = json_set(locks, '$.{queue}', datetime('now', '+15 minutes')),
last_used = datetime({utc_ts()}, 'unixepoch'),
_tx = '{tx}'
WHERE username = ({q1}) WHERE username = ({q1})
""" """
await execute(self._db_file, qs) await execute(self._db_file, qs)
@ -210,9 +228,9 @@ class AccountsPool:
async def accounts_info(self): async def accounts_info(self):
accounts = await self.get_all() accounts = await self.get_all()
items = [] items: list[AccountInfo] = []
for x in accounts: for x in accounts:
item = { item: AccountInfo = {
"username": x.username, "username": x.username,
"logged_in": (x.headers or {}).get("authorization", "") != "", "logged_in": (x.headers or {}).get("authorization", "") != "",
"active": x.active, "active": x.active,

Просмотреть файл

@ -46,8 +46,8 @@ class QueueClient:
await self.ctx.clt.aclose() await self.ctx.clt.aclose()
await self.pool.unlock(self.ctx.acc.username, self.queue, self.ctx.req_count) await self.pool.unlock(self.ctx.acc.username, self.queue, self.ctx.req_count)
async def _get_ctx(self, fresh=False) -> Ctx: async def _get_ctx(self) -> Ctx:
if self.ctx and not fresh: if self.ctx:
return self.ctx return self.ctx
if self.ctx is not None: if self.ctx is not None:
@ -87,10 +87,8 @@ class QueueClient:
print(f"API dump ({len(self.history)}) dumped to {filename}") print(f"API dump ({len(self.history)}) dumped to {filename}")
async def req(self, method: str, url: str, params: ReqParams = None): async def req(self, method: str, url: str, params: ReqParams = None):
fresh = False # do not get new account on first try
while True: while True:
ctx = await self._get_ctx(fresh=fresh) ctx = await self._get_ctx()
fresh = True
try: try:
rep = await ctx.clt.request(method, url, params=params) rep = await ctx.clt.request(method, url, params=params)
@ -108,6 +106,7 @@ class QueueClient:
logger.debug(f"Rate limit for {log_id}") logger.debug(f"Rate limit for {log_id}")
reset_ts = int(rep.headers.get("x-rate-limit-reset", 0)) reset_ts = int(rep.headers.get("x-rate-limit-reset", 0))
await self.pool.lock_until(ctx.acc.username, self.queue, reset_ts) await self.pool.lock_until(ctx.acc.username, self.queue, reset_ts)
self.ctx = None # get next account
continue continue
# possible account banned # possible account banned
@ -115,6 +114,7 @@ class QueueClient:
reset_ts = utc_ts() + 60 * 60 # + 1 hour reset_ts = utc_ts() + 60 * 60 # + 1 hour
logger.warning(f"Code {rep.status_code} for {log_id} – frozen for 1h") logger.warning(f"Code {rep.status_code} for {log_id} – frozen for 1h")
await self.pool.lock_until(ctx.acc.username, self.queue, reset_ts) await self.pool.lock_until(ctx.acc.username, self.queue, reset_ts)
self.ctx = None # get next account
continue continue
# twitter can return different types of cursors that not transfers between accounts # twitter can return different types of cursors that not transfers between accounts