add card parser for summary and poll (#46, #72, #157)

Этот коммит содержится в:
Vlad Pronsky 2024-04-18 03:31:08 +03:00
родитель 84078e72f9
Коммит 09c820cade
6 изменённых файлов: 12386 добавлений и 21 удалений

Просмотреть файл

@ -27,7 +27,7 @@ dependencies = [
[project.optional-dependencies]
dev = [
"pyright>=1.1.350",
"pyright>=1.1.359",
"pytest-asyncio>=0.23.3",
"pytest-cov>=4.1.0",
"pytest-httpx>=0.28.0",

1013
tests/mocked-data/_issue_72.json Обычный файл

Разница между файлами не показана из-за своего большого размера Загрузить разницу

11186
tests/mocked-data/_issue_72_poll.json Обычный файл

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -4,7 +4,7 @@ from typing import Callable
from twscrape import API, gather
from twscrape.logger import set_log_level
from twscrape.models import Tweet, User, UserRef, parse_tweet
from twscrape.models import PollCard, SummaryCard, Tweet, User, UserRef, parse_tweet
BASE_DIR = os.path.dirname(__file__)
DATA_DIR = os.path.join(BASE_DIR, "mocked-data")
@ -398,3 +398,30 @@ async def test_issue_56():
assert doc is not None
assert len(set([x.tcourl for x in doc.links])) == len(doc.links)
assert len(doc.links) == 5
async def test_issue_72():
# Check SummaryCard
raw = fake_rep("_issue_72").json()
doc = parse_tweet(raw, 1696922210588410217)
assert doc is not None
assert doc.card is not None
assert isinstance(doc.card, SummaryCard)
assert doc.card._type == "summary"
assert doc.card.title is not None
assert doc.card.description is not None
assert doc.card.url is not None
# Check PoolCard
raw = fake_rep("_issue_72_poll").json()
doc = parse_tweet(raw, 1780666831310877100)
assert doc is not None
assert doc.card is not None
assert isinstance(doc.card, PollCard)
assert doc.card._type == "poll"
assert doc.card.finished is not None
assert doc.card.options is not None
assert len(doc.card.options) > 0
for x in doc.card.options:
assert x.label is not None
assert x.votesCount is not None

Просмотреть файл

@ -412,19 +412,3 @@ class API:
async for rep in gen:
for x in parse_tweets(rep.json(), limit):
yield x
# trends
async def trends_raw(self, _=None, limit=-1, kv=None):
op = OP_ExplorePage
kv = {**(kv or {})}
ft = {"articles_preview_enabled": False}
async with aclosing(self._gql_items(op, kv, limit=limit, ft=ft)) as gen:
async for x in gen:
yield x
async def trends(self, _=None, limit=-1, kv=None):
async with aclosing(self.trends_raw(_, limit=limit, kv=kv)) as gen:
async for rep in gen:
for x in parse_tweets(rep.json(), limit):
yield x

Просмотреть файл

@ -186,12 +186,12 @@ class Tweet(JSONTrait):
sourceUrl: str | None = None
sourceLabel: str | None = None
media: Optional["Media"] = None
card: Optional["SummaryCard"] | Optional["PollCard"] = None
_type: str = "snscrape.modules.twitter.Tweet"
# todo:
# renderedContent: str
# card: typing.Optional["Card"] = None
# vibe: typing.Optional["Vibe"] = None
# vibe: Optional["Vibe"] = None
@staticmethod
def parse(obj: dict, res: dict):
@ -212,10 +212,11 @@ class Tweet(JSONTrait):
rt_obj = get_or(res, f"tweets.{_first(obj, rt_id_path)}")
qt_obj = get_or(res, f"tweets.{_first(obj, qt_id_path)}")
url = f'https://twitter.com/{tw_usr.username}/status/{obj["id_str"]}'
doc = Tweet(
id=int(obj["id_str"]),
id_str=obj["id_str"],
url=f'https://twitter.com/{tw_usr.username}/status/{obj["id_str"]}',
url=url,
date=email.utils.parsedate_to_datetime(obj["created_at"]),
user=tw_usr,
lang=obj["lang"],
@ -244,6 +245,7 @@ class Tweet(JSONTrait):
sourceUrl=_get_source_url(obj),
sourceLabel=_get_source_label(obj),
media=Media.parse(obj),
card=_parse_card(obj, url),
)
# issue #42 – restore full rt text
@ -348,6 +350,159 @@ class Media(JSONTrait):
return Media(photos=photos, videos=videos, animated=animated)
@dataclass
class Card(JSONTrait):
pass
@dataclass
class SummaryCard(Card):
title: str
description: str
vanityUrl: str
url: str
photo: MediaPhoto | None = None
video: MediaVideo | None = None
_type: str = "summary"
@dataclass
class PollOption(JSONTrait):
label: str
votesCount: int
@dataclass
class PollCard(Card):
options: list[PollOption]
finished: bool
_type: str = "poll"
def _parse_card_get_bool(values: list[dict], key: str):
for x in values:
if x["key"] == key:
return x["value"]["boolean_value"]
return False
def _parse_card_get_str(values: list[dict], key: str, defaultVal=None):
for x in values:
if x["key"] == key:
return x["value"]["string_value"]
return defaultVal
def _parse_card_extract_str(values: list[dict], key: str):
pretenders = [x["value"]["string_value"] for x in values if x["key"] == key]
new_values = [x for x in values if x["key"] != key]
return pretenders[0] if pretenders else "", new_values
def _parse_card_extract_title(values: list[dict]):
new_values, pretenders = [], []
# title is trimmed to 70 chars, so try to find the longest text in alt_text
for x in values:
k = x["key"]
if k == "title" or k.endswith("_alt_text"):
pretenders.append(x["value"]["string_value"])
else:
new_values.append(x)
pretenders = sorted(pretenders, key=lambda x: len(x), reverse=True)
return pretenders[0] if pretenders else "", new_values
def _parse_card_extract_largest_photo(values: list[dict]):
photos = [x for x in values if x["value"]["type"] == "IMAGE"]
photos = sorted(photos, key=lambda x: x["value"]["image_value"]["height"], reverse=True)
values = [x for x in values if x["value"]["type"] != "IMAGE"]
if photos:
return MediaPhoto(url=photos[0]["value"]["image_value"]["url"]), values
else:
return None, values
def _parse_card_prepare_values(obj: dict):
values = get_or(obj, "card.legacy.binding_values", [])
# values = sorted(values, key=lambda x: x["key"])
# values = [x for x in values if x["key"] not in {"domain", "creator", "site"}]
values = [x for x in values if x["value"]["type"] != "IMAGE_COLOR"]
return values
def _parse_card(obj: dict, url: str):
name = get_or(obj, "card.legacy.name", None)
if not name:
return None
if name == "summary" or name == "summary_large_image":
val = _parse_card_prepare_values(obj)
title, val = _parse_card_extract_title(val)
description, val = _parse_card_extract_str(val, "description")
vanity_url, val = _parse_card_extract_str(val, "vanity_url")
url, val = _parse_card_extract_str(val, "card_url")
photo, val = _parse_card_extract_largest_photo(val)
return SummaryCard(
title=title,
description=description,
vanityUrl=vanity_url,
url=url,
photo=photo,
)
if name == "unified_card":
val = _parse_card_prepare_values(obj)
val = [x for x in val if x["key"] == "unified_card"][0]["value"]["string_value"]
val = json.loads(val)
co = get_or(val, "component_objects", {})
do = get_or(val, "destination_objects", {})
me = list(get_or(val, "media_entities", {}).values())
if len(me) > 1:
logger.debug(f"[Card] Multiple media entities: {json.dumps(me, indent=2)}")
me = me[0] if me else {}
title = get_or(co, "details_1.data.title.content", "")
description = get_or(co, "details_1.data.subtitle.content", "")
vanity_url = get_or(do, "browser_with_docked_media_1.data.url_data.vanity", "")
url = get_or(do, "browser_with_docked_media_1.data.url_data.url", "")
video = MediaVideo.parse(me) if me and me["type"] == "video" else None
photo = MediaPhoto.parse(me) if me and me["type"] == "photo" else None
return SummaryCard(
title=title,
description=description,
vanityUrl=vanity_url,
url=url,
photo=photo,
video=video,
)
if name == "poll2choice_text_only":
val = _parse_card_prepare_values(obj)
options = []
for x in range(20):
label = _parse_card_get_str(val, f"choice{x+1}_label")
votes = _parse_card_get_str(val, f"choice{x+1}_count")
if label is None or votes is None:
break
options.append(PollOption(label=label, votesCount=int(votes)))
finished = _parse_card_get_bool(val, "counts_are_final")
# duration_minutes = int(_parse_card_get_str(val, "duration_minutes") or "0")
# end_datetime_utc = _parse_card_get_str(val, "end_datetime_utc")
# print(json.dumps(val, indent=2))
return PollCard(options=options, finished=finished)
# logger.warning(f"Unknown card type '{name}' on {url}")
print(f"Unknown card type '{name}' on {url}")
# internal helpers