diff --git a/twapi/client.py b/twapi/client.py index 0f8b5b8..17f3663 100644 --- a/twapi/client.py +++ b/twapi/client.py @@ -2,33 +2,47 @@ import email as emaillib import imaplib import json import os +import time from fake_useragent import UserAgent from httpx import Client, HTTPStatusError, Response +from loguru import logger TOKEN = "Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA" TASK_URL = "https://api.twitter.com/1.1/onboarding/task.json" +def search_email_with_confirmation_code(imap: imaplib.IMAP4_SSL, msg_count: int) -> str | None: + for i in range(msg_count, 0, -1): + _, rep = imap.fetch(str(i), "(RFC822)") + for x in rep: + if isinstance(x, tuple): + msg = emaillib.message_from_bytes(x[1]) + if "info@twitter.com" in msg.get("From", ""): + # eg. Your Twitter confirmation code is XXX + subject = str(msg.get("Subject", "")) + return subject.split(" ")[-1].strip() + + return None + + def get_verification_code(email: str, password: str, imap_domain: None | str = None) -> str | None: imap_domain = f"imap.{email.split('@')[1]}" if imap_domain is None else imap_domain with imaplib.IMAP4_SSL(imap_domain) as imap: imap.login(email, password) - _, rep = imap.select("INBOX") - msg_count = int(rep[0].decode("utf-8")) if len(rep) > 0 and rep[0] is not None else 0 - for i in range(msg_count, 0, -1): - _, rep = imap.fetch(str(i), "(RFC822)") - for x in rep: - if isinstance(x, tuple): - msg = emaillib.message_from_bytes(x[1]) - if "info@twitter.com" in msg.get("From", ""): - # eg. Your Twitter confirmation code is XXX - subject = str(msg.get("Subject", "")) - return subject.split(" ")[-1].strip() + before_count = 0 + while True: + _, rep = imap.select("INBOX") + msg_count = int(rep[0].decode("utf-8")) if len(rep) > 0 and rep[0] is not None else 0 + if msg_count > before_count: + code = search_email_with_confirmation_code(imap, msg_count) + if code is not None: + return code - return None + logger.debug(f"Waiting for confirmation email... {msg_count}") + time.sleep(1) def raise_for_status(rep: Response, label: str): @@ -130,6 +144,7 @@ def login_duplication_check(client: Client, flow_token: str) -> Response: def get_confirm_email_code(task: dict, email: str, email_password: str) -> str: + logger.debug(f"task: {json.dumps(task)}") is_code = task["enter_text"]["hint_text"].lower() == "confirmation code" value = get_verification_code(email, email_password) if is_code else email assert value is not None, "Could not get verification code" @@ -191,13 +206,24 @@ class UserClient: except (FileNotFoundError, json.JSONDecodeError): return False + def print_session(self): + for x in self.client.headers.items(): + print(x) + + print() + for x in self.client.cookies.items(): + print(x) + def _next_login_task(self, rep: Response): client = self.client - data = rep.json() - # print("-" * 20) - # print([x["subtask_id"] for x in data["subtasks"]]) - # print(rep.text) + ct0 = client.cookies.get("ct0", None) + if ct0: + client.headers["x-csrf-token"] = ct0 + client.headers["x-twitter-auth-type"] = "OAuth2Session" + + data = rep.json() + logger.debug(f"login tasks: {[x['subtask_id'] for x in data['subtasks']]}") flow_token = data["flow_token"] for x in data["subtasks"]: @@ -221,19 +247,17 @@ class UserClient: def login(self): if self.restore(): - print(f"session restored for {self.username}") + logger.debug(f"session restored for {self.username}") return + self.client.headers["user-agent"] = UserAgent().safari + self.client.headers["content-type"] = "application/json" + self.client.headers["authorization"] = TOKEN + self.client.headers["x-twitter-active-user"] = "yes" + self.client.headers["x-twitter-client-language"] = "en" + guest_token = login_get_guest_token(self.client) - headers = { - "authorization": TOKEN, - "user-agent": UserAgent().safari, - "x-twitter-active-user": "yes", - "x-twitter-client-language": "en", - "x-guest-token": guest_token, - "content-type": "application/json", - } - self.client.headers.update(headers) + self.client.headers["x-guest-token"] = guest_token rep = login_initiate(self.client) while True: @@ -245,4 +269,4 @@ class UserClient: self.client.headers["x-twitter-auth-type"] = "OAuth2Session" self.save() - print(f"login success for {self.username}") + logger.debug(f"login success for {self.username}")