Repeat
스프링 배치는 특정 조건이 충족될 때까지 Job 또는 Step을 반복하도록 배치 애플리케이션을 구성할 수 있다.
스프링 배치에선 Step과 Chunk의 반복을 RepeateOperation을 사용해 처리하고 있다.
기본 구현체론 RepeatTemplate을 제공한다.
Step은 RepeatTemplate을 사용해 Tasklet을 반복적으로 실행한다. ChunkOrientedTasklet은 내부적으로 ChunkProvider를 통해 ItemReader로 데이터를 읽어올 것을 지시한다. ChunkProvider는 내부적으로 RepeatTemplate을 갖고 있고 이를 이용해 반복적으로 ItemReader에게 반복적으로 데이터를 읽어오도록 처리한다.
FaultTolerant
스프링 배치는 Job 실행 중에 오류가 발생할 경우 장애를 처리하기 위한 기능을 제공한다. 오류가 발생해도 Step이 즉시 종료되지 않고 Retry 혹은 Skip 기능을 활성화 함으로 내결함성 서비스가 가능하다.
- Skip
- ItemReader, ItemProcessor, ItemWriter에 적용 가능
- Retry
- ItemProcessor, ItemWriter에 적용 가능
아래와 같은 형식으로 실패에 대한 핸들링을 설정할 수 있다.
Skip
- Skip은 데이터를 처리하는 동안 설정된 Exception이 발생했을 경우, 해당 데이터 처리를 건너뛰는 기능이다.
- ItemReader, ItemProcessor, ItemWriter에 적용 가능하다.
- 데이터의 사소한 오류에 대해 Step의 실패처리 대신 Skip 함으로써, 배치수행의 빈번한 실패를 줄일 수 있다.
동작 방식
- itemReader
- item을 한건씩 읽다가 예외가 발생하게 되면 해당 item을 skip하고 다음 item을 읽는다.
- itemProcessor
- itemProcessor는 item을 처리하다가 예외가 발생하면 해당 Chunk의 첫 단계로 돌아가서 itemReader로부터 다시 데이터를 받습니다.
- 이때 캐시에 저장한 아이템을 다시 사용해서 itemProcessor로 다시 보내준다.
- itemProcessor는 다시 아이템들을 받아서 실행하게 되는데 이전 실행에서 예외가 발생했던 아이템은 처리하지 않고 넘어간다. 이게 가능한 이유는 캐시를 사용해서 정보가 내부적으로 남아있기 때문이다.
- 결론적으로 skip하는 건 맞는데 itemReader와 동작 방식이 다름
- itemWriter
- 위 그림에서 Writer에서 item4번에서 예외가 발생했다면 다시 Chunk 단위로 ItemReader로 돌아간다.
- 캐싱된 데이터로 itemReader는 itemProcessor로 넘긴다.
- itemProcessor는 이전처럼 청크 단위만큼 item을 처리하고 한 번에 writer로 넘기는 게 아니라 단건 처리 후 writer로 단건을 넘긴다.
위 내용들을 간단한 코드로 확인해볼 수 있다.
먼저 itemReader 에서 예외가 발생하면 item을 skip하는지 테스트하기 위한 코드이다.
@Configuration
@RequiredArgsConstructor
@Slf4j
public class JobConfig {
private int chunkSize = 5;
@Bean
public Job testJob(JobRepository jobRepository, Step testStep) {
return new JobBuilder("testJob", jobRepository)
.start(testStep)
.build();
}
@Bean Step testStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("testStep", jobRepository)
.<String, String>chunk(chunkSize, transactionManager)
.reader(customItemReader())
.writer(items -> System.out.println("items=" + items))
.faultTolerant()
.skip(CustomException.class)
.skipLimit(3)
.build();
}
@Bean
public ItemReader<String> customItemReader() {
return new ItemReader<String>() {
int i = 0;
@Override
public String read() throws SkipException {
i++;
if (i == 3) {
throw new CustomException(HttpStatus.INTERNAL_SERVER_ERROR, "error");
}
System.out.println("itemReader : " + i);
return i > 10 ? null : String.valueOf(i);
}
};
}
}
i가 3인 경우 reader에서 예외를 던진다. 결과는 writer에서 3이 skip됐기 때문에 1 2 4 5 6이 출력되야한다.
아래 코드는 itemProcess 에서 예외가 발생하면 Chunk의 첫 단계로 돌아가는지 테스트하기 위한 코드이다.
@Configuration
@RequiredArgsConstructor
@Slf4j
public class JobConfig {
private int chunkSize = 5;
@Bean
public Job testJob(JobRepository jobRepository, Step testStep) {
return new JobBuilder("testJob", jobRepository)
.start(testStep)
.build();
}
@Bean Step testStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("testStep", jobRepository)
.<String, String>chunk(chunkSize, transactionManager)
.reader(customItemReader())
.processor(customItemProcessor())
.writer(items -> System.out.println("items=" + items))
.faultTolerant()
.skip(CustomException.class)
.skipLimit(3)
.build();
}
@Bean
public ItemReader<String> customItemReader() {
return new ItemReader<String>() {
int i = 0;
@Override
public String read() throws SkipException {
i++;
// if (i == 3) {
// throw new CustomException(HttpStatus.INTERNAL_SERVER_ERROR, "error");
// }
System.out.println("itemReader : " + i);
return i > 12 ? null : String.valueOf(i);
}
};
}
@Bean
public ItemProcessor<? super String, String> customItemProcessor() {
return item -> {
System.out.println("itemProcessor " + item);
if (item.equals("3")) {
throw new CustomException(HttpStatus.INTERNAL_SERVER_ERROR, "error");
}
return item;
};
}
}
Processor에서 1 2 3 을 처리하다 예외가 터졌으므로 1 2 4 5만 writer로 넘어가야한다.
로그 찍힌걸 확인해보면 아래와같다.
Reader에서 5개를 읽고, Processor에서 1,2,3 을 처리하던 중 예외가 터졌고 Chunk의 첫 단계로 돌아가 1 2 4 5 가 찍히는걸 확인할 수 있다.
Reader에선 Writer로 넘어간 item은 1,2,4,5,6 으로 총 5개였지만, Processor에서 예외가 터지면 하나 더 불러오는 것이 아닌 1,2,4,5만 Writer로 넘어간다.
아래 코드는 Writer 에서 예외가 발생하면 itemProcessor에서 청크 단위만큼 item을 처리하고 한 번에 writer로 넘기는 게 아니라 단건 처리 후 writer로 단건을 넘기는지 테스트하기 위한 코드이다.
@Configuration
@RequiredArgsConstructor
@Slf4j
public class JobConfig {
private int chunkSize = 5;
@Bean
public Job testJob(JobRepository jobRepository, Step testStep) {
return new JobBuilder("testJob", jobRepository)
.start(testStep)
.build();
}
@Bean Step testStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("testStep", jobRepository)
.<String, String>chunk(chunkSize, transactionManager)
.reader(customItemReader())
.processor(customItemProcessor())
.writer(customItemWriter())
.faultTolerant()
.skip(CustomException.class)
.skipLimit(3)
.build();
}
@Bean
public ItemReader<String> customItemReader() {
return new ItemReader<String>() {
int i = 0;
@Override
public String read() throws SkipException {
i++;
// if (i == 3) {
// throw new CustomException(HttpStatus.INTERNAL_SERVER_ERROR, "error");
// }
System.out.println("itemReader : " + i);
return i > 12 ? null : String.valueOf(i);
}
};
}
@Bean
public ItemProcessor<? super String, String> customItemProcessor() {
return item -> {
System.out.println("itemProcessor " + item);
return item;
};
}
@Bean
public ItemWriter<? super String> customItemWriter() {
return items -> {
for (String item : items) {
if (item.equals("3")){
throw new CustomException(HttpStatus.INTERNAL_SERVER_ERROR, "error");
}
}
System.out.println("items = " + items);
};
}
}
itemProcessor 1 이 찍히고 Writer에서 작성한 로그가 바로 찍히는 걸 확인할 수 있다.
Retry
- ItemProcessor, ItemWriter에서 설정된 Exception이 발생했을 때, 지정한 정책에 따라 데이터 처리를 재시도하는 기능이다.
- ItemReader에서는 지원하지 않는다.
- 예외 발생 시 재시도 설정에 의해서 해당 Chunk의 처음부터 다시 시작한다.
- Retry Count는 Item마다 각각 가지고 있다.
- RetryLimit 횟수 이후에도 재시도가 실패한다면 recover 에서 후속작업을 처리할 수 있다.
동작 방식
기존에는 itemProcessor와 itemWriter는 ChunkProcessor에서 실행이 되었지만, Retry 기능이 활성화되면 RetryTemplate 안에서 ItemProcessor와 itemWriter가 실행되고, 예외가 발생하면 RetryTemplate 안에서 처리가 진행된다.
itemProcessor에서 예외가 발생하면 다시 Chunk 단계의 처음부터 시작한다. skip에서와 마찬가지로 itemReader는 캐시에 저장된 값을 itemProcessor에 전달한다.
itemWriter에서 예외가 발생하면 skip의 경우엔 단건 처리로 변경되었지만, retry의 경우 원래대로 다건 처리 형태가 유지된다.
아래는 itemWriter에서 예외가 발생하면 2번 재시도 하는지 테스트하기 위한 코드이다.
@Configuration
@RequiredArgsConstructor
@Slf4j
public class JobConfig {
private int chunkSize = 5;
@Bean
public Job testJob(JobRepository jobRepository, Step testStep) {
return new JobBuilder("testJob", jobRepository)
.start(testStep)
.incrementer(new RunIdIncrementer())
.build();
}
@Bean Step testStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("testStep", jobRepository)
.<String, String>chunk(chunkSize, transactionManager)
.reader(customItemReader())
.processor(customItemProcessor())
.writer(customItemWriter())
.faultTolerant()
// .skip(CustomException.class)
// .skipLimit(3)
.retry(CustomException.class)
.retryLimit(2)
.build();
}
@Bean
public ItemReader<String> customItemReader() {
return new ItemReader<String>() {
int i = 0;
@Override
public String read() throws SkipException {
i++;
System.out.println("itemReader : " + i);
return i > 5 ? null : String.valueOf(i);
}
};
}
@Bean
public ItemProcessor<? super String, String> customItemProcessor() {
return item -> {
System.out.println("itemProcessor : " + item);
return item;
};
}
@Bean
public ItemWriter<? super String> customItemWriter() {
return items -> {
for (String item : items) {
if (item.equals("3")){
throw new CustomException(HttpStatus.INTERNAL_SERVER_ERROR, "error");
}
}
System.out.println("items = " + items);
};
}
}
캐시를 사용하므로 itemReader의 로그는 한번만 찍히고, 2번의 1~5가 찍힌 후 예외가 터지는걸 확인할 수 있다.
아래는 itemProcessor에서 예외가 발생하면 2번 재시도 하는지 테스트하기 위한 코드이다.
@Configuration
@RequiredArgsConstructor
@Slf4j
public class JobConfig {
private int chunkSize = 5;
@Bean
public Job testJob(JobRepository jobRepository, Step testStep) {
return new JobBuilder("testJob", jobRepository)
.start(testStep)
.incrementer(new RunIdIncrementer())
.build();
}
@Bean Step testStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("testStep", jobRepository)
.<String, String>chunk(chunkSize, transactionManager)
.reader(customItemReader())
.processor(customItemProcessor())
.writer(customItemWriter())
.faultTolerant()
// .skip(CustomException.class)
// .skipLimit(3)
.retry(CustomException.class)
.retryLimit(2)
.build();
}
@Bean
public ItemReader<String> customItemReader() {
return new ItemReader<String>() {
int i = 0;
@Override
public String read() throws SkipException {
i++;
System.out.println("itemReader : " + i);
return i > 5 ? null : String.valueOf(i);
}
};
}
@Bean
public ItemProcessor<? super String, String> customItemProcessor() {
return item -> {
if (item.equals("3")){
throw new CustomException(HttpStatus.INTERNAL_SERVER_ERROR, "error");
}
System.out.println("itemProcessor : " + item);
return item;
};
}
@Bean
public ItemWriter<? super String> customItemWriter() {
return items -> {
System.out.println("items : " + items);
};
}
}
itemProcessor에서 2번의 재시도를 하고 3번째 재시도 하던 중 예외가 터지는 모습을 확인할 수 있다.
프로젝트에 적용시켜 배치 작업 실패에 대처
위에서 학습한 내용들을 현재 진행 중인 프로젝트에 적용했다.
아래는 출역일이 지난 지원(Apply) 내역을 자동으로 취소 시키는 배치 작업이다.
조금더 상세한 Retry와 Skip의 설정을 위해 RetryTemplate, SimpleRetryPolicy 빈을 등록시켜주었고, CustomSkippolicy 클래스를 만들어 설정해주었다.
- JobConfig
@Configuration
@RequiredArgsConstructor
@Slf4j
public class ApplyJobConfig {
// 생략
private static final int CHUNK_SIZE = 3;
@Bean
public Job applyProcessJob(JobRepository jobRepository, Step applyProcessStep) {
return new JobBuilder("applyProcessJob", jobRepository)
.start(applyProcessStep)
.build();
}
@Bean
public Step applyProcessStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("applyProcessStep", jobRepository)
.<Apply, Apply>chunk(CHUNK_SIZE, transactionManager)
.reader(applyReader())
.processor(applyProcessor())
.writer(applyWriter())
.faultTolerant()
.retryPolicy(retryPolicy())
.skipPolicy(new CustomSkipPolicy()) // Custom Skip 정책 설정
.build();
}
@Bean
public ItemReader<Apply> applyReader() {
// 생략
}
@Bean
public ItemProcessor<Apply, Apply> applyProcessor() {
return apply -> {
RetryTemplate retryTemplate = retryTemplate();
return retryTemplate.execute(context -> {
if (context.getRetryCount() > 0) {
log.warn("배치 작업 재시도. apply id: {}, 시도 횟수: {}", apply.getId(), context.getRetryCount());
}
// 생략
return apply;
}, context -> {
// 재시도 실패 후 처리
log.error("모든 재시도 실패. 실패한 Apply ID: {}", apply.getId());
return null;
});
};
}
@Bean
public ItemWriter<Apply> applyWriter() {
return new JpaItemWriterBuilder<Apply>()
.entityManagerFactory(entityManagerFactory)
.build();
}
// RetryTemplate 빈 생성 (재시도 정책 설정)
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(5000); // 재시도 간 5초 대기
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3); // 최대 재시도 횟수 3회
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
// SimpleRetryPolicy 빈 생성 (최대 재시도 횟수 3회)
@Bean
public SimpleRetryPolicy retryPolicy() {
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
return retryPolicy;
}
}
- CustomSkipPolicy
@Component
@Slf4j
public class CustomSkipPolicy implements SkipPolicy {
private int skipCount = 0;
@Override
public boolean shouldSkip(Throwable t, long skipCount) {
if (skipCount > 5) {
log.error("skip한 item 5개 초과");
return false; // 최대 Skip 횟수 초과 시 Skip 중지
}
log.warn("item 처리 중 예외 발생: {}", t.getMessage());
this.skipCount++;
return true;
}
}
ID값이 13일 때 에러가 나는 상황을 만들어 테스트 해본 결과 아래와 같이 로그가 잘 찍히는걸 볼 수 있다.
시도를 총 3번 하고, 모두 실패했을 경우 Skip Policy에서 걸러지는 걸 확인할 수 있다.
나는 ID가 13일 때 예외를 터트렸기 때문에 13번을 제외한 나머지 Item들은 Cancel로 잘 업데이트 되었다.
'Spring Boot, JAVA 🌱' 카테고리의 다른 글
AWS Lambda를 이용한 Image Resize 도입 (썸네일 이미지 성능 개선) (1) | 2024.05.07 |
---|---|
지도의 경계 정보를 가져와 근처 일자리 공고 조회 기능 개발 (0) | 2024.05.06 |
동시성 문제에 대한 고민과 해결 과정 (0) | 2024.04.23 |
Spring Boot 의 다중 유저 요청 처리 방법 (Tomcat) (3) | 2024.04.19 |
멀티 쓰레드 사용 시 꼭 알아야 할 내용 - Thread-Safe (0) | 2024.04.18 |