update tests with fixture magic

Этот коммит содержится в:
Vlad Pronsky 2023-05-28 21:03:02 +03:00
родитель d2cf6783d8
Коммит 9735559c49
4 изменённых файлов: 72 добавлений и 105 удалений

21
tests/conftest.py Обычный файл
Просмотреть файл

@ -0,0 +1,21 @@
import pytest
from twscrape.accounts_pool import AccountsPool
from twscrape.queue_client import QueueClient
@pytest.fixture
def poolm(tmp_path) -> AccountsPool: # type: ignore
db_path = tmp_path / "test.db"
yield AccountsPool(db_path)
@pytest.fixture
async def client_fixture(poolm: AccountsPool):
await poolm.add_account("user1", "pass1", "email1", "email_pass1")
await poolm.add_account("user2", "pass2", "email2", "email_pass2")
await poolm.set_active("user1", True)
await poolm.set_active("user2", True)
client = QueueClient(poolm, "search")
yield poolm, client

Просмотреть файл

@ -1,101 +1,79 @@
import os
from twscrape.accounts_pool import AccountsPool
from twscrape.db import DB
from twscrape.utils import utc_ts
DB_FILE = "/tmp/twscrape_test_pool.db"
def remove_db():
DB._init_once[DB_FILE] = False
if os.path.exists(DB_FILE):
os.remove(DB_FILE)
async def test_add_accounts():
remove_db()
pool = AccountsPool(DB_FILE)
async def test_add_accounts(poolm: AccountsPool):
# should add account
await pool.add_account("user1", "pass1", "email1", "email_pass1")
acc = await pool.get("user1")
await poolm.add_account("user1", "pass1", "email1", "email_pass1")
acc = await poolm.get("user1")
assert acc.username == "user1"
assert acc.password == "pass1"
assert acc.email == "email1"
assert acc.email_password == "email_pass1"
# should not add account with same username
await pool.add_account("user1", "pass2", "email2", "email_pass2")
acc = await pool.get("user1")
await poolm.add_account("user1", "pass2", "email2", "email_pass2")
acc = await poolm.get("user1")
assert acc.username == "user1"
assert acc.password == "pass1"
assert acc.email == "email1"
assert acc.email_password == "email_pass1"
# should not add account with different username case
await pool.add_account("USER1", "pass2", "email2", "email_pass2")
acc = await pool.get("user1")
await poolm.add_account("USER1", "pass2", "email2", "email_pass2")
acc = await poolm.get("user1")
assert acc.username == "user1"
assert acc.password == "pass1"
assert acc.email == "email1"
assert acc.email_password == "email_pass1"
# should add account with different username
await pool.add_account("user2", "pass2", "email2", "email_pass2")
acc = await pool.get("user2")
await poolm.add_account("user2", "pass2", "email2", "email_pass2")
acc = await poolm.get("user2")
assert acc.username == "user2"
assert acc.password == "pass2"
assert acc.email == "email2"
assert acc.email_password == "email_pass2"
async def test_get_all():
remove_db()
pool = AccountsPool(DB_FILE)
async def test_get_all(poolm: AccountsPool):
# should return empty list
accs = await pool.get_all()
accs = await poolm.get_all()
assert len(accs) == 0
# should return all accounts
await pool.add_account("user1", "pass1", "email1", "email_pass1")
await pool.add_account("user2", "pass2", "email2", "email_pass2")
accs = await pool.get_all()
await poolm.add_account("user1", "pass1", "email1", "email_pass1")
await poolm.add_account("user2", "pass2", "email2", "email_pass2")
accs = await poolm.get_all()
assert len(accs) == 2
assert accs[0].username == "user1"
assert accs[1].username == "user2"
async def test_save():
remove_db()
pool = AccountsPool(DB_FILE)
async def test_save(poolm: AccountsPool):
# should save account
await pool.add_account("user1", "pass1", "email1", "email_pass1")
acc = await pool.get("user1")
await poolm.add_account("user1", "pass1", "email1", "email_pass1")
acc = await poolm.get("user1")
acc.password = "pass2"
await pool.save(acc)
acc = await pool.get("user1")
await poolm.save(acc)
acc = await poolm.get("user1")
assert acc.password == "pass2"
# should not save account
acc = await pool.get("user1")
acc = await poolm.get("user1")
acc.username = "user2"
await pool.save(acc)
acc = await pool.get("user1")
await poolm.save(acc)
acc = await poolm.get("user1")
assert acc.username == "user1"
async def test_get_for_queue():
remove_db()
pool = AccountsPool(DB_FILE)
async def test_get_for_queue(poolm: AccountsPool):
Q = "test_queue"
# should return account
await pool.add_account("user1", "pass1", "email1", "email_pass1")
await pool.set_active("user1", True)
acc = await pool.get_for_queue(Q)
await poolm.add_account("user1", "pass1", "email1", "email_pass1")
await poolm.set_active("user1", True)
acc = await poolm.get_for_queue(Q)
assert acc is not None
assert acc.username == "user1"
assert acc.active is True
@ -104,60 +82,56 @@ async def test_get_for_queue():
assert acc.locks[Q] is not None
# should return None
acc = await pool.get_for_queue(Q)
acc = await poolm.get_for_queue(Q)
assert acc is None
async def test_account_unlock():
remove_db()
pool = AccountsPool(DB_FILE)
async def test_account_unlock(poolm: AccountsPool):
Q = "test_queue"
await pool.add_account("user1", "pass1", "email1", "email_pass1")
await pool.set_active("user1", True)
acc = await pool.get_for_queue(Q)
await poolm.add_account("user1", "pass1", "email1", "email_pass1")
await poolm.set_active("user1", True)
acc = await poolm.get_for_queue(Q)
assert acc is not None
assert acc.locks[Q] is not None
# should unlock account and make available for queue
await pool.unlock(acc.username, Q)
acc = await pool.get_for_queue(Q)
await poolm.unlock(acc.username, Q)
acc = await poolm.get_for_queue(Q)
assert acc is not None
assert acc.locks[Q] is not None
# should update lock time
end_time = utc_ts() + 60 # + 1 minute
await pool.lock_until(acc.username, Q, end_time)
await poolm.lock_until(acc.username, Q, end_time)
acc = await pool.get(acc.username)
acc = await poolm.get(acc.username)
assert int(acc.locks[Q].timestamp()) == end_time
async def test_get_stats():
remove_db()
pool = AccountsPool(DB_FILE)
async def test_get_stats(poolm: AccountsPool):
Q = "search"
# should return empty stats
stats = await pool.stats()
stats = await poolm.stats()
for k, v in stats.items():
assert v == 0, f"{k} should be 0"
# should increate total
await pool.add_account("user1", "pass1", "email1", "email_pass1")
stats = await pool.stats()
await poolm.add_account("user1", "pass1", "email1", "email_pass1")
stats = await poolm.stats()
assert stats["total"] == 1
assert stats["active"] == 0
# should increate active
await pool.set_active("user1", True)
stats = await pool.stats()
await poolm.set_active("user1", True)
stats = await poolm.stats()
assert stats["total"] == 1
assert stats["active"] == 1
# should update queue stats
await pool.get_for_queue(Q)
stats = await pool.stats()
await poolm.get_for_queue(Q)
stats = await poolm.stats()
assert stats["total"] == 1
assert stats["active"] == 1
assert stats["locked_search"] == 1

Просмотреть файл

@ -2,9 +2,6 @@ import httpx
from pytest_httpx import HTTPXMock
from twscrape.logger import set_log_level
from twscrape.queue_client import QueueClient
from .utils import get_pool
DB_FILE = "/tmp/twscrape_test_queue_client.db"
URL = "https://example.com/api"
@ -12,19 +9,8 @@ URL = "https://example.com/api"
set_log_level("ERROR")
async def get_client():
pool = get_pool(DB_FILE)
await pool.add_account("user1", "pass1", "email1", "email_pass1")
await pool.add_account("user2", "pass2", "email2", "email_pass2")
await pool.set_active("user1", True)
await pool.set_active("user2", True)
client = QueueClient(pool, "search")
return pool, client
async def test_should_lock_account_on_queue(httpx_mock: HTTPXMock):
pool, client = await get_client()
async def test_lock_account_when_used(httpx_mock: HTTPXMock, client_fixture):
pool, client = client_fixture
assert (await pool.stats())["locked_search"] == 0
await client.__aenter__()
@ -37,9 +23,8 @@ async def test_should_lock_account_on_queue(httpx_mock: HTTPXMock):
assert (await pool.stats())["locked_search"] == 0
async def test_should_not_switch_account_on_200(httpx_mock: HTTPXMock):
pool, client = await get_client()
async def test_do_not_switch_account_on_200(httpx_mock: HTTPXMock, client_fixture):
pool, client = client_fixture
assert (await pool.stats())["locked_search"] == 0
await client.__aenter__()
@ -56,8 +41,8 @@ async def test_should_not_switch_account_on_200(httpx_mock: HTTPXMock):
await client.__aexit__(None, None, None)
async def test_should_switch_account_on_http_error(httpx_mock: HTTPXMock):
pool, client = await get_client()
async def test_switch_acc_on_http_error(httpx_mock: HTTPXMock, client_fixture):
pool, client = client_fixture
assert (await pool.stats())["locked_search"] == 0
await client.__aenter__()
@ -72,8 +57,8 @@ async def test_should_switch_account_on_http_error(httpx_mock: HTTPXMock):
await client.__aexit__(None, None, None)
async def test_should_retry_with_same_account_on_network_error(httpx_mock: HTTPXMock):
pool, client = await get_client()
async def test_retry_with_same_acc_on_network_error(httpx_mock: HTTPXMock, client_fixture):
pool, client = client_fixture
await client.__aenter__()
httpx_mock.add_exception(httpx.ReadTimeout("Unable to read within timeout"))

Просмотреть файл

@ -1,13 +0,0 @@
import os
from twscrape.accounts_pool import AccountsPool
from twscrape.db import DB
def get_pool(db_path: str):
DB._init_once[db_path] = False
if os.path.exists(db_path):
os.remove(db_path)
pool = AccountsPool(db_path)
return pool