У меня есть архив Aeron, и я хочу расширить существующую запись, т. е. чтобы она продолжала добавлять сообщения после перезапуска службы.
Я не смог найти реального примера, как это сделать, поэтому я пришел со своим собственным кодом, основанным на Aeron javadoc/Поваренная книга Аэрона.
Что у меня есть до сих пор
Сначала я пытаюсь получить lastRecordingId, позицию и т. д. из самого архива.
private void findLatestRecording() {
final RecordingDescriptorConsumer consumer =
(controlSessionId, correlationId, recordingId,
startTimestamp, stopTimestamp, startPosition,
stopPosition, initialTermId, segmentFileLength,
termBufferLength, mtuLength, sessionId,
streamId, strippedChannel, originalChannel,
sourceIdentity) -> {
AeronArchiveJournal.this.lastRecordingId = recordingId;
AeronArchiveJournal.this.lastRecordingPosition = stopPosition;
AeronArchiveJournal.this.initialTermId = initialTermId;
AeronArchiveJournal.this.termBufferLength = termBufferLength;
};
final long fromRecordingId = 0L;
final int recordCount = 100;
final int foundCount = archive.listRecordingsForUri(fromRecordingId, recordCount, AeronChannels.ipc(), AeronStreams.STREAM_ID_JOURNAL, consumer);
if (foundCount == 0) {
LOG.info("No previous recording found, will start a new one");
}
}
затем я пытаюсь продлить запись
private void extendExistingRecording() {
publication = aeron.addExclusivePublication(AeronChannels.ipc(), AeronStreams.STREAM_ID_JOURNAL);
String channelUri = new ChannelUriStringBuilder()
.media(CommonContext.IPC_MEDIA)
.initialPosition(lastRecordingPosition, initialTermId, termBufferLength)
.build();
LOG.info("Extending existing recording");
LOG.info("Recording id: {}", lastRecordingId);
LOG.info("Channel URI: {}", channelUri);
archive.extendRecording(lastRecordingId, channelUri, AeronStreams.STREAM_ID_JOURNAL, SourceLocation.LOCAL);
LOG.info("Waiting for recording to start for session {}", publication.sessionId());
final CountersReader counters = aeron.countersReader();
int counterId = RecordingPos.findCounterIdBySession(counters, publication.sessionId());
while (CountersReader.NULL_COUNTER_ID == counterId) {
journalIdleStrategy.idle();
counterId = RecordingPos.findCounterIdBySession(counters, publication.sessionId());
}
lastRecordingId = RecordingPos.getRecordingId(counters, counterId);
}
Однако цикл CountersReader никогда не завершается, и аэрон выводит следующее предупреждение:
io.aeron.archive.client.ArchiveEvent: WARN - cannot extend recording 0 image.joinPosition=0 != rec.stopPosition=25312
Явно что-то упускаю, но пока не могу понять что.




Чтобы расширить существующую запись, необходимо установить начальную позицию публикации, которую вы будете использовать для расширения существующей записи. Поток должен быть инициализирован, поэтому он является подлинным расширением. Этот тест показывает, как это делается.