зеркало из
https://github.com/viginum-datalab/twscrape.git
synced 2025-10-30 21:46:13 +02:00
add account last_used & stats fields
Этот коммит содержится в:
родитель
3333692a32
Коммит
09bb27485b
@ -1,60 +1,49 @@
|
||||
import json
|
||||
import sqlite3
|
||||
from dataclasses import asdict, dataclass
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from datetime import datetime
|
||||
|
||||
from httpx import AsyncClient, AsyncHTTPTransport
|
||||
|
||||
from .constants import TOKEN
|
||||
from .models import JSONTrait
|
||||
from .utils import from_utciso
|
||||
|
||||
|
||||
@dataclass
|
||||
class Account:
|
||||
class Account(JSONTrait):
|
||||
username: str
|
||||
password: str
|
||||
email: str
|
||||
email_password: str
|
||||
user_agent: str
|
||||
active: bool
|
||||
locks: dict[str, datetime]
|
||||
headers: dict[str, str] | None = None
|
||||
cookies: dict[str, str] | None = None
|
||||
locks: dict[str, datetime] = field(default_factory=dict) # queue: datetime
|
||||
stats: dict[str, int] = field(default_factory=dict) # queue: requests
|
||||
headers: dict[str, str] = field(default_factory=dict)
|
||||
cookies: dict[str, str] = field(default_factory=dict)
|
||||
proxy: str | None = None
|
||||
error_msg: str | None = None
|
||||
|
||||
@staticmethod
|
||||
def create_sql():
|
||||
return """
|
||||
CREATE TABLE IF NOT EXISTS accounts (
|
||||
username TEXT PRIMARY KEY NOT NULL COLLATE NOCASE,
|
||||
password TEXT NOT NULL,
|
||||
email TEXT NOT NULL COLLATE NOCASE,
|
||||
email_password TEXT NOT NULL,
|
||||
user_agent TEXT NOT NULL,
|
||||
active BOOLEAN DEFAULT FALSE NOT NULL,
|
||||
locks TEXT DEFAULT '{}' NOT NULL,
|
||||
headers TEXT DEFAULT '{}' NOT NULL,
|
||||
cookies TEXT DEFAULT '{}' NOT NULL,
|
||||
proxy TEXT DEFAULT NULL,
|
||||
error_msg TEXT DEFAULT NULL
|
||||
);
|
||||
"""
|
||||
last_used: datetime | None = None
|
||||
|
||||
@staticmethod
|
||||
def from_rs(rs: sqlite3.Row):
|
||||
doc = dict(rs)
|
||||
doc["locks"] = {k: from_utciso(v) for k, v in json.loads(doc["locks"]).items()}
|
||||
doc["stats"] = {k: v for k, v in json.loads(doc["stats"]).items() if isinstance(v, int)}
|
||||
doc["headers"] = json.loads(doc["headers"])
|
||||
doc["cookies"] = json.loads(doc["cookies"])
|
||||
doc["active"] = bool(doc["active"])
|
||||
doc["last_used"] = from_utciso(doc["last_used"]) if doc["last_used"] else None
|
||||
return Account(**doc)
|
||||
|
||||
def to_rs(self):
|
||||
rs = asdict(self)
|
||||
rs["locks"] = json.dumps(rs["locks"], default=lambda x: x.isoformat())
|
||||
rs["stats"] = json.dumps(rs["stats"])
|
||||
rs["headers"] = json.dumps(rs["headers"])
|
||||
rs["cookies"] = json.dumps(rs["cookies"])
|
||||
rs["last_used"] = rs["last_used"].isoformat() if rs["last_used"] else None
|
||||
return rs
|
||||
|
||||
def make_client(self) -> AsyncClient:
|
||||
|
||||
@ -4,15 +4,15 @@ import asyncio
|
||||
from fake_useragent import UserAgent
|
||||
|
||||
from .account import Account
|
||||
from .db import add_init_query, execute, fetchall, fetchone
|
||||
from .db import execute, fetchall, fetchone
|
||||
from .logger import logger
|
||||
from .login import login
|
||||
from .utils import utc_ts
|
||||
|
||||
|
||||
class AccountsPool:
|
||||
def __init__(self, db_file="accounts.db"):
|
||||
self._db_file = db_file
|
||||
add_init_query(db_file, Account.create_sql())
|
||||
|
||||
async def add_account(
|
||||
self,
|
||||
@ -36,6 +36,7 @@ class AccountsPool:
|
||||
user_agent=user_agent or UserAgent().safari,
|
||||
active=False,
|
||||
locks={},
|
||||
stats={},
|
||||
headers={},
|
||||
cookies={},
|
||||
proxy=proxy,
|
||||
@ -94,9 +95,12 @@ class AccountsPool:
|
||||
"""
|
||||
await execute(self._db_file, qs, {"username": username})
|
||||
|
||||
async def unlock(self, username: str, queue: str):
|
||||
async def unlock(self, username: str, queue: str, req_count=0):
|
||||
qs = f"""
|
||||
UPDATE accounts SET locks = json_remove(locks, '$.{queue}')
|
||||
UPDATE accounts SET
|
||||
locks = json_remove(locks, '$.{queue}'),
|
||||
stats = json_set(stats, '$.{queue}', COALESCE(json_extract(stats, '$.{queue}'), 0) + {req_count}),
|
||||
last_used = datetime({utc_ts()}, 'unixepoch')
|
||||
WHERE username = :username
|
||||
"""
|
||||
await execute(self._db_file, qs, {"username": username})
|
||||
|
||||
@ -4,6 +4,8 @@ from collections import defaultdict
|
||||
|
||||
import aiosqlite
|
||||
|
||||
from .logger import logger
|
||||
|
||||
MIN_SQLITE_VERSION = "3.34"
|
||||
|
||||
|
||||
@ -36,6 +38,51 @@ async def check_version(db: aiosqlite.Connection):
|
||||
raise SystemError(msg)
|
||||
|
||||
|
||||
async def migrate(db: aiosqlite.Connection):
|
||||
async with db.execute("PRAGMA user_version") as cur:
|
||||
rs = await cur.fetchone()
|
||||
uv = rs[0] if rs else 0
|
||||
|
||||
async def v1():
|
||||
qs = """
|
||||
CREATE TABLE IF NOT EXISTS accounts (
|
||||
username TEXT PRIMARY KEY NOT NULL COLLATE NOCASE,
|
||||
password TEXT NOT NULL,
|
||||
email TEXT NOT NULL COLLATE NOCASE,
|
||||
email_password TEXT NOT NULL,
|
||||
user_agent TEXT NOT NULL,
|
||||
active BOOLEAN DEFAULT FALSE NOT NULL,
|
||||
locks TEXT DEFAULT '{}' NOT NULL,
|
||||
headers TEXT DEFAULT '{}' NOT NULL,
|
||||
cookies TEXT DEFAULT '{}' NOT NULL,
|
||||
proxy TEXT DEFAULT NULL,
|
||||
error_msg TEXT DEFAULT NULL
|
||||
);"""
|
||||
await db.execute(qs)
|
||||
|
||||
async def v2():
|
||||
await db.execute("ALTER TABLE accounts ADD COLUMN stats TEXT DEFAULT '{}' NOT NULL")
|
||||
await db.execute("ALTER TABLE accounts ADD COLUMN last_used TEXT DEFAULT NULL")
|
||||
|
||||
migrations = {
|
||||
1: v1,
|
||||
2: v2,
|
||||
}
|
||||
|
||||
logger.debug(f"Current migration v{uv} (latest v{len(migrations)})")
|
||||
|
||||
for i in range(uv + 1, len(migrations) + 1):
|
||||
logger.debug(f"Running migration to v{i}")
|
||||
try:
|
||||
await migrations[i]()
|
||||
except sqlite3.OperationalError as e:
|
||||
if "duplicate column name" not in str(e):
|
||||
raise e
|
||||
|
||||
await db.execute(f"PRAGMA user_version = {i}")
|
||||
await db.commit()
|
||||
|
||||
|
||||
class DB:
|
||||
_init_queries: defaultdict[str, list[str]] = defaultdict(list)
|
||||
_init_once: defaultdict[str, bool] = defaultdict(bool)
|
||||
@ -50,10 +97,7 @@ class DB:
|
||||
await check_version(db)
|
||||
|
||||
if not self._init_once[self.db_path]:
|
||||
for qs in self._init_queries[self.db_path]:
|
||||
await db.execute(qs)
|
||||
|
||||
await db.commit() # required here because of _init_once
|
||||
await migrate(db)
|
||||
self._init_once[self.db_path] = True
|
||||
|
||||
self.conn = db
|
||||
@ -65,10 +109,6 @@ class DB:
|
||||
await self.conn.close()
|
||||
|
||||
|
||||
def add_init_query(db_path: str, qs: str):
|
||||
DB._init_queries[db_path].append(qs)
|
||||
|
||||
|
||||
@lock_retry()
|
||||
async def execute(db_path: str, qs: str, params: dict | None = None):
|
||||
async with DB(db_path) as db:
|
||||
|
||||
@ -22,6 +22,7 @@ class Ctx:
|
||||
def __init__(self, acc: Account, clt: httpx.AsyncClient):
|
||||
self.acc = acc
|
||||
self.clt = clt
|
||||
self.req_count = 0
|
||||
|
||||
|
||||
class QueueClient:
|
||||
@ -43,7 +44,7 @@ class QueueClient:
|
||||
async def _close_ctx(self):
|
||||
if self.ctx is not None:
|
||||
await self.ctx.clt.aclose()
|
||||
await self.pool.unlock(self.ctx.acc.username, self.queue)
|
||||
await self.pool.unlock(self.ctx.acc.username, self.queue, self.ctx.req_count)
|
||||
|
||||
async def _get_ctx(self, fresh=False) -> Ctx:
|
||||
if self.ctx and not fresh:
|
||||
@ -96,6 +97,7 @@ class QueueClient:
|
||||
setattr(rep, "__username", ctx.acc.username)
|
||||
self._push_history(rep)
|
||||
rep.raise_for_status()
|
||||
ctx.req_count += 1 # count only successful
|
||||
return rep
|
||||
except httpx.HTTPStatusError as e:
|
||||
rep = e.response
|
||||
|
||||
Загрузка…
x
Ссылка в новой задаче
Block a user