From 86d10b19dcd7ad75cdc50094c707fe7d79c6ab4a Mon Sep 17 00:00:00 2001 From: Vlad Pronsky Date: Thu, 6 Jul 2023 00:39:29 +0300 Subject: [PATCH] fix: account was NOT locked after failed request --- tests/conftest.py | 13 ++--- tests/test_queue_client.py | 102 +++++++++++++++++++++++++------------ twscrape/accounts_pool.py | 32 +++++++++--- twscrape/queue_client.py | 10 ++-- 4 files changed, 106 insertions(+), 51 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 28007b1..33bfac9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,17 +6,18 @@ from twscrape.queue_client import QueueClient @pytest.fixture -def pool_mock(tmp_path) -> AccountsPool: # type: ignore +def pool_mock(tmp_path) -> AccountsPool: db_path = tmp_path / "test.db" - yield AccountsPool(db_path) + yield AccountsPool(db_path) # type: ignore @pytest.fixture async def client_fixture(pool_mock: AccountsPool): - await pool_mock.add_account("user1", "pass1", "email1", "email_pass1") - await pool_mock.add_account("user2", "pass2", "email2", "email_pass2") - await pool_mock.set_active("user1", True) - await pool_mock.set_active("user2", True) + pool_mock._order_by = "username" + + for x in range(1, 3): + await pool_mock.add_account(f"user{x}", f"pass{x}", f"email{x}", f"email_pass{x}") + await pool_mock.set_active(f"user{x}", True) client = QueueClient(pool_mock, "SearchTimeline") yield pool_mock, client diff --git a/tests/test_queue_client.py b/tests/test_queue_client.py index 4c8d1f9..f919ef9 100644 --- a/tests/test_queue_client.py +++ b/tests/test_queue_client.py @@ -1,79 +1,115 @@ import httpx from pytest_httpx import HTTPXMock +from twscrape.accounts_pool import AccountsPool from twscrape.logger import set_log_level +from twscrape.queue_client import QueueClient + +set_log_level("ERROR") DB_FILE = "/tmp/twscrape_test_queue_client.db" URL = "https://example.com/api" +CF = tuple[AccountsPool, QueueClient] -set_log_level("ERROR") + +async def get_locked(pool: AccountsPool) -> set[str]: + rep = await pool.get_all() + return set([x.username for x in rep if x.locks.get("SearchTimeline", None) is not None]) async def test_lock_account_when_used(httpx_mock: HTTPXMock, client_fixture): pool, client = client_fixture - assert (await pool.stats())["locked_SearchTimeline"] == 0 + locked = await get_locked(pool) + assert len(locked) == 0 + + # should lock account on getting it await client.__aenter__() - assert (await pool.stats())["locked_SearchTimeline"] == 1 + locked = await get_locked(pool) + assert len(locked) == 1 + assert "user1" in locked + # keep locked on request httpx_mock.add_response(url=URL, json={"foo": "bar"}, status_code=200) assert (await client.get(URL)).json() == {"foo": "bar"} + locked = await get_locked(pool) + assert len(locked) == 1 + assert "user1" in locked + + # unlock on exit await client.__aexit__(None, None, None) - assert (await pool.stats())["locked_SearchTimeline"] == 0 + locked = await get_locked(pool) + assert len(locked) == 0 -async def test_do_not_switch_account_on_200(httpx_mock: HTTPXMock, client_fixture): - pool, client = client_fixture - assert (await pool.stats())["locked_SearchTimeline"] == 0 - await client.__aenter__() - - httpx_mock.add_response(url=URL, json={"foo": "1"}, status_code=200) - httpx_mock.add_response(url=URL, json={"foo": "2"}, status_code=200) - - rep = await client.get(URL) - assert rep.json() == {"foo": "1"} - - rep = await client.get(URL) - assert rep.json() == {"foo": "2"} - - assert (await pool.stats())["locked_SearchTimeline"] == 1 - await client.__aexit__(None, None, None) - - -async def test_switch_acc_on_http_error(httpx_mock: HTTPXMock, client_fixture): +async def test_do_not_switch_account_on_200(httpx_mock: HTTPXMock, client_fixture: CF): pool, client = client_fixture - assert (await pool.stats())["locked_SearchTimeline"] == 0 + # get account and lock it await client.__aenter__() + locked1 = await get_locked(pool) + assert len(locked1) == 1 + # make several requests with status=200 + for x in range(1): + httpx_mock.add_response(url=URL, json={"foo": x}, status_code=200) + rep = await client.get(URL) + assert rep.json() == {"foo": x} + + # account should not be switched + locked2 = await get_locked(pool) + assert locked1 == locked2 + + # unlock on exit + await client.__aexit__(None, None, None) + locked3 = await get_locked(pool) + assert len(locked3) == 0 + + +async def test_switch_acc_on_http_error(httpx_mock: HTTPXMock, client_fixture: CF): + pool, client = client_fixture + + # locked account on enter + await client.__aenter__() + locked1 = await get_locked(pool) + assert len(locked1) == 1 + + # fail one request, account should be switched httpx_mock.add_response(url=URL, json={"foo": "1"}, status_code=403) httpx_mock.add_response(url=URL, json={"foo": "2"}, status_code=200) rep = await client.get(URL) assert rep.json() == {"foo": "2"} - assert (await pool.stats())["locked_SearchTimeline"] == 1 # user1 unlocked, user2 locked + locked2 = await get_locked(pool) + assert len(locked2) == 2 + + # unlock on exit (failed account still should locked) await client.__aexit__(None, None, None) + locked3 = await get_locked(pool) + assert len(locked3) == 1 + assert locked1 == locked3 # failed account locked async def test_retry_with_same_acc_on_network_error(httpx_mock: HTTPXMock, client_fixture): pool, client = client_fixture - await client.__aenter__() + # locked account on enter + await client.__aenter__() + locked1 = await get_locked(pool) + assert len(locked1) == 1 + + # timeout on first request, account should not be switched httpx_mock.add_exception(httpx.ReadTimeout("Unable to read within timeout")) httpx_mock.add_response(url=URL, json={"foo": "2"}, status_code=200) rep = await client.get(URL) assert rep.json() == {"foo": "2"} - assert (await pool.stats())["locked_SearchTimeline"] == 1 + locked2 = await get_locked(pool) + assert locked2 == locked1 + # check username added to request obj (for logging) username = getattr(rep, "__username", None) assert username is not None - - acc1 = await pool.get(username) - assert len(acc1.locks) > 0 - - acc2 = await pool.get("user2" if username == "user1" else "user1") - assert len(acc2.locks) == 0 diff --git a/twscrape/accounts_pool.py b/twscrape/accounts_pool.py index 7906d90..1b4b42a 100644 --- a/twscrape/accounts_pool.py +++ b/twscrape/accounts_pool.py @@ -3,6 +3,7 @@ import asyncio import sqlite3 import uuid from datetime import datetime, timezone +from typing import TypedDict from fake_useragent import UserAgent @@ -13,12 +14,23 @@ from .login import login from .utils import utc_ts +class AccountInfo(TypedDict): + username: str + logged_in: bool + active: bool + last_used: datetime | None + total_req: int + error_msg: str | None + + def guess_delim(line: str): lp, rp = tuple([x.strip() for x in line.split("username")]) return rp[0] if not lp else lp[-1] class AccountsPool: + _order_by: str = "RANDOM()" + def __init__(self, db_file="accounts.db"): self._db_file = db_file @@ -123,7 +135,9 @@ class AccountsPool: async def lock_until(self, username: str, queue: str, unlock_at: int): qs = f""" - UPDATE accounts SET locks = json_set(locks, '$.{queue}', datetime({unlock_at}, 'unixepoch')) + UPDATE accounts SET + locks = json_set(locks, '$.{queue}', datetime({unlock_at}, 'unixepoch')), + last_used = datetime({utc_ts()}, 'unixepoch') WHERE username = :username """ await execute(self._db_file, qs, {"username": username}) @@ -146,13 +160,15 @@ class AccountsPool: OR json_extract(locks, '$.{queue}') IS NULL OR json_extract(locks, '$.{queue}') < datetime('now') ) - ORDER BY RANDOM() + ORDER BY {self._order_by} LIMIT 1 """ if int(sqlite3.sqlite_version_info[1]) >= 35: qs = f""" - UPDATE accounts SET locks = json_set(locks, '$.{queue}', datetime('now', '+15 minutes')) + UPDATE accounts SET + locks = json_set(locks, '$.{queue}', datetime('now', '+15 minutes')), + last_used = datetime({utc_ts()}, 'unixepoch') WHERE username = ({q1}) RETURNING * """ @@ -160,8 +176,10 @@ class AccountsPool: else: tx = uuid.uuid4().hex qs = f""" - UPDATE accounts - SET locks = json_set(locks, '$.{queue}', datetime('now', '+15 minutes')), _tx = '{tx}' + UPDATE accounts SET + locks = json_set(locks, '$.{queue}', datetime('now', '+15 minutes')), + last_used = datetime({utc_ts()}, 'unixepoch'), + _tx = '{tx}' WHERE username = ({q1}) """ await execute(self._db_file, qs) @@ -210,9 +228,9 @@ class AccountsPool: async def accounts_info(self): accounts = await self.get_all() - items = [] + items: list[AccountInfo] = [] for x in accounts: - item = { + item: AccountInfo = { "username": x.username, "logged_in": (x.headers or {}).get("authorization", "") != "", "active": x.active, diff --git a/twscrape/queue_client.py b/twscrape/queue_client.py index 08c5c9a..3173730 100644 --- a/twscrape/queue_client.py +++ b/twscrape/queue_client.py @@ -46,8 +46,8 @@ class QueueClient: await self.ctx.clt.aclose() await self.pool.unlock(self.ctx.acc.username, self.queue, self.ctx.req_count) - async def _get_ctx(self, fresh=False) -> Ctx: - if self.ctx and not fresh: + async def _get_ctx(self) -> Ctx: + if self.ctx: return self.ctx if self.ctx is not None: @@ -87,10 +87,8 @@ class QueueClient: print(f"API dump ({len(self.history)}) dumped to {filename}") async def req(self, method: str, url: str, params: ReqParams = None): - fresh = False # do not get new account on first try while True: - ctx = await self._get_ctx(fresh=fresh) - fresh = True + ctx = await self._get_ctx() try: rep = await ctx.clt.request(method, url, params=params) @@ -108,6 +106,7 @@ class QueueClient: logger.debug(f"Rate limit for {log_id}") reset_ts = int(rep.headers.get("x-rate-limit-reset", 0)) await self.pool.lock_until(ctx.acc.username, self.queue, reset_ts) + self.ctx = None # get next account continue # possible account banned @@ -115,6 +114,7 @@ class QueueClient: reset_ts = utc_ts() + 60 * 60 # + 1 hour logger.warning(f"Code {rep.status_code} for {log_id} – frozen for 1h") await self.pool.lock_until(ctx.acc.username, self.queue, reset_ts) + self.ctx = None # get next account continue # twitter can return different types of cursors that not transfers between accounts