From 070aa71583428b501e8617bb5e71dfcb359b5610 Mon Sep 17 00:00:00 2001 From: Vlad Pronsky Date: Thu, 6 Jul 2023 13:20:27 +0300 Subject: [PATCH] improve queue client logic; update readme --- readme.md | 15 +++++++++++++ tests/test_queue_client.py | 44 +++++++++++++++++++++++++++++++++++++- twscrape/accounts_pool.py | 3 ++- twscrape/api.py | 4 ++-- twscrape/queue_client.py | 25 +++++++++++----------- 5 files changed, 75 insertions(+), 16 deletions(-) diff --git a/readme.md b/readme.md index 6b1ed97..4f70124 100644 --- a/readme.md +++ b/readme.md @@ -101,6 +101,19 @@ if __name__ == "__main__": asyncio.run(main()) ``` +### Stoping iteration with break + +In order to correctly release an account in case of `break` in loop, a special syntax must be used. Otherwise, Python's events loop will release lock on the account sometime in the future. See explanation [here](https://github.com/vladkens/twscrape/issues/27#issuecomment-1623395424). + +```python +from contextlib import aclosing + +async with aclosing(api.search("elon musk")) as gen: + async for tweet in gen: + if tweet.id < 200: + break +``` + ## CLI ### Get help on CLI commands @@ -183,6 +196,8 @@ twscrape search "elon mask lang:es" --limit=20 --raw ## Limitations +NOTE: After 1 July 2023 Twitter [introduced limits](https://twitter.com/elonmusk/status/1675187969420828672) on the number of tweets per day per account (and these continue to change), so the values below may not be fully correct. + API rate limits (per account): - Search API – 250 req / 15 min - GraphQL API – has individual rate limits per operation (in most cases this is 500 req / 15 min) diff --git a/tests/test_queue_client.py b/tests/test_queue_client.py index f919ef9..3e8385a 100644 --- a/tests/test_queue_client.py +++ b/tests/test_queue_client.py @@ -1,3 +1,5 @@ +from contextlib import aclosing + import httpx from pytest_httpx import HTTPXMock @@ -92,7 +94,7 @@ async def test_switch_acc_on_http_error(httpx_mock: HTTPXMock, client_fixture: C assert locked1 == locked3 # failed account locked -async def test_retry_with_same_acc_on_network_error(httpx_mock: HTTPXMock, client_fixture): +async def test_retry_with_same_acc_on_network_error(httpx_mock: HTTPXMock, client_fixture: CF): pool, client = client_fixture # locked account on enter @@ -113,3 +115,43 @@ async def test_retry_with_same_acc_on_network_error(httpx_mock: HTTPXMock, clien # check username added to request obj (for logging) username = getattr(rep, "__username", None) assert username is not None + + +async def test_ctx_closed_on_break(httpx_mock: HTTPXMock, client_fixture: CF): + pool, client = client_fixture + + async def get_data_stream(): + async with client as c: + counter = 0 + while True: + counter += 1 + check_retry = counter == 2 + before_ctx = c.ctx + + if check_retry: + httpx_mock.add_response(url=URL, json={"counter": counter}, status_code=403) + httpx_mock.add_response(url=URL, json={"counter": counter}, status_code=200) + else: + httpx_mock.add_response(url=URL, json={"counter": counter}, status_code=200) + + rep = await c.get(URL) + + if check_retry: + assert before_ctx != c.ctx + elif before_ctx is not None: + assert before_ctx == c.ctx + + assert rep.json() == {"counter": counter} + yield rep.json()["counter"] + + if counter == 9: + return + + # need to use async with to break to work + async with aclosing(get_data_stream()) as gen: + async for x in gen: + if x == 3: + break + + # ctx should be None after break + assert client.ctx is None diff --git a/twscrape/accounts_pool.py b/twscrape/accounts_pool.py index 43a1efd..e5dfc9e 100644 --- a/twscrape/accounts_pool.py +++ b/twscrape/accounts_pool.py @@ -133,10 +133,11 @@ class AccountsPool: qs = "UPDATE accounts SET active = :active WHERE username = :username" await execute(self._db_file, qs, {"username": username, "active": active}) - async def lock_until(self, username: str, queue: str, unlock_at: int): + async def lock_until(self, username: str, queue: str, unlock_at: int, req_count=0): qs = f""" UPDATE accounts SET locks = json_set(locks, '$.{queue}', datetime({unlock_at}, 'unixepoch')), + stats = json_set(stats, '$.{queue}', COALESCE(json_extract(stats, '$.{queue}'), 0) + {req_count}), last_used = datetime({utc_ts()}, 'unixepoch') WHERE username = :username """ diff --git a/twscrape/api.py b/twscrape/api.py index e5eefe7..ee4d577 100644 --- a/twscrape/api.py +++ b/twscrape/api.py @@ -9,8 +9,8 @@ from .utils import encode_params, find_obj, get_by_path, to_old_obj, to_old_rep class API: - def __init__(self, pool: AccountsPool, debug=False): - self.pool = pool + def __init__(self, pool: AccountsPool | None, debug=False): + self.pool = pool if pool is not None else AccountsPool() self.debug = debug # general helpers diff --git a/twscrape/queue_client.py b/twscrape/queue_client.py index 3173730..f39d411 100644 --- a/twscrape/queue_client.py +++ b/twscrape/queue_client.py @@ -39,20 +39,23 @@ class QueueClient: async def __aexit__(self, exc_type, exc_val, exc_tb): await self._close_ctx() - return self - async def _close_ctx(self): - if self.ctx is not None: - await self.ctx.clt.aclose() - await self.pool.unlock(self.ctx.acc.username, self.queue, self.ctx.req_count) + async def _close_ctx(self, reset_at=-1): + if self.ctx is None: + return + + ctx, self.ctx, self.req_count = self.ctx, None, 0 + await ctx.clt.aclose() + + if reset_at <= 0: + await self.pool.unlock(ctx.acc.username, self.queue, ctx.req_count) + else: + await self.pool.lock_until(ctx.acc.username, self.queue, reset_at, ctx.req_count) async def _get_ctx(self) -> Ctx: if self.ctx: return self.ctx - if self.ctx is not None: - await self._close_ctx() - acc = await self.pool.get_for_queue_or_wait(self.queue) clt = acc.make_client() self.ctx = Ctx(acc, clt) @@ -105,16 +108,14 @@ class QueueClient: if rep.status_code == 429: logger.debug(f"Rate limit for {log_id}") reset_ts = int(rep.headers.get("x-rate-limit-reset", 0)) - await self.pool.lock_until(ctx.acc.username, self.queue, reset_ts) - self.ctx = None # get next account + await self._close_ctx(reset_ts) # get next account on next iteration continue # possible account banned if rep.status_code in (401, 403): reset_ts = utc_ts() + 60 * 60 # + 1 hour logger.warning(f"Code {rep.status_code} for {log_id} – frozen for 1h") - await self.pool.lock_until(ctx.acc.username, self.queue, reset_ts) - self.ctx = None # get next account + await self._close_ctx(reset_ts) # get next account on next iteration continue # twitter can return different types of cursors that not transfers between accounts