오늘 하루에 집중하자
  • [Spring Batch] 4. Step 관련 도메인(Step, StepExecution, StepContribution)
    2024년 06월 15일 12시 17분 00초에 업로드 된 글입니다.
    작성자: nickhealthy

    Step


    기본 개념

    • Job을 구성하는 독립적인 하나의 단계로서 실제 배치 처리를 정의하고 컨트롤하는 데 필요한 모든 정보를 가지고 있는 도메인 객체
    • 단순한 단일 태스크 뿐 아니라 입력과 처리 그리고 출력과 관련된 복잡한 비즈니스 로직을 포함하는 모든 설정을 할 수 있다.
    • 배치작업을 어떻게 구성하고 실행할 것인지 Job의 세부 작업을 Task 기반으로 설정하고 명세해 놓은 객체
    • 모든 Job은 하나 이상의 Step으로 구성됨

     

    기본 구현체

    Step은 인터페이스이며, 이를 구현한 기본 구현체들은 아래와 같다.

    • `TaskletStep`: 가장 기본이 되는 클래스로서 Tasklet 타입의 구현체들을 제어한다.
    • `PartitionStep`: 멀티스레드 방식으로 Step을 여러 개로 분리해서 실행한다.
    • `JobStep`: Step 내에서 Job을 실행하도록 한다.
    • `FlowStep`: Step 내에서 Flow를 실행하도록 한다.

     

    STEP의 클래스 구조

    • Step은 `execute()`를 통해 실행되며, 실행 결과는 `StepExecution` 객체에 저장된다.
    • `AbstractStep` 추상 클래스로 구현되어 있으며, 이를 구현한 하위 클래스들은 각각 기능이 조금씩 다르다.
      • Step 인터페이스의 `execute()` 메서드를 구현하고 있고 이 메서드 안에서 `doExecute()` 라는 추상 메서드를 호출하게 된다. `doExecute()`  메서드는 하위 클래스들이(JobStep, TaskletStep, FlowStep, PartitionStep) 각각 목적에 맞게 구현되어 있다.

     

     

    StepExecution


    기본 개념

    • Step 실행 중에 발생한 정보들을 저장하고 있는 객체
      • 시작시간, 종료시간, 상태(시작됨, 완료, 실패), commit, count, rollback, count 등의 속성을 가짐
    • Step이 매번 시도될 때마다 생성되며, 각 Step 별로 생성된다.(JobExecution 이랑 같음)
    • Job이 재시작 하더라도 이미 성공적으로 완료된 Step은 재실행되지 않고 실패한 Step만 실행된다.
      • 옵션을 통해 이미 성공적으로 완료된 Step도 재실행 시킬 수 있다.
    • 이전 단계 Step이 실패하게 되면 현재 Step은 StepExecution을 생성하지 않는다. Step이 실제로 시작되었을 때만 StepExecution 을 생성한다.
    • JobExecution과의 관계
      • Step의 StepExecution이 모두 정상적으로 완료되어야 JobExecution이 정상적으로 완료된다.
      • Step의 StepExecution 중 하나라도 실패하면 JobExecution은 실패한다.

     

    BATCH_STEP_EXECUTION 테이블과 매핑

    • JobExecution와 StepExecution은 1:M의 관계를 가진다.
    • 하나의 Job에 여러 개의 Step으로 구성했을 경우 각 StepExecution은 하나의 JobExecution을 부모로 가진다.

     

    흐름도 및 메타 데이터 저장 방식

    우선 첫 번째 경우를 보면 '일별정산'라는 Job이 실행되고 그에 따라 JobExecution이 하나가 생성되었다.

    Job에는 Step이 2개로 구성되어 있어 StepExecution은 두 개가 생성되었고, 모두 성공적으로 수행되었다.

    첫 번째 Job 결과

     

    이번에는 똑같은 Job이지만 JobParameter를 다르게 하여 다시 돌려보자(Job을 똑같은 파라미터로 여러 번 수행하는 것은 불가능함)

    위와 똑같이 JobExecution, StepExecution이 생성되었다. 하지만 하나의 Step은 성공하고 하나는 실패했다.

    두 번째 Job 결과

     

    자 그럼 메타 데이터가 어떤 식으로 들어가 있는지 확인해보자

    먼저 StepExecution부터 보면 아래와 같이 각각의 Step 별로 StepExecution이 생성되었으며, JobExecution과 1:M의 관계이므로 하나의 JobExecution에 여러 개의 StepExecution이 존재하게 된다. 그리고 개별적으로 성공/실패 여부를 나타내게 된다.

     

    이번에는 JobExection을 보자

    위에서 설명했듯이 JobExecution은 하나의 Step이라도 실패하면 FAILED로 처리하게 된다.

    따라서 두 번째 JobExecution은 실패로 처리되었다.

     

    예제

    이제 위의 예제를 실제 코드를 돌려보며 테스트 해보자

    아래 그림은 위에서 설명한 예제의 전체적인 그림이며, 예제에서도 똑같은 시나리오로 진행될 것이다.

     

    CASE 1

    우선 스프링 배치를 돌려보자

    스프링 배치의 기본 코드이며, 성공적으로 수행된다.

    해당 코드를 돌리면 메타 데이터 스키마가 자동으로 생성된다.

    • `application.yml`: `spring.batch.jdbc.initialize-schema: always` 설정
    package io.spring.batch.helloworld.ch3.stepExecution;
    
    import lombok.RequiredArgsConstructor;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.StepContribution;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.scope.context.ChunkContext;
    import org.springframework.batch.core.step.tasklet.Tasklet;
    import org.springframework.batch.repeat.RepeatStatus;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @RequiredArgsConstructor
    public class StepExecutionConfiguration {
    
        private final JobBuilderFactory jobBuilderFactory;
        private final StepBuilderFactory stepBuilderFactory;
    
        @Bean
        public Job batchJob() {
            return jobBuilderFactory.get("StepExecutionConfiguration")
                    .start(step1())
                    .next(step2())
                    .build();
        }
    
        @Bean
        public Step step1() {
            return stepBuilderFactory.get("step1")
                    .tasklet(new Tasklet() {
                        @Override
                        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                            System.out.println(">> step1 has executed!!");
                            return RepeatStatus.FINISHED;
                        }
                    }).build();
        }
    
    
        @Bean
        public Step step2() {
            return stepBuilderFactory.get("step2")
                    .tasklet(new Tasklet() {
                        @Override
                        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                            System.out.println(">> step2 has executed!!");
                            return RepeatStatus.FINISHED;
                        }
                    }).build();
        }
    
    }

     

    이제 StepExecution 메타 데이터를 확인해보자

    Step이 총 2개이므로 StepExecution도 2개가 생성되었다.

    또한 배치가 정상적으로 수행되었으므로 `STATUS`, `EXIT_CODE` 모두 COMPLETED로 정상 수행되었다.

     

    다음은 부모격인 JobExecution의 메타 데이터를 확인해본다.

    StepExecution과 1:M의 관계이므로 하나의 Job 배치 작업에서 하나의 JobExecution이 생긴 모습이다.

    또한 StepExecution이 모두 정상 수행되었기에 JobExecution도 COMPLETED로 정상 수행되었다.

     

    CASE 2

    이번에는 의도적으로 Step에 오류를 내어 메타 데이터가 어떻게 저장되는지 확인해보자

    테스트를 위해 메타 데이터 테이블은 모두 지우고 다시 실행한다.

    Step2 메서드 안에 `RuntimeException` 추가

     

    StepExecution의 메타 데이터를 확인해보자

    Step2에서 에러가 발생하였지만 실행은 되었기에 StepExecution이 생성되었다.

    만약 Step3가 다음 Step으로 등록되었다면 Step3는 StepExecution이 생성되지 않았을 것이다.

     

    JobExecution도 확인해보자

    StepExecution을 모두 통과하지 못했기 때문에 JobExecution도 FAILED로 실패로 끝나게 된다.

     

    CASE3

    이번에는 `RuntimeExecption`를 제거해서 의도적으로 오류를 발생시키지 않고, 실패한 상태에서 그대로 수행해본다.

    메타 데이터 테이블을 지우지 않고 그대로 수행한다.

     

    결과에서 보이듯이 Step1은 실행되지 않았고, Step2만 실행되었다.

    실패한 Job을 재실행하게 되었을 때 성공한 Step은 넘어가게 되고, 실패한 Step에 대해서만 다시 시도하기 때문이다. 물론 옵션으로 설정하면 성공한 Step도 Job을 다시 돌렸을 때 재실행하게 할 수 있다.

     

    그럼 StepExecution 메타 데이터를 확인해보자

    `STEP_EXECUTION_ID` 1, 2는 처음 의도적으로 에러를 발생시켜 실행했던 STEP들이고, STEP2는 방금 실행시킨 STEP이다. 보다시피 실패한 STEP만 실행된 것을 확인할 수 있다.

     

    마지막으로 JobExecution을 확인해보자

    처음 실행했던 JobExecution은 FAILED로 실패한 상태이고, 방금 실행했던 JobExecution은 COMPLETED 된 모습이다.

    그 전에 실패했던 Step2가 이번에는 성공적으로 수행했기 때문에 정상적으로 처리된 것이다.

    또한 JobExecution은 Job이 실행될 때마다 생기기 때문에 2개가 생성된 것이다.

     

    StepContribution


    기본 개념

    StepExecution과 관련이 높은 도메인 객체이다.

    StepExecution에 속하는 여러 속성 값들을 업데이트 해주는 도메인 객체이다.

    • 청크 프로세스의 변경사항을 버퍼링 한 후 StepExecution 상태를 업데이트 하는 도메인 객체
    • 청크 커밋 직전에 StepContribution이 가지고 있는 속성 값들을 StepExecution의 `apply` 메서드를 호출하여 상태를 업데이트 함
    • ExitStatus의 기본 종료코드 외 사용자 정의 종료코드를 생성해서 적용할 수 있음

     

    StepContribution의 클래스 구조

    StepContribution 도메인의 구조

     

    위의 그림을 순서대로 적은 것인데, StepExecution의 내용과 거의 동일하다.

    StepExecution의 상태 값을 업데이트 하는 역할을 하는 것을 보면 속성 값이 비슷해야 하는 것도 당연하다.

    • 성공적으로 read한 아이템 수
    • 성공적으로 write한 아이템 수
    • ItemProcessor 에 의해 필터링된 아이템 수
    • 부모 클래스인 StepExecution 의 총 skip 횟수
    • read에 실패해서 스킵된 횟수
    • write에 실패해서 스킵된 횟수
    • process에 실패해서 스킵된 횟수
    • 실행결과를 나타내는 클래스로서 종료코드를 포함(UNKNOWN, EXECUTING, COMPLETED, NOOP, FAILED, STOPPED)
    • StepExecution 객체 저장

     

    흐름도 및 메타 데이터 저장 방식

    StepContribution의 흐름도

     

    1. TaskletStep이 시작된다. TaskletStep이 StepExecution 객체를 생성한다.
    2. StepExecution이 내부적으로 StepContribution 객체를 생성한다.
    3. TaskletStep이 ChunkOrientedTasklet 청크 기반을 처리할 수 있는 전용 Tasklet 구현체를 호출하게 된다.
    4. ChunkOrientedTasklet 내부적으로 ItemReader, ItemWriter, ItemProcesser 등과 같은 청크 기반 프로세스를 수행하게 된다. 각각의 청크 기반 프로세스를 수행할 때마다 결과를 StepContribution 도메인 객체에 저장 및 업데이트하게 된다.
    5. Step이 종료되는 마지막 시점에 StepContribution에 저장된 값들을 StepExecution의 `apply` 메서드를 호출해 StepExecution에 최종적으로 업데이트하게 된다.

     

    예제

    디버깅을 통해 StepContribution의 프로세스를 알아보자

    우선 아래는 배치를 수행하기 위한 기본적인 코드이다.

    package io.spring.batch.helloworld.ch3.stepContribution;
    
    import lombok.RequiredArgsConstructor;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.StepContribution;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.scope.context.ChunkContext;
    import org.springframework.batch.core.step.tasklet.Tasklet;
    import org.springframework.batch.repeat.RepeatStatus;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @RequiredArgsConstructor
    public class StepContributionConfiguration {
    
        private final JobBuilderFactory jobBuilderFactory;
        private final StepBuilderFactory stepBuilderFactory;
    
        @Bean
        public Job stepContributionJob() {
            return jobBuilderFactory.get("stepContributionJob")
                    .start(step1())
                    .next(step2())
                    .build();
        }
    
        @Bean
        public Step step1() {
            return stepBuilderFactory.get("step1")
                    .tasklet(new Tasklet() {
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                            System.out.println(">> STEP1 EXECUTIONED");
                            return RepeatStatus.FINISHED;
                        }
                    }).build();
        }
    
        @Bean
        public Step step2() {
            return stepBuilderFactory.get("step2")
                    .tasklet(new Tasklet() {
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                            System.out.println(">> STEP2 EXECUTIONED");
                            return RepeatStatus.FINISHED;
                        }
                    }).build();
        }
    }

     

     

    StepContribution 프로세스를 알아보기 위해 디버깅으로  TaskletStep부터 시작해보았다.

    Step이 실행되고 TaskletStep이 실행되면 내부적으로 StepExecution이 StepContribution을 생성하게 된다.

    또한 `doInTransaction` 메서드 안에 정의되어 있는 것을 확인할 수 있는데 StepExecution은 데이터베이스에서 메타 데이터로 사용되기 때문에 StepExecution 도메인 객체를 생성해야만 한다.

    따라서 데이터베이스 COMMIT 직전에 StepExecution 도메인 객체를 생성한다.

    TaskletStep

     

     

    위의 사진에서 내부로 들어온 사진인데, StepExecution이 내부적으로 StepContribution을 생성하고 있는 것을 확인할 수 있다.

    StepExecution

     

     

    그리고 아래 `tasklet.execute` 메서드를 통해 실제로 Tasklet를 실행하게 된다.

    그 다음 사진을 보자

    TaskletStep

     

    `tasklet.execute` 메서드를 통해서 step1의 Tasklet으로 들어오게 되었다.

    step 안의 Tasklet

     

    디버깅을 통해 stepContribution 파라미터를 보면 나머지 부분은 기본 값으로 0이고, `exitStatus`는 EXECUTING 이라는 실행중 표시가 나타나게 된다. 그리고 stepContribution이 stepExecution를 함께 참조하고 있는 모습이다.

    stepExecution은 step 실행 상태에 대한 정보다.

    step의 파라미터 stepContribution

     

    이후 step1 안의 Tasklet이 모두 수행하게 되면 TaskletStep으로 넘어와서 `stepExecution.apply(contribution);` 메서드를 수행하고 있는 것을 볼 수 있다. 현재 contribution이 가지고 있는 값들을 stepExecution에 적용하는(업데이트) 것을 볼 수 있다.

    즉, 하나의 step 안에 있는 Tasklet이 끝날 때마다 contribution은 stepExecution에 값을 업데이트 한다.

    TaskletStep - apply()

     

    한 단계 더 타고 `apply()`에 들어가면 아래와 같은 코드를 볼 수 있다.

    StepExecution 각각의 속성들을 업데이트 하고 있다.

     

    이런식으로 step들이 실행될 때마다 반복되어 StepExecution에 step 실행 상태에 대한 정보가 업데이트 되는 것을 반복하게 된다.

     

    이번엔 StepExecution의 메타 데이터를 확인해보자

    나머지 속성들은(READ_COUNT, FILTER_COUNT..) 모두 값이 0이지만 COMMIT_COUNT는 1로 되어 있는 모습을 확인할 수 있다.

    청크 기반의 원리 자체가 Tasklet이 수행이 되면 COMMIT을 하게 되어있기 때문에 COMMIT 값은 1로 변경된다. 이건 프레임워크에서 만들어진 규칙이라고 보면 될 것 같다. 참고로 COMMIT의 개념은 데이터베이스의 COMMIT 개념이 아니라 위에서 언급한대로 Tasklet이 수행되면 COMMIT을 해서 값이 증가하는 형태이다.

     

    마지막으로 contribution에서 참조하고 있는 StepExecution과 마찬가지로 다른 도메인에서도 각자 상호작용하는 도메인들과 서로 참조하고 있는 덕분에 여러가지 다양한 도메인들을 아래와 같이 다양한 값들을 확인할 수 있고, 활용 가능하다.

     

    댓글