add example of parallel processing #47

Этот коммит содержится в:
Vlad Pronsky 2023-07-30 16:47:45 +03:00
родитель 1230e32186
Коммит bce3bac8be
2 изменённых файлов: 81 добавлений и 0 удалений

36
examples/parallel_search.py Обычный файл
Просмотреть файл

@ -0,0 +1,36 @@
"""
This example shows how to use twscrape to complete some queries in parallel.
To limit the number of concurrent requests, see examples/parallel_search_with_limit.py
"""
import asyncio
import twscrape
async def worker(api: twscrape.API, q: str):
tweets = []
try:
async for doc in api.search(q):
tweets.append(doc)
except Exception as e:
print(e)
finally:
return tweets
async def main():
api = twscrape.API()
# add accounts here or before from cli (see README.md for examples)
await api.pool.add_account("u1", "p1", "eu1", "ep1")
await api.pool.login_all()
queries = ["elon musk", "tesla", "spacex", "neuralink", "boring company"]
results = await asyncio.gather(*(worker(api, q) for q in queries))
combined = dict(zip(queries, results))
for k, v in combined.items():
print(k, len(v))
if __name__ == "__main__":
asyncio.run(main())

45
examples/parallel_search_with_limit.py Обычный файл
Просмотреть файл

@ -0,0 +1,45 @@
"""
This example shows how to use twscrape in parallel with concurrency limit.
"""
import asyncio
import time
import twscrape
async def worker(queue: asyncio.Queue, api: twscrape.API):
while True:
query = await queue.get()
try:
tweets = await twscrape.gather(api.search(query))
print(f"{query} - {len(tweets)} - {int(time.time())}")
# do something with tweets here, eg same to file, etc
except Exception as e:
print(f"Error on {query} - {type(e)}")
finally:
queue.task_done()
async def main():
api = twscrape.API()
# add accounts here or before from cli (see README.md for examples)
# await api.pool.add_account("u1", "p1", "eu1", "ep1")
# await api.pool.login_all()
queries = ["elon musk", "tesla", "spacex", "neuralink", "boring company"]
queue = asyncio.Queue()
workers_count = 2 # limit concurrency here 2 concurrent requests at time
workers = [asyncio.create_task(worker(queue, api)) for _ in range(workers_count)]
for q in queries:
queue.put_nowait(q)
await queue.join()
for worker_task in workers:
worker_task.cancel()
if __name__ == "__main__":
asyncio.run(main())