diff --git a/twapi/client.py b/twapi/client.py index f3baf65..0f8b5b8 100644 --- a/twapi/client.py +++ b/twapi/client.py @@ -1,3 +1,5 @@ +import email as emaillib +import imaplib import json import os @@ -8,6 +10,27 @@ TOKEN = "Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Z TASK_URL = "https://api.twitter.com/1.1/onboarding/task.json" +def get_verification_code(email: str, password: str, imap_domain: None | str = None) -> str | None: + imap_domain = f"imap.{email.split('@')[1]}" if imap_domain is None else imap_domain + + with imaplib.IMAP4_SSL(imap_domain) as imap: + imap.login(email, password) + _, rep = imap.select("INBOX") + + msg_count = int(rep[0].decode("utf-8")) if len(rep) > 0 and rep[0] is not None else 0 + for i in range(msg_count, 0, -1): + _, rep = imap.fetch(str(i), "(RFC822)") + for x in rep: + if isinstance(x, tuple): + msg = emaillib.message_from_bytes(x[1]) + if "info@twitter.com" in msg.get("From", ""): + # eg. Your Twitter confirmation code is XXX + subject = str(msg.get("Subject", "")) + return subject.split(" ")[-1].strip() + + return None + + def raise_for_status(rep: Response, label: str): try: rep.raise_for_status() @@ -106,13 +129,20 @@ def login_duplication_check(client: Client, flow_token: str) -> Response: return rep -def login_confirm_email(client: Client, flow_token: str, email: str) -> Response: +def get_confirm_email_code(task: dict, email: str, email_password: str) -> str: + is_code = task["enter_text"]["hint_text"].lower() == "confirmation code" + value = get_verification_code(email, email_password) if is_code else email + assert value is not None, "Could not get verification code" + return value + + +def login_confirm_email(client: Client, flow_token: str, value: str) -> Response: payload = { "flow_token": flow_token, "subtask_inputs": [ { "subtask_id": "LoginAcid", - "enter_text": {"text": email, "link": "next_link"}, + "enter_text": {"text": value, "link": "next_link"}, } ], } @@ -134,10 +164,11 @@ def login_success(client: Client, flow_token: str) -> Response: class UserClient: - def __init__(self, username: str, password: str, email: str): + def __init__(self, username: str, password: str, email: str, email_password: str): self.username = username self.password = password self.email = email + self.email_password = email_password self.client = Client() self.session_path = f"sessions/{self.username}.session.json" @@ -175,7 +206,8 @@ class UserClient: if task_id == "LoginSuccessSubtask": return login_success(client, flow_token) if task_id == "LoginAcid": - return login_confirm_email(client, flow_token, self.email) + value = get_confirm_email_code(x, self.email, self.email_password) + return login_confirm_email(client, flow_token, value) if task_id == "AccountDuplicationCheck": return login_duplication_check(client, flow_token) if task_id == "LoginEnterPassword":