move constants to single file; improve logging

Этот коммит содержится в:
Vlad Pronsky 2023-04-29 22:05:13 +03:00
родитель eadf053f12
Коммит 410a1fb9a4
4 изменённых файлов: 96 добавлений и 105 удалений

Просмотреть файл

@ -7,12 +7,10 @@ from fake_useragent import UserAgent
from httpx import AsyncClient, HTTPStatusError, Response from httpx import AsyncClient, HTTPStatusError, Response
from loguru import logger from loguru import logger
from .constants import LOGIN_URL, TOKEN
from .imap import get_email_code from .imap import get_email_code
from .utils import raise_for_status from .utils import raise_for_status
TOKEN = "Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA"
TASK_URL = "https://api.twitter.com/1.1/onboarding/task.json"
class Status(str, Enum): class Status(str, Enum):
NEW = "new" NEW = "new"
@ -151,7 +149,7 @@ class Account:
"subtask_versions": {}, "subtask_versions": {},
} }
rep = await self.client.post(TASK_URL, params={"flow_name": "login"}, json=payload) rep = await self.client.post(LOGIN_URL, params={"flow_name": "login"}, json=payload)
raise_for_status(rep, "login_initiate") raise_for_status(rep, "login_initiate")
return rep return rep
@ -201,7 +199,7 @@ class Account:
], ],
} }
rep = await self.client.post(TASK_URL, json=payload) rep = await self.client.post(LOGIN_URL, json=payload)
raise_for_status(rep, "login_instrumentation") raise_for_status(rep, "login_instrumentation")
return rep return rep
@ -224,7 +222,7 @@ class Account:
], ],
} }
rep = await self.client.post(TASK_URL, json=payload) rep = await self.client.post(LOGIN_URL, json=payload)
raise_for_status(rep, "login_username") raise_for_status(rep, "login_username")
return rep return rep
@ -239,7 +237,7 @@ class Account:
], ],
} }
rep = await self.client.post(TASK_URL, json=payload) rep = await self.client.post(LOGIN_URL, json=payload)
raise_for_status(rep, "login_password") raise_for_status(rep, "login_password")
return rep return rep
@ -254,7 +252,7 @@ class Account:
], ],
} }
rep = await self.client.post(TASK_URL, json=payload) rep = await self.client.post(LOGIN_URL, json=payload)
raise_for_status(rep, "login_duplication_check") raise_for_status(rep, "login_duplication_check")
return rep return rep
@ -269,7 +267,7 @@ class Account:
], ],
} }
rep = await self.client.post(TASK_URL, json=payload) rep = await self.client.post(LOGIN_URL, json=payload)
raise_for_status(rep, "login_confirm_email") raise_for_status(rep, "login_confirm_email")
return rep return rep
@ -287,7 +285,7 @@ class Account:
], ],
} }
rep = await self.client.post(TASK_URL, json=payload) rep = await self.client.post(LOGIN_URL, json=payload)
raise_for_status(rep, "login_confirm_email") raise_for_status(rep, "login_confirm_email")
return rep return rep
@ -297,6 +295,6 @@ class Account:
"subtask_inputs": [], "subtask_inputs": [],
} }
rep = await self.client.post(TASK_URL, json=payload) rep = await self.client.post(LOGIN_URL, json=payload)
raise_for_status(rep, "login_success") raise_for_status(rep, "login_success")
return rep return rep

Просмотреть файл

@ -2,7 +2,7 @@ import asyncio
from loguru import logger from loguru import logger
from .client import Account from .account import Account
class AccountsPool: class AccountsPool:
@ -12,6 +12,12 @@ class AccountsPool:
def add_account(self, account: Account): def add_account(self, account: Account):
self.accounts.append(account) self.accounts.append(account)
def get_login_by_token(self, auth_token: str) -> str:
for x in self.accounts:
if x.client.cookies.get("auth_token") == auth_token:
return x.username
return "UNKNOWN"
def get_account(self, queue: str) -> Account | None: def get_account(self, queue: str) -> Account | None:
for x in self.accounts: for x in self.accounts:
if x.can_use(queue): if x.can_use(queue):

66
twapi/constants.py Обычный файл
Просмотреть файл

@ -0,0 +1,66 @@
TOKEN = "Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA" # noqa: E501
GQL_URL = "https://twitter.com/i/api/graphql"
LOGIN_URL = "https://api.twitter.com/1.1/onboarding/task.json"
SEARCH_URL = "https://api.twitter.com/2/search/adaptive.json"
GQL_FEATURES = {
"blue_business_profile_image_shape_enabled": True,
"responsive_web_graphql_exclude_directive_enabled": True,
"verified_phone_label_enabled": False,
"responsive_web_graphql_skip_user_profile_image_extensions_enabled": False,
"responsive_web_graphql_timeline_navigation_enabled": True,
"tweetypie_unmention_optimization_enabled": True,
"vibe_api_enabled": True,
"responsive_web_edit_tweet_api_enabled": True,
"graphql_is_translatable_rweb_tweet_is_translatable_enabled": True,
"view_counts_everywhere_api_enabled": True,
"longform_notetweets_consumption_enabled": True,
"tweet_awards_web_tipping_enabled": False,
"freedom_of_speech_not_reach_fetch_enabled": True,
"standardized_nudges_misinfo": True,
"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled": False,
"interactive_text_enabled": True,
"responsive_web_text_conversations_enabled": False,
"longform_notetweets_rich_text_read_enabled": True,
"responsive_web_enhance_cards_enabled": False,
}
SEARCH_PARAMS = {
"include_profile_interstitial_type": "1",
"include_blocking": "1",
"include_blocked_by": "1",
"include_followed_by": "1",
"include_want_retweets": "1",
"include_mute_edge": "1",
"include_can_dm": "1",
"include_can_media_tag": "1",
"include_ext_has_nft_avatar": "1",
"include_ext_is_blue_verified": "1",
"include_ext_verified_type": "1",
"include_ext_profile_image_shape": "1",
"skip_status": "1",
"cards_platform": "Web-12",
"include_cards": "1",
"include_ext_alt_text": "true",
"include_ext_limited_action_results": "false",
"include_quote_count": "true",
"include_reply_count": "1",
"tweet_mode": "extended",
"include_ext_views": "true",
"include_entities": "true",
"include_user_entities": "true",
"include_ext_media_color": "true",
"include_ext_media_availability": "true",
"include_ext_sensitive_media_warning": "true",
"include_ext_trusted_friends_metadata": "true",
"send_error_codes": "true",
"simple_quoted_tweet": "true",
"tweet_search_mode": "live",
"query_source": "typed_query",
"count": "20",
"pc": "1",
"spelling_corrections": "1",
"include_ext_edit_control": "true",
"ext": "mediaStats,highlightedLabel,hasNftAvatar,voiceInfo,birdwatchPivot,enrichments,superFollowMetadata,unmentionInfo,editControl,vibe", # noqa: E501
}

Просмотреть файл

@ -1,95 +1,14 @@
import json import time
from time import time
from typing import Awaitable, Callable from typing import Awaitable, Callable
from httpx import AsyncClient, HTTPStatusError, Response from httpx import AsyncClient, HTTPStatusError, Response
from loguru import logger from loguru import logger
from .accounts_pool import AccountsPool
from .constants import GQL_FEATURES, GQL_URL, SEARCH_PARAMS, SEARCH_URL
from .models import Tweet, User from .models import Tweet, User
from .pool import AccountsPool
from .utils import encode_params, find_item, to_old_obj, to_search_like from .utils import encode_params, find_item, to_old_obj, to_search_like
BASIC_SEARCH_PARAMS = """
include_profile_interstitial_type=1
include_blocking=1
include_blocked_by=1
include_followed_by=1
include_want_retweets=1
include_mute_edge=1
include_can_dm=1
include_can_media_tag=1
include_ext_has_nft_avatar=1
include_ext_is_blue_verified=1
include_ext_verified_type=1
include_ext_profile_image_shape=1
skip_status=1
cards_platform=Web-12
include_cards=1
include_ext_alt_text=true
include_ext_limited_action_results=false
include_quote_count=true
include_reply_count=1
tweet_mode=extended
include_ext_views=true
include_entities=true
include_user_entities=true
include_ext_media_color=true
include_ext_media_availability=true
include_ext_sensitive_media_warning=true
include_ext_trusted_friends_metadata=true
send_error_codes=true
simple_quoted_tweet=true
tweet_search_mode=live
query_source=recent_search_click
pc=1
spelling_corrections=1
include_ext_edit_control=true
ext=mediaStats%2ChighlightedLabel%2ChasNftAvatar%2CvoiceInfo%2CbirdwatchPivot%2Cenrichments%2CsuperFollowMetadata%2CunmentionInfo%2CeditControl%2Cvibe
"""
BASE_FEATURES = {
"blue_business_profile_image_shape_enabled": True,
"responsive_web_graphql_exclude_directive_enabled": True,
"verified_phone_label_enabled": False,
"responsive_web_graphql_skip_user_profile_image_extensions_enabled": False,
"responsive_web_graphql_timeline_navigation_enabled": True,
#
"tweetypie_unmention_optimization_enabled": True,
"vibe_api_enabled": True,
"responsive_web_edit_tweet_api_enabled": True,
"graphql_is_translatable_rweb_tweet_is_translatable_enabled": True,
"view_counts_everywhere_api_enabled": True,
"longform_notetweets_consumption_enabled": True,
"tweet_awards_web_tipping_enabled": False,
"freedom_of_speech_not_reach_fetch_enabled": True,
"standardized_nudges_misinfo": True,
"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled": False,
"interactive_text_enabled": True,
"responsive_web_text_conversations_enabled": False,
"longform_notetweets_rich_text_read_enabled": True,
"responsive_web_enhance_cards_enabled": False,
}
SEARCH_URL = "https://api.twitter.com/2/search/adaptive.json"
SEARCH_PARAMS = dict(x.split("=") for x in BASIC_SEARCH_PARAMS.splitlines() if x)
GRAPHQL_URL = "https://twitter.com/i/api/graphql/"
def filter_null(obj: dict):
try:
return {k: v for k, v in obj.items() if v is not None}
except AttributeError:
return obj
def json_params(obj: dict):
return {k: json.dumps(filter_null(v), separators=(",", ":")) for k, v in obj.items()}
def get_ql_entries(obj: dict) -> list[dict]:
entries = find_item(obj, "entries")
return entries or []
class Search: class Search:
def __init__(self, pool: AccountsPool): def __init__(self, pool: AccountsPool):
@ -100,7 +19,11 @@ class Search:
def _limit_msg(self, rep: Response): def _limit_msg(self, rep: Response):
lr = rep.headers.get("x-rate-limit-remaining", -1) lr = rep.headers.get("x-rate-limit-remaining", -1)
ll = rep.headers.get("x-rate-limit-limit", -1) ll = rep.headers.get("x-rate-limit-limit", -1)
return f"{lr}/{ll}"
auth_token = rep.request.headers["cookie"].split("auth_token=")[1].split(";")[0]
username = self.pool.get_login_by_token(auth_token)
return f"{username} {lr}/{ll}"
def _is_end(self, rep: Response, q: str, res: list, cur: str | None, cnt: int, lim: int): def _is_end(self, rep: Response, q: str, res: list, cur: str | None, cnt: int, lim: int):
new_count = len(res) new_count = len(res)
@ -170,12 +93,12 @@ class Search:
except Exception: except Exception:
return None return None
async def _ql_items(self, op: str, kv: dict, ft: dict = {}, limit=-1): async def _ql_items(self, op: str, kv: dict, limit=-1):
queue, cursor, count = op.split("/")[-1], None, 0 queue, cursor, count = op.split("/")[-1], None, 0
async def _get(client: AsyncClient): async def _get(client: AsyncClient):
params = {"variables": {**kv, "cursor": cursor}, "features": BASE_FEATURES} params = {"variables": {**kv, "cursor": cursor}, "features": GQL_FEATURES}
return await client.get(f"{GRAPHQL_URL}/{op}", params=encode_params(params)) return await client.get(f"{GQL_URL}/{op}", params=encode_params(params))
async for rep in self._inf_req(queue, _get): async for rep in self._inf_req(queue, _get):
obj = rep.json() obj = rep.json()
@ -197,11 +120,9 @@ class Search:
return return
async def _ql_item(self, op: str, kv: dict, ft: dict = {}): async def _ql_item(self, op: str, kv: dict, ft: dict = {}):
variables, features = {**kv}, {**BASE_FEATURES, **ft}
params = {"variables": variables, "features": features}
async def _get(client: AsyncClient): async def _get(client: AsyncClient):
return await client.get(f"{GRAPHQL_URL}/{op}", params=encode_params(params)) params = {"variables": {**kv}, "features": {**GQL_FEATURES, **ft}}
return await client.get(f"{GQL_URL}/{op}", params=encode_params(params))
queue = op.split("/")[-1] queue = op.split("/")[-1]
async for rep in self._inf_req(queue, _get): async for rep in self._inf_req(queue, _get):