зеркало из
https://github.com/viginum-datalab/twscrape.git
synced 2025-10-30 05:26:20 +02:00
add cli runner
Этот коммит содержится в:
родитель
09bb27485b
Коммит
719c972d96
@ -37,6 +37,9 @@ dev = [
|
||||
[project.urls]
|
||||
repository = "https://github.com/vladkens/twscrape"
|
||||
|
||||
[project.scripts]
|
||||
twscrape = "twscrape.cli:run"
|
||||
|
||||
[tool.setuptools]
|
||||
packages = ['twscrape']
|
||||
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
# ruff: noqa: E501
|
||||
import asyncio
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from fake_useragent import UserAgent
|
||||
|
||||
@ -161,3 +162,25 @@ class AccountsPool:
|
||||
qs = f"SELECT {','.join([f'({q}) as {k}' for k, q in config])}"
|
||||
rs = await fetchone(self._db_file, qs)
|
||||
return dict(rs) if rs else {}
|
||||
|
||||
async def accounts_info(self):
|
||||
accounts = await self.get_all()
|
||||
|
||||
items = []
|
||||
for x in accounts:
|
||||
item = {
|
||||
"username": x.username,
|
||||
"logged_in": (x.headers or {}).get("authorization", "") != "",
|
||||
"active": x.active,
|
||||
"last_used": x.last_used,
|
||||
"total_req": sum(x.stats.values()),
|
||||
"error_msg": x.error_msg,
|
||||
}
|
||||
items.append(item)
|
||||
|
||||
old_time = datetime(1970, 1, 1).replace(tzinfo=timezone.utc)
|
||||
items = sorted(items, key=lambda x: x["username"].lower())
|
||||
items = sorted(items, key=lambda x: x["last_used"] or old_time, reverse=True)
|
||||
items = sorted(items, key=lambda x: x["total_req"], reverse=True)
|
||||
items = sorted(items, key=lambda x: x["active"], reverse=True)
|
||||
return items
|
||||
|
||||
84
twscrape/cli.py
Обычный файл
84
twscrape/cli.py
Обычный файл
@ -0,0 +1,84 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
|
||||
from .api import API, AccountsPool
|
||||
from .logger import logger, set_log_level
|
||||
from .utils import print_table
|
||||
|
||||
|
||||
def get_fn_arg(args):
|
||||
names = ["query", "tweet_id", "user_id"]
|
||||
for name in names:
|
||||
if name in args:
|
||||
return name, getattr(args, name)
|
||||
|
||||
logger.error(f"Missing argument: {names}")
|
||||
exit(1)
|
||||
|
||||
|
||||
async def main(args):
|
||||
if args.debug:
|
||||
set_log_level("DEBUG")
|
||||
|
||||
pool = AccountsPool(args.db)
|
||||
api = API(pool, debug=args.debug)
|
||||
|
||||
if args.command == "accounts":
|
||||
print_table(await pool.accounts_info())
|
||||
return
|
||||
|
||||
if args.command == "stats":
|
||||
print(await pool.stats())
|
||||
return
|
||||
|
||||
fn = args.command + "_raw" if args.raw else args.command
|
||||
fn = getattr(api, fn, None)
|
||||
if fn is None:
|
||||
logger.error(f"Unknown command: {args.command}")
|
||||
exit(1)
|
||||
|
||||
_, val = get_fn_arg(args)
|
||||
|
||||
if "limit" in args:
|
||||
async for doc in fn(val, limit=args.limit):
|
||||
print(doc.json())
|
||||
else:
|
||||
doc = await fn(val)
|
||||
print(doc.json())
|
||||
|
||||
|
||||
def run():
|
||||
p = argparse.ArgumentParser()
|
||||
p.add_argument("--db", default="accounts.db", help="Accounts database file")
|
||||
p.add_argument("--debug", action="store_true", help="Enable debug mode")
|
||||
subparsers = p.add_subparsers(dest="command")
|
||||
|
||||
def cone(name: str, msg: str, a_name: str, a_msg: str, a_type: type = str):
|
||||
p = subparsers.add_parser(name, help=msg)
|
||||
p.add_argument(a_name, help=a_msg, type=a_type)
|
||||
p.add_argument("--raw", action="store_true", help="Print raw response")
|
||||
return p
|
||||
|
||||
def clim(name: str, msg: str, a_name: str, a_msg: str, a_type: type = str):
|
||||
p = cone(name, msg, a_name, a_msg, a_type)
|
||||
p.add_argument("--limit", type=int, default=20, help="Max tweets to retrieve")
|
||||
return p
|
||||
|
||||
subparsers.add_parser("accounts", help="List all accounts")
|
||||
subparsers.add_parser("stats", help="Show scraping statistics")
|
||||
|
||||
clim("search", "Search for tweets", "query", "Search query")
|
||||
cone("tweet_details", "Get tweet details", "tweet_id", "Tweet ID", int)
|
||||
clim("retweeters", "Get retweeters of a tweet", "tweet_id", "Tweet ID", int)
|
||||
clim("favoriters", "Get favoriters of a tweet", "tweet_id", "Tweet ID", int)
|
||||
cone("user_by_id", "Get user data by ID", "user_id", "User ID", int)
|
||||
clim("user_by_login", "Get user data by username", "username", "Username")
|
||||
clim("followers", "Get user followers", "user_id", "User ID", int)
|
||||
clim("following", "Get user following", "user_id", "User ID", int)
|
||||
clim("user_tweets", "Get user tweets", "user_id", "User ID", int)
|
||||
clim("user_tweets_and_replies", "Get user tweets and replies", "user_id", "User ID", int)
|
||||
|
||||
args = p.parse_args()
|
||||
asyncio.run(main(args))
|
||||
@ -142,3 +142,22 @@ def utc_ts() -> int:
|
||||
|
||||
def from_utciso(iso: str) -> datetime:
|
||||
return datetime.fromisoformat(iso).replace(tzinfo=timezone.utc)
|
||||
|
||||
|
||||
def print_table(rows: list[dict]):
|
||||
if not rows:
|
||||
return
|
||||
|
||||
keys = list(rows[0].keys())
|
||||
rows = [{k: k for k in keys}, *[{k: str(x.get(k, "")) for k in keys} for x in rows]]
|
||||
colw = [max(len(x[k]) for x in rows) + 1 for k in keys]
|
||||
|
||||
lines = []
|
||||
for row in rows:
|
||||
line = [f"{row[k]:<{colw[i]}}" for i, k in enumerate(keys)]
|
||||
lines.append(" ".join(line))
|
||||
|
||||
max_len = max(len(x) for x in lines)
|
||||
lines.insert(1, "─" * max_len)
|
||||
lines.insert(0, "─" * max_len)
|
||||
print("\n".join(lines))
|
||||
|
||||
Загрузка…
x
Ссылка в новой задаче
Block a user