[Zipkin] ๋ถ„์‚ฐ ์ถ”์ ์„ ์œ„ํ•œ Zipkin, Kafka ํ†ต์‹  ์‹œ TraceId ์•ˆ ์ฐํžˆ๋Š” ๋ฌธ์ œ ํ•ด๊ฒฐ

 

1. Zipkin ์ด๋ž€?

Zipkin์€ ๋ถ„์‚ฐ ์‹œ์Šคํ…œ์—์„œ์˜ ์š”์ฒญ ์ถ”์ ์„ ์œ„ํ•œ ์˜คํ”ˆ์†Œ์Šค ๋„๊ตฌ์ด๋‹ค. ๋งˆ์ดํฌ๋กœ์„œ๋น„์Šค ์•„ํ‚คํ…์ฒ˜(MSA)์™€ ๊ฐ™์ด ์—ฌ๋Ÿฌ ์„œ๋น„์Šค๊ฐ€ ์ƒํ˜ธ์ž‘์šฉํ•˜๋Š” ํ™˜๊ฒฝ์—์„œ ์–ด๋–ค ์š”์ฒญ์ด ์–ด๋””์„œ ์ง€์—ฐ๋˜์—ˆ๋Š”์ง€, ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒํ–ˆ๋Š”์ง€๋ฅผ ์ถ”์ ํ•˜๊ณ  ๋ถ„์„ํ•  ์ˆ˜ ์žˆ๊ฒŒ ๋„์™€์ค€๋‹ค.

 

Zipkin์œผ๋กœ ์ถ”์ ํ•  ์ˆ˜ ์žˆ๋Š” ๋ถ„์‚ฐ ํŠธ๋žœ์žญ์…˜์€ ๋Œ€ํ‘œ์ ์œผ๋กœ HTTP, gRPC๊ฐ€ ์žˆ๋‹ค. ๋‚˜์ค‘์— ๋‹ค๋ฃฐ ๋‚ด์šฉ์ด์ง€๋งŒ, Kafka๋ฅผ ์‚ฌ์šฉํ•œ ํ†ต์‹  ์‹œ Zipkin์—์„œ๋Š” ํ๋ฆ„์„ ์ถ”์ ํ•  ์ˆ˜ ์—†๋‹ค. ์ด๋ฅผ ํ•ด๊ฒฐํ•˜๊ธฐ ์œ„ํ•œ ๋‚ด์šฉ๋„ ๋‹ค๋ฃฐ ์˜ˆ์ •์ด๋‹ˆ ์ฐธ๊ณ ๋ฐ”๋ž€๋‹ค. 

 

 

 

 

 

 

 

 

 

 

 

 

2. Zipkin ๊ด€๋ จ ์ปดํฌ๋„ŒํŠธ ์šฉ์–ด ์ •๋ฆฌ

Zipkin Client Library

์„œ๋น„์Šค์—์„œ ํŠธ๋ž˜์ด์Šค ์ •๋ณด๋ฅผ ์ˆ˜์ง‘ํ•˜์—ฌ Zipkin ์„œ๋ฒ„์˜ Collector๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ „์†กํ•˜๋Š” ์—ญํ• ์„ ํ•œ๋‹ค.

 

Collector๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ „์†กํ•  ๋•Œ๋Š” ์ผ๋ฐ˜์ ์œผ๋กœ HTTP๋ฅผ ์‚ฌ์šฉํ•˜์ง€๋งŒ, ์‹œ์Šคํ…œ์ด ํด ๊ฒฝ์šฐ Kafka ํ๋ฅผ ํ†ตํ•ด ์ „์†กํ•˜๊ธฐ๋„ ํ•œ๋‹ค. ์˜ˆ๋ฅผ๋“ค์–ด ์–ดํ”Œ๋ฆฌ์ผ€์ด์…˜์— http ์š”์ฒญ ๋˜๋Š” gRPC ์š”์ฒญ์ด ๋“ค์–ด์˜ค๋ฉด Zipkin Client Library์—์„œ ๋กœ๊ทธ๋ฅผ ๋ฐœ์ƒ์‹œํ‚จ๋’ค Zipkin ์„œ๋ฒ„์˜ Colector์—๊ฒŒ ํ•ด๋‹น ๋ฐ์ดํ„ฐ๋ฅผ ๋ณด๋‚ธ๋‹ค.

 

 

 

Collector

Zipkin Client Library ๋กœ๋ถ€ํ„ฐ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ›์•„ ํŠธ๋ ˆ์ด์Šค ์ •๋ณด์™€ ์œ ํšจ์„ฑ์„ ๊ฒ€์ฆํ•˜๊ณ  ๊ฒ€์ƒ‰์ด ๊ฐ€๋Šฅํ•˜๋„๋ก ์ €์žฅ ๋ฐ ์ธ๋ฑ์‹ฑํ•œ๋‹ค. 

 

 

 

Storage

Zipkin Collector์—์„œ ๋ฐ›์€ ํŠธ๋ ˆ์ด์Šค ์ •๋ณด๋ฅผ Storage์— ์ €์žฅํ•œ๋‹ค. ์ฃผ๋กœ ElasticSearch๋‚˜ MySQL์„ ์‚ฌ์šฉํ•˜์—ฌ ์ €์žฅํ•˜๋Š”๋ฐ, ์ธ๋ฉ”๋ชจ๋ฆฌ ๋˜ํ•œ ์ง€์›ํ•˜๊ธฐ ๋•Œ๋ฌธ์— ๋กœ์ปฌ์—์„œ ํ…Œ์ŠคํŠธํ•˜๊ฑฐ๋‚˜ ๊ฐœ๋ฐœํ•  ๋•Œ๋Š” ์ธ๋ฉ”๋ชจ๋ฆฌ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ๋„ ์ข‹๋‹ค๊ณ  ํ•œ๋‹ค.

 

๋ช…์‹œ์ ์œผ๋กœ ์ง€์ •ํ•˜์ง€ ์•Š์œผ๋ฉด In-Memory ์ €์žฅ์†Œ์— ์ €์žฅ๋œ๋‹ค. 

 

 

 

API

์ €์žฅ๋œ ๋กœ๊ทธ ์ •๋ณด๋ฅผ ๊ฒ€์ƒ‰ํ•˜๊ธฐ ์œ„ํ•œ JSON ํ˜•์‹์˜ API

 

 

 

 

 

 

 

 

 

 

 

3. Zipkin ํ๋ฆ„

 

Trace Id = X ๋ผ๋Š” Trace Id ๊ฐ€ service1,2,3,4 ๊นŒ์ง€ ์—ฐ๊ฒฐ๋˜์–ด ํ๋ฅด๋Š” ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๊ณ ,

 

Span Id ๋Š” ๊ฐ service ๋‚ด์— ์„œ๋กœ ๋‹ค๋ฅด๊ฒŒ ์ •์˜๋˜์–ด ์„ ์–ธ๋œ ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค. 

 

์ฆ‰, MSA ์— ์ฒซ entry point service ์—์„œ Trace ๋ฅผ ์ •์˜ํ•˜๊ณ , ๋‹ค๋ฅธ service ๊นŒ์ง€ ์ •์˜ํ•œ Trace ๋ฅผ ๊ณ„์† ์ „ํŒŒํ•˜๋ฉด์„œ TraceContext ๋ฅผ ์œ ์ง€์‹œํ‚ค๋Š” ๊ฒƒ์ด๋‹ค. 

 

์ด๋ ‡๊ฒŒ ์ „ํŒŒ๋œ TraceContext ๋Š” logging ์— ์“ฐ์—ฌ์„œ ์š”์ฒญ์— ํ๋ฆ„์— ๋Œ€ํ•œ ๋ฐœ์ž์ทจ(slueth)๋ฅผ ๋‚จ๊ธธ ์ˆ˜ ์žˆ๋‹ค. 

 

 

 

 

Trace๋งŒ ์‚ฌ์šฉํ•ด๋„ ์ถ”์ ์„ ํ•  ์ˆ˜ ์žˆ์ง€๋งŒ Span๊นŒ์ง€ ์‚ฌ์šฉํ•œ ์ด์œ ๋Š” ์ง์ „ Service๊ฐ„์˜ ์—ฐ๊ฒฐ ๊ด€๊ณ„๋ฅผ ํŒŒ์•…ํ•˜๊ธฐ ์œ„ํ•ด์„œ์ด๋‹ค. Parent Id ๋Š” ์ด์ „ Span Id ๋ฅผ ๊ฐ€๋ฆฌํ‚ด์œผ๋กœ์จ ์—ฐ๊ฒฐ๊ด€๊ณ„๋ฅผ ํ‘œํ˜„ํ•œ๋‹ค. 

 

 

 

 

Span, Trace .. ๊ฐ€ ๋ฌด์—‡์ด๋ƒ?

 

  • Span
    • The Basic Unit Of Work. ์ฆ‰, communication ๋ฐฉ์‹์— ๋Œ€ํ•œ ์‹ค์งˆ์ ์ธ ์ž‘์—… ๋‹จ์œ„์ด๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด API ์š”์ฒญ, gRPC ์š”์ฒญ์ด ์žˆ๋‹ค.
  • Trace
    • Tree ๊ตฌ์กฐ๋กœ ๊ตฌ์„ฑ๋œ Span ์˜ ๋ชจ์Œ
  • Tracer
    • Span ์˜ Life Cycle์„ ๊ด€๋ฆฌํ•œ๋‹ค. Span์„ ์‹œ์ž‘ํ•˜๊ฑฐ๋‚˜ ๋ฉˆ์ถ”๊ฑฐ๋‚˜ ์ „ํŒŒํ•˜๊ฑฐ๋‚˜ ๋“ฑ
  • Trace Context
    • ๋ถ„์‚ฐ tracing์„ ์œ„ํ•ด์„œ network ๊ฐ„์˜ ์ „ํŒŒ๋˜์–ด์•ผ ํ•˜๋Š” context(span id, trace id .. etc)
  • Log correlation
    • Trace context๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ๋กœ๊ทธ๋ฅผ ๋ชจ์•„ ๊ด€๋ฆฌํ•˜๋Š” ์—ญํ• 

 

 

 

 

 

 

 

 

 

4. Tracer Context์˜ ์ „ํŒŒ ๋ฐฉ์‹

Trace Context๋ฅผ ๊ฐ Service๊ฐ„์— ์„œ๋กœ ๋‹ค๋ฅธ ๊ทœ์•ฝ์œผ๋กœ ์ „์†กํ•˜์ง€ ์•Š๊ณ , ํ•˜๋‚˜์˜ ํ‘œ์ค€์„ ์ •ํ•ด ์ „์†กํ•˜๊ธฐ ์œ„ํ•ด b3 ๋ฐฉ์‹๊ณผ w3c ๋ฐฉ์‹์„ ์ฑ„ํƒํ•˜๊ณ  ์žˆ๋‹ค. 

 

B3 propagation์€ ๊ฐ„๋‹จํžˆ ๋งํ•ด 'X-B3-'์œผ๋กœ ์‹œ์ž‘ํ•˜๋Š” X-B3-TraceId์™€ X-B3-ParentSpanId, X-B3-SpanId, X-B3-Sampled, ์ด 4๊ฐœ ๊ฐ’์„ ์ „๋‹ฌํ•˜๋Š” ๊ฒƒ์„ ํ†ตํ•ด์„œ ํŠธ๋ ˆ์ด์Šค ์ •๋ณด๋ฅผ ๊ด€๋ฆฌํ•œ๋‹ค.

 

HTTP๋ฅผ ํ†ตํ•ด ๋‹ค๋ฅธ ์„œ๋ฒ„๋กœ ์ „๋‹ฌํ•˜๋Š” ๊ฒฝ์šฐ์—๋Š” HTTP ํ—ค๋”๋ฅผ ํ†ตํ•ด์„œ ์ „๋‹ฌํ•˜๊ณ , Kafka ๋ฉ”์‹œ์ง€๋ฅผ ํ†ตํ•ด ์ „๋‹ฌํ•˜๋Š” ๊ฒฝ์šฐ์—๋Š” Kafka ํ—ค๋”๋ฅผ ํ†ตํ•ด์„œ ์ „๋‹ฌํ•œ๋‹ค.

 

 

 

  1. A Service๊ฐ€ request๋ฅผ ๋ฐ›์•„ Trace ID, Span ID๋ฅผ ์ƒ์„ฑํ•˜๊ณ  Parent Span ID๋Š” null๋กœ ์ง€์ •๋œ๋‹ค.
  2. B Service๊ฐ€ A์„œ๋น„์Šค๋กœ๋ถ€ํ„ฐ ์š”์ฒญ์„ ๋ฐ›์œผ๋ฉด X-B3 ํ—ค๋”๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ Trace ID๋Š” ๊ทธ๋Œ€๋กœ ์ด์–ด๋ฐ›๊ณ  Span ID๋Š” ์ƒˆ๋กญ๊ฒŒ ์ƒ์„ฑํ•˜๋ฉฐ, Parent Span ID๋Š” A service์˜ Span ID๋กœ ์ง€์ •ํ•œ๋‹ค.

 

Request์˜ ์ „์ฒด ํ๋ฆ„์€ Trace ID๋ฅผ ๊ธฐ์ค€์œผ๋กœ ํŠธ๋ž˜ํ‚น ํ•˜๋ฉฐ Span ID๋กœ๋Š” ํ•ด๋‹น Request๊ฐ€ ์†ํ–ˆ๋˜ ์„œ๋น„์Šค๋ฅผ ์‹๋ณ„์ด ๊ฐ€๋Šฅํ•˜๋‹ค. Parent ID๋กœ๋Š” ํ˜ธ์ถœ ๊ฐ„์˜ ์ƒ๊ด€๊ด€๊ณ„๋ฅผ ํŒŒ์•…ํ•  ์ˆ˜ ์žˆ๋‹ค.

 

 

 

 

 

 

 

 

 

 

5. TraceId ์•ˆ ์ฐํžˆ๋Š” ๋ฌธ์ œ

Zipkin ๊ด€๋ จ ์˜์กด์„ฑ๋“ค์„ ์ถ”๊ฐ€ํ•˜๊ณ  ์š”์ฒญ์„ ๋ณด๋ƒˆ์„ ๋•Œ ์ฒซ ์š”์ฒญ์„ ๋ฐ›๋Š” Service์—์„œ๋Š” TraceId์™€ SpanId๊ฐ€ ์ž˜ ์ฐํ˜”์ง€๋งŒ, ๊ทธ ์ดํ›„์— ํ˜ธ์ถœ๋˜๋Š” Service์—์„œ๋Š” ์ฐํžˆ์ง€ ์•Š์•˜๋‹ค.

 

ํ…Œ์ŠคํŠธ ํ•ด๋ณด๋‹ˆ OpenFeign์„ ์‚ฌ์šฉํ•œ HTTP ์š”์ฒญ์—์„œ๋Š” Trace Context๊ฐ€ ์ž˜ ์ „ํŒŒ๋˜์—ˆ์ง€๋งŒ, Kafka๋ฅผ ์‚ฌ์šฉํ•œ ํ†ต์‹ ์—์„œ๋Š” ์ฐํžˆ์ง€ ์•Š์•˜๋‹ค.

 

Kafka Consumer ๋ถ€๋ถ„์—์„œ ํ—ค๋”๋ฅผ ๊นŒ๋ดค์„ ๋•Œ๋„ Trace Context๊ฐ€ ํฌํ•จ๋˜์ง€ ์•Š์•˜๋‹ค. ์ž๋™์œผ๋กœ Kafka ํ—ค๋”๋ฅผ ํ†ตํ•ด Trace Context๊ฐ€ ์ „๋‹ฌ๋œ๋‹ค๊ณ  ๋˜์–ด ์žˆ์ง€๋งŒ ์‹ค์ œ๋กœ๋Š” ์ „ํŒŒ๋˜์ง€ ์•Š์•˜๋‹ค. ๊ทธ๋ž˜์„œ ์•„๋ž˜์™€ ๊ฐ™์ด Trace Context๋ฅผ ์ง์ ‘ ์ง€์ •ํ•ด์ฃผ์—ˆ๋‹ค.

 

 

Kafka Producer

@Component
@RequiredArgsConstructor
@Slf4j
public class FtpFileProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private final Tracer tracer;

    private static final String TOPIC_NAME = "ftp-file-topic";

    public boolean sendFtpFileList(List<FtpFileRequest> request) {
        try {
            String traceId = tracer.currentSpan().context().traceIdString();
            String spanId = tracer.currentSpan().context().spanIdString();

            // Kafka ๋ฉ”์‹œ์ง€์— traceId์™€ spanId๋ฅผ ํ—ค๋”๋กœ ์ถ”๊ฐ€
            Message<String> message = MessageBuilder
                .withPayload(JsonUtil.toJson(request))
                .setHeader(KafkaHeaders.TOPIC, TOPIC_NAME)
                .setHeader("X-B3-TraceId", traceId)
                .setHeader("X-B3-SpanId", spanId)
                .build();
            // Kafka ๋ฉ”์‹œ์ง€ ์ „์†ก
            kafkaTemplate.send(message);

            log.info("Kafka ๋ฉ”์‹œ์ง€ ๋ฐœ์†ก ์„ฑ๊ณต - ํŒŒ์ผ ๊ฐœ์ˆ˜: {}", request.size());
            return true;
        } catch (Exception e) {
            log.error("Kafka ๋ฉ”์‹œ์ง€ ๋ฐœ์†ก ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ - ํŒŒ์ผ ๊ฐœ์ˆ˜: {}, ์˜ค๋ฅ˜: {}", request.size(), e.getMessage(), e);
            return false;
        }
    }
}

 

 

Kafka Consumer

@Component
@RequiredArgsConstructor
@Slf4j
public class FtpFileSubscriber {

    private final ParsingCsvUseCase parsingCsvUseCase;
    private final Tracer tracer;

    private static final String TOPIC_NAME = "ftp-file-topic";

    /**
     * zipkin ์„ ์œ„ํ•œ traceId, spanId ์„ค์ • ๋กœ์ง
     */
    @KafkaListener(topics = TOPIC_NAME, groupId = "ftp-file-group")
    public void consume(String message,
        @Header(name = "X-B3-TraceId", required = false) String traceId) { 
        Span span = null;
        try {
            if (traceId != null) {
                TraceContext traceContext = TraceContext.newBuilder()
                    .traceIdHigh(parseHigh(traceId))
                    .traceId(parseLow(traceId))
                    .spanId(tracer.nextSpan().context().spanId()) // ์ƒˆ spanId ์ƒ์„ฑ
                    .build();

                // ๊ธฐ์กด TraceContext๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ์ƒˆ Span ์ƒ์„ฑ
                span = tracer.toSpan(traceContext).start();

                try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
                    processMessage(message);
                }
            } else {
                log.warn("TraceId๊ฐ€ ์—†์–ด ์ƒˆ Trace๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค.");
                processMessage(message);
            }
        } catch (Exception e) {
            log.error("TraceId ์„ค์ • ์˜ค๋ฅ˜ - traceId: {}, ์˜ค๋ฅ˜: {}", traceId, e.getMessage(), e);
            processMessage(message);
        } finally {
            if (span != null) {
                span.finish();
            }
        }
    }

    private long parseHigh(String traceId) {
        // traceId์˜ ์•ž๋ถ€๋ถ„ 64๋น„ํŠธ๋ฅผ ์ฒ˜๋ฆฌ
        return traceId.length() > 16 ? Long.parseUnsignedLong(traceId.substring(0, traceId.length() - 16), 16) : 0L;
    }

    private long parseLow(String traceId) {
        // traceId์˜ ๋’ท๋ถ€๋ถ„ 64๋น„ํŠธ๋ฅผ ์ฒ˜๋ฆฌ
        return Long.parseUnsignedLong(traceId.substring(Math.max(traceId.length() - 16, 0)), 16);
    }
}

 

 

 

์ด๋ ‡๊ฒŒ ์„ค์ •ํ•˜๋‹ˆ Producer๋„ Producer์ง€๋งŒ Consumer์˜ ์ฝ”๋“œ๊ฐ€ ๋„ˆ๋ฌด ๋”๋Ÿฌ์›Œ์กŒ๋‹ค. 

 

 

KafkaUtilํด๋ž˜์Šค ๋ฅผ ์ด์šฉํ•˜์—ฌ Zipkin์„ ์œ„ํ•œ ๋กœ์ง์€ ์ตœ๋Œ€ํ•œ ๋”ฐ๋กœ ๋นผ์ฃผ์—ˆ๊ณ , ์ตœ์ข… ์ฝ”๋“œ๋Š” ์•„๋ž˜์™€ ๊ฐ™๋‹ค. 

 

 

 

Producer & Consumer & Producer KafkaUtil

@Component
@RequiredArgsConstructor
@Slf4j
public class FtpFileProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private final Tracer tracer;

    private static final String TOPIC_NAME = "ftp-file-topic";

    public boolean sendFtpFileList(List<FtpFileRequest> request) {
        try {
            // Kafka ๋ฉ”์‹œ์ง€ ์ƒ์„ฑ (Util ํด๋ž˜์Šค ์‚ฌ์šฉ)
            Message<String> message = KafkaUtil.createKafkaMessage(tracer, TOPIC_NAME, request);

            // Kafka ๋ฉ”์‹œ์ง€ ์ „์†ก
            kafkaTemplate.send(message);

            log.info("Kafka ๋ฉ”์‹œ์ง€ ๋ฐœ์†ก ์„ฑ๊ณต - ํŒŒ์ผ ๊ฐœ์ˆ˜: {}", request.size());
            return true;
        } catch (Exception e) {
            log.error("Kafka ๋ฉ”์‹œ์ง€ ๋ฐœ์†ก ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ - ํŒŒ์ผ ๊ฐœ์ˆ˜: {}, ์˜ค๋ฅ˜: {}", request.size(), e.getMessage(), e);
            return false;
        }
    }
}

 

@Component
@RequiredArgsConstructor
@Slf4j
public class FtpFileSubscriber {

    private final ParsingCsvUseCase parsingCsvUseCase;
    private final Tracer tracer;

    private static final String TOPIC_NAME = "ftp-file-topic";

    /**
     * zipkin ์„ ์œ„ํ•œ traceId, spanId ์„ค์ • ๋กœ์ง
     */
    @KafkaListener(topics = TOPIC_NAME, groupId = "ftp-file-group")
    public void consume(String message, @Header(name = "X-B3-TraceId", required = false) String traceId) {
        Span span = KafkaUtil.startTraceIfExists(tracer, traceId);
        try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
            List<FtpFileRequest> fileRequests = KafkaUtil.toClassList(message, FtpFileRequest.class);
            parsingCsvUseCase.parsingCsv(fileRequests);
            log.info("Kafka ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์„ฑ๊ณต - ์ฒ˜๋ฆฌ๋œ ํŒŒ์ผ ๊ฐœ์ˆ˜: {}", fileRequests.size());
        } catch (Exception e) {
            log.error("Kafka ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ - traceId: {}, ์˜ค๋ฅ˜: {}", traceId, e.getMessage(), e);
        } finally {
            if (span != null) {
                span.finish(); // Span ์ข…๋ฃŒ
            }
        }
    }
}

 

@Slf4j
public class KafkaUtil {

    private static final ObjectMapper objectMapper = new ObjectMapper();

    public static <T> T toClass(String json, Class<T> clazz) {
        try {
            return objectMapper.readValue(json, clazz);
        } catch (Exception e) {
            throw new IllegalArgumentException("JSON ๋ณ€ํ™˜ ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: " + e.getMessage(), e);
        }
    }

    public static <T> List<T> toClassList(String json, Class<T> clazz) {
        try {
            return objectMapper.readValue(json, objectMapper.getTypeFactory().constructCollectionType(List.class, clazz));
        } catch (Exception e) {
            throw new IllegalArgumentException("JSON ๋ฆฌ์ŠคํŠธ ๋ณ€ํ™˜ ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: " + e.getMessage(), e);
        }
    }

    public static String toJson(Object object) {
        try {
            return objectMapper.writeValueAsString(object);
        } catch (Exception e) {
            log.error("๊ฐ์ฒด๋ฅผ JSON์œผ๋กœ ๋ณ€ํ™˜ ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: {}", e.getMessage(), e);
            throw new IllegalArgumentException("JSON ๋ณ€ํ™˜ ์˜ค๋ฅ˜", e);
        }
    }
    
	public static <T> Message<String> createKafkaMessage(Tracer tracer, String topicName, T payload) {
        // traceId์™€ spanId ๊ฐ€์ ธ์˜ค๊ธฐ
        String traceId = tracer.currentSpan().context().traceIdString();
        String spanId = tracer.currentSpan().context().spanIdString();

        // Kafka ๋ฉ”์‹œ์ง€ ์ƒ์„ฑ
        return MessageBuilder
            .withPayload(toJson(payload))
            .setHeader(KafkaHeaders.TOPIC, topicName)
            .setHeader("X-B3-TraceId", traceId)
            .setHeader("X-B3-SpanId", spanId)
            .build();
    }

    public static Span startTraceIfExists(Tracer tracer, String traceId) {
        if (traceId != null) {
            TraceContext traceContext = TraceContext.newBuilder()
                .traceIdHigh(parseHigh(traceId))
                .traceId(parseLow(traceId))
                .spanId(tracer.nextSpan().context().spanId()) // ์ƒˆ spanId ์ƒ์„ฑ
                .build();
            return tracer.toSpan(traceContext).start();
        }
        log.warn("TraceId๊ฐ€ ์—†์–ด ์ƒˆ Trace๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค.");
        return tracer.nextSpan().start();
    }

    private static long parseHigh(String traceId) {
        return traceId.length() > 16 ? Long.parseUnsignedLong(traceId.substring(0, traceId.length() - 16), 16) : 0L;
    }

    private static long parseLow(String traceId) {
        return Long.parseUnsignedLong(traceId.substring(Math.max(traceId.length() - 16, 0)), 16);
    }
}

 

 

 

 

์ด๋ ‡๊ฒŒ ์ง์ ‘ TraceId๋ฅผ ์ž‘์„ฑํ•˜๋Š”๊ฒŒ ๊ทผ๋ณธ์ ์ธ ํ•ด๊ฒฐ์ฑ…์€ ์•„๋‹ ์ˆ˜ ์žˆ์ง€๋งŒ, ์ฐพ์•„๋ดค์„ ๋• ๋งˆ๋•…ํžˆ ๋ฐฉ๋ฒ•์ด ์—†์–ด ๋ณด์˜€๊ณ , ์ง์ ‘ Kafka ํ—ค๋”๋ฅผ ํ†ตํ•ด TraceId๋ฅผ ์ „๋‹ฌํ•˜๋Š” ๋ฐฉ์‹์œผ๋กœ ํ•ด๊ฒฐํ•˜์˜€๋‹ค.