зеркало из
				https://github.com/viginum-datalab/twscrape.git
				synced 2025-10-30 21:46:13 +02:00 
			
		
		
		
	
		
			
				
	
	
		
			342 строки
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			342 строки
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # ruff: noqa: E501
 | |
| import asyncio
 | |
| import sqlite3
 | |
| import uuid
 | |
| from datetime import datetime, timezone
 | |
| from typing import TypedDict
 | |
| 
 | |
| from fake_useragent import UserAgent
 | |
| 
 | |
| from .account import Account
 | |
| from .db import execute, fetchall, fetchone
 | |
| from .logger import logger
 | |
| from .login import login
 | |
| from .utils import parse_cookies, utc_ts
 | |
| 
 | |
| 
 | |
| class AccountInfo(TypedDict):
 | |
|     username: str
 | |
|     logged_in: bool
 | |
|     active: bool
 | |
|     last_used: datetime | None
 | |
|     total_req: int
 | |
|     error_msg: str | None
 | |
| 
 | |
| 
 | |
| def guess_delim(line: str):
 | |
|     lp, rp = tuple([x.strip() for x in line.split("username")])
 | |
|     return rp[0] if not lp else lp[-1]
 | |
| 
 | |
| 
 | |
| class AccountsPool:
 | |
|     # _order_by: str = "RANDOM()"
 | |
|     _order_by: str = "username"
 | |
| 
 | |
|     def __init__(self, db_file="accounts.db"):
 | |
|         self._db_file = db_file
 | |
| 
 | |
|     async def load_from_file(self, filepath: str, line_format: str):
 | |
|         line_delim = guess_delim(line_format)
 | |
|         tokens = line_format.split(line_delim)
 | |
| 
 | |
|         required = set(["username", "password", "email", "email_password"])
 | |
|         if not required.issubset(tokens):
 | |
|             raise ValueError(f"Invalid line format: {line_format}")
 | |
| 
 | |
|         accounts = []
 | |
|         with open(filepath, "r") as f:
 | |
|             lines = f.read().split("\n")
 | |
|             lines = [x.strip() for x in lines if x.strip()]
 | |
| 
 | |
|             for line in lines:
 | |
|                 data = [x.strip() for x in line.split(line_delim)]
 | |
|                 if len(data) < len(tokens):
 | |
|                     raise ValueError(f"Invalid line: {line}")
 | |
| 
 | |
|                 data = data[: len(tokens)]
 | |
|                 vals = {k: v for k, v in zip(tokens, data) if k != "_"}
 | |
|                 accounts.append(vals)
 | |
| 
 | |
|         for x in accounts:
 | |
|             await self.add_account(**x)
 | |
| 
 | |
|     async def add_account(
 | |
|         self,
 | |
|         username: str,
 | |
|         password: str,
 | |
|         email: str,
 | |
|         email_password: str,
 | |
|         user_agent: str | None = None,
 | |
|         proxy: str | None = None,
 | |
|         cookies: str | None = None,
 | |
|     ):
 | |
|         qs = "SELECT * FROM accounts WHERE username = :username"
 | |
|         rs = await fetchone(self._db_file, qs, {"username": username})
 | |
|         if rs:
 | |
|             logger.warning(f"Account {username} already exists")
 | |
|             return
 | |
| 
 | |
|         account = Account(
 | |
|             username=username,
 | |
|             password=password,
 | |
|             email=email,
 | |
|             email_password=email_password,
 | |
|             user_agent=user_agent or UserAgent().safari,
 | |
|             active=False,
 | |
|             locks={},
 | |
|             stats={},
 | |
|             headers={},
 | |
|             cookies=parse_cookies(cookies) if cookies else {},
 | |
|             proxy=proxy,
 | |
|         )
 | |
| 
 | |
|         if "ct0" in account.cookies:
 | |
|             account.active = True
 | |
| 
 | |
|         await self.save(account)
 | |
|         logger.info(f"Account {username} added successfully (active={account.active})")
 | |
| 
 | |
|     async def delete_accounts(self, usernames: str | list[str]):
 | |
|         usernames = usernames if isinstance(usernames, list) else [usernames]
 | |
|         usernames = list(set(usernames))
 | |
|         if not usernames:
 | |
|             logger.warning("No usernames provided")
 | |
|             return
 | |
| 
 | |
|         qs = f"""DELETE FROM accounts WHERE username IN ({','.join([f'"{x}"' for x in usernames])})"""
 | |
|         await execute(self._db_file, qs)
 | |
| 
 | |
|     async def delete_inactive(self):
 | |
|         qs = "DELETE FROM accounts WHERE active = false"
 | |
|         await execute(self._db_file, qs)
 | |
| 
 | |
|     async def get(self, username: str):
 | |
|         qs = "SELECT * FROM accounts WHERE username = :username"
 | |
|         rs = await fetchone(self._db_file, qs, {"username": username})
 | |
|         if not rs:
 | |
|             raise ValueError(f"Account {username} not found")
 | |
|         return Account.from_rs(rs)
 | |
| 
 | |
|     async def get_all(self):
 | |
|         qs = "SELECT * FROM accounts"
 | |
|         rs = await fetchall(self._db_file, qs)
 | |
|         return [Account.from_rs(x) for x in rs]
 | |
| 
 | |
|     async def save(self, account: Account):
 | |
|         data = account.to_rs()
 | |
|         cols = list(data.keys())
 | |
| 
 | |
|         qs = f"""
 | |
|         INSERT INTO accounts ({",".join(cols)}) VALUES ({",".join([f":{x}" for x in cols])})
 | |
|         ON CONFLICT(username) DO UPDATE SET {",".join([f"{x}=excluded.{x}" for x in cols])}
 | |
|         """
 | |
|         await execute(self._db_file, qs, data)
 | |
| 
 | |
|     async def login(self, account: Account, email_first: bool = False):
 | |
|         try:
 | |
|             await login(account, email_first=email_first)
 | |
|             logger.info(f"Logged in to {account.username} successfully")
 | |
|             return True
 | |
|         except Exception as e:
 | |
|             logger.error(f"Error logging in to {account.username}: {e}")
 | |
|             return False
 | |
|         finally:
 | |
|             await self.save(account)
 | |
| 
 | |
|     async def login_all(self, email_first=False):
 | |
|         qs = "SELECT * FROM accounts WHERE active = false AND error_msg IS NULL"
 | |
|         rs = await fetchall(self._db_file, qs)
 | |
| 
 | |
|         accounts = [Account.from_rs(rs) for rs in rs]
 | |
|         # await asyncio.gather(*[login(x) for x in self.accounts])
 | |
| 
 | |
|         counter = {"total": len(accounts), "success": 0, "failed": 0}
 | |
|         for i, x in enumerate(accounts, start=1):
 | |
|             logger.info(f"[{i}/{len(accounts)}] Logging in {x.username} - {x.email}")
 | |
|             status = await self.login(x, email_first=email_first)
 | |
|             counter["success" if status else "failed"] += 1
 | |
|         return counter
 | |
| 
 | |
|     async def relogin(self, usernames: str | list[str], email_first=False):
 | |
|         usernames = usernames if isinstance(usernames, list) else [usernames]
 | |
|         usernames = list(set(usernames))
 | |
|         if not usernames:
 | |
|             logger.warning("No usernames provided")
 | |
|             return
 | |
| 
 | |
|         qs = f"""
 | |
|         UPDATE accounts SET
 | |
|             active = false,
 | |
|             locks = json_object(),
 | |
|             last_used = NULL,
 | |
|             error_msg = NULL,
 | |
|             headers = json_object(),
 | |
|             cookies = json_object(),
 | |
|             user_agent = "{UserAgent().safari}"
 | |
|         WHERE username IN ({','.join([f'"{x}"' for x in usernames])})
 | |
|         """
 | |
| 
 | |
|         await execute(self._db_file, qs)
 | |
|         await self.login_all(email_first=email_first)
 | |
| 
 | |
|     async def relogin_failed(self, email_first=False):
 | |
|         qs = "SELECT username FROM accounts WHERE active = false AND error_msg IS NOT NULL"
 | |
|         rs = await fetchall(self._db_file, qs)
 | |
|         await self.relogin([x["username"] for x in rs], email_first=email_first)
 | |
| 
 | |
|     async def reset_locks(self):
 | |
|         qs = "UPDATE accounts SET locks = json_object()"
 | |
|         await execute(self._db_file, qs)
 | |
| 
 | |
|     async def set_active(self, username: str, active: bool):
 | |
|         qs = "UPDATE accounts SET active = :active WHERE username = :username"
 | |
|         await execute(self._db_file, qs, {"username": username, "active": active})
 | |
| 
 | |
|     async def lock_until(self, username: str, queue: str, unlock_at: int, req_count=0):
 | |
|         qs = f"""
 | |
|         UPDATE accounts SET
 | |
|             locks = json_set(locks, '$.{queue}', datetime({unlock_at}, 'unixepoch')),
 | |
|             stats = json_set(stats, '$.{queue}', COALESCE(json_extract(stats, '$.{queue}'), 0) + {req_count}),
 | |
|             last_used = datetime({utc_ts()}, 'unixepoch')
 | |
|         WHERE username = :username
 | |
|         """
 | |
|         await execute(self._db_file, qs, {"username": username})
 | |
| 
 | |
|     async def unlock(self, username: str, queue: str, req_count=0):
 | |
|         qs = f"""
 | |
|         UPDATE accounts SET
 | |
|             locks = json_remove(locks, '$.{queue}'),
 | |
|             stats = json_set(stats, '$.{queue}', COALESCE(json_extract(stats, '$.{queue}'), 0) + {req_count}),
 | |
|             last_used = datetime({utc_ts()}, 'unixepoch')
 | |
|         WHERE username = :username
 | |
|         """
 | |
|         await execute(self._db_file, qs, {"username": username})
 | |
| 
 | |
|     async def get_for_queue(self, queue: str):
 | |
|         q1 = f"""
 | |
|         SELECT username FROM accounts
 | |
|         WHERE active = true AND (
 | |
|             locks IS NULL
 | |
|             OR json_extract(locks, '$.{queue}') IS NULL
 | |
|             OR json_extract(locks, '$.{queue}') < datetime('now')
 | |
|         )
 | |
|         ORDER BY {self._order_by}
 | |
|         LIMIT 1
 | |
|         """
 | |
| 
 | |
|         if int(sqlite3.sqlite_version_info[1]) >= 35:
 | |
|             qs = f"""
 | |
|             UPDATE accounts SET
 | |
|                 locks = json_set(locks, '$.{queue}', datetime('now', '+15 minutes')),
 | |
|                 last_used = datetime({utc_ts()}, 'unixepoch')
 | |
|             WHERE username = ({q1})
 | |
|             RETURNING *
 | |
|             """
 | |
|             rs = await fetchone(self._db_file, qs)
 | |
|         else:
 | |
|             tx = uuid.uuid4().hex
 | |
|             qs = f"""
 | |
|             UPDATE accounts SET
 | |
|                 locks = json_set(locks, '$.{queue}', datetime('now', '+15 minutes')),
 | |
|                 last_used = datetime({utc_ts()}, 'unixepoch'),
 | |
|                 _tx = '{tx}'
 | |
|             WHERE username = ({q1})
 | |
|             """
 | |
|             await execute(self._db_file, qs)
 | |
| 
 | |
|             qs = f"SELECT * FROM accounts WHERE _tx = '{tx}'"
 | |
|             rs = await fetchone(self._db_file, qs)
 | |
| 
 | |
|         return Account.from_rs(rs) if rs else None
 | |
| 
 | |
|     async def get_for_queue_or_wait(self, queue: str) -> Account:
 | |
|         msg_show = False
 | |
|         while True:
 | |
|             account = await self.get_for_queue(queue)
 | |
|             if not account:
 | |
|                 if not msg_show:
 | |
|                     nat = await self.next_available_at(queue)
 | |
|                     msg = f'No account available for queue "{queue}". Next available at {nat}'
 | |
|                     logger.info(msg)
 | |
|                     msg_show = True
 | |
|                 await asyncio.sleep(5)
 | |
|                 continue
 | |
| 
 | |
|             return account
 | |
| 
 | |
|     async def next_available_at(self, queue: str):
 | |
|         qs = f"""
 | |
|         SELECT json_extract(locks, '$."{queue}"') as lock_until
 | |
|         FROM accounts
 | |
|         WHERE active = true AND json_extract(locks, '$."{queue}"') IS NOT NULL
 | |
|         ORDER BY lock_until ASC
 | |
|         LIMIT 1
 | |
|         """
 | |
|         rs = await fetchone(self._db_file, qs)
 | |
|         if rs:
 | |
|             now = datetime.utcnow().replace(tzinfo=timezone.utc)
 | |
|             trg = datetime.fromisoformat(rs[0]).replace(tzinfo=timezone.utc)
 | |
|             if trg < now:
 | |
|                 return "now"
 | |
| 
 | |
|             at_local = datetime.now() + (trg - now)
 | |
|             return at_local.strftime("%H:%M:%S")
 | |
| 
 | |
|         return "none"
 | |
| 
 | |
|     async def mark_banned(self, username: str, error_msg: str):
 | |
|         qs = """
 | |
|         UPDATE accounts SET active = false, error_msg = :error_msg
 | |
|         WHERE username = :username
 | |
|         """
 | |
|         await execute(self._db_file, qs, {"username": username, "error_msg": error_msg})
 | |
| 
 | |
|     async def stats(self):
 | |
|         def locks_count(queue: str):
 | |
|             return f"""
 | |
|             SELECT COUNT(*) FROM accounts
 | |
|             WHERE json_extract(locks, '$.{queue}') IS NOT NULL
 | |
|                 AND json_extract(locks, '$.{queue}') > datetime('now')
 | |
|             """
 | |
| 
 | |
|         qs = "SELECT DISTINCT(f.key) as k from accounts, json_each(locks) f"
 | |
|         rs = await fetchall(self._db_file, qs)
 | |
|         gql_ops = [x["k"] for x in rs]
 | |
| 
 | |
|         config = [
 | |
|             ("total", "SELECT COUNT(*) FROM accounts"),
 | |
|             ("active", "SELECT COUNT(*) FROM accounts WHERE active = true"),
 | |
|             ("inactive", "SELECT COUNT(*) FROM accounts WHERE active = false"),
 | |
|             *[(f"locked_{x}", locks_count(x)) for x in gql_ops],
 | |
|         ]
 | |
| 
 | |
|         qs = f"SELECT {','.join([f'({q}) as {k}' for k, q in config])}"
 | |
|         rs = await fetchone(self._db_file, qs)
 | |
|         return dict(rs) if rs else {}
 | |
| 
 | |
|     async def accounts_info(self):
 | |
|         accounts = await self.get_all()
 | |
| 
 | |
|         items: list[AccountInfo] = []
 | |
|         for x in accounts:
 | |
|             item: AccountInfo = {
 | |
|                 "username": x.username,
 | |
|                 "logged_in": (x.headers or {}).get("authorization", "") != "",
 | |
|                 "active": x.active,
 | |
|                 "last_used": x.last_used,
 | |
|                 "total_req": sum(x.stats.values()),
 | |
|                 "error_msg": str(x.error_msg)[0:60],
 | |
|             }
 | |
|             items.append(item)
 | |
| 
 | |
|         old_time = datetime(1970, 1, 1).replace(tzinfo=timezone.utc)
 | |
|         items = sorted(items, key=lambda x: x["username"].lower())
 | |
|         items = sorted(
 | |
|             items,
 | |
|             key=lambda x: x["last_used"] or old_time if x["total_req"] > 0 else old_time,
 | |
|             reverse=True,
 | |
|         )
 | |
|         items = sorted(items, key=lambda x: x["active"], reverse=True)
 | |
|         # items = sorted(items, key=lambda x: x["total_req"], reverse=True)
 | |
|         return items
 | 
