Асинхронное программирование. Почему не парсит все ссылки? Здравствуйте! Задача: Спарсить 78 файлов, в каждом из которых по 50000 ссылок. Проблема: Переменная total выводит 700-1500, от сюда следует, что выполняются запросы не по всем ссылкам , хотя прокси раздается для каждой. И как заморозить программу при ошибке 429 и 503? Использовал time.sleep() и asyncio.sleep(). Но не то, не другое не сработало.fields = { 'B1': 'Полное наименование Заявителя', 'C1': 'Адрес', 'D1': 'ОГРН', 'E1': 'Должность руководителя', 'F1': 'ФИО', 'G1': 'Телефон', 'H1': 'Эл. Почта', 'I1': 'Рег. номер декларации', 'J1': 'Дата начала действия', 'K1': 'Дата окончания', 'L1': 'Схема', 'M1': 'Статус', 'N1': 'Сведения о документах, на основании которых изготовлена продукция', 'O1': 'ТНВЭД' } def get_data(page): pass async def get(url, session, proxy): global total try: async with session.get(url, proxy='http://{}'.format(proxy), timeout=40) as resp: total += 1 if resp.status == 200: return await resp.text() else: write_to_file('bad.txt', url) '''if resp.status == 429: print('[429] waiting for 30 seconds') await asyncio.sleep(time_error429) if resp.status == 503: print('[503] waiting for 2 minutes') await asyncio.sleep(time_error503)''' except Exception as e: #print('bad proxy', e) pass total = 0 async def save_data_from(url, filename, session, proxy): try: async with asyncio.Semaphore(semaphore): try: page = await get(url, session, proxy) data = get_data(page) if data: text = '' for f in sorted(fields.values()): text += f + ':' text += data[f] + ' ' print(text) write_to_excel(data) except Exception as e: #print('save_data_from', e) pass except: pass async def run(): names = get_names_of('pages/') it = iter(names) cur = 'pages/{}'.format(next(it)) tasks = [] urls = open(cur, encoding='utf-8').readlines() async with ClientSession(headers=headers) as session: for i, url in enumerate(urls): if i % 10: proxy = proxy_local.get_proxy() task = asyncio.ensure_future(save_data_from(url, cur, session, proxy)) tasks.append(task) end_of_file = url #remove_if_done(cur, end_of_file) return await asyncio.gather(*tasks) if __name__ == '__main__': proxy_local = Proxy() user_agent = UserAgent() save = Save() semaphore, time_error429, time_error503 = read_conf() loop = asyncio.get_event_loop() loop.run_until_complete(run()) loop.close() print('total',total)
Проблема может быть связана с тем, что у вас используется совместное использование переменной total в нескольких асинхронных функциях, что может привести к непредсказуемым результатам. Чтобы избежать этого, рекомендуется использовать Asyncio Lock для защиты критических областей кода, включая изменение общих переменных.
Пример:
async def get(url, session, proxy): global total try: async with session.get(url, proxy='http://{}'.format(proxy), timeout=40) as resp: async with total_lock: total += 1 if resp.status == 200: return await resp.text() else: write_to_file('bad.txt', url) except Exception as e: pass total = 0 total_lock = asyncio.Lock() async def save_data_from(url, filename, session, proxy): try: async with asyncio.Semaphore(semaphore): try: page = await get(url, session, proxy) data = get_data(page) if data: text = '' for f in sorted(fields.values()): text += f + ':' text += data[f] + ' ' print(text) write_to_excel(data) except Exception as e: pass except: pass async def run(): names = get_names_of('pages/') it = iter(names) cur = 'pages/{}'.format(next(it)) tasks = [] urls = open(cur, encoding='utf-8').readlines() async with ClientSession(headers=headers) as session: for i, url in enumerate(urls): if i % 10: proxy = proxy_local.get_proxy() task = asyncio.ensure_future(save_data_from(url, cur, session, proxy)) tasks.append(task) end_of_file = url return await asyncio.gather(*tasks) if __name__ == '__main__': proxy_local = Proxy() user_agent = UserAgent() save = Save() semaphore, time_error429, time_error503 = read_conf() loop = asyncio.get_event_loop() loop.run_until_complete(run()) loop.close() print('total', total)
Относительно замораживания программы при ошибках 429 и 503, можно попробовать использовать Retry-After заголовок ответа для 429 ошибки и задержку повторной попытки соединения, а для 503 ошибки попробовать использовать повторный запрос с увеличенной задержкой.
if resp.status == 429: if 'Retry-After' in resp.headers: delay = int(resp.headers['Retry-After']) else: delay = time_error429 print('[429] waiting for {} seconds'.format(delay)) await asyncio.sleep(delay)if resp.status == 503: print('[503] waiting for {} seconds'.format(time_error503)) await asyncio.sleep(time_error503)
Эти стратегии позволят вашей программе корректно обрабатывать ошибки 429 и 503 и продолжать парсинг после задержки.
Проблема может быть связана с тем, что у вас используется совместное использование переменной total в нескольких асинхронных функциях, что может привести к непредсказуемым результатам. Чтобы избежать этого, рекомендуется использовать Asyncio Lock для защиты критических областей кода, включая изменение общих переменных.
Пример:
async def get(url, session, proxy):global total
try:
async with session.get(url, proxy='http://{}'.format(proxy), timeout=40) as resp:
async with total_lock:
total += 1
if resp.status == 200:
return await resp.text()
else:
write_to_file('bad.txt', url)
except Exception as e:
pass
total = 0
total_lock = asyncio.Lock()
async def save_data_from(url, filename, session, proxy):
try:
async with asyncio.Semaphore(semaphore):
try:
page = await get(url, session, proxy)
data = get_data(page)
if data:
text = ''
for f in sorted(fields.values()):
text += f + ':'
text += data[f] + ' '
print(text)
write_to_excel(data)
except Exception as e:
pass
except:
pass
async def run():
names = get_names_of('pages/')
it = iter(names)
cur = 'pages/{}'.format(next(it))
tasks = []
urls = open(cur, encoding='utf-8').readlines()
async with ClientSession(headers=headers) as session:
for i, url in enumerate(urls):
if i % 10:
proxy = proxy_local.get_proxy()
task = asyncio.ensure_future(save_data_from(url, cur, session, proxy))
tasks.append(task)
end_of_file = url
return await asyncio.gather(*tasks)
if __name__ == '__main__':
proxy_local = Proxy()
user_agent = UserAgent()
save = Save()
semaphore, time_error429, time_error503 = read_conf()
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()
print('total', total)
Относительно замораживания программы при ошибках 429 и 503, можно попробовать использовать Retry-After заголовок ответа для 429 ошибки и задержку повторной попытки соединения, а для 503 ошибки попробовать использовать повторный запрос с увеличенной задержкой.
if resp.status == 429:if 'Retry-After' in resp.headers:
delay = int(resp.headers['Retry-After'])
else:
delay = time_error429
print('[429] waiting for {} seconds'.format(delay))
await asyncio.sleep(delay)if resp.status == 503:
print('[503] waiting for {} seconds'.format(time_error503))
await asyncio.sleep(time_error503)
Эти стратегии позволят вашей программе корректно обрабатывать ошибки 429 и 503 и продолжать парсинг после задержки.