1. Zipkin ์ด๋?
Zipkin์ ๋ถ์ฐ ์์คํ ์์์ ์์ฒญ ์ถ์ ์ ์ํ ์คํ์์ค ๋๊ตฌ์ด๋ค. ๋ง์ดํฌ๋ก์๋น์ค ์ํคํ ์ฒ(MSA)์ ๊ฐ์ด ์ฌ๋ฌ ์๋น์ค๊ฐ ์ํธ์์ฉํ๋ ํ๊ฒฝ์์ ์ด๋ค ์์ฒญ์ด ์ด๋์ ์ง์ฐ๋์๋์ง, ๋ฌธ์ ๊ฐ ๋ฐ์ํ๋์ง๋ฅผ ์ถ์ ํ๊ณ ๋ถ์ํ ์ ์๊ฒ ๋์์ค๋ค.
Zipkin์ผ๋ก ์ถ์ ํ ์ ์๋ ๋ถ์ฐ ํธ๋์ญ์ ์ ๋ํ์ ์ผ๋ก HTTP, gRPC๊ฐ ์๋ค. ๋์ค์ ๋ค๋ฃฐ ๋ด์ฉ์ด์ง๋ง, Kafka๋ฅผ ์ฌ์ฉํ ํต์ ์ Zipkin์์๋ ํ๋ฆ์ ์ถ์ ํ ์ ์๋ค. ์ด๋ฅผ ํด๊ฒฐํ๊ธฐ ์ํ ๋ด์ฉ๋ ๋ค๋ฃฐ ์์ ์ด๋ ์ฐธ๊ณ ๋ฐ๋๋ค.
2. Zipkin ๊ด๋ จ ์ปดํฌ๋ํธ ์ฉ์ด ์ ๋ฆฌ
Zipkin Client Library
์๋น์ค์์ ํธ๋์ด์ค ์ ๋ณด๋ฅผ ์์งํ์ฌ Zipkin ์๋ฒ์ Collector๋ก ๋ฐ์ดํฐ๋ฅผ ์ ์กํ๋ ์ญํ ์ ํ๋ค.
Collector๋ก ๋ฐ์ดํฐ๋ฅผ ์ ์กํ ๋๋ ์ผ๋ฐ์ ์ผ๋ก HTTP๋ฅผ ์ฌ์ฉํ์ง๋ง, ์์คํ ์ด ํด ๊ฒฝ์ฐ Kafka ํ๋ฅผ ํตํด ์ ์กํ๊ธฐ๋ ํ๋ค. ์๋ฅผ๋ค์ด ์ดํ๋ฆฌ์ผ์ด์ ์ http ์์ฒญ ๋๋ gRPC ์์ฒญ์ด ๋ค์ด์ค๋ฉด Zipkin Client Library์์ ๋ก๊ทธ๋ฅผ ๋ฐ์์ํจ๋ค Zipkin ์๋ฒ์ Colector์๊ฒ ํด๋น ๋ฐ์ดํฐ๋ฅผ ๋ณด๋ธ๋ค.
Collector
Zipkin Client Library ๋ก๋ถํฐ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์ ํธ๋ ์ด์ค ์ ๋ณด์ ์ ํจ์ฑ์ ๊ฒ์ฆํ๊ณ ๊ฒ์์ด ๊ฐ๋ฅํ๋๋ก ์ ์ฅ ๋ฐ ์ธ๋ฑ์ฑํ๋ค.
Storage
Zipkin Collector์์ ๋ฐ์ ํธ๋ ์ด์ค ์ ๋ณด๋ฅผ Storage์ ์ ์ฅํ๋ค. ์ฃผ๋ก ElasticSearch๋ MySQL์ ์ฌ์ฉํ์ฌ ์ ์ฅํ๋๋ฐ, ์ธ๋ฉ๋ชจ๋ฆฌ ๋ํ ์ง์ํ๊ธฐ ๋๋ฌธ์ ๋ก์ปฌ์์ ํ ์คํธํ๊ฑฐ๋ ๊ฐ๋ฐํ ๋๋ ์ธ๋ฉ๋ชจ๋ฆฌ ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ฅผ ์ฌ์ฉํ๋ ๊ฒ๋ ์ข๋ค๊ณ ํ๋ค.
๋ช ์์ ์ผ๋ก ์ง์ ํ์ง ์์ผ๋ฉด In-Memory ์ ์ฅ์์ ์ ์ฅ๋๋ค.
API
์ ์ฅ๋ ๋ก๊ทธ ์ ๋ณด๋ฅผ ๊ฒ์ํ๊ธฐ ์ํ JSON ํ์์ API
3. Zipkin ํ๋ฆ
Trace Id = X ๋ผ๋ Trace Id ๊ฐ service1,2,3,4 ๊น์ง ์ฐ๊ฒฐ๋์ด ํ๋ฅด๋ ๊ฒ์ ๋ณผ ์ ์๊ณ ,
Span Id ๋ ๊ฐ service ๋ด์ ์๋ก ๋ค๋ฅด๊ฒ ์ ์๋์ด ์ ์ธ๋ ๊ฒ์ ๋ณผ ์ ์๋ค.
์ฆ, MSA ์ ์ฒซ entry point service ์์ Trace ๋ฅผ ์ ์ํ๊ณ , ๋ค๋ฅธ service ๊น์ง ์ ์ํ Trace ๋ฅผ ๊ณ์ ์ ํํ๋ฉด์ TraceContext ๋ฅผ ์ ์ง์ํค๋ ๊ฒ์ด๋ค.
์ด๋ ๊ฒ ์ ํ๋ TraceContext ๋ logging ์ ์ฐ์ฌ์ ์์ฒญ์ ํ๋ฆ์ ๋ํ ๋ฐ์์ทจ(slueth)๋ฅผ ๋จ๊ธธ ์ ์๋ค.
Trace๋ง ์ฌ์ฉํด๋ ์ถ์ ์ ํ ์ ์์ง๋ง Span๊น์ง ์ฌ์ฉํ ์ด์ ๋ ์ง์ Service๊ฐ์ ์ฐ๊ฒฐ ๊ด๊ณ๋ฅผ ํ์ ํ๊ธฐ ์ํด์์ด๋ค. Parent Id ๋ ์ด์ Span Id ๋ฅผ ๊ฐ๋ฆฌํด์ผ๋ก์จ ์ฐ๊ฒฐ๊ด๊ณ๋ฅผ ํํํ๋ค.
Span, Trace .. ๊ฐ ๋ฌด์์ด๋?
- Span
- The Basic Unit Of Work. ์ฆ, communication ๋ฐฉ์์ ๋ํ ์ค์ง์ ์ธ ์์ ๋จ์์ด๋ค. ์๋ฅผ ๋ค์ด API ์์ฒญ, gRPC ์์ฒญ์ด ์๋ค.
- Trace
- Tree ๊ตฌ์กฐ๋ก ๊ตฌ์ฑ๋ Span ์ ๋ชจ์
- Tracer
- Span ์ Life Cycle์ ๊ด๋ฆฌํ๋ค. Span์ ์์ํ๊ฑฐ๋ ๋ฉ์ถ๊ฑฐ๋ ์ ํํ๊ฑฐ๋ ๋ฑ
- Trace Context
- ๋ถ์ฐ tracing์ ์ํด์ network ๊ฐ์ ์ ํ๋์ด์ผ ํ๋ context(span id, trace id .. etc)
- Log correlation
- Trace context๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ๋ก๊ทธ๋ฅผ ๋ชจ์ ๊ด๋ฆฌํ๋ ์ญํ
4. Tracer Context์ ์ ํ ๋ฐฉ์
Trace Context๋ฅผ ๊ฐ Service๊ฐ์ ์๋ก ๋ค๋ฅธ ๊ท์ฝ์ผ๋ก ์ ์กํ์ง ์๊ณ , ํ๋์ ํ์ค์ ์ ํด ์ ์กํ๊ธฐ ์ํด b3 ๋ฐฉ์๊ณผ w3c ๋ฐฉ์์ ์ฑํํ๊ณ ์๋ค.
B3 propagation์ ๊ฐ๋จํ ๋งํด 'X-B3-'์ผ๋ก ์์ํ๋ X-B3-TraceId์ X-B3-ParentSpanId, X-B3-SpanId, X-B3-Sampled, ์ด 4๊ฐ ๊ฐ์ ์ ๋ฌํ๋ ๊ฒ์ ํตํด์ ํธ๋ ์ด์ค ์ ๋ณด๋ฅผ ๊ด๋ฆฌํ๋ค.
HTTP๋ฅผ ํตํด ๋ค๋ฅธ ์๋ฒ๋ก ์ ๋ฌํ๋ ๊ฒฝ์ฐ์๋ HTTP ํค๋๋ฅผ ํตํด์ ์ ๋ฌํ๊ณ , Kafka ๋ฉ์์ง๋ฅผ ํตํด ์ ๋ฌํ๋ ๊ฒฝ์ฐ์๋ Kafka ํค๋๋ฅผ ํตํด์ ์ ๋ฌํ๋ค.
- A Service๊ฐ request๋ฅผ ๋ฐ์ Trace ID, Span ID๋ฅผ ์์ฑํ๊ณ Parent Span ID๋ null๋ก ์ง์ ๋๋ค.
- B Service๊ฐ A์๋น์ค๋ก๋ถํฐ ์์ฒญ์ ๋ฐ์ผ๋ฉด X-B3 ํค๋๋ฅผ ๊ธฐ๋ฐ์ผ๋ก Trace ID๋ ๊ทธ๋๋ก ์ด์ด๋ฐ๊ณ Span ID๋ ์๋กญ๊ฒ ์์ฑํ๋ฉฐ, Parent Span ID๋ A service์ Span ID๋ก ์ง์ ํ๋ค.
Request์ ์ ์ฒด ํ๋ฆ์ Trace ID๋ฅผ ๊ธฐ์ค์ผ๋ก ํธ๋ํน ํ๋ฉฐ Span ID๋ก๋ ํด๋น Request๊ฐ ์ํ๋ ์๋น์ค๋ฅผ ์๋ณ์ด ๊ฐ๋ฅํ๋ค. Parent ID๋ก๋ ํธ์ถ ๊ฐ์ ์๊ด๊ด๊ณ๋ฅผ ํ์ ํ ์ ์๋ค.
5. TraceId ์ ์ฐํ๋ ๋ฌธ์
Zipkin ๊ด๋ จ ์์กด์ฑ๋ค์ ์ถ๊ฐํ๊ณ ์์ฒญ์ ๋ณด๋์ ๋ ์ฒซ ์์ฒญ์ ๋ฐ๋ Service์์๋ TraceId์ SpanId๊ฐ ์ ์ฐํ์ง๋ง, ๊ทธ ์ดํ์ ํธ์ถ๋๋ Service์์๋ ์ฐํ์ง ์์๋ค.
ํ ์คํธ ํด๋ณด๋ OpenFeign์ ์ฌ์ฉํ HTTP ์์ฒญ์์๋ Trace Context๊ฐ ์ ์ ํ๋์์ง๋ง, Kafka๋ฅผ ์ฌ์ฉํ ํต์ ์์๋ ์ฐํ์ง ์์๋ค.
Kafka Consumer ๋ถ๋ถ์์ ํค๋๋ฅผ ๊น๋ดค์ ๋๋ Trace Context๊ฐ ํฌํจ๋์ง ์์๋ค. ์๋์ผ๋ก Kafka ํค๋๋ฅผ ํตํด Trace Context๊ฐ ์ ๋ฌ๋๋ค๊ณ ๋์ด ์์ง๋ง ์ค์ ๋ก๋ ์ ํ๋์ง ์์๋ค. ๊ทธ๋์ ์๋์ ๊ฐ์ด Trace Context๋ฅผ ์ง์ ์ง์ ํด์ฃผ์๋ค.
Kafka Producer
@Component
@RequiredArgsConstructor
@Slf4j
public class FtpFileProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final Tracer tracer;
private static final String TOPIC_NAME = "ftp-file-topic";
public boolean sendFtpFileList(List<FtpFileRequest> request) {
try {
String traceId = tracer.currentSpan().context().traceIdString();
String spanId = tracer.currentSpan().context().spanIdString();
// Kafka ๋ฉ์์ง์ traceId์ spanId๋ฅผ ํค๋๋ก ์ถ๊ฐ
Message<String> message = MessageBuilder
.withPayload(JsonUtil.toJson(request))
.setHeader(KafkaHeaders.TOPIC, TOPIC_NAME)
.setHeader("X-B3-TraceId", traceId)
.setHeader("X-B3-SpanId", spanId)
.build();
// Kafka ๋ฉ์์ง ์ ์ก
kafkaTemplate.send(message);
log.info("Kafka ๋ฉ์์ง ๋ฐ์ก ์ฑ๊ณต - ํ์ผ ๊ฐ์: {}", request.size());
return true;
} catch (Exception e) {
log.error("Kafka ๋ฉ์์ง ๋ฐ์ก ์ค ์ค๋ฅ ๋ฐ์ - ํ์ผ ๊ฐ์: {}, ์ค๋ฅ: {}", request.size(), e.getMessage(), e);
return false;
}
}
}
Kafka Consumer
@Component
@RequiredArgsConstructor
@Slf4j
public class FtpFileSubscriber {
private final ParsingCsvUseCase parsingCsvUseCase;
private final Tracer tracer;
private static final String TOPIC_NAME = "ftp-file-topic";
/**
* zipkin ์ ์ํ traceId, spanId ์ค์ ๋ก์ง
*/
@KafkaListener(topics = TOPIC_NAME, groupId = "ftp-file-group")
public void consume(String message,
@Header(name = "X-B3-TraceId", required = false) String traceId) {
Span span = null;
try {
if (traceId != null) {
TraceContext traceContext = TraceContext.newBuilder()
.traceIdHigh(parseHigh(traceId))
.traceId(parseLow(traceId))
.spanId(tracer.nextSpan().context().spanId()) // ์ spanId ์์ฑ
.build();
// ๊ธฐ์กด TraceContext๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ์ Span ์์ฑ
span = tracer.toSpan(traceContext).start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
processMessage(message);
}
} else {
log.warn("TraceId๊ฐ ์์ด ์ Trace๋ก ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํฉ๋๋ค.");
processMessage(message);
}
} catch (Exception e) {
log.error("TraceId ์ค์ ์ค๋ฅ - traceId: {}, ์ค๋ฅ: {}", traceId, e.getMessage(), e);
processMessage(message);
} finally {
if (span != null) {
span.finish();
}
}
}
private long parseHigh(String traceId) {
// traceId์ ์๋ถ๋ถ 64๋นํธ๋ฅผ ์ฒ๋ฆฌ
return traceId.length() > 16 ? Long.parseUnsignedLong(traceId.substring(0, traceId.length() - 16), 16) : 0L;
}
private long parseLow(String traceId) {
// traceId์ ๋ท๋ถ๋ถ 64๋นํธ๋ฅผ ์ฒ๋ฆฌ
return Long.parseUnsignedLong(traceId.substring(Math.max(traceId.length() - 16, 0)), 16);
}
}
์ด๋ ๊ฒ ์ค์ ํ๋ Producer๋ Producer์ง๋ง Consumer์ ์ฝ๋๊ฐ ๋๋ฌด ๋๋ฌ์์ก๋ค.
KafkaUtilํด๋์ค ๋ฅผ ์ด์ฉํ์ฌ Zipkin์ ์ํ ๋ก์ง์ ์ต๋ํ ๋ฐ๋ก ๋นผ์ฃผ์๊ณ , ์ต์ข ์ฝ๋๋ ์๋์ ๊ฐ๋ค.
Producer & Consumer & Producer KafkaUtil
@Component
@RequiredArgsConstructor
@Slf4j
public class FtpFileProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final Tracer tracer;
private static final String TOPIC_NAME = "ftp-file-topic";
public boolean sendFtpFileList(List<FtpFileRequest> request) {
try {
// Kafka ๋ฉ์์ง ์์ฑ (Util ํด๋์ค ์ฌ์ฉ)
Message<String> message = KafkaUtil.createKafkaMessage(tracer, TOPIC_NAME, request);
// Kafka ๋ฉ์์ง ์ ์ก
kafkaTemplate.send(message);
log.info("Kafka ๋ฉ์์ง ๋ฐ์ก ์ฑ๊ณต - ํ์ผ ๊ฐ์: {}", request.size());
return true;
} catch (Exception e) {
log.error("Kafka ๋ฉ์์ง ๋ฐ์ก ์ค ์ค๋ฅ ๋ฐ์ - ํ์ผ ๊ฐ์: {}, ์ค๋ฅ: {}", request.size(), e.getMessage(), e);
return false;
}
}
}
@Component
@RequiredArgsConstructor
@Slf4j
public class FtpFileSubscriber {
private final ParsingCsvUseCase parsingCsvUseCase;
private final Tracer tracer;
private static final String TOPIC_NAME = "ftp-file-topic";
/**
* zipkin ์ ์ํ traceId, spanId ์ค์ ๋ก์ง
*/
@KafkaListener(topics = TOPIC_NAME, groupId = "ftp-file-group")
public void consume(String message, @Header(name = "X-B3-TraceId", required = false) String traceId) {
Span span = KafkaUtil.startTraceIfExists(tracer, traceId);
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
List<FtpFileRequest> fileRequests = KafkaUtil.toClassList(message, FtpFileRequest.class);
parsingCsvUseCase.parsingCsv(fileRequests);
log.info("Kafka ๋ฉ์์ง ์ฒ๋ฆฌ ์ฑ๊ณต - ์ฒ๋ฆฌ๋ ํ์ผ ๊ฐ์: {}", fileRequests.size());
} catch (Exception e) {
log.error("Kafka ๋ฉ์์ง ์ฒ๋ฆฌ ์ค ์ค๋ฅ ๋ฐ์ - traceId: {}, ์ค๋ฅ: {}", traceId, e.getMessage(), e);
} finally {
if (span != null) {
span.finish(); // Span ์ข
๋ฃ
}
}
}
}
@Slf4j
public class KafkaUtil {
private static final ObjectMapper objectMapper = new ObjectMapper();
public static <T> T toClass(String json, Class<T> clazz) {
try {
return objectMapper.readValue(json, clazz);
} catch (Exception e) {
throw new IllegalArgumentException("JSON ๋ณํ ์ค ์ค๋ฅ ๋ฐ์: " + e.getMessage(), e);
}
}
public static <T> List<T> toClassList(String json, Class<T> clazz) {
try {
return objectMapper.readValue(json, objectMapper.getTypeFactory().constructCollectionType(List.class, clazz));
} catch (Exception e) {
throw new IllegalArgumentException("JSON ๋ฆฌ์คํธ ๋ณํ ์ค ์ค๋ฅ ๋ฐ์: " + e.getMessage(), e);
}
}
public static String toJson(Object object) {
try {
return objectMapper.writeValueAsString(object);
} catch (Exception e) {
log.error("๊ฐ์ฒด๋ฅผ JSON์ผ๋ก ๋ณํ ์ค ์ค๋ฅ ๋ฐ์: {}", e.getMessage(), e);
throw new IllegalArgumentException("JSON ๋ณํ ์ค๋ฅ", e);
}
}
public static <T> Message<String> createKafkaMessage(Tracer tracer, String topicName, T payload) {
// traceId์ spanId ๊ฐ์ ธ์ค๊ธฐ
String traceId = tracer.currentSpan().context().traceIdString();
String spanId = tracer.currentSpan().context().spanIdString();
// Kafka ๋ฉ์์ง ์์ฑ
return MessageBuilder
.withPayload(toJson(payload))
.setHeader(KafkaHeaders.TOPIC, topicName)
.setHeader("X-B3-TraceId", traceId)
.setHeader("X-B3-SpanId", spanId)
.build();
}
public static Span startTraceIfExists(Tracer tracer, String traceId) {
if (traceId != null) {
TraceContext traceContext = TraceContext.newBuilder()
.traceIdHigh(parseHigh(traceId))
.traceId(parseLow(traceId))
.spanId(tracer.nextSpan().context().spanId()) // ์ spanId ์์ฑ
.build();
return tracer.toSpan(traceContext).start();
}
log.warn("TraceId๊ฐ ์์ด ์ Trace๋ก ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํฉ๋๋ค.");
return tracer.nextSpan().start();
}
private static long parseHigh(String traceId) {
return traceId.length() > 16 ? Long.parseUnsignedLong(traceId.substring(0, traceId.length() - 16), 16) : 0L;
}
private static long parseLow(String traceId) {
return Long.parseUnsignedLong(traceId.substring(Math.max(traceId.length() - 16, 0)), 16);
}
}
์ด๋ ๊ฒ ์ง์ TraceId๋ฅผ ์์ฑํ๋๊ฒ ๊ทผ๋ณธ์ ์ธ ํด๊ฒฐ์ฑ ์ ์๋ ์ ์์ง๋ง, ์ฐพ์๋ดค์ ๋ ๋ง๋ ํ ๋ฐฉ๋ฒ์ด ์์ด ๋ณด์๊ณ , ์ง์ Kafka ํค๋๋ฅผ ํตํด TraceId๋ฅผ ์ ๋ฌํ๋ ๋ฐฉ์์ผ๋ก ํด๊ฒฐํ์๋ค.