diff --git a/.gitignore b/.gitignore index a51ccf7..9b8d241 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ accounts/ results-raw/ results-parsed/ +accounts.db* # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/.vscode/settings.json b/.vscode/settings.json index 7f3f159..1d23fc8 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,7 +1,10 @@ { "files.exclude": { ".ruff_cache": true, - ".pytest_cache": true + ".pytest_cache": true, + "*.egg-info": true, + "build": true, + ".coverage": true, }, "[python]": { "editor.formatOnSave": true, diff --git a/Makefile b/Makefile index 941f392..3164b18 100644 --- a/Makefile +++ b/Makefile @@ -2,10 +2,10 @@ all: @echo "hi" lint: - ruff check . + ruff check twapi lint-fix: - ruff check --fix . + ruff check --fix twapi pylint: pylint --errors-only twapi diff --git a/pyproject.toml b/pyproject.toml index 059ab26..181a463 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ classifiers = [ 'Programming Language :: Python :: 3.11', ] dependencies = [ + "aiosqlite==0.17.0", "fake-useragent==1.1.3", "httpx==0.24.0", "loguru==0.7.0", diff --git a/readme.md b/readme.md index 467749a..14840a7 100644 --- a/readme.md +++ b/readme.md @@ -22,13 +22,12 @@ from twapi import AccountsPool, API, gather from twapi.logger import set_log_level async def main(): - pool = AccountsPool() - pool.add_account("user1", "pass1", "user1@example.com", "email_pass1") - pool.add_account("user2", "pass2", "user2@example.com", "email_pass2") + pool = AccountsPool() # or pool = AccountsPool("path-to.db") - default is `accounts.db` + await pool.add_account("user1", "pass1", "user1@example.com", "email_pass1") + await pool.add_account("user2", "pass2", "user2@example.com", "email_pass2") - # login all accounts if required (not account file found) - # session file will be saved to `accounts/{username}.json` - await pool.login() + # log in to all fresh accounts + await pool.login_all() api = API(pool) @@ -67,7 +66,8 @@ async def main(): doc.dict() # -> python dict doc.json() # -> json string - if __name__ == "__main__": asyncio.run(main()) ``` + +You can use `login_all` once in your program to pass the login flow and add the accounts to the database. Re-runs will use the previously activated accounts. diff --git a/tests/test_pool.py b/tests/test_pool.py new file mode 100644 index 0000000..1af0c1c --- /dev/null +++ b/tests/test_pool.py @@ -0,0 +1,107 @@ +import os + +from twapi.accounts_pool import AccountsPool +from twapi.db import DB + +DB_FILE = "/tmp/twapi_test.db" + + +def remove_db(): + DB._init_once[DB_FILE] = False + if os.path.exists(DB_FILE): + os.remove(DB_FILE) + + +async def test_add_accounts(): + remove_db() + pool = AccountsPool(DB_FILE) + + # should add account + await pool.add_account("user1", "pass1", "email1", "email_pass1") + acc = await pool.get("user1") + assert acc.username == "user1" + assert acc.password == "pass1" + assert acc.email == "email1" + assert acc.email_password == "email_pass1" + + # should not add account with same username + await pool.add_account("user1", "pass2", "email2", "email_pass2") + acc = await pool.get("user1") + assert acc.username == "user1" + assert acc.password == "pass1" + assert acc.email == "email1" + assert acc.email_password == "email_pass1" + + # should not add account with different username case + await pool.add_account("USER1", "pass2", "email2", "email_pass2") + acc = await pool.get("user1") + assert acc.username == "user1" + assert acc.password == "pass1" + assert acc.email == "email1" + assert acc.email_password == "email_pass1" + + # should add account with different username + await pool.add_account("user2", "pass2", "email2", "email_pass2") + acc = await pool.get("user2") + assert acc.username == "user2" + assert acc.password == "pass2" + assert acc.email == "email2" + assert acc.email_password == "email_pass2" + + +async def test_get_all(): + remove_db() + pool = AccountsPool(DB_FILE) + + # should return empty list + accs = await pool.get_all() + assert len(accs) == 0 + + # should return all accounts + await pool.add_account("user1", "pass1", "email1", "email_pass1") + await pool.add_account("user2", "pass2", "email2", "email_pass2") + accs = await pool.get_all() + assert len(accs) == 2 + assert accs[0].username == "user1" + assert accs[1].username == "user2" + + +async def test_save(): + remove_db() + pool = AccountsPool(DB_FILE) + + # should save account + await pool.add_account("user1", "pass1", "email1", "email_pass1") + acc = await pool.get("user1") + acc.password = "pass2" + await pool.save(acc) + acc = await pool.get("user1") + assert acc.password == "pass2" + + # should not save account + acc = await pool.get("user1") + acc.username = "user2" + await pool.save(acc) + acc = await pool.get("user1") + assert acc.username == "user1" + + +async def test_get_for_queue(): + remove_db() + pool = AccountsPool(DB_FILE) + Q = "test_queue" + + # should return account + await pool.add_account("user1", "pass1", "email1", "email_pass1") + await pool.set_active("user1", True) + acc = await pool.get_for_queue(Q) + assert acc is not None + assert acc.username == "user1" + assert acc.active is True + assert acc.locks is not None + assert Q in acc.locks + assert acc.locks[Q] is not None + + # should return None + acc = await pool.get_for_queue(Q) + assert acc is None diff --git a/twapi/account.py b/twapi/account.py index 16e1149..4a450b1 100644 --- a/twapi/account.py +++ b/twapi/account.py @@ -1,288 +1,73 @@ import json -from datetime import datetime, timedelta, timezone -from enum import Enum +import sqlite3 +from dataclasses import asdict, dataclass +from datetime import datetime -from fake_useragent import UserAgent -from httpx import AsyncClient, HTTPStatusError, Response +from httpx import AsyncClient -from .constants import LOGIN_URL, TOKEN -from .imap import get_email_code -from .logger import logger -from .utils import raise_for_status - - -class Status(str, Enum): - NEW = "new" - ACTIVE = "active" - LOGIN_ERROR = "login_error" +from .constants import TOKEN +@dataclass class Account: - @classmethod - def load_from_file(cls, filepath: str): - try: - with open(filepath) as f: - data = json.load(f) - return cls(**data) - except (FileNotFoundError, json.JSONDecodeError) as e: - logger.error(f"Failed to load account {filepath}: {e}") - return None + username: str + password: str + email: str + email_password: str + user_agent: str + active: bool + locks: dict[str, datetime] + headers: dict[str, str] | None = None + cookies: dict[str, str] | None = None + proxy: str | None = None + error_msg: str | None = None - def __init__( - self, - username: str, - password: str, - email: str, - email_password: str, - user_agent: str | None = None, - proxy: str | None = None, - headers={}, - cookies={}, - limits={}, - status=Status.NEW, - ): - self.username = username - self.password = password - self.email = email - self.email_password = email_password - self.user_agent = user_agent or UserAgent().safari - self.proxy = proxy or None + @staticmethod + def create_sql(): + return """ + CREATE TABLE IF NOT EXISTS accounts ( + username TEXT PRIMARY KEY NOT NULL COLLATE NOCASE, + password TEXT NOT NULL, + email TEXT NOT NULL COLLATE NOCASE, + email_password TEXT NOT NULL, + user_agent TEXT NOT NULL, + active BOOLEAN DEFAULT FALSE NOT NULL, + locks TEXT DEFAULT '{}' NOT NULL, + headers TEXT DEFAULT '{}' NOT NULL, + cookies TEXT DEFAULT '{}' NOT NULL, + proxy TEXT DEFAULT NULL, + error_msg TEXT DEFAULT NULL + ); + """ - self.client = AsyncClient(proxies=self.proxy) - self.client.cookies.update(cookies) - self.client.headers.update(headers) + @staticmethod + def from_rs(rs: sqlite3.Row): + doc = dict(rs) + doc["locks"] = {k: datetime.fromisoformat(v) for k, v in json.loads(doc["locks"]).items()} + doc["headers"] = json.loads(doc["headers"]) + doc["cookies"] = json.loads(doc["cookies"]) + doc["active"] = bool(doc["active"]) + return Account(**doc) - self.client.headers["user-agent"] = self.user_agent - self.client.headers["content-type"] = "application/json" - self.client.headers["authorization"] = TOKEN - self.client.headers["x-twitter-active-user"] = "yes" - self.client.headers["x-twitter-client-language"] = "en" + def to_rs(self): + rs = asdict(self) + rs["locks"] = json.dumps(rs["locks"], default=lambda x: x.isoformat()) + rs["headers"] = json.dumps(rs["headers"]) + rs["cookies"] = json.dumps(rs["cookies"]) + return rs - self.status: Status = status - self.locked: dict[str, bool] = {} - self.limits: dict[str, datetime] = { - k: datetime.fromisoformat(v) for k, v in limits.items() - } + def make_client(self) -> AsyncClient: + client = AsyncClient(proxies=self.proxy) - def dump(self): - return { - "username": self.username, - "password": self.password, - "email": self.email, - "email_password": self.email_password, - "user_agent": self.user_agent, - "proxy": self.proxy, - "cookies": {k: v for k, v in self.client.cookies.items()}, - "headers": {k: v for k, v in self.client.headers.items()}, - "limits": {k: v.isoformat() for k, v in self.limits.items()}, - "status": self.status, - } + # saved from previous usage + client.cookies.update(self.cookies) + client.headers.update(self.headers) - def update_limit(self, queue: str, reset_ts: int): - self.limits[queue] = datetime.fromtimestamp(reset_ts, tz=timezone.utc) + # default settings + client.headers["user-agent"] = self.user_agent + client.headers["content-type"] = "application/json" + client.headers["authorization"] = TOKEN + client.headers["x-twitter-active-user"] = "yes" + client.headers["x-twitter-client-language"] = "en" - def can_use(self, queue: str): - if self.locked.get(queue, False) or self.status != Status.ACTIVE: - return False - - limit = self.limits.get(queue) - return not limit or limit <= datetime.now(timezone.utc) - - def lock(self, queue: str): - self.locked[queue] = True - - def unlock(self, queue: str): - self.locked[queue] = False - - async def login(self, fresh=False): - log_id = f"{self.username} - {self.email}" - if self.status != "new" and not fresh: - logger.debug(f"logged in already {log_id}") - return - - logger.debug(f"logging in {log_id}") - - guest_token = await self.get_guest_token() - self.client.headers["x-guest-token"] = guest_token - - rep = await self.login_initiate() - while True: - try: - if rep: - rep = await self.next_login_task(rep) - else: - break - except HTTPStatusError as e: - if e.response.status_code == 403: - logger.error(f"403 error {log_id}") - self.status = Status.LOGIN_ERROR - return - - self.client.headers["x-csrf-token"] = self.client.cookies["ct0"] - self.client.headers["x-twitter-auth-type"] = "OAuth2Session" - - logger.info(f"logged in success {log_id}") - self.status = Status.ACTIVE - - async def get_guest_token(self): - rep = await self.client.post("https://api.twitter.com/1.1/guest/activate.json") - raise_for_status(rep, "guest_token") - return rep.json()["guest_token"] - - async def login_initiate(self) -> Response: - payload = { - "input_flow_data": { - "flow_context": {"debug_overrides": {}, "start_location": {"location": "unknown"}} - }, - "subtask_versions": {}, - } - - rep = await self.client.post(LOGIN_URL, params={"flow_name": "login"}, json=payload) - raise_for_status(rep, "login_initiate") - return rep - - async def next_login_task(self, rep: Response): - ct0 = self.client.cookies.get("ct0", None) - if ct0: - self.client.headers["x-csrf-token"] = ct0 - self.client.headers["x-twitter-auth-type"] = "OAuth2Session" - - data = rep.json() - assert "flow_token" in data, f"flow_token not in {rep.text}" - # logger.debug(f"login tasks: {[x['subtask_id'] for x in data['subtasks']]}") - - for x in data["subtasks"]: - task_id = x["subtask_id"] - - try: - if task_id == "LoginSuccessSubtask": - return await self.login_success(data) - if task_id == "LoginAcid": - is_code = x["enter_text"]["hint_text"].lower() == "confirmation code" - # logger.debug(f"is login code: {is_code}") - fn = self.login_confirm_email_code if is_code else self.login_confirm_email - return await fn(data) - if task_id == "AccountDuplicationCheck": - return await self.login_duplication_check(data) - if task_id == "LoginEnterPassword": - return await self.login_enter_password(data) - if task_id == "LoginEnterUserIdentifierSSO": - return await self.login_enter_username(data) - if task_id == "LoginJsInstrumentationSubtask": - return await self.login_instrumentation(data) - except Exception as e: - logger.error(f"Error in {task_id}: {e}") - logger.error(e) - - return None - - async def login_instrumentation(self, prev: dict) -> Response: - payload = { - "flow_token": prev["flow_token"], - "subtask_inputs": [ - { - "subtask_id": "LoginJsInstrumentationSubtask", - "js_instrumentation": {"response": "{}", "link": "next_link"}, - } - ], - } - - rep = await self.client.post(LOGIN_URL, json=payload) - raise_for_status(rep, "login_instrumentation") - return rep - - async def login_enter_username(self, prev: dict) -> Response: - payload = { - "flow_token": prev["flow_token"], - "subtask_inputs": [ - { - "subtask_id": "LoginEnterUserIdentifierSSO", - "settings_list": { - "setting_responses": [ - { - "key": "user_identifier", - "response_data": {"text_data": {"result": self.username}}, - } - ], - "link": "next_link", - }, - } - ], - } - - rep = await self.client.post(LOGIN_URL, json=payload) - raise_for_status(rep, "login_username") - return rep - - async def login_enter_password(self, prev: dict) -> Response: - payload = { - "flow_token": prev["flow_token"], - "subtask_inputs": [ - { - "subtask_id": "LoginEnterPassword", - "enter_password": {"password": self.password, "link": "next_link"}, - } - ], - } - - rep = await self.client.post(LOGIN_URL, json=payload) - raise_for_status(rep, "login_password") - return rep - - async def login_duplication_check(self, prev: dict) -> Response: - payload = { - "flow_token": prev["flow_token"], - "subtask_inputs": [ - { - "subtask_id": "AccountDuplicationCheck", - "check_logged_in_account": {"link": "AccountDuplicationCheck_false"}, - } - ], - } - - rep = await self.client.post(LOGIN_URL, json=payload) - raise_for_status(rep, "login_duplication_check") - return rep - - async def login_confirm_email(self, prev: dict) -> Response: - payload = { - "flow_token": prev["flow_token"], - "subtask_inputs": [ - { - "subtask_id": "LoginAcid", - "enter_text": {"text": self.email, "link": "next_link"}, - } - ], - } - - rep = await self.client.post(LOGIN_URL, json=payload) - raise_for_status(rep, "login_confirm_email") - return rep - - async def login_confirm_email_code(self, prev: dict): - now_time = datetime.now(timezone.utc) - timedelta(seconds=30) - value = await get_email_code(self.email, self.email_password, now_time) - - payload = { - "flow_token": prev["flow_token"], - "subtask_inputs": [ - { - "subtask_id": "LoginAcid", - "enter_text": {"text": value, "link": "next_link"}, - } - ], - } - - rep = await self.client.post(LOGIN_URL, json=payload) - raise_for_status(rep, "login_confirm_email") - return rep - - async def login_success(self, prev: dict) -> Response: - payload = { - "flow_token": prev["flow_token"], - "subtask_inputs": [], - } - - rep = await self.client.post(LOGIN_URL, json=payload) - raise_for_status(rep, "login_success") - return rep + return client diff --git a/twapi/accounts_pool.py b/twapi/accounts_pool.py index af12c78..816c002 100644 --- a/twapi/accounts_pool.py +++ b/twapi/accounts_pool.py @@ -1,101 +1,153 @@ +# ruff: noqa: E501 import asyncio -import json -import os -from .account import Account, Status +from fake_useragent import UserAgent + +from .account import Account +from .db import add_init_query, execute, fetchall, fetchone from .logger import logger -from .utils import shuffle +from .login import login class AccountsPool: - def __init__(self, base_dir: str | None = None): - self.accounts: list[Account] = [] - self.base_dir = base_dir or "accounts" + def __init__(self, db_file="accounts.db"): + self._db_file = db_file + add_init_query(db_file, Account.create_sql()) - def restore(self): - files = [os.path.join(self.base_dir, x) for x in os.listdir(self.base_dir)] - files = [x for x in files if x.endswith(".json")] - for file in files: - self._load_account_from_file(file) - - def _load_account_from_file(self, filepath: str): - account = Account.load_from_file(filepath) - if account: - username = set(x.username for x in self.accounts) - if account.username in username: - raise ValueError(f"Duplicate username {account.username}") - self.accounts.append(account) - return account - - def _get_filename(self, username: str): - return f"{self.base_dir}/{username}.json" - - def add_account( + async def add_account( self, - login: str, + username: str, password: str, email: str, email_password: str, - proxy: str | None = None, user_agent: str | None = None, + proxy: str | None = None, ): - account = self._load_account_from_file(self._get_filename(login)) - if account: + qs = "SELECT * FROM accounts WHERE username = :username" + rs = await fetchone(self._db_file, qs, {"username": username}) + if rs: return account = Account( - login, - password, - email, - email_password, + username=username, + password=password, + email=email, + email_password=email_password, + user_agent=user_agent or UserAgent().safari, + active=False, + locks={}, + headers={}, + cookies={}, proxy=proxy, - user_agent=user_agent, ) - self.save_account(account) - self._load_account_from_file(self._get_filename(login)) + await self.save(account) - async def login(self): - for x in self.accounts: - try: - if x.status == Status.NEW: - await x.login() - except Exception as e: - logger.error(f"Error logging in to {x.username}: {e}") - finally: - self.save_account(x) + async def get(self, username: str): + qs = "SELECT * FROM accounts WHERE username = :username" + rs = await fetchone(self._db_file, qs, {"username": username}) + if not rs: + raise ValueError(f"Account {username} not found") + return Account.from_rs(rs) - def get_username_by_token(self, auth_token: str) -> str: - for x in self.accounts: - if x.client.cookies.get("auth_token") == auth_token: - return x.username - return "UNKNOWN" + async def get_all(self): + qs = "SELECT * FROM accounts" + rs = await fetchall(self._db_file, qs) + return [Account.from_rs(x) for x in rs] - def get_account(self, queue: str) -> Account | None: - accounts = shuffle(self.accounts) # make random order each time - for x in accounts: - if x.can_use(queue): - return x - return None + async def save(self, account: Account): + data = account.to_rs() + cols = list(data.keys()) - async def get_account_or_wait(self, queue: str) -> Account: + qs = f""" + INSERT INTO accounts ({",".join(cols)}) VALUES ({",".join([f":{x}" for x in cols])}) + ON CONFLICT DO UPDATE SET {",".join([f"{x}=excluded.{x}" for x in cols])} + """ + await execute(self._db_file, qs, data) + + async def login(self, account: Account): + try: + await login(account) + except Exception as e: + logger.error(f"Error logging in to {account.username}: {e}") + finally: + await self.save(account) + + async def login_all(self): + qs = "SELECT * FROM accounts WHERE active = false AND error_msg IS NULL" + rs = await fetchall(self._db_file, qs) + + accounts = [Account.from_rs(rs) for rs in rs] + # await asyncio.gather(*[login(x) for x in self.accounts]) + + for i, x in enumerate(accounts, start=1): + logger.info(f"[{i}/{len(accounts)}] Logging in {x.username} - {x.email}") + await self.login(x) + + async def set_active(self, username: str, active: bool): + qs = "UPDATE accounts SET active = :active WHERE username = :username" + await execute(self._db_file, qs, {"username": username, "active": active}) + + async def lock_until(self, username: str, queue: str, unlock_at: int): + # unlock_at is unix timestamp + qs = f""" + UPDATE accounts SET locks = json_set(locks, '$.{queue}', datetime({unlock_at}, 'unixepoch')) + WHERE username = :username + """ + await execute(self._db_file, qs, {"username": username}) + + async def unlock(self, username: str, queue: str): + qs = f""" + UPDATE accounts SET locks = json_remove(locks, '$.{queue}') + WHERE username = :username + """ + await execute(self._db_file, qs, {"username": username}) + + async def get_for_queue(self, queue: str): + q1 = f""" + SELECT username FROM accounts + WHERE active = true AND ( + locks IS NULL + OR json_extract(locks, '$.{queue}') IS NULL + OR json_extract(locks, '$.{queue}') < datetime('now') + ) + ORDER BY RANDOM() + LIMIT 1 + """ + + q2 = f""" + UPDATE accounts SET locks = json_set(locks, '$.{queue}', datetime('now', '+15 minutes')) + WHERE username = ({q1}) + RETURNING * + """ + + rs = await fetchone(self._db_file, q2) + return Account.from_rs(rs) if rs else None + + async def get_for_queue_or_wait(self, queue: str) -> Account: while True: - account = self.get_account(queue) - if account: - logger.debug(f"Using account {account.username} for queue '{queue}'") - account.lock(queue) - return account - else: + account = await self.get_for_queue(queue) + if not account: logger.debug(f"No accounts available for queue '{queue}' (sleeping for 5 sec)") await asyncio.sleep(5) + continue - def save_account(self, account: Account): - filename = self._get_filename(account.username) - data = account.dump() + logger.debug(f"Using account {account.username} for queue '{queue}'") + return account - os.makedirs(os.path.dirname(filename), exist_ok=True) - with open(filename, "w") as f: - json.dump(data, f, indent=2) + async def stats(self): + def by_queue(queue: str): + return f""" + SELECT COUNT(*) FROM accounts + WHERE json_extract(locks, '$.{queue}') IS NOT NULL AND json_extract(locks, '$.{queue}') > datetime('now') + """ - def update_limit(self, account: Account, queue: str, reset_ts: int): - account.update_limit(queue, reset_ts) - self.save_account(account) + config = [ + ("total", "SELECT COUNT(*) FROM accounts"), + ("active", "SELECT COUNT(*) FROM accounts WHERE active = true"), + ("inactive", "SELECT COUNT(*) FROM accounts WHERE active = false"), + ("locked_search", by_queue("search")), + ] + + qs = f"SELECT {','.join([f'({q}) as {k}' for k, q in config])}" + rs = await fetchone(self._db_file, qs) + return dict(rs) if rs else {} diff --git a/twapi/api.py b/twapi/api.py index 89b84bc..921c876 100644 --- a/twapi/api.py +++ b/twapi/api.py @@ -24,9 +24,7 @@ class API: lr = rep.headers.get("x-rate-limit-remaining", -1) ll = rep.headers.get("x-rate-limit-limit", -1) - auth_token = rep.request.headers["cookie"].split("auth_token=")[1].split(";")[0] - username = self.pool.get_username_by_token(auth_token) - + username = getattr(rep, "__username", "") return f"{username} {lr}/{ll}" def _is_end(self, rep: Response, q: str, res: list, cur: str | None, cnt: int, lim: int): @@ -73,31 +71,33 @@ class API: async def _inf_req(self, queue: str, cb: Callable[[AsyncClient], Awaitable[Response]]): while True: - account = await self.pool.get_account_or_wait(queue) + acc = await self.pool.get_for_queue_or_wait(queue) + client = acc.make_client() try: while True: - rep = await cb(account.client) - rep.raise_for_status() + rep = await cb(client) + setattr(rep, "__username", acc.username) self._push_history(rep) + rep.raise_for_status() + yield rep except HTTPStatusError as e: rep = e.response - self._push_history(rep) log_id = f"{self._limit_msg(rep)} on queue={queue}" # rate limit if rep.status_code == 429: logger.debug(f"Rate limit for {log_id}") reset_ts = int(rep.headers.get("x-rate-limit-reset", 0)) - self.pool.update_limit(account, queue, reset_ts) + await self.pool.lock_until(acc.username, queue, reset_ts) continue # possible account banned if rep.status_code == 403: logger.debug(f"Ban for {log_id}") reset_ts = int(time.time() + 60 * 60) # 1 hour - self.pool.update_limit(account, queue, reset_ts) + await self.pool.lock_until(acc.username, queue, reset_ts) continue # twitter can return different types of cursors that not transfers between accounts @@ -109,7 +109,8 @@ class API: logger.error(f"[{rep.status_code}] {e.request.url}\n{rep.text}") raise e finally: - account.unlock(queue) + await self.pool.unlock(acc.username, queue) + await client.aclose() def _get_cursor(self, obj: dict): if cur := find_obj(obj, lambda x: x.get("cursorType") == "Bottom"): @@ -153,7 +154,6 @@ class API: queue = op.split("/")[-1] async for rep in self._inf_req(queue, _get): - logger.debug(f"{queue} {self._limit_msg(rep)}") return rep raise Exception("No response") # todo @@ -166,7 +166,12 @@ class API: async def _get(client: AsyncClient): params = {**SEARCH_PARAMS, "q": q, "count": 20} params["cursor" if cursor else "requestContext"] = cursor if cursor else "launch" - return await client.get(SEARCH_URL, params=params) + try: + return await client.get(SEARCH_URL, params=params) + except Exception as e: + logger.error(f"Error requesting {q}: {e}") + logger.error(f"Request: {SEARCH_URL}, {params}") + raise e try: async for rep in self._inf_req(queue, _get): @@ -190,11 +195,14 @@ class API: raise e async def search(self, q: str, limit=-1): + twids = set() async for rep in self.search_raw(q, limit=limit): res = rep.json() obj = res.get("globalObjects", {}) for x in list(obj.get("tweets", {}).values()): - yield Tweet.parse(x, obj) + if x["id_str"] not in twids: + twids.add(x["id_str"]) + yield Tweet.parse(x, obj) # user_by_id diff --git a/twapi/db.py b/twapi/db.py new file mode 100644 index 0000000..066b864 --- /dev/null +++ b/twapi/db.py @@ -0,0 +1,83 @@ +import asyncio +import sqlite3 +from collections import defaultdict + +import aiosqlite + + +def lock_retry(max_retries=5, delay=1): + def decorator(func): + async def wrapper(*args, **kwargs): + for i in range(max_retries): + try: + return await func(*args, **kwargs) + except sqlite3.OperationalError as e: + if i == max_retries - 1 or "database is locked" not in str(e): + raise e + + # print(f"Retrying in {delay} second(s) ({i+1}/{max_retries})") + await asyncio.sleep(delay) + + return wrapper + + return decorator + + +class DB: + _init_queries: defaultdict[str, list[str]] = defaultdict(list) + _init_once: defaultdict[str, bool] = defaultdict(bool) + + def __init__(self, db_path): + self.db_path = db_path + self.conn = None + + async def __aenter__(self): + db = await aiosqlite.connect(self.db_path) + db.row_factory = aiosqlite.Row + + if not self._init_once[self.db_path]: + for qs in self._init_queries[self.db_path]: + await db.execute(qs) + + await db.commit() # required here because of _init_once + self._init_once[self.db_path] = True + + self.conn = db + return db + + async def __aexit__(self, exc_type, exc_val, exc_tb): + if self.conn: + await self.conn.commit() + await self.conn.close() + + +def add_init_query(db_path: str, qs: str): + DB._init_queries[db_path].append(qs) + + +@lock_retry() +async def execute(db_path: str, qs: str, params: dict = {}): + async with DB(db_path) as db: + await db.execute(qs, params) + + +@lock_retry() +async def fetchone(db_path: str, qs: str, params: dict = {}): + async with DB(db_path) as db: + async with db.execute(qs, params) as cur: + row = await cur.fetchone() + return row + + +@lock_retry() +async def fetchall(db_path: str, qs: str, params: dict = {}): + async with DB(db_path) as db: + async with db.execute(qs, params) as cur: + rows = await cur.fetchall() + return rows + + +@lock_retry() +async def executemany(db_path: str, qs: str, params: list[dict]): + async with DB(db_path) as db: + await db.executemany(qs, params) diff --git a/twapi/imap.py b/twapi/imap.py index b73399c..ee734b2 100644 --- a/twapi/imap.py +++ b/twapi/imap.py @@ -1,10 +1,13 @@ import asyncio import email as emaillib import imaplib +import time from datetime import datetime from .logger import logger +MAX_WAIT_SEC = 30 + def get_imap_domain(email: str) -> str: return f"imap.{email.split('@')[1]}" @@ -34,6 +37,7 @@ def search_email_code(imap: imaplib.IMAP4_SSL, count: int, min_t: datetime | Non async def get_email_code(email: str, password: str, min_t: datetime | None = None) -> str: domain = get_imap_domain(email) + start_time = time.time() with imaplib.IMAP4_SSL(domain) as imap: imap.login(email, password) @@ -47,4 +51,6 @@ async def get_email_code(email: str, password: str, min_t: datetime | None = Non return code logger.debug(f"Waiting for confirmation code for {email}, msg_count: {now_count}") + if MAX_WAIT_SEC < time.time() - start_time: + raise Exception(f"Timeout on getting confirmation code for {email}") await asyncio.sleep(5) diff --git a/twapi/login.py b/twapi/login.py new file mode 100644 index 0000000..c9e7212 --- /dev/null +++ b/twapi/login.py @@ -0,0 +1,215 @@ +from datetime import datetime, timedelta, timezone + +from httpx import AsyncClient, HTTPStatusError, Response + +from .account import Account +from .constants import LOGIN_URL +from .imap import get_email_code +from .logger import logger +from .utils import raise_for_status + + +async def get_guest_token(client: AsyncClient): + rep = await client.post("https://api.twitter.com/1.1/guest/activate.json") + raise_for_status(rep, "guest_token") + return rep.json()["guest_token"] + + +async def login_initiate(client: AsyncClient) -> Response: + payload = { + "input_flow_data": { + "flow_context": {"debug_overrides": {}, "start_location": {"location": "unknown"}} + }, + "subtask_versions": {}, + } + + rep = await client.post(LOGIN_URL, params={"flow_name": "login"}, json=payload) + raise_for_status(rep, "login_initiate") + return rep + + +async def login_instrumentation(client: AsyncClient, acc: Account, prev: dict) -> Response: + payload = { + "flow_token": prev["flow_token"], + "subtask_inputs": [ + { + "subtask_id": "LoginJsInstrumentationSubtask", + "js_instrumentation": {"response": "{}", "link": "next_link"}, + } + ], + } + + rep = await client.post(LOGIN_URL, json=payload) + raise_for_status(rep, "login_instrumentation") + return rep + + +async def login_enter_username(client: AsyncClient, acc: Account, prev: dict) -> Response: + payload = { + "flow_token": prev["flow_token"], + "subtask_inputs": [ + { + "subtask_id": "LoginEnterUserIdentifierSSO", + "settings_list": { + "setting_responses": [ + { + "key": "user_identifier", + "response_data": {"text_data": {"result": acc.username}}, + } + ], + "link": "next_link", + }, + } + ], + } + + rep = await client.post(LOGIN_URL, json=payload) + raise_for_status(rep, "login_username") + return rep + + +async def login_enter_password(client: AsyncClient, acc: Account, prev: dict) -> Response: + payload = { + "flow_token": prev["flow_token"], + "subtask_inputs": [ + { + "subtask_id": "LoginEnterPassword", + "enter_password": {"password": acc.password, "link": "next_link"}, + } + ], + } + + rep = await client.post(LOGIN_URL, json=payload) + raise_for_status(rep, "login_password") + return rep + + +async def login_duplication_check(client: AsyncClient, acc: Account, prev: dict) -> Response: + payload = { + "flow_token": prev["flow_token"], + "subtask_inputs": [ + { + "subtask_id": "AccountDuplicationCheck", + "check_logged_in_account": {"link": "AccountDuplicationCheck_false"}, + } + ], + } + + rep = await client.post(LOGIN_URL, json=payload) + raise_for_status(rep, "login_duplication_check") + return rep + + +async def login_confirm_email(client: AsyncClient, acc: Account, prev: dict) -> Response: + payload = { + "flow_token": prev["flow_token"], + "subtask_inputs": [ + { + "subtask_id": "LoginAcid", + "enter_text": {"text": acc.email, "link": "next_link"}, + } + ], + } + + rep = await client.post(LOGIN_URL, json=payload) + raise_for_status(rep, "login_confirm_email") + return rep + + +async def login_confirm_email_code(client: AsyncClient, acc: Account, prev: dict): + now_time = datetime.now(timezone.utc) - timedelta(seconds=30) + value = await get_email_code(acc.email, acc.email_password, now_time) + + payload = { + "flow_token": prev["flow_token"], + "subtask_inputs": [ + { + "subtask_id": "LoginAcid", + "enter_text": {"text": value, "link": "next_link"}, + } + ], + } + + rep = await client.post(LOGIN_URL, json=payload) + raise_for_status(rep, "login_confirm_email") + return rep + + +async def login_success(client: AsyncClient, acc: Account, prev: dict) -> Response: + payload = { + "flow_token": prev["flow_token"], + "subtask_inputs": [], + } + + rep = await client.post(LOGIN_URL, json=payload) + raise_for_status(rep, "login_success") + return rep + + +async def next_login_task(client: AsyncClient, acc: Account, rep: Response): + ct0 = client.cookies.get("ct0", None) + if ct0: + client.headers["x-csrf-token"] = ct0 + client.headers["x-twitter-auth-type"] = "OAuth2Session" + + prev = rep.json() + assert "flow_token" in prev, f"flow_token not in {rep.text}" + # logger.debug(f"login tasks: {[x['subtask_id'] for x in data['subtasks']]}") + + for x in prev["subtasks"]: + task_id = x["subtask_id"] + + try: + if task_id == "LoginSuccessSubtask": + return await login_success(client, acc, prev) + if task_id == "LoginAcid": + is_code = x["enter_text"]["hint_text"].lower() == "confirmation code" + # logger.debug(f"is login code: {is_code}") + fn = login_confirm_email_code if is_code else login_confirm_email + return await fn(client, acc, prev) + if task_id == "AccountDuplicationCheck": + return await login_duplication_check(client, acc, prev) + if task_id == "LoginEnterPassword": + return await login_enter_password(client, acc, prev) + if task_id == "LoginEnterUserIdentifierSSO": + return await login_enter_username(client, acc, prev) + if task_id == "LoginJsInstrumentationSubtask": + return await login_instrumentation(client, acc, prev) + except Exception as e: + acc.error_msg = f"task={task_id} err={e}" + logger.error(f"Error in {task_id}: {e}") + raise e + + return None + + +async def login(acc: Account, fresh=False) -> Account: + log_id = f"{acc.username} - {acc.email}" + if acc.active and not fresh: + logger.debug(f"account already active {log_id}") + return acc + + logger.debug(f"logging in {log_id}") + client = acc.make_client() + guest_token = await get_guest_token(client) + client.headers["x-guest-token"] = guest_token + + rep = await login_initiate(client) + while True: + if not rep: + break + + try: + rep = await next_login_task(client, acc, rep) + except HTTPStatusError as e: + if e.response.status_code == 403: + logger.error(f"403 error {log_id}") + return acc + + client.headers["x-csrf-token"] = client.cookies["ct0"] + client.headers["x-twitter-auth-type"] = "OAuth2Session" + + acc.active = True + acc.headers = {k: v for k, v in client.headers.items()} + acc.cookies = {k: v for k, v in client.cookies.items()} + return acc diff --git a/twapi/utils.py b/twapi/utils.py index 06be3ac..5444763 100644 --- a/twapi/utils.py +++ b/twapi/utils.py @@ -1,5 +1,4 @@ import json -import random from collections import defaultdict from typing import Any, AsyncGenerator, Callable, TypeVar @@ -134,9 +133,3 @@ def to_search_like(obj: dict): users = {str(x["rest_id"]): to_old_obj(x) for x in users} return {"tweets": tweets, "users": users} - - -def shuffle(lst: list): - lst = lst.copy() - random.shuffle(lst) - return lst