Анализ отзывов с banki.ru [Часть 3] Аудит загрузок

3 minute read

img

Данный материал представлен в информационно-ознакомительных целях.

Список частей:

Короткое вступление

Третья часть в серии статей о сборе данных с ресурса banki.ru - отзывов клиентов

Прошлую статью мы закончили на визуализации данных в Grafana, что мы имеем на данный момент?

  1. Код, который парсит сайт
  2. Код, который сохраняет данных в бд
  3. Код, который визуализирует данные из бд

Не хватает блока, в котором будут мониторится процессы загрузки данных (Парсинга)

Решение проблемы

Предлагаю реализовать некий аудит загрузок, используя 2 таблицы:

  • audit_feed - основная
  • audit_feed_type - вспомогательная

img

create table if not exists home.audit_feed (
	id integer primary key,
	start_date timestamptz not null,
	name varchar(255) not null,
	finish_date timestamptz null,
	status varchar(50) not null
);
create table if not exists home.audit_feed_type (
	id serial primary key,
	feed_id integer not null,
	start_date timestamptz not null,
	url varchar(510) not null,
	finish_date timestamptz null,
	status varchar(50) not null,
	FOREIGN KEY (feed_id) REFERENCES home.audit_feed (id)
);

Также добавим последовальность SEQUENCE, с помощью которой будет проставляться связь между таблицами по время инсерта новых данных

CREATE SEQUENCE IF NOT exists audit_sequence START 1;

Подготовка функций на python

Таблицы и последовательность есть, теперь необходимо обернуть логику их заполнения в python код

D = Union[datetime, None]

Получаем id последовательности

def get_sequence_id():
    cursor.execute("SELECT nextval('audit_sequence')")
    sequence_id = cursor.fetchone()[0]
    return sequence_id

Добавление строки в основную таблицу

def create_flow_audit(
        sequence_id: int,
        start_date: datetime,
        name: str,
        finish_date: D,
        status: str
):
    cursor.execute(f'''
    insert into {AUDIT_TABLE} (id, start_date, name, finish_date, status) values (%s, %s, %s, %s, %s)
    ''', (sequence_id, start_date, name, finish_date, status))
    conn.commit()

Обновление строки в основной таблице

def update_flow_audit(
        sequence_id: int,
        finish_date: datetime,
        status: str
):
    cursor.execute(f'''
    update {AUDIT_TABLE} set finish_date = '{finish_date}', status = '{status}' where id = {sequence_id}
    ''')
    conn.commit()

Добавление строки во вспомогательную таблицу

def create_flow_audit_type(
        feed_id: int,
        start_date: datetime,
        url: str,
        finish_date: D,
        status: str
):
    cursor.execute(f'''
    insert into {AUDIT_TYPE_TABLE} (feed_id, start_date, url, finish_date, status) values (%s, %s, %s, %s, %s)
    ''', (feed_id, start_date, url, finish_date, status))
    conn.commit()

Обновление строки во вспомогательной таблице

def update_flow_audit_type(
        feed_id: int,
        finish_date: D,
        status: str
):
    cursor.execute(f'''
    update {AUDIT_TYPE_TABLE} set finish_date = '{finish_date}', status = '{status}' where feed_id = {feed_id}
    and finish_date is null
    ''')
    conn.commit()

Обёртка sql-логики готова, осталось внести ее в основной код

Выглядеть это будет следующим образом:

def main():
    ... # come code
    sequence_id = get_sequence_id()
    create_flow_audit(sequence_id, get_datetime(), FLOW_NAME.format(args.name), None, RUNNING)
    status = SUCCESS
    for page in range(1, PAGES):
        url = URL.format(args.name, page)
        try:
            create_flow_audit_type(sequence_id, get_datetime(), url, None, RUNNING)
            ... # come code
            update_flow_audit_type(sequence_id, get_datetime(), SUCCESS)
        except (... # come code) as e:
            update_flow_audit_type(sequence_id, get_datetime(), FAILED)
            ... # come code
            status = FAILED
    update_flow_audit(sequence_id, get_datetime(), status)
    ... # come code

Проверка данных

select * from home.audit_feed order by id desc limit 10;

img

select * from home.audit_feed_type where feed_id = 846 order by id desc limit 10;

img

Вариант развернутого просмотра

select 
	f.id
	, f.start_date 
	, f.name 
	, f.finish_date 
	, f.status 
	, ft.start_date as process_st_date
	, ft.finish_date as process_end_date
	, ft.status as process_status
	, ft.url
from 
	home.audit_feed f
	left join home.audit_feed_type ft
		on f.id = ft.feed_id 
where f.id = 846
order by f.id desc limit 10;

img

Визуализация мониторинга процессов

Сформируем sql отчет по статусу запусков

select 
	AF.name
	, AF.start_date
	, AF.finish_date
	, count(*) filter(where FT.status = 'RUNNING') as running_cnt
	, count(*) filter(where FT.status = 'SUCCESS') as success_cnt
	, count(*) filter(where FT.status = 'FAILED') as failed_cnt
	, round((extract(epoch from (AF.finish_date - AF.start_date))/60)::numeric, 1) as run_time
	, AF.status
from 
	home.audit_feed AF
	inner join home.audit_feed_type FT
		on AF.id = FT.feed_id 
where 
	1=1
	and AF.id in (
		select max(id) from home.audit_feed group by name
	)
group by 
	AF.id
	, AF.start_date
	, AF.name
	, AF.finish_date
	, AF.status;

img

Результат в Grafana

img

На этом шаге мониторинг загрузок можно считать завершенным (учитывая, что в коде присутствует достаточно логгирования), при желании можно добавить некоторые оповещения (алерты) в самой графане.