diff --git a/twscrape/accounts_pool.py b/twscrape/accounts_pool.py index 24d46b1..86a7089 100644 --- a/twscrape/accounts_pool.py +++ b/twscrape/accounts_pool.py @@ -9,7 +9,7 @@ from fake_useragent import UserAgent from .account import Account from .db import execute, fetchall, fetchone from .logger import logger -from .login import login +from .login import LoginConfig, login from .utils import parse_cookies, utc @@ -31,8 +31,9 @@ class AccountsPool: # _order_by: str = "RANDOM()" _order_by: str = "username" - def __init__(self, db_file="accounts.db"): + def __init__(self, db_file="accounts.db", login_config: LoginConfig | None = None): self._db_file = db_file + self._login_config = login_config or LoginConfig() async def load_from_file(self, filepath: str, line_format: str): line_delim = guess_delim(line_format) @@ -138,9 +139,9 @@ class AccountsPool: """ await execute(self._db_file, qs, data) - async def login(self, account: Account, email_first: bool = False): + async def login(self, account: Account): try: - await login(account, email_first=email_first) + await login(account, cfg=self._login_config) logger.info(f"Logged in to {account.username} successfully") return True except Exception as e: @@ -149,7 +150,7 @@ class AccountsPool: finally: await self.save(account) - async def login_all(self, email_first=False, usernames: list[str] | None = None): + async def login_all(self, usernames: list[str] | None = None): if usernames is None: qs = "SELECT * FROM accounts WHERE active = false AND error_msg IS NULL" else: @@ -163,11 +164,11 @@ class AccountsPool: counter = {"total": len(accounts), "success": 0, "failed": 0} for i, x in enumerate(accounts, start=1): logger.info(f"[{i}/{len(accounts)}] Logging in {x.username} - {x.email}") - status = await self.login(x, email_first=email_first) + status = await self.login(x) counter["success" if status else "failed"] += 1 return counter - async def relogin(self, usernames: str | list[str], email_first=False): + async def relogin(self, usernames: str | list[str]): usernames = usernames if isinstance(usernames, list) else [usernames] usernames = list(set(usernames)) if not usernames: @@ -187,12 +188,12 @@ class AccountsPool: """ await execute(self._db_file, qs) - await self.login_all(email_first=email_first, usernames=usernames) + await self.login_all(usernames) - async def relogin_failed(self, email_first=False): + async def relogin_failed(self): qs = "SELECT username FROM accounts WHERE active = false AND error_msg IS NOT NULL" rs = await fetchall(self._db_file, qs) - await self.relogin([x["username"] for x in rs], email_first=email_first) + await self.relogin([x["username"] for x in rs]) async def reset_locks(self): qs = "UPDATE accounts SET locks = json_object()" @@ -277,7 +278,7 @@ class AccountsPool: continue else: if msg_shown: - logger.info(f"Account available for queue {queue}") + logger.info(f"Continuing with account {account.username} on queue {queue}") return account diff --git a/twscrape/cli.py b/twscrape/cli.py index 6870cca..083a8ef 100644 --- a/twscrape/cli.py +++ b/twscrape/cli.py @@ -12,6 +12,7 @@ import httpx from .api import API, AccountsPool from .db import get_sqlite_version from .logger import logger, set_log_level +from .login import LoginConfig from .models import Tweet, User from .utils import print_table @@ -49,7 +50,8 @@ async def main(args): print(f"SQLite runtime: {sqlite3.sqlite_version} ({await get_sqlite_version()})") return - pool = AccountsPool(args.db) + login_config = LoginConfig(getattr(args, "email_first", False), getattr(args, "manual", False)) + pool = AccountsPool(args.db, login_config=login_config) api = API(pool, debug=args.debug) if args.command == "accounts": @@ -81,16 +83,16 @@ async def main(args): return if args.command == "login_accounts": - stats = await pool.login_all(email_first=args.email_first) + stats = await pool.login_all() print(stats) return if args.command == "relogin_failed": - await pool.relogin_failed(email_first=args.email_first) + await pool.relogin_failed() return if args.command == "relogin": - await pool.relogin(args.usernames, email_first=args.email_first) + await pool.relogin(args.usernames) return if args.command == "reset_locks": @@ -171,9 +173,10 @@ def run(): relogin.add_argument("usernames", nargs="+", default=[], help="Usernames to re-login") re_failed = subparsers.add_parser("relogin_failed", help="Retry login for failed accounts") - check_email = [login_cmd, relogin, re_failed] - for cmd in check_email: + login_commands = [login_cmd, relogin, re_failed] + for cmd in login_commands: cmd.add_argument("--email-first", action="store_true", help="Check email first") + cmd.add_argument("--manual", action="store_true", help="Enter email code manually") subparsers.add_parser("reset_locks", help="Reset all locks") subparsers.add_parser("delete_inactive", help="Delete inactive accounts") diff --git a/twscrape/login.py b/twscrape/login.py index 679feb5..4f18838 100644 --- a/twscrape/login.py +++ b/twscrape/login.py @@ -1,4 +1,7 @@ +import imaplib +from dataclasses import dataclass from datetime import timedelta +from typing import Any from httpx import AsyncClient, HTTPStatusError, Response @@ -10,6 +13,21 @@ from .utils import raise_for_status, utc LOGIN_URL = "https://api.twitter.com/1.1/onboarding/task.json" +@dataclass +class LoginConfig: + email_first: bool = False + manual: bool = False + + +@dataclass +class TaskCtx: + client: AsyncClient + acc: Account + cfg: LoginConfig + prev: Any + imap: None | imaplib.IMAP4_SSL + + async def get_guest_token(client: AsyncClient): rep = await client.post("https://api.twitter.com/1.1/guest/activate.json") raise_for_status(rep, "guest_token (most likely ip ban)") @@ -29,9 +47,9 @@ async def login_initiate(client: AsyncClient) -> Response: return rep -async def login_instrumentation(client: AsyncClient, acc: Account, prev: dict) -> Response: +async def login_instrumentation(ctx: TaskCtx) -> Response: payload = { - "flow_token": prev["flow_token"], + "flow_token": ctx.prev["flow_token"], "subtask_inputs": [ { "subtask_id": "LoginJsInstrumentationSubtask", @@ -40,14 +58,14 @@ async def login_instrumentation(client: AsyncClient, acc: Account, prev: dict) - ], } - rep = await client.post(LOGIN_URL, json=payload) + rep = await ctx.client.post(LOGIN_URL, json=payload) raise_for_status(rep, "login_instrumentation") return rep -async def login_enter_username(client: AsyncClient, acc: Account, prev: dict) -> Response: +async def login_enter_username(ctx: TaskCtx) -> Response: payload = { - "flow_token": prev["flow_token"], + "flow_token": ctx.prev["flow_token"], "subtask_inputs": [ { "subtask_id": "LoginEnterUserIdentifierSSO", @@ -55,7 +73,7 @@ async def login_enter_username(client: AsyncClient, acc: Account, prev: dict) -> "setting_responses": [ { "key": "user_identifier", - "response_data": {"text_data": {"result": acc.username}}, + "response_data": {"text_data": {"result": ctx.acc.username}}, } ], "link": "next_link", @@ -64,30 +82,30 @@ async def login_enter_username(client: AsyncClient, acc: Account, prev: dict) -> ], } - rep = await client.post(LOGIN_URL, json=payload) + rep = await ctx.client.post(LOGIN_URL, json=payload) raise_for_status(rep, "login_username") return rep -async def login_enter_password(client: AsyncClient, acc: Account, prev: dict) -> Response: +async def login_enter_password(ctx: TaskCtx) -> Response: payload = { - "flow_token": prev["flow_token"], + "flow_token": ctx.prev["flow_token"], "subtask_inputs": [ { "subtask_id": "LoginEnterPassword", - "enter_password": {"password": acc.password, "link": "next_link"}, + "enter_password": {"password": ctx.acc.password, "link": "next_link"}, } ], } - rep = await client.post(LOGIN_URL, json=payload) + rep = await ctx.client.post(LOGIN_URL, json=payload) raise_for_status(rep, "login_password") return rep -async def login_duplication_check(client: AsyncClient, acc: Account, prev: dict) -> Response: +async def login_duplication_check(ctx: TaskCtx) -> Response: payload = { - "flow_token": prev["flow_token"], + "flow_token": ctx.prev["flow_token"], "subtask_inputs": [ { "subtask_id": "AccountDuplicationCheck", @@ -96,36 +114,41 @@ async def login_duplication_check(client: AsyncClient, acc: Account, prev: dict) ], } - rep = await client.post(LOGIN_URL, json=payload) + rep = await ctx.client.post(LOGIN_URL, json=payload) raise_for_status(rep, "login_duplication_check") return rep -async def login_confirm_email(client: AsyncClient, acc: Account, prev: dict, imap) -> Response: +async def login_confirm_email(ctx: TaskCtx) -> Response: payload = { - "flow_token": prev["flow_token"], + "flow_token": ctx.prev["flow_token"], "subtask_inputs": [ { "subtask_id": "LoginAcid", - "enter_text": {"text": acc.email, "link": "next_link"}, + "enter_text": {"text": ctx.acc.email, "link": "next_link"}, } ], } - rep = await client.post(LOGIN_URL, json=payload) + rep = await ctx.client.post(LOGIN_URL, json=payload) raise_for_status(rep, "login_confirm_email") return rep -async def login_confirm_email_code(client: AsyncClient, acc: Account, prev: dict, imap): - if not imap: - imap = await imap_login(acc.email, acc.email_password) +async def login_confirm_email_code(ctx: TaskCtx): + if ctx.cfg.manual: + print(f"Enter email code for {ctx.acc.username} / {ctx.acc.email}") + value = input("Code: ") + value = value.strip() + else: + if not ctx.imap: + ctx.imap = await imap_login(ctx.acc.email, ctx.acc.email_password) - now_time = utc.now() - timedelta(seconds=30) - value = await imap_get_email_code(imap, acc.email, now_time) + now_time = utc.now() - timedelta(seconds=30) + value = await imap_get_email_code(ctx.imap, ctx.acc.email, now_time) payload = { - "flow_token": prev["flow_token"], + "flow_token": ctx.prev["flow_token"], "subtask_inputs": [ { "subtask_id": "LoginAcid", @@ -134,64 +157,64 @@ async def login_confirm_email_code(client: AsyncClient, acc: Account, prev: dict ], } - rep = await client.post(LOGIN_URL, json=payload) + rep = await ctx.client.post(LOGIN_URL, json=payload) raise_for_status(rep, "login_confirm_email") return rep -async def login_success(client: AsyncClient, acc: Account, prev: dict) -> Response: +async def login_success(ctx: TaskCtx) -> Response: payload = { - "flow_token": prev["flow_token"], + "flow_token": ctx.prev["flow_token"], "subtask_inputs": [], } - rep = await client.post(LOGIN_URL, json=payload) + rep = await ctx.client.post(LOGIN_URL, json=payload) raise_for_status(rep, "login_success") return rep -async def next_login_task(client: AsyncClient, acc: Account, rep: Response, imap): - ct0 = client.cookies.get("ct0", None) +async def next_login_task(ctx: TaskCtx, rep: Response): + ct0 = ctx.client.cookies.get("ct0", None) if ct0: - client.headers["x-csrf-token"] = ct0 - client.headers["x-twitter-auth-type"] = "OAuth2Session" + ctx.client.headers["x-csrf-token"] = ct0 + ctx.client.headers["x-twitter-auth-type"] = "OAuth2Session" - prev = rep.json() - assert "flow_token" in prev, f"flow_token not in {rep.text}" + ctx.prev = rep.json() + assert "flow_token" in ctx.prev, f"flow_token not in {rep.text}" - for x in prev["subtasks"]: + for x in ctx.prev["subtasks"]: task_id = x["subtask_id"] try: if task_id == "LoginSuccessSubtask": - return await login_success(client, acc, prev) + return await login_success(ctx) if task_id == "LoginAcid": is_code = x["enter_text"]["hint_text"].lower() == "confirmation code" fn = login_confirm_email_code if is_code else login_confirm_email - return await fn(client, acc, prev, imap) + return await fn(ctx) if task_id == "AccountDuplicationCheck": - return await login_duplication_check(client, acc, prev) + return await login_duplication_check(ctx) if task_id == "LoginEnterPassword": - return await login_enter_password(client, acc, prev) + return await login_enter_password(ctx) if task_id == "LoginEnterUserIdentifierSSO": - return await login_enter_username(client, acc, prev) + return await login_enter_username(ctx) if task_id == "LoginJsInstrumentationSubtask": - return await login_instrumentation(client, acc, prev) + return await login_instrumentation(ctx) except Exception as e: - acc.error_msg = f"login_step={task_id} err={e}" + ctx.acc.error_msg = f"login_step={task_id} err={e}" raise e return None -async def login(acc: Account, email_first=False) -> Account: +async def login(acc: Account, cfg: LoginConfig | None = None) -> Account: log_id = f"{acc.username} - {acc.email}" if acc.active: logger.info(f"account already active {log_id}") return acc - imap = None - if email_first: + cfg, imap = cfg or LoginConfig(), None + if cfg.email_first and not cfg.manual: imap = await imap_login(acc.email, acc.email_password) client = acc.make_client() @@ -199,12 +222,13 @@ async def login(acc: Account, email_first=False) -> Account: client.headers["x-guest-token"] = guest_token rep = await login_initiate(client) + ctx = TaskCtx(client, acc, cfg, None, imap) while True: if not rep: break try: - rep = await next_login_task(client, acc, rep, imap) + rep = await next_login_task(ctx, rep) except HTTPStatusError as e: if e.response.status_code == 403: logger.error(f"403 error {log_id}")