Потоковая передача больших данных из внешней базы данных на сервер Ignite занимает слишком много времени

Мое требование состоит в том, что я хочу загрузить 10 миллионов данных на сервер Ignite из внешней базы данных (SQL). Я использую функцию кэширования Ignite, которая сохраняет эти 10 миллионов записей на моем сервере Ignite. Я использовал пакетную обработку и разбиение на страницы для потоковой передачи данных на сервер с помощью DataStreamer от Ignite. Даже перед отправкой первого пакета данных я получаю сообщение «Возможно, слишком длинная пауза JVM», и даже если все 10 миллионов записей будут обработаны. Загрузка данных на сервер Ignite занимает около 40 минут. Что я делаю не так? Требуется ли какая-либо модификация, кроме увеличения кучи? И есть ли способ передать эти объемные данные из внешней БД в кеш?

Мой JVM_OPTS на стороне клиента и сервера:

-Xms4g
-Xmx4g
-XX:+AlwaysPreTouch
-XX:+UseG1GC
-XX:+ScavengeBeforeFullGC
-XX:ParallelGCThreads=8
-XX:ConcGCThreads=2
-XX:InitiatingHeapOccupancyPercent=55

Весь приведенный ниже код находится на моем клиенте.

Код на стороне клиента для потоковой передачи больших объемов данных:

     @Service
        public class Service {
        private static final Logger logger = LoggerFactory.getLogger(PosWavierService.class);
        private static final int BATCH_SIZE = 100_000; // Adjust batch size as needed
        private static final int NUM_THREADS = 6;
    
        private final ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
    
        @Autowired
        private IgniteCacheService igniteCacheService;
    
        @Autowired
        private Ignite ignite;
    
        @Autowired
        private Repo productRepo;
    
    
        public CompletableFuture<Void> processAllRecords(String cacheName) {
            long startTime = System.currentTimeMillis();
    
            // List to store CompletableFuture for each thread
            List<CompletableFuture<Void>> futures = new ArrayList<>();
    
            // Submit tasks for fetching and streaming data concurrently
            AtomicInteger pageNumber = new AtomicInteger(0);
            for (int i = 0; i < NUM_THREADS; i++) {
                CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
                    while (true) {
                        List<ProductLines> records = fetchDataFromRepo(pageNumber.getAndIncrement(), BATCH_SIZE);
                        if (records.isEmpty()) {
                            break;
                        }
                        igniteCacheService.streamBulkData(cacheName, records);
                        logger.info("Processed {} records for cache {}", records.size(), cacheName);
                    }
                    return null;
                });             futures.add(future);
            }
    
            CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
                    futures.toArray(new CompletableFuture[0]));
    
            combinedFuture.thenRun(() -> {
                long endTime = System.currentTimeMillis();
                long totalTime = endTime - startTime;
                logger.info("Total time taken for processing all records: {} milliseconds", totalTime);
            });
    
            return combinedFuture;
        }}

Репозиторий для получения данных из таблицы:

    @Query(value = "SELECT * FROM table WHERE key IS NOT NULL AND key != '' AND key != ' ' ", nativeQuery = true)
    Page<Object> findRecordsWithPanNotNull(Pageable pageable);

ФункцияstreamBulkData:

     public void streamBulkData(String cacheName, List<Object> records) {
        try (IgniteDataStreamer<String, Object> streamer = ignite.dataStreamer(cacheName)){
           //  FileWriter writer = new FileWriter(KEYS_FILE_PATH, true)) { // Append mode
    
            streamer.allowOverwrite(true);
            streamer.perNodeBufferSize(1024);
            streamer.perNodeParallelOperations(8);
            streamer.skipStore(true);
    
            for (Product record : records) {
                String key = record.getPan_no();
                if (key != null) {
                    streamer.addData(key, record);
                    //writer.write(key);
                } else {
                    System.err.println("Skipping record with null key: " + record);
                }
            }
            streamer.flush();
    
        } catch (CacheException e) {
            System.err.println("Error streaming data to cache: " + e.getMessage());
            e.printStackTrace();
        }
    }

Я попробовал CacheDataStreamerExample, поставляемый с Ignite, он загружает 10 миллионов элементов за 8 секунд. Вы проводили тестирование своего кода, где находится узкое место?

Pavel Tupitsyn 18.04.2024 13:16
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
1
61
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я считаю, что вы создаете экземпляр стримера для каждого вызоваstreamBulkData. Я бы рекомендовал создать экземпляр стримера и передать ссылку на него в методstreamBulkData. При этом он может поддерживать соединения с хостами, на которые он передает данные.

Вы также не предоставили никакой информации о том, на чем стримете! Размер кластера (количество узлов), конфигурация памяти вне кучи, размер памяти кучи, сетевые возможности, задержка между клиентским загрузчиком и сервером, конфигурация персистентности,...

Помните, что несколько хостов Ignite могут принимать данные быстрее, чем один, но я понятия не имею, на что вы транслируете!

Надеюсь, это поможет.

На данный момент у меня нет кластера. Я просто использую свой компьютер как клиент и сервер. Это снимок топологии: [ver=6, locNode=5c7504bf, серверы=1, клиенты=1, состояние=ACTIVE, CPUs=12, offheap=1,4 ГБ, куча=7,0 ГБ]

Dude Ramasamy 23.04.2024 08:52

Что вы подразумеваете под созданием моего экземпляра стримера?? Можете ли вы объяснить это, пожалуйста?

Dude Ramasamy 24.04.2024 10:57

Вы создаете стример для каждой партии. Вместо этого создайте стример один раз и добавьте в него данные (т. е. передайте стример данных, а не имя кэша). Вам также не нужно выполнять пакетирование самостоятельно; пакеты стримеров вставляются для вас.

Stephen Darlington 24.04.2024 11:28

IgniteDataStreamer<String, Object>streamer = ignite.dataStreamer(cacheName) Это в вашем коде. Эта строка создает экземпляр Data Streamer. Ваша реализация вызывает метод. В этом методе вы создаете новый экземпляр Data Streamer при каждом вызове метода! НЕ следует создавать новый поток данных для каждого вызова. Вам следует создать поток данных за пределами этого блока кода и передать ссылку на поток данных в этот метод. При этом ваш Data Streamer может группировать данные за вас. Экземпляр Streamer должен существовать на протяжении всего пакетного задания!

user21160483 24.04.2024 15:39

Другие вопросы по теме