diff --git a/.gitignore b/.gitignore index 52c8f85..4553f4b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ .DS_Store .ruff_cache/ -sessions/ +accounts/ +results-raw/ +results/ # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/twapi/client.py b/twapi/client.py index ebe85a2..7582e20 100644 --- a/twapi/client.py +++ b/twapi/client.py @@ -1,223 +1,100 @@ -import email as emaillib -import imaplib import json import os -import time -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone +from enum import Enum from fake_useragent import UserAgent -from httpx import AsyncClient, Client, Response +from httpx import AsyncClient, HTTPStatusError, Response from loguru import logger +from .imap import get_email_code from .utils import raise_for_status TOKEN = "Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA" TASK_URL = "https://api.twitter.com/1.1/onboarding/task.json" -def search_email_with_confirmation_code(imap: imaplib.IMAP4_SSL, msg_count: int) -> str | None: - for i in range(msg_count, 0, -1): - _, rep = imap.fetch(str(i), "(RFC822)") - for x in rep: - if isinstance(x, tuple): - msg = emaillib.message_from_bytes(x[1]) - if "info@twitter.com" in msg.get("From", ""): - # eg. Your Twitter confirmation code is XXX - subject = str(msg.get("Subject", "")) - return subject.split(" ")[-1].strip() - - return None +class Status(str, Enum): + NEW = "new" + ACTIVE = "active" + LOGIN_ERROR = "login_error" -def get_verification_code(email: str, password: str, imap_domain: None | str = None) -> str | None: - imap_domain = f"imap.{email.split('@')[1]}" if imap_domain is None else imap_domain +class Account: + BASE_DIR = "accounts" - with imaplib.IMAP4_SSL(imap_domain) as imap: - imap.login(email, password) + @classmethod + def load(cls, filepath: str): + try: + with open(filepath) as f: + data = json.load(f) + return cls(**data) + except Exception as e: + logger.error(f"Failed to load account {filepath}: {e}") + return None - before_count = 0 - while True: - _, rep = imap.select("INBOX") - msg_count = int(rep[0].decode("utf-8")) if len(rep) > 0 and rep[0] is not None else 0 - if msg_count > before_count: - code = search_email_with_confirmation_code(imap, msg_count) - if code is not None: - return code - - logger.debug(f"Waiting for confirmation email... {msg_count}") - time.sleep(1) - - -def login_get_guest_token(client: Client) -> str: - rep = client.post("https://api.twitter.com/1.1/guest/activate.json") - raise_for_status(rep, "guest_token") - return rep.json()["guest_token"] - - -def login_initiate(client: Client) -> Response: - payload = { - "input_flow_data": { - "flow_context": {"debug_overrides": {}, "start_location": {"location": "unknown"}} - }, - "subtask_versions": {}, - } - - rep = client.post(TASK_URL, params={"flow_name": "login"}, json=payload) - raise_for_status(rep, "login_initiate") - return rep - - -def login_instrumentation(client, flow_token: str) -> Response: - payload = { - "flow_token": flow_token, - "subtask_inputs": [ - { - "subtask_id": "LoginJsInstrumentationSubtask", - "js_instrumentation": {"response": "{}", "link": "next_link"}, - } - ], - } - - rep = client.post(TASK_URL, json=payload) - raise_for_status(rep, "login_instrumentation") - return rep - - -def login_enter_username(client: Client, flow_token: str, username: str) -> Response: - payload = { - "flow_token": flow_token, - "subtask_inputs": [ - { - "subtask_id": "LoginEnterUserIdentifierSSO", - "settings_list": { - "setting_responses": [ - { - "key": "user_identifier", - "response_data": {"text_data": {"result": username}}, - } - ], - "link": "next_link", - }, - } - ], - } - - rep = client.post(TASK_URL, json=payload) - raise_for_status(rep, "login_username") - return rep - - -def login_enter_password(client: Client, flow_token: str, password: str) -> Response: - payload = { - "flow_token": flow_token, - "subtask_inputs": [ - { - "subtask_id": "LoginEnterPassword", - "enter_password": {"password": password, "link": "next_link"}, - } - ], - } - - rep = client.post(TASK_URL, json=payload) - raise_for_status(rep, "login_password") - return rep - - -def login_duplication_check(client: Client, flow_token: str) -> Response: - payload = { - "flow_token": flow_token, - "subtask_inputs": [ - { - "subtask_id": "AccountDuplicationCheck", - "check_logged_in_account": {"link": "AccountDuplicationCheck_false"}, - } - ], - } - - rep = client.post(TASK_URL, json=payload) - raise_for_status(rep, "login_duplication_check") - return rep - - -def get_confirm_email_code(task: dict, email: str, email_password: str) -> str: - logger.debug(f"task: {json.dumps(task)}") - is_code = task["enter_text"]["hint_text"].lower() == "confirmation code" - value = get_verification_code(email, email_password) if is_code else email - assert value is not None, "Could not get verification code" - return value - - -def login_confirm_email(client: Client, flow_token: str, value: str) -> Response: - payload = { - "flow_token": flow_token, - "subtask_inputs": [ - { - "subtask_id": "LoginAcid", - "enter_text": {"text": value, "link": "next_link"}, - } - ], - } - - rep = client.post(TASK_URL, json=payload) - raise_for_status(rep, "login_confirm_email") - return rep - - -def login_success(client: Client, flow_token: str) -> Response: - payload = { - "flow_token": flow_token, - "subtask_inputs": [], - } - - rep = client.post(TASK_URL, json=payload) - raise_for_status(rep, "login_success") - return rep - - -class UserClient: - def __init__(self, username: str, password: str, email: str, email_password: str): + def __init__( + self, + username: str, + password: str, + email: str, + email_password: str, + user_agent: str | None = None, + proxy: str | None = None, + headers={}, + cookies={}, + limits={}, + status=Status.NEW, + ): self.username = username self.password = password self.email = email self.email_password = email_password - self.client = Client() - self.session_path = f"sessions/{self.username}.session.json" + self.user_agent = user_agent or UserAgent().safari + self.proxy = proxy or None - dirname = os.path.dirname(self.session_path) - os.makedirs(dirname, exist_ok=True) + self.client = AsyncClient(proxies=self.proxy) + self.client.cookies.update(cookies) + self.client.headers.update(headers) - self.limits: dict[str, datetime] = {} + self.client.headers["user-agent"] = self.user_agent + self.client.headers["content-type"] = "application/json" + self.client.headers["authorization"] = TOKEN + self.client.headers["x-twitter-active-user"] = "yes" + self.client.headers["x-twitter-client-language"] = "en" + + self.status: Status = status self.locked: dict[str, bool] = {} + self.limits: dict[str, datetime] = { + k: datetime.fromisoformat(v) for k, v in limits.items() + } + + def dump(self): + return { + "username": self.username, + "password": self.password, + "email": self.email, + "email_password": self.email_password, + "user_agent": self.user_agent, + "proxy": self.proxy, + "cookies": {k: v for k, v in self.client.cookies.items()}, + "headers": {k: v for k, v in self.client.headers.items()}, + "limits": {k: v.isoformat() for k, v in self.limits.items()}, + "status": self.status, + } def save(self): - cookies = dict(self.client.cookies) - headers = dict(self.client.headers) - limits = dict(self.limits) + os.makedirs(self.BASE_DIR, exist_ok=True) + data = self.dump() + with open(f"{self.BASE_DIR}/{self.username}.json", "w") as f: + json.dump(data, f, indent=2) - with open(self.session_path, "w") as fp: - json.dump({"cookies": cookies, "headers": headers, "limits": limits}, fp, indent=2) - - def restore(self): - try: - with open(self.session_path) as fp: - data = json.load(fp) - self.client.cookies.update(data.get("cookies", {})) - self.client.headers.update(data.get("headers", {})) - self.limits.update( - {k: datetime.fromisoformat(v) for k, v in data.get("limits", {}).items()} - ) - return True - except (FileNotFoundError, json.JSONDecodeError): - return False - - def make_client(self) -> AsyncClient: - client = AsyncClient() - client.headers.update(self.client.headers) - client.cookies.update(self.client.cookies) - return client + def update_limit(self, queue: str, reset_ts: int): + self.limits[queue] = datetime.fromtimestamp(reset_ts, tz=timezone.utc) + self.save() def can_use(self, queue: str): - if self.locked.get(queue, False): + if self.locked.get(queue, False) or self.status != Status.ACTIVE: return False limit = self.limits.get(queue) @@ -229,73 +106,197 @@ class UserClient: def unlock(self, queue: str): self.locked[queue] = False - def update_limit(self, queue: str, rep: Response): - reset = rep.headers.get("x-rate-limit-reset", 0) - reset = datetime.fromtimestamp(int(reset), tz=timezone.utc) - self.limits[queue] = reset - self.save() - - def print_session(self): - for x in self.client.headers.items(): - print(x) - - print() - for x in self.client.cookies.items(): - print(x) - - def _next_login_task(self, rep: Response): - client = self.client - - ct0 = client.cookies.get("ct0", None) - if ct0: - client.headers["x-csrf-token"] = ct0 - client.headers["x-twitter-auth-type"] = "OAuth2Session" - - data = rep.json() - logger.debug(f"login tasks: {[x['subtask_id'] for x in data['subtasks']]}") - - flow_token = data["flow_token"] - for x in data["subtasks"]: - task_id = x["subtask_id"] - - if task_id == "LoginSuccessSubtask": - return login_success(client, flow_token) - if task_id == "LoginAcid": - value = get_confirm_email_code(x, self.email, self.email_password) - return login_confirm_email(client, flow_token, value) - if task_id == "AccountDuplicationCheck": - return login_duplication_check(client, flow_token) - if task_id == "LoginEnterPassword": - return login_enter_password(client, flow_token, self.password) - if task_id == "LoginEnterUserIdentifierSSO": - return login_enter_username(client, flow_token, self.username) - if task_id == "LoginJsInstrumentationSubtask": - return login_instrumentation(client, flow_token) - - return None - - def login(self): - if self.restore(): - logger.debug(f"session restored for {self.username}") + async def login(self, fresh=False): + log_id = f"{self.username} - {self.email}" + if self.status != "new" and not fresh: + logger.debug(f"logged in already {log_id}") return - self.client.headers["user-agent"] = UserAgent().safari - self.client.headers["content-type"] = "application/json" - self.client.headers["authorization"] = TOKEN - self.client.headers["x-twitter-active-user"] = "yes" - self.client.headers["x-twitter-client-language"] = "en" + logger.debug(f"logging in {log_id}") - guest_token = login_get_guest_token(self.client) + guest_token = await self.get_guest_token() self.client.headers["x-guest-token"] = guest_token - rep = login_initiate(self.client) + rep = await self.login_initiate() while True: - rep = self._next_login_task(rep) - if rep is None: - break + try: + if rep: + rep = await self.next_login_task(rep) + else: + break + except HTTPStatusError as e: + if e.response.status_code == 403: + logger.error(f"403 error {log_id}") + self.status = Status.LOGIN_ERROR + self.save() + return self.client.headers["x-csrf-token"] = self.client.cookies["ct0"] self.client.headers["x-twitter-auth-type"] = "OAuth2Session" + + logger.info(f"logged in success {log_id}") + self.status = Status.ACTIVE self.save() - logger.debug(f"login success for {self.username}") + async def get_guest_token(self): + rep = await self.client.post("https://api.twitter.com/1.1/guest/activate.json") + raise_for_status(rep, "guest_token") + return rep.json()["guest_token"] + + async def login_initiate(self) -> Response: + payload = { + "input_flow_data": { + "flow_context": {"debug_overrides": {}, "start_location": {"location": "unknown"}} + }, + "subtask_versions": {}, + } + + rep = await self.client.post(TASK_URL, params={"flow_name": "login"}, json=payload) + raise_for_status(rep, "login_initiate") + return rep + + async def next_login_task(self, rep: Response): + ct0 = self.client.cookies.get("ct0", None) + if ct0: + self.client.headers["x-csrf-token"] = ct0 + self.client.headers["x-twitter-auth-type"] = "OAuth2Session" + + data = rep.json() + assert "flow_token" in data, f"flow_token not in {rep.text}" + # logger.debug(f"login tasks: {[x['subtask_id'] for x in data['subtasks']]}") + + for x in data["subtasks"]: + task_id = x["subtask_id"] + + try: + if task_id == "LoginSuccessSubtask": + return await self.login_success(data) + if task_id == "LoginAcid": + is_code = x["enter_text"]["hint_text"].lower() == "confirmation code" + # logger.debug(f"is login code: {is_code}") + fn = self.login_confirm_email_code if is_code else self.login_confirm_email + return await fn(data) + if task_id == "AccountDuplicationCheck": + return await self.login_duplication_check(data) + if task_id == "LoginEnterPassword": + return await self.login_enter_password(data) + if task_id == "LoginEnterUserIdentifierSSO": + return await self.login_enter_username(data) + if task_id == "LoginJsInstrumentationSubtask": + return await self.login_instrumentation(data) + except Exception as e: + logger.error(f"Error in {task_id}: {e}") + logger.error(e) + + return None + + async def login_instrumentation(self, prev: dict) -> Response: + payload = { + "flow_token": prev["flow_token"], + "subtask_inputs": [ + { + "subtask_id": "LoginJsInstrumentationSubtask", + "js_instrumentation": {"response": "{}", "link": "next_link"}, + } + ], + } + + rep = await self.client.post(TASK_URL, json=payload) + raise_for_status(rep, "login_instrumentation") + return rep + + async def login_enter_username(self, prev: dict) -> Response: + payload = { + "flow_token": prev["flow_token"], + "subtask_inputs": [ + { + "subtask_id": "LoginEnterUserIdentifierSSO", + "settings_list": { + "setting_responses": [ + { + "key": "user_identifier", + "response_data": {"text_data": {"result": self.username}}, + } + ], + "link": "next_link", + }, + } + ], + } + + rep = await self.client.post(TASK_URL, json=payload) + raise_for_status(rep, "login_username") + return rep + + async def login_enter_password(self, prev: dict) -> Response: + payload = { + "flow_token": prev["flow_token"], + "subtask_inputs": [ + { + "subtask_id": "LoginEnterPassword", + "enter_password": {"password": self.password, "link": "next_link"}, + } + ], + } + + rep = await self.client.post(TASK_URL, json=payload) + raise_for_status(rep, "login_password") + return rep + + async def login_duplication_check(self, prev: dict) -> Response: + payload = { + "flow_token": prev["flow_token"], + "subtask_inputs": [ + { + "subtask_id": "AccountDuplicationCheck", + "check_logged_in_account": {"link": "AccountDuplicationCheck_false"}, + } + ], + } + + rep = await self.client.post(TASK_URL, json=payload) + raise_for_status(rep, "login_duplication_check") + return rep + + async def login_confirm_email(self, prev: dict) -> Response: + payload = { + "flow_token": prev["flow_token"], + "subtask_inputs": [ + { + "subtask_id": "LoginAcid", + "enter_text": {"text": self.email, "link": "next_link"}, + } + ], + } + + rep = await self.client.post(TASK_URL, json=payload) + raise_for_status(rep, "login_confirm_email") + return rep + + async def login_confirm_email_code(self, prev: dict): + now_time = datetime.now(timezone.utc) - timedelta(seconds=30) + value = await get_email_code(self.email, self.email_password, now_time) + + payload = { + "flow_token": prev["flow_token"], + "subtask_inputs": [ + { + "subtask_id": "LoginAcid", + "enter_text": {"text": value, "link": "next_link"}, + } + ], + } + + rep = await self.client.post(TASK_URL, json=payload) + raise_for_status(rep, "login_confirm_email") + return rep + + async def login_success(self, prev: dict) -> Response: + payload = { + "flow_token": prev["flow_token"], + "subtask_inputs": [], + } + + rep = await self.client.post(TASK_URL, json=payload) + raise_for_status(rep, "login_success") + return rep diff --git a/twapi/imap.py b/twapi/imap.py new file mode 100644 index 0000000..39236f4 --- /dev/null +++ b/twapi/imap.py @@ -0,0 +1,50 @@ +import asyncio +import email as emaillib +import imaplib +from datetime import datetime + +from loguru import logger + + +def get_imap_domain(email: str) -> str: + return f"imap.{email.split('@')[1]}" + + +def search_email_code(imap: imaplib.IMAP4_SSL, count: int, min_t: datetime | None) -> str | None: + for i in range(count, 0, -1): + _, rep = imap.fetch(str(i), "(RFC822)") + for x in rep: + if isinstance(x, tuple): + msg = emaillib.message_from_bytes(x[1]) + + msg_time = datetime.strptime(msg.get("Date", ""), "%a, %d %b %Y %H:%M:%S %z") + msg_from = str(msg.get("From", "")).lower() + msg_subj = str(msg.get("Subject", "")).lower() + logger.debug(f"({i} of {count}) {msg_from} - {msg_time} - {msg_subj}") + + if min_t is not None and msg_time < min_t: + return None + + if "info@twitter.com" in msg_from and "confirmation code is" in msg_subj: + # eg. Your Twitter confirmation code is XXX + return msg_subj.split(" ")[-1].strip() + + return None + + +async def get_email_code(email: str, password: str, min_t: datetime | None = None) -> str: + domain = get_imap_domain(email) + with imaplib.IMAP4_SSL(domain) as imap: + imap.login(email, password) + + was_count = 0 + while True: + _, rep = imap.select("INBOX") + now_count = int(rep[0].decode("utf-8")) if len(rep) > 0 and rep[0] is not None else 0 + if now_count > was_count: + code = search_email_code(imap, now_count, min_t) + if code is not None: + return code + + logger.debug(f"Waiting for confirmation code for {email}, msg_count: {now_count}") + await asyncio.sleep(5) diff --git a/twapi/pool.py b/twapi/pool.py index 82f133d..31be1fe 100644 --- a/twapi/pool.py +++ b/twapi/pool.py @@ -2,23 +2,23 @@ import asyncio from loguru import logger -from .client import UserClient +from .client import Account class AccountsPool: def __init__(self): - self.accounts: list[UserClient] = [] + self.accounts: list[Account] = [] - def add_account(self, account: UserClient): + def add_account(self, account: Account): self.accounts.append(account) - def get_account(self, queue: str) -> UserClient | None: + def get_account(self, queue: str) -> Account | None: for x in self.accounts: if x.can_use(queue): return x return None - async def get_account_or_wait(self, queue: str) -> UserClient: + async def get_account_or_wait(self, queue: str) -> Account: while True: account = self.get_account(queue) if account: diff --git a/twapi/search.py b/twapi/search.py index 9f532cd..9aff3dd 100644 --- a/twapi/search.py +++ b/twapi/search.py @@ -1,4 +1,5 @@ import json +from time import time from typing import Awaitable, Callable from httpx import AsyncClient, HTTPStatusError, Response @@ -118,21 +119,27 @@ class Search: async def _inf_req(self, queue: str, cb: Callable[[AsyncClient], Awaitable[Response]]): while True: account = await self.pool.get_account_or_wait(queue) - client = account.make_client() try: while True: - rep = await cb(client) + rep = await cb(account.client) rep.raise_for_status() yield rep except HTTPStatusError as e: if e.response.status_code == 429: logger.debug(f"Rate limit for account={account.username} on queue={queue}") - account.update_limit(queue, e.response) + reset_ts = int(e.response.headers.get("x-rate-limit-reset", 0)) + account.update_limit(queue, reset_ts) continue - else: - logger.error(f"[{e.response.status_code}] {e.request.url}\n{e.response.text}") - raise e + + if e.response.status_code == 403: + logger.debug(f"Account={account.username} is banned on queue={queue}") + reset_ts = int(time.time() + 60 * 60) # 1 hour + account.update_limit(queue, reset_ts) + continue + + logger.error(f"[{e.response.status_code}] {e.request.url}\n{e.response.text}") + raise e finally: account.unlock(queue)