improve queue client logic; update readme

Этот коммит содержится в:
Vlad Pronsky 2023-07-06 13:20:27 +03:00
родитель 7b94895cd5
Коммит 070aa71583
5 изменённых файлов: 75 добавлений и 16 удалений

Просмотреть файл

@ -101,6 +101,19 @@ if __name__ == "__main__":
asyncio.run(main())
```
### Stoping iteration with break
In order to correctly release an account in case of `break` in loop, a special syntax must be used. Otherwise, Python's events loop will release lock on the account sometime in the future. See explanation [here](https://github.com/vladkens/twscrape/issues/27#issuecomment-1623395424).
```python
from contextlib import aclosing
async with aclosing(api.search("elon musk")) as gen:
async for tweet in gen:
if tweet.id < 200:
break
```
## CLI
### Get help on CLI commands
@ -183,6 +196,8 @@ twscrape search "elon mask lang:es" --limit=20 --raw
## Limitations
NOTE: After 1 July 2023 Twitter [introduced limits](https://twitter.com/elonmusk/status/1675187969420828672) on the number of tweets per day per account (and these continue to change), so the values below may not be fully correct.
API rate limits (per account):
- Search API – 250 req / 15 min
- GraphQL API – has individual rate limits per operation (in most cases this is 500 req / 15 min)

Просмотреть файл

@ -1,3 +1,5 @@
from contextlib import aclosing
import httpx
from pytest_httpx import HTTPXMock
@ -92,7 +94,7 @@ async def test_switch_acc_on_http_error(httpx_mock: HTTPXMock, client_fixture: C
assert locked1 == locked3 # failed account locked
async def test_retry_with_same_acc_on_network_error(httpx_mock: HTTPXMock, client_fixture):
async def test_retry_with_same_acc_on_network_error(httpx_mock: HTTPXMock, client_fixture: CF):
pool, client = client_fixture
# locked account on enter
@ -113,3 +115,43 @@ async def test_retry_with_same_acc_on_network_error(httpx_mock: HTTPXMock, clien
# check username added to request obj (for logging)
username = getattr(rep, "__username", None)
assert username is not None
async def test_ctx_closed_on_break(httpx_mock: HTTPXMock, client_fixture: CF):
pool, client = client_fixture
async def get_data_stream():
async with client as c:
counter = 0
while True:
counter += 1
check_retry = counter == 2
before_ctx = c.ctx
if check_retry:
httpx_mock.add_response(url=URL, json={"counter": counter}, status_code=403)
httpx_mock.add_response(url=URL, json={"counter": counter}, status_code=200)
else:
httpx_mock.add_response(url=URL, json={"counter": counter}, status_code=200)
rep = await c.get(URL)
if check_retry:
assert before_ctx != c.ctx
elif before_ctx is not None:
assert before_ctx == c.ctx
assert rep.json() == {"counter": counter}
yield rep.json()["counter"]
if counter == 9:
return
# need to use async with to break to work
async with aclosing(get_data_stream()) as gen:
async for x in gen:
if x == 3:
break
# ctx should be None after break
assert client.ctx is None

Просмотреть файл

@ -133,10 +133,11 @@ class AccountsPool:
qs = "UPDATE accounts SET active = :active WHERE username = :username"
await execute(self._db_file, qs, {"username": username, "active": active})
async def lock_until(self, username: str, queue: str, unlock_at: int):
async def lock_until(self, username: str, queue: str, unlock_at: int, req_count=0):
qs = f"""
UPDATE accounts SET
locks = json_set(locks, '$.{queue}', datetime({unlock_at}, 'unixepoch')),
stats = json_set(stats, '$.{queue}', COALESCE(json_extract(stats, '$.{queue}'), 0) + {req_count}),
last_used = datetime({utc_ts()}, 'unixepoch')
WHERE username = :username
"""

Просмотреть файл

@ -9,8 +9,8 @@ from .utils import encode_params, find_obj, get_by_path, to_old_obj, to_old_rep
class API:
def __init__(self, pool: AccountsPool, debug=False):
self.pool = pool
def __init__(self, pool: AccountsPool | None, debug=False):
self.pool = pool if pool is not None else AccountsPool()
self.debug = debug
# general helpers

Просмотреть файл

@ -39,20 +39,23 @@ class QueueClient:
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._close_ctx()
return self
async def _close_ctx(self):
if self.ctx is not None:
await self.ctx.clt.aclose()
await self.pool.unlock(self.ctx.acc.username, self.queue, self.ctx.req_count)
async def _close_ctx(self, reset_at=-1):
if self.ctx is None:
return
ctx, self.ctx, self.req_count = self.ctx, None, 0
await ctx.clt.aclose()
if reset_at <= 0:
await self.pool.unlock(ctx.acc.username, self.queue, ctx.req_count)
else:
await self.pool.lock_until(ctx.acc.username, self.queue, reset_at, ctx.req_count)
async def _get_ctx(self) -> Ctx:
if self.ctx:
return self.ctx
if self.ctx is not None:
await self._close_ctx()
acc = await self.pool.get_for_queue_or_wait(self.queue)
clt = acc.make_client()
self.ctx = Ctx(acc, clt)
@ -105,16 +108,14 @@ class QueueClient:
if rep.status_code == 429:
logger.debug(f"Rate limit for {log_id}")
reset_ts = int(rep.headers.get("x-rate-limit-reset", 0))
await self.pool.lock_until(ctx.acc.username, self.queue, reset_ts)
self.ctx = None # get next account
await self._close_ctx(reset_ts) # get next account on next iteration
continue
# possible account banned
if rep.status_code in (401, 403):
reset_ts = utc_ts() + 60 * 60 # + 1 hour
logger.warning(f"Code {rep.status_code} for {log_id} – frozen for 1h")
await self.pool.lock_until(ctx.acc.username, self.queue, reset_ts)
self.ctx = None # get next account
await self._close_ctx(reset_ts) # get next account on next iteration
continue
# twitter can return different types of cursors that not transfers between accounts