зеркало из
https://github.com/viginum-datalab/twscrape.git
synced 2025-10-29 13:06:13 +02:00
debugging login flow; add retry on getting verification code from email
Этот коммит содержится в:
родитель
e19daada6a
Коммит
36b83ecee5
@ -2,33 +2,47 @@ import email as emaillib
|
||||
import imaplib
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
|
||||
from fake_useragent import UserAgent
|
||||
from httpx import Client, HTTPStatusError, Response
|
||||
from loguru import logger
|
||||
|
||||
TOKEN = "Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA"
|
||||
TASK_URL = "https://api.twitter.com/1.1/onboarding/task.json"
|
||||
|
||||
|
||||
def search_email_with_confirmation_code(imap: imaplib.IMAP4_SSL, msg_count: int) -> str | None:
|
||||
for i in range(msg_count, 0, -1):
|
||||
_, rep = imap.fetch(str(i), "(RFC822)")
|
||||
for x in rep:
|
||||
if isinstance(x, tuple):
|
||||
msg = emaillib.message_from_bytes(x[1])
|
||||
if "info@twitter.com" in msg.get("From", ""):
|
||||
# eg. Your Twitter confirmation code is XXX
|
||||
subject = str(msg.get("Subject", ""))
|
||||
return subject.split(" ")[-1].strip()
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def get_verification_code(email: str, password: str, imap_domain: None | str = None) -> str | None:
|
||||
imap_domain = f"imap.{email.split('@')[1]}" if imap_domain is None else imap_domain
|
||||
|
||||
with imaplib.IMAP4_SSL(imap_domain) as imap:
|
||||
imap.login(email, password)
|
||||
_, rep = imap.select("INBOX")
|
||||
|
||||
msg_count = int(rep[0].decode("utf-8")) if len(rep) > 0 and rep[0] is not None else 0
|
||||
for i in range(msg_count, 0, -1):
|
||||
_, rep = imap.fetch(str(i), "(RFC822)")
|
||||
for x in rep:
|
||||
if isinstance(x, tuple):
|
||||
msg = emaillib.message_from_bytes(x[1])
|
||||
if "info@twitter.com" in msg.get("From", ""):
|
||||
# eg. Your Twitter confirmation code is XXX
|
||||
subject = str(msg.get("Subject", ""))
|
||||
return subject.split(" ")[-1].strip()
|
||||
before_count = 0
|
||||
while True:
|
||||
_, rep = imap.select("INBOX")
|
||||
msg_count = int(rep[0].decode("utf-8")) if len(rep) > 0 and rep[0] is not None else 0
|
||||
if msg_count > before_count:
|
||||
code = search_email_with_confirmation_code(imap, msg_count)
|
||||
if code is not None:
|
||||
return code
|
||||
|
||||
return None
|
||||
logger.debug(f"Waiting for confirmation email... {msg_count}")
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def raise_for_status(rep: Response, label: str):
|
||||
@ -130,6 +144,7 @@ def login_duplication_check(client: Client, flow_token: str) -> Response:
|
||||
|
||||
|
||||
def get_confirm_email_code(task: dict, email: str, email_password: str) -> str:
|
||||
logger.debug(f"task: {json.dumps(task)}")
|
||||
is_code = task["enter_text"]["hint_text"].lower() == "confirmation code"
|
||||
value = get_verification_code(email, email_password) if is_code else email
|
||||
assert value is not None, "Could not get verification code"
|
||||
@ -191,13 +206,24 @@ class UserClient:
|
||||
except (FileNotFoundError, json.JSONDecodeError):
|
||||
return False
|
||||
|
||||
def print_session(self):
|
||||
for x in self.client.headers.items():
|
||||
print(x)
|
||||
|
||||
print()
|
||||
for x in self.client.cookies.items():
|
||||
print(x)
|
||||
|
||||
def _next_login_task(self, rep: Response):
|
||||
client = self.client
|
||||
data = rep.json()
|
||||
|
||||
# print("-" * 20)
|
||||
# print([x["subtask_id"] for x in data["subtasks"]])
|
||||
# print(rep.text)
|
||||
ct0 = client.cookies.get("ct0", None)
|
||||
if ct0:
|
||||
client.headers["x-csrf-token"] = ct0
|
||||
client.headers["x-twitter-auth-type"] = "OAuth2Session"
|
||||
|
||||
data = rep.json()
|
||||
logger.debug(f"login tasks: {[x['subtask_id'] for x in data['subtasks']]}")
|
||||
|
||||
flow_token = data["flow_token"]
|
||||
for x in data["subtasks"]:
|
||||
@ -221,19 +247,17 @@ class UserClient:
|
||||
|
||||
def login(self):
|
||||
if self.restore():
|
||||
print(f"session restored for {self.username}")
|
||||
logger.debug(f"session restored for {self.username}")
|
||||
return
|
||||
|
||||
self.client.headers["user-agent"] = UserAgent().safari
|
||||
self.client.headers["content-type"] = "application/json"
|
||||
self.client.headers["authorization"] = TOKEN
|
||||
self.client.headers["x-twitter-active-user"] = "yes"
|
||||
self.client.headers["x-twitter-client-language"] = "en"
|
||||
|
||||
guest_token = login_get_guest_token(self.client)
|
||||
headers = {
|
||||
"authorization": TOKEN,
|
||||
"user-agent": UserAgent().safari,
|
||||
"x-twitter-active-user": "yes",
|
||||
"x-twitter-client-language": "en",
|
||||
"x-guest-token": guest_token,
|
||||
"content-type": "application/json",
|
||||
}
|
||||
self.client.headers.update(headers)
|
||||
self.client.headers["x-guest-token"] = guest_token
|
||||
|
||||
rep = login_initiate(self.client)
|
||||
while True:
|
||||
@ -245,4 +269,4 @@ class UserClient:
|
||||
self.client.headers["x-twitter-auth-type"] = "OAuth2Session"
|
||||
self.save()
|
||||
|
||||
print(f"login success for {self.username}")
|
||||
logger.debug(f"login success for {self.username}")
|
||||
|
||||
Загрузка…
x
Ссылка в новой задаче
Block a user