add ability to add accounts & login from cli

Этот коммит содержится в:
Vlad Pronsky 2023-05-29 00:28:57 +03:00
родитель d3d6a002f2
Коммит 2492de1417
6 изменённых файлов: 174 добавлений и 45 удалений

Просмотреть файл

@ -99,7 +99,58 @@ if __name__ == "__main__":
## CLI
You can also use the CLI to make requests (before that you need to log in to some accounts through the programming interface).
### Get help on CLI commands
```sh
# show all commands
twscrape
# help on specific comand
twscrape search --help
```
### Add accounts & login
First add accounts from file:
```sh
# twscrape add_accounts <file_path> <line_format>
# line_format should have "username", "password", "email", "email_password" tokens
# tokens delimeter should be same as an file
twscrape add_accounts accounts.txt username:password:email:email_password
```
The call login:
```sh
twscrape login_accounts
```
Accounts and their sessions will be saved, so they can be reused for future requests
### Get list of accounts and their statuses
```sh
twscrape accounts
# Output:
# ───────────────────────────────────────────────────────────────────────────────────
# username logged_in active last_used total_req error_msg
# ───────────────────────────────────────────────────────────────────────────────────
# user1 True True 2023-05-20 03:20:40 100 None
# user2 True True 2023-05-20 03:25:45 120 None
# user3 False False None 120 Login error
```
### Use different accounts file
Useful if using a different set of accounts for different actions
```
twscrape --db test-accounts.db <command>
```
### Search commands
```sh
twscrape search "QUERY" --limit=20
@ -126,38 +177,6 @@ By default, parsed data is returned. The original tweet responses can be retriev
twscrape search "elon mask lang:es" --limit=20 --raw
```
View a list of commands:
```sh
# show all commands
twscrape
# help on specific comand
twscrape search --help
```
## Advanced usage
### Get list of connected accounts and their statuses
```sh
twscrape accounts
# Output:
# ───────────────────────────────────────────────────────────────────────────────────
# username logged_in active last_used total_req error_msg
# ───────────────────────────────────────────────────────────────────────────────────
# user1 True True 2023-05-20 03:20:40 100 None
# user2 True True 2023-05-20 03:25:45 120 None
# user3 False False None 120 Login error
```
Or from code:
```python
pool = AccountsPool()
print(await pool.accounts_info()) # list
```
## Limitations
API rate limits (per account):

Просмотреть файл

@ -11,10 +11,36 @@ from .login import login
from .utils import utc_ts
def guess_delim(line: str):
l, r = [x.strip() for x in line.split("username")]
return r[0] if not l else l[-1]
class AccountsPool:
def __init__(self, db_file="accounts.db"):
self._db_file = db_file
async def load_from_file(self, filepath: str, line_format: str):
assert "username" in line_format, "username is required"
assert "password" in line_format, "password is required"
assert "email" in line_format, "email is required"
assert "email_password" in line_format, "email_password is required"
line_delim = guess_delim(line_format)
tokens = line_format.split(line_delim)
with open(filepath, "r") as f:
lines = f.read().split("\n")
lines = [x.strip() for x in lines if x.strip()]
for line in lines:
data = [x.strip() for x in line.split(line_delim)]
if len(data) < len(tokens):
logger.warning(f"Invalid line format: {line}")
continue
data = data[: len(tokens)]
await self.add_account(**{k: v for k, v in zip(tokens, data)})
async def add_account(
self,
username: str,
@ -27,8 +53,11 @@ class AccountsPool:
qs = "SELECT * FROM accounts WHERE username = :username"
rs = await fetchone(self._db_file, qs, {"username": username})
if rs:
logger.debug(f"Account {username} already exists")
return
logger.debug(f"Adding account {username}")
account = Account(
username=username,
password=password,
@ -69,6 +98,7 @@ class AccountsPool:
async def login(self, account: Account):
try:
await login(account)
logger.info(f"Logged in to {account.username} successfully")
except Exception as e:
logger.error(f"Error logging in to {account.username}: {e}")
finally:

Просмотреть файл

@ -2,6 +2,7 @@
import argparse
import asyncio
import io
from .api import API, AccountsPool
from .logger import logger, set_log_level
@ -10,6 +11,11 @@ from .utils import print_table
VER = "0.1.0"
class CustomHelpFormatter(argparse.HelpFormatter):
def __init__(self, prog):
super().__init__(prog, max_help_position=30, width=120)
def get_fn_arg(args):
names = ["query", "tweet_id", "user_id", "username"]
for name in names:
@ -24,13 +30,14 @@ async def main(args):
if args.debug:
set_log_level("DEBUG")
pool = AccountsPool(args.db)
api = API(pool, debug=args.debug)
if args.command == "version":
print(VER)
return
logger.debug(f"Using database: {args.db}")
pool = AccountsPool(args.db)
api = API(pool, debug=args.debug)
if args.command == "accounts":
print_table(await pool.accounts_info())
return
@ -39,6 +46,14 @@ async def main(args):
print(await pool.stats())
return
if args.command == "add_accounts":
await pool.load_from_file(args.file_path, args.line_format)
return
if args.command == "login_accounts":
await pool.login_all()
return
fn = args.command + "_raw" if args.raw else args.command
fn = getattr(api, fn, None)
if fn is None:
@ -55,8 +70,29 @@ async def main(args):
print(doc.json())
def custom_help(p):
buffer = io.StringIO()
p.print_help(buffer)
msg = buffer.getvalue()
cmd = msg.split("positional arguments:")[1].strip().split("\n")[0]
msg = msg.replace("positional arguments:", "commands:")
msg = [x for x in msg.split("\n") if not cmd in x and not "..." in x]
msg[0] = f"{msg[0]} <command> [...]"
i = 0
for i, line in enumerate(msg):
if line.strip().startswith("search"):
break
msg.insert(i, "")
msg.insert(i + 1, "search commands:")
print("\n".join(msg))
def run():
p = argparse.ArgumentParser(add_help=False)
p = argparse.ArgumentParser(add_help=False, formatter_class=CustomHelpFormatter)
p.add_argument("--db", default="accounts.db", help="Accounts database file")
p.add_argument("--debug", action="store_true", help="Enable debug mode")
subparsers = p.add_subparsers(dest="command")
@ -74,7 +110,11 @@ def run():
subparsers.add_parser("version", help="Show version")
subparsers.add_parser("accounts", help="List all accounts")
subparsers.add_parser("stats", help="Show scraping statistics")
add_accounts = subparsers.add_parser("add_accounts", help="Add accounts")
add_accounts.add_argument("file_path", help="File with accounts")
add_accounts.add_argument("line_format", help="args of Pool.add_account splited by same delim")
subparsers.add_parser("login_accounts", help="Login accounts")
clim("search", "Search for tweets", "query", "Search query")
cone("tweet_details", "Get tweet details", "tweet_id", "Tweet ID", int)
@ -89,7 +129,6 @@ def run():
args = p.parse_args()
if args.command is None:
p.print_help()
return
return custom_help(p)
asyncio.run(main(args))

Просмотреть файл

@ -9,8 +9,35 @@ from .logger import logger
MAX_WAIT_SEC = 30
class EmailLoginError(Exception):
def __init__(self, message="Email login error"):
self.message = message
super().__init__(self.message)
class EmailCodeTimeoutError(Exception):
def __init__(self, message="Email code timeout"):
self.message = message
super().__init__(self.message)
IMAP_MAPPING: dict[str, str] = {
"yahoo.com": "imap.mail.yahoo.com",
"icloud.com": "imap.mail.me.com",
"outlook.com": "imap-mail.outlook.com",
"hotmail.com": "imap-mail.outlook.com",
}
def add_imap_mapping(email_domain: str, imap_domain: str):
IMAP_MAPPING[email_domain] = imap_domain
def get_imap_domain(email: str) -> str:
return f"imap.{email.split('@')[1]}"
email_domain = email.split("@")[1]
if email_domain in IMAP_MAPPING:
return IMAP_MAPPING[email_domain]
return f"imap.{email_domain}"
def search_email_code(imap: imaplib.IMAP4_SSL, count: int, min_t: datetime | None) -> str | None:
@ -39,7 +66,11 @@ async def get_email_code(email: str, password: str, min_t: datetime | None = Non
domain = get_imap_domain(email)
start_time = time.time()
with imaplib.IMAP4_SSL(domain) as imap:
imap.login(email, password)
try:
imap.login(email, password)
except imaplib.IMAP4.error as e:
logger.error(f"Error logging into {email}: {e}")
raise EmailLoginError() from e
was_count = 0
while True:
@ -52,5 +83,6 @@ async def get_email_code(email: str, password: str, min_t: datetime | None = Non
logger.debug(f"Waiting for confirmation code for {email}, msg_count: {now_count}")
if MAX_WAIT_SEC < time.time() - start_time:
raise Exception(f"Timeout on getting confirmation code for {email}")
logger.error(f"Timeout waiting for confirmation code for {email}")
raise EmailCodeTimeoutError()
await asyncio.sleep(5)

Просмотреть файл

@ -176,7 +176,7 @@ async def next_login_task(client: AsyncClient, acc: Account, rep: Response):
if task_id == "LoginJsInstrumentationSubtask":
return await login_instrumentation(client, acc, prev)
except Exception as e:
acc.error_msg = f"task={task_id} err={e}"
acc.error_msg = f"login_step={task_id} err={e}"
logger.error(f"Error in {task_id}: {e}")
raise e

Просмотреть файл

@ -148,8 +148,17 @@ def print_table(rows: list[dict]):
if not rows:
return
def prt(x):
if isinstance(x, str):
return x
if isinstance(x, int):
return f"{x:,}"
return str(x)
keys = list(rows[0].keys())
rows = [{k: k for k in keys}, *[{k: str(x.get(k, "")) for k in keys} for x in rows]]
rows = [{k: k for k in keys}, *[{k: prt(x.get(k, "")) for k in keys} for x in rows]]
colw = [max(len(x[k]) for x in rows) + 1 for k in keys]
lines = []