Мое требование состоит в том, что я хочу загрузить 10 миллионов данных на сервер Ignite из внешней базы данных (SQL). Я использую функцию кэширования Ignite, которая сохраняет эти 10 миллионов записей на моем сервере Ignite. Я использовал пакетную обработку и разбиение на страницы для потоковой передачи данных на сервер с помощью DataStreamer от Ignite. Даже перед отправкой первого пакета данных я получаю сообщение «Возможно, слишком длинная пауза JVM», и даже если все 10 миллионов записей будут обработаны. Загрузка данных на сервер Ignite занимает около 40 минут. Что я делаю не так? Требуется ли какая-либо модификация, кроме увеличения кучи? И есть ли способ передать эти объемные данные из внешней БД в кеш?
Мой JVM_OPTS на стороне клиента и сервера:
-Xms4g
-Xmx4g
-XX:+AlwaysPreTouch
-XX:+UseG1GC
-XX:+ScavengeBeforeFullGC
-XX:ParallelGCThreads=8
-XX:ConcGCThreads=2
-XX:InitiatingHeapOccupancyPercent=55
Весь приведенный ниже код находится на моем клиенте.
Код на стороне клиента для потоковой передачи больших объемов данных:
@Service
public class Service {
private static final Logger logger = LoggerFactory.getLogger(PosWavierService.class);
private static final int BATCH_SIZE = 100_000; // Adjust batch size as needed
private static final int NUM_THREADS = 6;
private final ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
@Autowired
private IgniteCacheService igniteCacheService;
@Autowired
private Ignite ignite;
@Autowired
private Repo productRepo;
public CompletableFuture<Void> processAllRecords(String cacheName) {
long startTime = System.currentTimeMillis();
// List to store CompletableFuture for each thread
List<CompletableFuture<Void>> futures = new ArrayList<>();
// Submit tasks for fetching and streaming data concurrently
AtomicInteger pageNumber = new AtomicInteger(0);
for (int i = 0; i < NUM_THREADS; i++) {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
while (true) {
List<ProductLines> records = fetchDataFromRepo(pageNumber.getAndIncrement(), BATCH_SIZE);
if (records.isEmpty()) {
break;
}
igniteCacheService.streamBulkData(cacheName, records);
logger.info("Processed {} records for cache {}", records.size(), cacheName);
}
return null;
}); futures.add(future);
}
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]));
combinedFuture.thenRun(() -> {
long endTime = System.currentTimeMillis();
long totalTime = endTime - startTime;
logger.info("Total time taken for processing all records: {} milliseconds", totalTime);
});
return combinedFuture;
}}
Репозиторий для получения данных из таблицы:
@Query(value = "SELECT * FROM table WHERE key IS NOT NULL AND key != '' AND key != ' ' ", nativeQuery = true)
Page<Object> findRecordsWithPanNotNull(Pageable pageable);
ФункцияstreamBulkData:
public void streamBulkData(String cacheName, List<Object> records) {
try (IgniteDataStreamer<String, Object> streamer = ignite.dataStreamer(cacheName)){
// FileWriter writer = new FileWriter(KEYS_FILE_PATH, true)) { // Append mode
streamer.allowOverwrite(true);
streamer.perNodeBufferSize(1024);
streamer.perNodeParallelOperations(8);
streamer.skipStore(true);
for (Product record : records) {
String key = record.getPan_no();
if (key != null) {
streamer.addData(key, record);
//writer.write(key);
} else {
System.err.println("Skipping record with null key: " + record);
}
}
streamer.flush();
} catch (CacheException e) {
System.err.println("Error streaming data to cache: " + e.getMessage());
e.printStackTrace();
}
}
Я считаю, что вы создаете экземпляр стримера для каждого вызоваstreamBulkData. Я бы рекомендовал создать экземпляр стримера и передать ссылку на него в методstreamBulkData. При этом он может поддерживать соединения с хостами, на которые он передает данные.
Вы также не предоставили никакой информации о том, на чем стримете! Размер кластера (количество узлов), конфигурация памяти вне кучи, размер памяти кучи, сетевые возможности, задержка между клиентским загрузчиком и сервером, конфигурация персистентности,...
Помните, что несколько хостов Ignite могут принимать данные быстрее, чем один, но я понятия не имею, на что вы транслируете!
Надеюсь, это поможет.
На данный момент у меня нет кластера. Я просто использую свой компьютер как клиент и сервер. Это снимок топологии: [ver=6, locNode=5c7504bf, серверы=1, клиенты=1, состояние=ACTIVE, CPUs=12, offheap=1,4 ГБ, куча=7,0 ГБ]
Что вы подразумеваете под созданием моего экземпляра стримера?? Можете ли вы объяснить это, пожалуйста?
Вы создаете стример для каждой партии. Вместо этого создайте стример один раз и добавьте в него данные (т. е. передайте стример данных, а не имя кэша). Вам также не нужно выполнять пакетирование самостоятельно; пакеты стримеров вставляются для вас.
IgniteDataStreamer<String, Object>streamer = ignite.dataStreamer(cacheName) Это в вашем коде. Эта строка создает экземпляр Data Streamer. Ваша реализация вызывает метод. В этом методе вы создаете новый экземпляр Data Streamer при каждом вызове метода! НЕ следует создавать новый поток данных для каждого вызова. Вам следует создать поток данных за пределами этого блока кода и передать ссылку на поток данных в этот метод. При этом ваш Data Streamer может группировать данные за вас. Экземпляр Streamer должен существовать на протяжении всего пакетного задания!
Я попробовал
CacheDataStreamerExample
, поставляемый с Ignite, он загружает 10 миллионов элементов за 8 секунд. Вы проводили тестирование своего кода, где находится узкое место?