Установка и пример использования Apache Ignite

3 minute read

img

Установка

Скачиваем актуальную версию apache ignite из mirror.linux-ia64.org/apache.
Подготовка сервера.

mkdir $HOME/ignite $HOME/ignite-data $HOME/logs; \
touch $HOME/ignite-config.xml $HOME/run.sh; \
chmod +x $HOME/run.sh; \
wget https://mirror.linux-ia64.org/apache//ignite/2.15.0/apache-ignite-2.15.0-bin.zip; \
unzip apache-ignite-2.15.0-bin.zip; \
mv apache-ignite-2.15.0-bin $HOME/ignite

Пример конфигурации ignite-config.xml (создаем в $HOME/ignite-config.xml)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
xmlns:util="http://www.springframework.org/schema/util" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xsi:schemaLocation="http://www.springframework.org/schema/beans         
http://www.springframework.org/schema/beans/spring-beans.xsd        
http://www.springframework.org/schema/util         
http://www.springframework.org/schema/util/spring-util.xsd">
    <bean class="org.apache.ignite.configuration.IgniteConfiguration" id="ignite.cfg">

            <property name="workDirectory" value="/home/www/ignite-data/"/>

        <property name="dataStorageConfiguration">
            <bean class="org.apache.ignite.configuration.DataStorageConfiguration">

                <property name="walSegmentSize" value="#{1024 * 1024 * 1024}"/>
                <!-- SSD page size  -->
                <property name="pageSize" value="#{4 * 1024}"/>

                <property name="defaultDataRegionConfiguration">
                    <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
                        <property name="name" value="default_data_region"/>
                        <property name="initialSize" value="#{1L * 1024 * 1024 * 1024}"/>
                        <property name="maxSize" value="#{100L * 1024 * 1024 * 1024}"/>
                    </bean>
                </property>

                <property name="dataRegionConfigurations">
                    <list>
                        <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
                            <property name="name" value="persist_data_region"/>
                            <property name="initialSize" value="#{1L * 1024 * 1024 * 1024}"/>
                            <property name="maxSize" value="#{100L * 1024 * 1024 * 1024}"/>
                            <property name="pageEvictionMode" value="RANDOM_2_LRU"/>
                            <!-- persist options -->
                            <property name="persistenceEnabled" value="true"/>
                            <property name="warmUpConfiguration">
                                <bean class="org.apache.ignite.configuration.LoadAllWarmUpConfiguration"/>
                            </property>
                        </bean>
                    </list>
                </property>

            </bean>
        </property>
    </bean>
    <bean id="clientConnectorConfiguration" class="org.apache.ignite.configuration.ClientConnectorConfiguration">
        <property name="maxOpenCursorsPerConnection" value="128"/>
    </bean>

</beans>

Пример запуска run.sh (нужен для systemd конфигурации)
JVM_OPTS указаны с учетом того, что запуск производится из под пользователя www.

#!/bin/bash
IGNITE_HOME=/home/www/ignite

export IGNITE_HOME
# MaxDirectMemorySize = walSegmentSize * 4

export JVM_OPTS="$JVM_OPTS \
        -Duser.timezone=Europe/Moscow \
        -Xms1g \
        -Xmx4g \
        -server \
        -XX:MaxMetaspaceSize=128m \
        -XX:MaxDirectMemorySize=1g \
        -XX:+AlwaysPreTouch \
        -XX:+UseG1GC \
        -XX:+ScavengeBeforeFullGC \
        -XX:+DisableExplicitGC \
        -XX:+HeapDumpOnOutOfMemoryError \
        -XX:HeapDumpPath=/home/www/logs/heapdump.txt \
        -XX:+ExitOnOutOfMemoryError \
        -XX:+PrintGCDetails \
        -XX:+PrintGCTimeStamps \
        -XX:+PrintGCDateStamps \
        -XX:+UseGCLogFileRotation \
        -XX:NumberOfGCLogFiles=10 \
        -XX:GCLogFileSize=100M \
        -Xloggc:/home/www/logs/oggc.txt \
        -XX:+PrintAdaptiveSizePolicy \
        -XX:+UnlockCommercialFeatures \
        -XX:+FlightRecorder \
        -XX:+UnlockDiagnosticVMOptions \
        -XX:+DebugNonSafepoints"

export DEFAULT_CONFIG="/home/www/ignite-config.xml"
$IGNITE_HOME/bin/ignite.sh

Конфигурация systemd сервиса

vim /etc/systemd/system/ignite.service
[Unit]
Description=Apache Ignite Service
After=network.target

[Service]
WorkingDirectory=/home/www/ignite
User=www
PrivateDevices=yes
ProtectSystem=full
Type=simple
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed
KillSignal=SIGTERM
TimeoutStopSec=10
ExecStart=/home/www/run.sh
SyslogIdentifier=Ignite
Restart=on-failure
RestartSec=5

[Install]
WantedBy=multi-user.target
Alias=ignite.service

Рекомендации из оф. документации.

sudo sysctl -w vm.swappiness=0
sudo sysctl -w vm.extra_free_kbytes=1240000
sudo sysctl -w vm.zone_reclaim_mode=0
sudo sysctl -w vm.dirty_writeback_centisecs=500
sudo sysctl -w vm.dirty_expire_centisecs=500

Запуск сервиса.

sudo systemctl enable ignite.service
sudo systemctl start ignite
sudo systemctl status ignite

# check logs
sudo journalctl -u ignite -f

В рамках тестирования сформируем ETL-процесс переливки данных из PostgreSQL в Ignite

Создание тестовой таблицы Ignite

CREATE TABLE public.test(
	trans_id varchar(255) not NULL PRIMARY key,
	amount int not null,
	decimal_amount decimal(19, 3) not null,
	client varchar(255) not null,
	operation_type varchar(10) not null,
	ex_text_field varchar not null,
	transaction_ts timestamp not null,
	transaction_dt date not null
);

Создание тестовой таблицы PostgreSQL

create table wg.gendata(
	trans_id varchar(255) not null,
	amount int not null,
	decimal_amount decimal(19, 3) not null,
	client varchar(255) not null,
	operation_type varchar(10) not null,
	ex_text_field text not null,
	transaction_ts timestamp not null,
	transaction_dt date not null
);

Сформируем тестовые даннные с помощью python скрипта.

def gen_test_data():
    _test_data = []
    for index in range(1, 1_000_000+1):
        data = {
            "trans_id": f"C{uuid4().hex.upper()}",
            "amount": randint(100, 200),
            "decimal_amount": uniform(0.00001, 25.123),
            "client": f"FFM{randint(115, 720)}",
            "operation_type": "C2C",
            "ex_text_field": "pyignite.exceptions",
            "transaction_ts": datetime.now(),
            "transaction_dt": datetime.now().strftime("%Y-%m-%d"),
        }
        _test_data.append(list(data.values()))
    return _test_data

Создадим функционал для быстрой вставки и запроса данных.

import psycopg2.extras

from time import time
from uuid import uuid4
from datetime import datetime
from random import randint, uniform


class PostgresConnect:

    def __init__(self):
        self.conn = psycopg2.connect(
            dbname="database_name",
            user="username",
            password="password",
            host="111.222.333.444",
            port=5432
        )
        self.cusror = self.conn.cursor()
        self.conn.autocommit = True

    def fast_insert(self, table_name: str, data):
        start = time()
        columns = ",".join([
            "trans_id", "amount", "decimal_amount", "client", "operation_type", "ex_text_field", "transaction_ts", "transaction_dt"
            ])
        cmd_template = f"insert into {table_name}({columns}) values %s"

        psycopg2.extras.execute_values(
            self.cusror,
            cmd_template,
            data,
            page_size=1000
        )

        finish = time() - start
        print(f"Вставка тестовых данных в кол-ве {len(data)} завершена за {round(finish/60, 3)} минуты")

    def fetch(self, limit: int):
        self.cusror.execute(f"select * from wg.gendata limit {limit}")
        data = self.cusror.fetchall()
        return data

Запуск генерации данных

pg = PostgresConnect()

test_data = gen_test_data()
pg.fast_insert("wg.gendata", test_data)

Запуск speedtest’a

import pyignite
from pyignite.exceptions import SQLError


session = pyignite.Client(handshake_timeout=20.0)
session.connect("111.222.333.444", 10800)

LIMIT = 1_000_000

def insert_(data: tuple, table_name: str):
    columns = ",".join([
        "trans_id", "amount", "decimal_amount", "client", "operation_type", "ex_text_field", "transaction_ts", "transaction_dt"
        ])
    cmd = f"insert into {table_name}({columns}) values (?, ?, ?, ?, ?, ?, ?, ?)"
    try:
        session.sql(cmd, query_args=data)
    except SQLError:
        pass

def speedtest(data: list):
    session.sql("delete from public.test;")

    start = time()
    for row in data:
        insert_(row, "public.test")

    finish = time() - start
    print(f"Прошло времени, ss: {round(finish, 3)} // Кол-во {len(data)} // RPS: {round(len(data) / round(finish))}")

result_set = pg.fetch(LIMIT)
speedtest(result_set)

Заключение

Замер скорости с использованием pyignite показал весьма скромный результат RPS = 90
В то время как аналогичный сервис написанный на JAVA, с использованием JDBC (ignite-core.jar) и SET STREAMING ON показал цифры RPS от 10000 до 15000

Большое спасибо всем за внимание!
Подписывайтесь на мой телеграм-канал Telegram_logo artydev & Co.