ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spring-Batch 06. JpaPagingItemReader로 테이블 읽기, JpaItemWriter로 테이블에 데이터 쓰기
    Spring-Batch 2024. 11. 10. 13:46

     

    JPA 설정

     

    종속성 설정

    dependencies {
        implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
        implementation 'org.springframework.boot:spring-boot-starter-batch'
        testImplementation 'org.springframework.boot:spring-boot-starter-test'
        testImplementation 'org.springframework.batch:spring-batch-test'
        compileOnly 'org.projectlombok:lombok'
        annotationProcessor 'org.projectlombok:lombok'
        implementation 'com.mysql:mysql-connector-j:8.3.0'
    }

    application.properties

    spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
    spring.datasource.url=jdbc:mysql://localhost:3306/testdb?allowPublicKeyRetrieval=true&useSSL=false&useUnicode=true&serverTimezone=Asia/Seoul
    spring.datasource.username= root
    spring.datasource.password=
    
    spring.jpa.hibernate.ddl-auto=update
    spring.jpa.show-sql=true
    #spring.jpa.defer-datasource-initialization=true
    spring.jpa.properties.hibernate.format_sql=true
    
    spring.batch.jdbc.initialize-schema=always
    spring.batch.job.name=JPA_PAGING_CHUNK_JOB
    #spring.batch.job.name=JPA_ITEM_WRITER_JOB

     

    Entity

    package org.schooldevops.springbatch.sample.domain;
    
    
    import jakarta.persistence.*;
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    @Entity
    @Table(name = "customer")
    @NoArgsConstructor
    @AllArgsConstructor
    @Data
    public class Customer {
    
        @Id
        @GeneratedValue(strategy = GenerationType.AUTO)
        private int id;
    
        private String name;
        private Integer age;
        private String gender;
    
    }

     

    customer.csv

    KIDO9,41,M
    KIDO8,33,F
    KIDO7,25,M
    KIDO4,40,M
    KIDO3,30,M

     

    CustomerItemProcessor

    package org.schooldevops.springbatch.sample.jobs;
    
    import lombok.extern.slf4j.Slf4j;
    import org.schooldevops.springbatch.sample.domain.Customer;
    import org.springframework.batch.item.ItemProcessor;
    
    @Slf4j
    public class CustomerItemProcessor implements ItemProcessor<Customer, Customer> {
        @Override
        public Customer process(Customer item) throws Exception {
            log.info("Item Processor --------- {}", item);
            return item;
        }
    
    }

     

    JpaPagingItemReader 를 통해 읽기 연산 수행하기

    package org.schooldevops.springbatch.sample.config;
    
    import jakarta.persistence.EntityManagerFactory;
    import lombok.extern.slf4j.Slf4j;
    import org.schooldevops.springbatch.sample.domain.Customer;
    import org.schooldevops.springbatch.sample.jobs.CustomerItemProcessor;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.job.builder.JobBuilder;
    import org.springframework.batch.core.launch.support.RunIdIncrementer;
    import org.springframework.batch.core.repository.JobRepository;
    import org.springframework.batch.core.step.builder.StepBuilder;
    import org.springframework.batch.item.database.JpaPagingItemReader;
    import org.springframework.batch.item.database.builder.JpaPagingItemReaderBuilder;
    import org.springframework.batch.item.file.FlatFileItemWriter;
    import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.io.FileSystemResource;
    import org.springframework.transaction.PlatformTransactionManager;
    
    import javax.sql.DataSource;
    import java.util.Collections;
    
    @Slf4j
    @Configuration
    public class JpaPagingReaderJobConfig {
    
        public static final int CHUNK_SIZE = 2;  // 한 번에 처리할 데이터 수 (chunk 크기)
        public static final String ENCODING = "UTF-8";  // 파일 인코딩 설정
        public static final String JPA_PAGING_CHUNK_JOB = "JPA_PAGING_CHUNK_JOB";  // 잡 이름 설정
    
        @Autowired
        DataSource dataSource;  // 데이터 소스 (DB 연결 정보)
    
        @Autowired
        EntityManagerFactory entityManagerFactory;  // JPA 엔티티 매니저 팩토리
    
        // JPA Paging Item Reader 빈 설정
        @Bean
        public JpaPagingItemReader<Customer> customerJpaPagingItemReader() throws Exception {
            return new JpaPagingItemReaderBuilder<Customer>()
                    .name("customerJpaPagingItemReader")  // Reader의 이름 설정
                    .queryString("SELECT c FROM Customer c WHERE c.age > :age order by id desc")  // 데이터 조회 쿼리
                    .pageSize(CHUNK_SIZE)  // 페이지 사이즈 (한 번에 읽어올 데이터 수)
                    .entityManagerFactory(entityManagerFactory)  // 엔티티 매니저 팩토리 설정
                    .parameterValues(Collections.singletonMap("age", 20))  // 쿼리 파라미터 설정
                    .build();
        }
    
        // Flat File Item Writer 빈 설정
        @Bean
        public FlatFileItemWriter<Customer> customerJpaFlatFileItemWriter() {
            return new FlatFileItemWriterBuilder<Customer>()
                    .name("customerJpaFlatFileItemWriter")  // Writer의 이름 설정
                    .resource(new FileSystemResource("./output/customer_new_v2.csv"))  // 출력 파일 위치 설정
                    .encoding(ENCODING)  // 파일 인코딩 설정
                    .delimited().delimiter("\t")  // 구분자 설정 (탭)
                    .names("Name", "Age", "Gender")  // 출력할 필드 이름 설정
                    .build();
        }
    
        // Step 빈 설정
        @Bean
        public Step customerJpaPagingStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
            log.info("--------------- Init customerJpaPagingStep -------------");
    
            return new StepBuilder("customerJpaPagingStep", jobRepository)
                    .<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)  // Chunk 기반 데이터 처리 설정
                    .reader(customerJpaPagingItemReader())  // Reader 설정
                    .processor(new CustomerItemProcessor())  // Processor 설정 (데이터 변환 로직)
                    .writer(customerJpaFlatFileItemWriter())  // Writer 설정
                    .build();
        }
    
        // Job 빈 설정
        @Bean
        public Job customerJpaPagingJob(Step customerJdbcPagingStep, JobRepository jobRepository) {
            log.info("-------------- Init customerJpaPagingJob --------------");
    
            return new JobBuilder(JPA_PAGING_CHUNK_JOB, jobRepository)
                    .incrementer(new RunIdIncrementer())  // 잡 실행 시마다 ID 증가 설정
                    .start(customerJdbcPagingStep)  // Job의 첫 번째 Step 설정
                    .build();
        }
    }

     

    실행결과

    2024-11-10T13:39:25.360+09:00  INFO 2571 --- [           main] o.s.s.s.config.JpaPagingReaderJobConfig  : --------------- Init customerJpaPagingStep -------------
    2024-11-10T13:39:25.378+09:00  INFO 2571 --- [           main] o.s.s.s.config.JpaPagingReaderJobConfig  : -------------- Init customerJpaPagingJob --------------
    2024-11-10T13:39:25.459+09:00  INFO 2571 --- [           main] o.s.s.sample.SampleApplication           : Started SampleApplication in 1.325 seconds (process running for 1.541)
    2024-11-10T13:39:25.460+09:00  INFO 2571 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
    2024-11-10T13:39:25.513+09:00  INFO 2571 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=JPA_PAGING_CHUNK_JOB]] launched with the following parameters: [{'run.id':'{value=3, type=class java.lang.Long, identifying=true}'}]
    2024-11-10T13:39:25.531+09:00  INFO 2571 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [customerJpaPagingStep]
    Hibernate: 
        select
            c1_0.id,
            c1_0.age,
            c1_0.gender,
            c1_0.name 
        from
            customer c1_0 
        where
            c1_0.age>? 
        order by
            c1_0.id desc 
        limit
            ?, ?
    2024-11-10T13:39:25.694+09:00  INFO 2571 --- [           main] o.s.s.sample.jobs.CustomerItemProcessor  : Item Processor --------- Customer(id=5, name=KIDO3, age=30, gender=M)
    2024-11-10T13:39:25.695+09:00  INFO 2571 --- [           main] o.s.s.sample.jobs.CustomerItemProcessor  : Item Processor --------- Customer(id=4, name=KIDO4, age=40, gender=M)
    Hibernate: 
        select
            c1_0.id,
            c1_0.age,
            c1_0.gender,
            c1_0.name 
        from
            customer c1_0 
        where
            c1_0.age>? 
        order by
            c1_0.id desc 
        limit
            ?, ?
    2024-11-10T13:39:25.701+09:00  INFO 2571 --- [           main] o.s.s.sample.jobs.CustomerItemProcessor  : Item Processor --------- Customer(id=3, name=KIDO7, age=25, gender=M)
    2024-11-10T13:39:25.701+09:00  INFO 2571 --- [           main] o.s.s.sample.jobs.CustomerItemProcessor  : Item Processor --------- Customer(id=2, name=KIDO8, age=33, gender=F)
    Hibernate: 
        select
            c1_0.id,
            c1_0.age,
            c1_0.gender,
            c1_0.name 
        from
            customer c1_0 
        where
            c1_0.age>? 
        order by
            c1_0.id desc 
        limit
            ?, ?
    2024-11-10T13:39:25.704+09:00  INFO 2571 --- [           main] o.s.s.sample.jobs.CustomerItemProcessor  : Item Processor --------- Customer(id=1, name=KIDO9, age=41, gender=M)
    2024-11-10T13:39:25.708+09:00  INFO 2571 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [customerJpaPagingStep] executed in 176ms
    2024-11-10T13:39:25.721+09:00  INFO 2571 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=JPA_PAGING_CHUNK_JOB]] completed with the following parameters: [{'run.id':'{value=3, type=class java.lang.Long, identifying=true}'}] and the following status: [COMPLETED] in 198ms
    2024-11-10T13:39:25.724+09:00  INFO 2571 --- [ionShutdownHook] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default'
    2024-11-10T13:39:25.726+09:00  INFO 2571 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
    2024-11-10T13:39:25.729+09:00  INFO 2571 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.
    
    Process finished with exit code 0

     

    읽기연산이 정상적으로 잘 수행되는 것을 볼 수 있습니다.

     

     

     

    JobConfig가 2개 작성되면 오류가 발생하여, 기존에 ReaderJobConfig는 주석처리 하였습니다.

    JpaItemWriter를 통해 쓰기 연산 수행하기

     

    package org.schooldevops.springbatch.sample.config;
    
    import jakarta.persistence.EntityManagerFactory;
    import lombok.extern.slf4j.Slf4j;
    import org.schooldevops.springbatch.sample.domain.Customer;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.job.builder.JobBuilder;
    import org.springframework.batch.core.launch.support.RunIdIncrementer;
    import org.springframework.batch.core.repository.JobRepository;
    import org.springframework.batch.core.step.builder.StepBuilder;
    import org.springframework.batch.item.database.JpaItemWriter;
    import org.springframework.batch.item.database.builder.JpaItemWriterBuilder;
    import org.springframework.batch.item.file.FlatFileItemReader;
    import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.io.ClassPathResource;
    import org.springframework.transaction.PlatformTransactionManager;
    
    @Slf4j
    @Configuration
    public class JpaItemJobConfig {
        public static final int CHUNK_SIZE = 100;  // 한 번에 처리할 데이터 묶음 크기
        public static final String ENCODING = "UTF-8";  // 파일 인코딩 설정
        public static final String JPA_ITEM_WRITER_JOB = "JPA_ITEM_WRITER_JOB";  // Job 이름 설정
    
        @Autowired
        EntityManagerFactory entityManagerFactory;  // JPA 엔티티 매니저 팩토리
    
        // FlatFileItemReader 빈 설정: CSV 파일에서 데이터를 읽어오기 위한 설정
        @Bean
        public FlatFileItemReader<Customer> flatFileItemReader() {
            return new FlatFileItemReaderBuilder<Customer>()
                    .name("FlatFileItemReader")  // Reader의 이름 설정
                    .resource(new ClassPathResource("./customer.csv"))  // 파일 위치 설정
                    .encoding(ENCODING)  // 파일 인코딩 설정
                    .delimited().delimiter(",")  // 구분자 설정 (쉼표)
                    .names("name", "age", "gender")  // 필드 이름 설정
                    .targetType(Customer.class)  // 매핑할 클래스 타입 설정
                    .build();
        }
    
        // JpaItemWriter 빈 설정: 읽은 데이터를 JPA를 통해 DB에 쓰기 위한 설정
        @Bean
        public JpaItemWriter<Customer> jpaItemWriter() {
            return new JpaItemWriterBuilder<Customer>()
                    .entityManagerFactory(entityManagerFactory)  // 엔티티 매니저 팩토리 설정
                    .usePersist(true)  // 엔티티를 영속화하는 옵션
                    .build();
        }
    
        // Step 빈 설정: Job의 실행 단위인 Step을 정의
        @Bean
        public Step flatFileStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
            log.info("-------------- Init flatFileStep ---------");
    
            return new StepBuilder("flatFileStep", jobRepository)
                    .<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)  // Chunk 기반 데이터 처리
                    .reader(flatFileItemReader())  // Reader 설정
                    .writer(jpaItemWriter())  // Writer 설정
                    .build();
        }
    
        // Job 빈 설정: 하나 이상의 Step을 포함하는 Job을 정의
        @Bean
        public Job flatFileJob(Step flatFileStep, JobRepository jobRepository) {
            log.info("----------- Init flatFileJob -----------");
    
            return new JobBuilder(JPA_ITEM_WRITER_JOB, jobRepository)
                    .incrementer(new RunIdIncrementer())  // Job 실행 시마다 ID 증가 설정
                    .start(flatFileStep)  // Job의 첫 번째 Step 설정
                    .build();
        }
    }

     

     

     

    properties에서 job 설정을 변경해줍니다.

    spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
    spring.datasource.url=jdbc:mysql://localhost:3306/testdb?allowPublicKeyRetrieval=true&useSSL=false&useUnicode=true&serverTimezone=Asia/Seoul
    spring.datasource.username= root
    spring.datasource.password=
    
    spring.jpa.hibernate.ddl-auto=update
    spring.jpa.show-sql=true
    #spring.jpa.defer-datasource-initialization=true
    spring.jpa.properties.hibernate.format_sql=true
    
    spring.batch.jdbc.initialize-schema=always
    #spring.batch.job.name=JPA_PAGING_CHUNK_JOB
    spring.batch.job.name=JPA_ITEM_WRITER_JOB

     

    실행결과

    2024-11-10T13:44:05.040+09:00  INFO 2673 --- [           main] o.s.s.sample.config.JpaItemJobConfig     : -------------- Init flatFileStep ---------
    2024-11-10T13:44:05.057+09:00  INFO 2673 --- [           main] o.s.s.sample.config.JpaItemJobConfig     : ----------- Init flatFileJob -----------
    2024-11-10T13:44:05.140+09:00  INFO 2673 --- [           main] o.s.s.sample.SampleApplication           : Started SampleApplication in 1.28 seconds (process running for 1.481)
    2024-11-10T13:44:05.141+09:00  INFO 2673 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
    2024-11-10T13:44:05.205+09:00  INFO 2673 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=JPA_ITEM_WRITER_JOB]] launched with the following parameters: [{'run.id':'{value=3, type=class java.lang.Long, identifying=true}'}]
    2024-11-10T13:44:05.228+09:00  INFO 2673 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [flatFileStep]
    Hibernate: 
        select
            next_val as id_val 
        from
            customer_seq for update
    Hibernate: 
        update
            customer_seq 
        set
            next_val= ? 
        where
            next_val=?
    Hibernate: 
        insert 
        into
            customer
            (age, gender, name, id) 
        values
            (?, ?, ?, ?)
    Hibernate: 
        insert 
        into
            customer
            (age, gender, name, id) 
        values
            (?, ?, ?, ?)
    Hibernate: 
        insert 
        into
            customer
            (age, gender, name, id) 
        values
            (?, ?, ?, ?)
    Hibernate: 
        insert 
        into
            customer
            (age, gender, name, id) 
        values
            (?, ?, ?, ?)
    Hibernate: 
        insert 
        into
            customer
            (age, gender, name, id) 
        values
            (?, ?, ?, ?)
    2024-11-10T13:44:05.275+09:00  INFO 2673 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [flatFileStep] executed in 45ms
    2024-11-10T13:44:05.283+09:00  INFO 2673 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=JPA_ITEM_WRITER_JOB]] completed with the following parameters: [{'run.id':'{value=3, type=class java.lang.Long, identifying=true}'}] and the following status: [COMPLETED] in 69ms
    2024-11-10T13:44:05.286+09:00  INFO 2673 --- [ionShutdownHook] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default'
    2024-11-10T13:44:05.287+09:00  INFO 2673 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
    2024-11-10T13:44:05.291+09:00  INFO 2673 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.
    
    Process finished with exit code 0

     

    테이블에 정상적으로 데이터가 입력되는 것을 볼 수 있습니다.

     

     

     

    Reference

    https://devocean.sk.com/blog/techBoardDetail.do?ID=166902

     

    [SpringBatch 연재 06] JpaPagingItemReader로 DB내용을 읽고, JpaItemWriter로 DB에 쓰기

     

    devocean.sk.com

     

     

Designed by Tistory.