Асинхронное программирование. Почему не парсит все ссылки? Здравствуйте!
Задача: Спарсить 78 файлов, в каждом из которых по 50000 ссылок.
Проблема: Переменная total выводит 700-1500, от сюда следует, что выполняются запросы не по всем ссылкам , хотя прокси раздается для каждой. И как заморозить программу при ошибке 429 и 503? Использовал time.sleep() и asyncio.sleep(). Но не то, не другое не сработало.fields = {
'B1': 'Полное наименование Заявителя',
'C1': 'Адрес',
'D1': 'ОГРН',
'E1': 'Должность руководителя',
'F1': 'ФИО',
'G1': 'Телефон',
'H1': 'Эл. Почта',
'I1': 'Рег. номер декларации',
'J1': 'Дата начала действия',
'K1': 'Дата окончания',
'L1': 'Схема',
'M1': 'Статус',
'N1': 'Сведения о документах, на основании которых изготовлена продукция',
'O1': 'ТНВЭД'
}
def get_data(page):
pass
async def get(url, session, proxy):
global total
try:
async with session.get(url, proxy='http://{}'.format(proxy), timeout=40) as resp:
total += 1
if resp.status == 200:
return await resp.text()
else:
write_to_file('bad.txt', url)
'''if resp.status == 429:
print('[429] waiting for 30 seconds')
await asyncio.sleep(time_error429)
if resp.status == 503:
print('[503] waiting for 2 minutes')
await asyncio.sleep(time_error503)'''
except Exception as e:
#print('bad proxy', e)
pass
total = 0
async def save_data_from(url, filename, session, proxy):
try:
async with asyncio.Semaphore(semaphore):
try:
page = await get(url, session, proxy)
data = get_data(page)
if data:
text = ''
for f in sorted(fields.values()):
text += f + ':'
text += data[f] + ' '
print(text)
write_to_excel(data)
except Exception as e:
#print('save_data_from', e)
pass
except:
pass
async def run():
names = get_names_of('pages/')
it = iter(names)
cur = 'pages/{}'.format(next(it))
tasks = []
urls = open(cur, encoding='utf-8').readlines()
async with ClientSession(headers=headers) as session:
for i, url in enumerate(urls):
if i % 10:
proxy = proxy_local.get_proxy()
task = asyncio.ensure_future(save_data_from(url, cur, session, proxy))
tasks.append(task)
end_of_file = url
#remove_if_done(cur, end_of_file)
return await asyncio.gather(*tasks)
if __name__ == '__main__':
proxy_local = Proxy()
user_agent = UserAgent()
save = Save()
semaphore, time_error429, time_error503 = read_conf()
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()
print('total',total)

21 Авг 2019 в 06:23
258 +1
0
Ответы
1

Проблема может быть связана с тем, что у вас используется совместное использование переменной total в нескольких асинхронных функциях, что может привести к непредсказуемым результатам. Чтобы избежать этого, рекомендуется использовать Asyncio Lock для защиты критических областей кода, включая изменение общих переменных.

Пример:

async def get(url, session, proxy):
global total
try:
async with session.get(url, proxy='http://{}'.format(proxy), timeout=40) as resp:
async with total_lock:
total += 1
if resp.status == 200:
return await resp.text()
else:
write_to_file('bad.txt', url)
except Exception as e:
pass
total = 0
total_lock = asyncio.Lock()
async def save_data_from(url, filename, session, proxy):
try:
async with asyncio.Semaphore(semaphore):
try:
page = await get(url, session, proxy)
data = get_data(page)
if data:
text = ''
for f in sorted(fields.values()):
text += f + ':'
text += data[f] + ' '
print(text)
write_to_excel(data)
except Exception as e:
pass
except:
pass
async def run():
names = get_names_of('pages/')
it = iter(names)
cur = 'pages/{}'.format(next(it))
tasks = []
urls = open(cur, encoding='utf-8').readlines()
async with ClientSession(headers=headers) as session:
for i, url in enumerate(urls):
if i % 10:
proxy = proxy_local.get_proxy()
task = asyncio.ensure_future(save_data_from(url, cur, session, proxy))
tasks.append(task)
end_of_file = url
return await asyncio.gather(*tasks)
if __name__ == '__main__':
proxy_local = Proxy()
user_agent = UserAgent()
save = Save()
semaphore, time_error429, time_error503 = read_conf()
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()
print('total', total)

Относительно замораживания программы при ошибках 429 и 503, можно попробовать использовать Retry-After заголовок ответа для 429 ошибки и задержку повторной попытки соединения, а для 503 ошибки попробовать использовать повторный запрос с увеличенной задержкой.

if resp.status == 429:
if 'Retry-After' in resp.headers:
delay = int(resp.headers['Retry-After'])
else:
delay = time_error429
print('[429] waiting for {} seconds'.format(delay))
await asyncio.sleep(delay)if resp.status == 503:
print('[503] waiting for {} seconds'.format(time_error503))
await asyncio.sleep(time_error503)

Эти стратегии позволят вашей программе корректно обрабатывать ошибки 429 и 503 и продолжать парсинг после задержки.

20 Апр в 13:19
Не можешь разобраться в этой теме?
Обратись за помощью к экспертам
Название заказа не должно быть пустым
Введите email
Бесплатные доработки
Гарантированные бесплатные доработки
Быстрое выполнение
Быстрое выполнение от 2 часов
Проверка работы
Проверка работы на плагиат
Интересные статьи из справочника
Поможем написать учебную работу
Название заказа не должно быть пустым
Введите email
Доверьте свою работу экспертам
Разместите заказ
Наша система отправит ваш заказ на оценку 95 172 авторам
Первые отклики появятся уже в течение 10 минут
Прямой эфир