Анализ отзывов с banki.ru [Часть 3] Аудит загрузок
Данный материал представлен в информационно-ознакомительных целях.
Список частей:
1
часть: Сбор данных ссылка2
часть: Визуализация ссылка3
часть: Мониторинг загрузок ссылка4
часть: Создание сайта и RESTAPI ссылка
Короткое вступление
Третья часть в серии статей о сборе данных с ресурса banki.ru - отзывов клиентов
Прошлую статью мы закончили на визуализации данных в Grafana
, что мы имеем на данный момент?
- Код, который парсит сайт
- Код, который сохраняет данных в бд
- Код, который визуализирует данные из бд
Не хватает блока, в котором будут мониторится процессы загрузки данных (Парсинга)
Решение проблемы
Предлагаю реализовать некий аудит загрузок, используя 2 таблицы:
audit_feed
- основнаяaudit_feed_type
- вспомогательная
create table if not exists home.audit_feed (
id integer primary key,
start_date timestamptz not null,
name varchar(255) not null,
finish_date timestamptz null,
status varchar(50) not null
);
create table if not exists home.audit_feed_type (
id serial primary key,
feed_id integer not null,
start_date timestamptz not null,
url varchar(510) not null,
finish_date timestamptz null,
status varchar(50) not null,
FOREIGN KEY (feed_id) REFERENCES home.audit_feed (id)
);
Также добавим последовальность SEQUENCE
, с помощью которой будет проставляться связь между таблицами по время инсерта новых данных
CREATE SEQUENCE IF NOT exists audit_sequence START 1;
Подготовка функций на python
Таблицы и последовательность есть, теперь необходимо обернуть логику их заполнения в python
код
D = Union[datetime, None]
Получаем id
последовательности
def get_sequence_id():
cursor.execute("SELECT nextval('audit_sequence')")
sequence_id = cursor.fetchone()[0]
return sequence_id
Добавление строки в основную таблицу
def create_flow_audit(
sequence_id: int,
start_date: datetime,
name: str,
finish_date: D,
status: str
):
cursor.execute(f'''
insert into {AUDIT_TABLE} (id, start_date, name, finish_date, status) values (%s, %s, %s, %s, %s)
''', (sequence_id, start_date, name, finish_date, status))
conn.commit()
Обновление строки в основной таблице
def update_flow_audit(
sequence_id: int,
finish_date: datetime,
status: str
):
cursor.execute(f'''
update {AUDIT_TABLE} set finish_date = '{finish_date}', status = '{status}' where id = {sequence_id}
''')
conn.commit()
Добавление строки во вспомогательную таблицу
def create_flow_audit_type(
feed_id: int,
start_date: datetime,
url: str,
finish_date: D,
status: str
):
cursor.execute(f'''
insert into {AUDIT_TYPE_TABLE} (feed_id, start_date, url, finish_date, status) values (%s, %s, %s, %s, %s)
''', (feed_id, start_date, url, finish_date, status))
conn.commit()
Обновление строки во вспомогательной таблице
def update_flow_audit_type(
feed_id: int,
finish_date: D,
status: str
):
cursor.execute(f'''
update {AUDIT_TYPE_TABLE} set finish_date = '{finish_date}', status = '{status}' where feed_id = {feed_id}
and finish_date is null
''')
conn.commit()
Обёртка sql-логики
готова, осталось внести ее в основной код
Выглядеть это будет следующим образом:
def main():
... # come code
sequence_id = get_sequence_id()
create_flow_audit(sequence_id, get_datetime(), FLOW_NAME.format(args.name), None, RUNNING)
status = SUCCESS
for page in range(1, PAGES):
url = URL.format(args.name, page)
try:
create_flow_audit_type(sequence_id, get_datetime(), url, None, RUNNING)
... # come code
update_flow_audit_type(sequence_id, get_datetime(), SUCCESS)
except (... # come code) as e:
update_flow_audit_type(sequence_id, get_datetime(), FAILED)
... # come code
status = FAILED
update_flow_audit(sequence_id, get_datetime(), status)
... # come code
Проверка данных
select * from home.audit_feed order by id desc limit 10;
select * from home.audit_feed_type where feed_id = 846 order by id desc limit 10;
Вариант развернутого просмотра
select
f.id
, f.start_date
, f.name
, f.finish_date
, f.status
, ft.start_date as process_st_date
, ft.finish_date as process_end_date
, ft.status as process_status
, ft.url
from
home.audit_feed f
left join home.audit_feed_type ft
on f.id = ft.feed_id
where f.id = 846
order by f.id desc limit 10;
Визуализация мониторинга процессов
Сформируем sql
отчет по статусу запусков
select
AF.name
, AF.start_date
, AF.finish_date
, count(*) filter(where FT.status = 'RUNNING') as running_cnt
, count(*) filter(where FT.status = 'SUCCESS') as success_cnt
, count(*) filter(where FT.status = 'FAILED') as failed_cnt
, round((extract(epoch from (AF.finish_date - AF.start_date))/60)::numeric, 1) as run_time
, AF.status
from
home.audit_feed AF
inner join home.audit_feed_type FT
on AF.id = FT.feed_id
where
1=1
and AF.id in (
select max(id) from home.audit_feed group by name
)
group by
AF.id
, AF.start_date
, AF.name
, AF.finish_date
, AF.status;
Результат в Grafana
На этом шаге мониторинг загрузок можно считать завершенным (учитывая, что в коде присутствует достаточно логгирования), при желании можно добавить некоторые оповещения (алерты) в самой графане.