у меня есть конвейер ETL с 5 шагами. Каждый шаг может выполняться в другом потоке и другом приложении.
Это очень затрудняет передачу контекста трассировки отеля через все, потому что внутренние компоненты мне недоступны, поэтому все, к чему у меня есть доступ, — это этапы обработки.
Я ищу способ построить контекст трассировки только на основе одного идентификатора, который однозначно идентифицирует полное прохождение всех 5 шагов одной записи данных.
Пример:
Я ищу код, который работает примерно так:
public class Step1 {
public void execute(DataPackage obj){
var otelContext = SpanContext.create(
TraceId.fromBytes(obj.getUniqueId().getBytes()),
SpanId.fromBytes(processorName.getBytes()),
TraceFlags.getDefault(),
TraceState.getDefault()
);
var wrap = Span.wrap(otelContext);
var with = Context.root().with(wrap);
var span = tracer.spanBuilder("Step1").setParent(with).startSpan();
CompletableFuture.runAsync(() -> { /* the code is here*/ }).whenComplete((c1, exception) -> {
if (exception != null) {
span.recordException(exception);
} else {
span.end();
}
});
}
}
Происходит следующее: начало и конец диапазона являются отдельными и не находятся в одном и том же контексте трассировки. Значит, здесь что-то пошло не так
Сейчас я пытаюсь создать контекст вручную:
var paddedArray = new byte[16];
var originalArray = context.getId().getBytes();
System.arraycopy(originalArray, 0, paddedArray, 16 - originalArray.length, originalArray.length);
var wrap = Span.wrap(SpanContext.createFromRemoteParent(
TraceId.fromBytes(paddedArray),
SpanId.fromBytes(paddedArray),
TraceFlags.getDefault(),
TraceState.getDefault())
);
var otelContext = Context.root().with(wrap);
var startSpan = tracer.spanBuilder(context.getId())
.setParent(otelContext)
.startSpan();
Проблема, с которой я столкнулся ранее, заключалась в том, что массив входных байтов имел неправильную длину. Теперь я это исправил, но следующая проблема в том, что в jaeger это вообще не отображается.
Я подозреваю, что это потому, что контекст не был создан в jaeger, потому что здесь всегда предполагается, что контекст существовал ранее. Есть ли способ всегда «обновлять» контекст?
У меня действительно нет возможности узнать, когда следует создавать контекст или нет, поскольку конвейер etl 1 может работать до конвейера etl 2 или наоборот.




Я думаю, что доступные API должны достаточно хорошо справиться с вашей проблемой.
Можете ли вы попробовать следующий код и сообщить мне, работает ли он у вас?
Также обратите внимание на следующее: https://opentelemetry.io/docs/languages/java/instrumentation/#get-the-current-span
import io.opentelemetry.api.trace.*;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.Context;
import java.util.concurrent.CompletableFuture;
public class EtlStep {
private final Tracer tracer;
public EtlStep(Tracer tracer) {
this.tracer = tracer;
}
public void execute(DataPackage obj, int step) {
// Retrieve the current span context if available
SpanContext parentContext = Span.fromContext(Context.current()).getSpanContext();
Span span = tracer.spanBuilder("Step" + step)
.setParent(Context.current().with(Span.wrap(parentContext)))
.startSpan();
try (Scope scope = span.makeCurrent()) {
CompletableFuture.runAsync(() -> {
// Your code here
}).whenComplete((unused, exception) -> {
if (exception != null) {
span.recordException(exception);
}
span.end();
});
}
}
}
class DataPackage {
private final String uniqueId;
public DataPackage(String uniqueId) {
this.uniqueId = uniqueId;
}
public String getUniqueId() {
return uniqueId;
}
}
А вот инициализация теста (просто игрушечный пример):
// Create EtlStep instances
EtlStep step1 = new EtlStep(tracer);
EtlStep step2 = new EtlStep(tracer);
EtlStep step3 = new EtlStep(tracer);
EtlStep step4 = new EtlStep(tracer);
EtlStep step5 = new EtlStep(tracer);
// Create data packages
DataPackage dataPackage1 = new DataPackage("id_1234");
DataPackage dataPackage2 = new DataPackage("id_5555");
// Execute ETL steps for each data package
step1.execute(dataPackage1, 1);
step2.execute(dataPackage1, 2);
step3.execute(dataPackage1, 3);
step4.execute(dataPackage1, 4);
step5.execute(dataPackage1, 5);
большое спасибо за ваш ответ и извините за поздний ответ: сейчас у меня это работает, но теперь я столкнулся с другой проблемой: представьте, что у меня есть 2 конвейера etl, все используют один и тот же идентификатор пакета данных (поэтому один пакет данных попадает в два конвейера) . Я бы хотел, чтобы все они были в одном контексте. Как бы я сделал что-то подобное? Опять же, у меня есть только уникальный идентификатор пакета данных для создания контекста. Я отредактирую свой вопрос
я обновил свой вопрос текущей проблемой :)
В строке «TraceId.fromBytes(obj.getUniqueId().getBytes())» часть «obj.getUniqueId().getBytes()» должна возвращать одно и то же значение на всех ваших шагах, верно? Если это не так, то у вас должна быть проблема.