зеркало из
https://github.com/viginum-datalab/twscrape.git
synced 2025-10-29 21:16:25 +02:00
feat: media parser
Этот коммит содержится в:
родитель
8ccf3b6357
Коммит
f31046d8c4
2117
tests/mocked-data/manual_tweet_with_video_1.json
Обычный файл
2117
tests/mocked-data/manual_tweet_with_video_1.json
Обычный файл
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
2298
tests/mocked-data/manual_tweet_with_video_2.json
Обычный файл
2298
tests/mocked-data/manual_tweet_with_video_2.json
Обычный файл
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
@ -26,8 +26,9 @@ class Files:
|
||||
user_tweets_and_replies_raw = "user_tweets_and_replies_raw.json"
|
||||
|
||||
|
||||
def fake_rep(fn: str):
|
||||
filename = os.path.join(DATA_DIR, getattr(Files, fn))
|
||||
def fake_rep(fn: str, filename: str | None = None):
|
||||
if filename is None:
|
||||
filename = os.path.join(DATA_DIR, getattr(Files, fn))
|
||||
|
||||
with open(filename) as fp:
|
||||
data = fp.read()
|
||||
@ -38,9 +39,9 @@ def fake_rep(fn: str):
|
||||
return rep
|
||||
|
||||
|
||||
def mock_rep(obj, fn: str):
|
||||
def mock_rep(obj, fn: str, filename: str | None = None):
|
||||
async def cb_rep(*args, **kwargs):
|
||||
return fake_rep(fn)
|
||||
return fake_rep(fn, filename)
|
||||
|
||||
setattr(obj, fn, cb_rep)
|
||||
|
||||
@ -79,6 +80,19 @@ def check_tweet(doc: Tweet):
|
||||
assert isinstance(txt, str)
|
||||
assert str(doc.id) in txt
|
||||
|
||||
if doc.media is not None:
|
||||
if len(doc.media.photos) > 0:
|
||||
assert doc.media.photos[0].url is not None
|
||||
|
||||
if len(doc.media.videos) > 0:
|
||||
for x in doc.media.videos:
|
||||
assert x.thumbnailUrl is not None
|
||||
assert x.duration is not None
|
||||
for v in x.variants:
|
||||
assert v.url is not None
|
||||
assert v.bitrate is not None
|
||||
assert v.contentType is not None
|
||||
|
||||
check_user(doc.user)
|
||||
|
||||
|
||||
@ -221,6 +235,20 @@ async def test_user_tweets_and_replies():
|
||||
check_tweet(doc)
|
||||
|
||||
|
||||
async def test_tweet_with_video():
|
||||
api = API(AccountsPool())
|
||||
|
||||
files = [
|
||||
("manual_tweet_with_video_1.json", 1671508600538161153),
|
||||
("manual_tweet_with_video_2.json", 1671753569412820992),
|
||||
]
|
||||
|
||||
for file, twid in files:
|
||||
mock_rep(api, "tweet_details_raw", os.path.join(DATA_DIR, file))
|
||||
doc = await api.tweet_details(twid)
|
||||
check_tweet(doc)
|
||||
|
||||
|
||||
async def main():
|
||||
# prepare mock files from real twitter replies
|
||||
# you need to have some account to perform this
|
||||
|
||||
@ -1,10 +1,11 @@
|
||||
import email.utils
|
||||
import json
|
||||
import re
|
||||
from dataclasses import asdict, dataclass
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
from .logger import logger
|
||||
from .utils import find_item, get_or, int_or_none
|
||||
|
||||
|
||||
@ -162,11 +163,11 @@ class Tweet(JSONTrait):
|
||||
source: str | None = None
|
||||
sourceUrl: str | None = None
|
||||
sourceLabel: str | None = None
|
||||
media: Optional["Media"] = None
|
||||
_type: str = "snscrape.modules.twitter.Tweet"
|
||||
|
||||
# todo:
|
||||
# renderedContent: str
|
||||
# media: typing.Optional[typing.List["Medium"]] = None
|
||||
# card: typing.Optional["Card"] = None
|
||||
# vibe: typing.Optional["Vibe"] = None
|
||||
|
||||
@ -203,9 +204,104 @@ class Tweet(JSONTrait):
|
||||
source=obj.get("source", None),
|
||||
sourceUrl=_get_source_url(obj),
|
||||
sourceLabel=_get_source_label(obj),
|
||||
media=Media.parse(obj),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MediaPhoto(JSONTrait):
|
||||
url: str
|
||||
|
||||
@staticmethod
|
||||
def parse(obj: dict):
|
||||
return MediaPhoto(
|
||||
url=obj["media_url_https"],
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MediaVideo(JSONTrait):
|
||||
thumbnailUrl: str
|
||||
variants: list["MediaVideoVariant"]
|
||||
duration: int
|
||||
views: int | None = None
|
||||
|
||||
@staticmethod
|
||||
def parse(obj: dict):
|
||||
return MediaVideo(
|
||||
thumbnailUrl=obj["media_url_https"],
|
||||
variants=[
|
||||
MediaVideoVariant.parse(x) for x in obj["video_info"]["variants"] if "bitrate" in x
|
||||
],
|
||||
duration=obj["video_info"]["duration_millis"],
|
||||
views=int_or_none(obj, "mediaStats.viewCount"),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MediaAnimated(JSONTrait):
|
||||
thumbnailUrl: str
|
||||
videoUrl: str
|
||||
|
||||
@staticmethod
|
||||
def parse(obj: dict):
|
||||
try:
|
||||
return MediaAnimated(
|
||||
thumbnailUrl=obj["media_url_https"],
|
||||
videoUrl=obj["video_info"]["variants"][0]["url"],
|
||||
)
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
|
||||
@dataclass
|
||||
class MediaVideoVariant(JSONTrait):
|
||||
contentType: str
|
||||
bitrate: int
|
||||
url: str
|
||||
|
||||
@staticmethod
|
||||
def parse(obj: dict):
|
||||
return MediaVideoVariant(
|
||||
contentType=obj["content_type"],
|
||||
bitrate=obj["bitrate"],
|
||||
url=obj["url"],
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Media(JSONTrait):
|
||||
photos: list[MediaPhoto] = field(default_factory=list)
|
||||
videos: list[MediaVideo] = field(default_factory=list)
|
||||
animated: list[MediaAnimated] = field(default_factory=list)
|
||||
|
||||
@staticmethod
|
||||
def parse(obj: dict):
|
||||
photos: list[MediaPhoto] = []
|
||||
videos: list[MediaVideo] = []
|
||||
animated: list[MediaAnimated] = []
|
||||
|
||||
for x in get_or(obj, "extended_entities.media", []):
|
||||
if x["type"] == "video":
|
||||
if video := MediaVideo.parse(x):
|
||||
videos.append(video)
|
||||
continue
|
||||
|
||||
if x["type"] == "photo":
|
||||
if photo := MediaPhoto.parse(x):
|
||||
photos.append(photo)
|
||||
continue
|
||||
|
||||
if x["type"] == "animated_gif":
|
||||
if animated_gif := MediaAnimated.parse(x):
|
||||
animated.append(animated_gif)
|
||||
continue
|
||||
|
||||
logger.warning(f"Unknown media type: {x['type']}: {json.dumps(x)}")
|
||||
|
||||
return Media(photos=photos, videos=videos, animated=animated)
|
||||
|
||||
|
||||
def _get_reply_user(tw_obj: dict, res: dict):
|
||||
user_id = tw_obj.get("in_reply_to_user_id_str", None)
|
||||
if user_id is None:
|
||||
|
||||
Загрузка…
x
Ссылка в новой задаче
Block a user