-
Spring-Batch 06. JpaPagingItemReader로 테이블 읽기, JpaItemWriter로 테이블에 데이터 쓰기Spring-Batch 2024. 11. 10. 13:46
JPA 설정
종속성 설정
dependencies { implementation 'org.springframework.boot:spring-boot-starter-data-jpa' implementation 'org.springframework.boot:spring-boot-starter-batch' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.springframework.batch:spring-batch-test' compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' implementation 'com.mysql:mysql-connector-j:8.3.0' }
application.properties
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.url=jdbc:mysql://localhost:3306/testdb?allowPublicKeyRetrieval=true&useSSL=false&useUnicode=true&serverTimezone=Asia/Seoul spring.datasource.username= root spring.datasource.password= spring.jpa.hibernate.ddl-auto=update spring.jpa.show-sql=true #spring.jpa.defer-datasource-initialization=true spring.jpa.properties.hibernate.format_sql=true spring.batch.jdbc.initialize-schema=always spring.batch.job.name=JPA_PAGING_CHUNK_JOB #spring.batch.job.name=JPA_ITEM_WRITER_JOB
Entity
package org.schooldevops.springbatch.sample.domain; import jakarta.persistence.*; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Entity @Table(name = "customer") @NoArgsConstructor @AllArgsConstructor @Data public class Customer { @Id @GeneratedValue(strategy = GenerationType.AUTO) private int id; private String name; private Integer age; private String gender; }
customer.csv
KIDO9,41,M KIDO8,33,F KIDO7,25,M KIDO4,40,M KIDO3,30,M
CustomerItemProcessor
package org.schooldevops.springbatch.sample.jobs; import lombok.extern.slf4j.Slf4j; import org.schooldevops.springbatch.sample.domain.Customer; import org.springframework.batch.item.ItemProcessor; @Slf4j public class CustomerItemProcessor implements ItemProcessor<Customer, Customer> { @Override public Customer process(Customer item) throws Exception { log.info("Item Processor --------- {}", item); return item; } }
JpaPagingItemReader 를 통해 읽기 연산 수행하기
package org.schooldevops.springbatch.sample.config; import jakarta.persistence.EntityManagerFactory; import lombok.extern.slf4j.Slf4j; import org.schooldevops.springbatch.sample.domain.Customer; import org.schooldevops.springbatch.sample.jobs.CustomerItemProcessor; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.item.database.JpaPagingItemReader; import org.springframework.batch.item.database.builder.JpaPagingItemReaderBuilder; import org.springframework.batch.item.file.FlatFileItemWriter; import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.FileSystemResource; import org.springframework.transaction.PlatformTransactionManager; import javax.sql.DataSource; import java.util.Collections; @Slf4j @Configuration public class JpaPagingReaderJobConfig { public static final int CHUNK_SIZE = 2; // 한 번에 처리할 데이터 수 (chunk 크기) public static final String ENCODING = "UTF-8"; // 파일 인코딩 설정 public static final String JPA_PAGING_CHUNK_JOB = "JPA_PAGING_CHUNK_JOB"; // 잡 이름 설정 @Autowired DataSource dataSource; // 데이터 소스 (DB 연결 정보) @Autowired EntityManagerFactory entityManagerFactory; // JPA 엔티티 매니저 팩토리 // JPA Paging Item Reader 빈 설정 @Bean public JpaPagingItemReader<Customer> customerJpaPagingItemReader() throws Exception { return new JpaPagingItemReaderBuilder<Customer>() .name("customerJpaPagingItemReader") // Reader의 이름 설정 .queryString("SELECT c FROM Customer c WHERE c.age > :age order by id desc") // 데이터 조회 쿼리 .pageSize(CHUNK_SIZE) // 페이지 사이즈 (한 번에 읽어올 데이터 수) .entityManagerFactory(entityManagerFactory) // 엔티티 매니저 팩토리 설정 .parameterValues(Collections.singletonMap("age", 20)) // 쿼리 파라미터 설정 .build(); } // Flat File Item Writer 빈 설정 @Bean public FlatFileItemWriter<Customer> customerJpaFlatFileItemWriter() { return new FlatFileItemWriterBuilder<Customer>() .name("customerJpaFlatFileItemWriter") // Writer의 이름 설정 .resource(new FileSystemResource("./output/customer_new_v2.csv")) // 출력 파일 위치 설정 .encoding(ENCODING) // 파일 인코딩 설정 .delimited().delimiter("\t") // 구분자 설정 (탭) .names("Name", "Age", "Gender") // 출력할 필드 이름 설정 .build(); } // Step 빈 설정 @Bean public Step customerJpaPagingStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception { log.info("--------------- Init customerJpaPagingStep -------------"); return new StepBuilder("customerJpaPagingStep", jobRepository) .<Customer, Customer>chunk(CHUNK_SIZE, transactionManager) // Chunk 기반 데이터 처리 설정 .reader(customerJpaPagingItemReader()) // Reader 설정 .processor(new CustomerItemProcessor()) // Processor 설정 (데이터 변환 로직) .writer(customerJpaFlatFileItemWriter()) // Writer 설정 .build(); } // Job 빈 설정 @Bean public Job customerJpaPagingJob(Step customerJdbcPagingStep, JobRepository jobRepository) { log.info("-------------- Init customerJpaPagingJob --------------"); return new JobBuilder(JPA_PAGING_CHUNK_JOB, jobRepository) .incrementer(new RunIdIncrementer()) // 잡 실행 시마다 ID 증가 설정 .start(customerJdbcPagingStep) // Job의 첫 번째 Step 설정 .build(); } }
실행결과
2024-11-10T13:39:25.360+09:00 INFO 2571 --- [ main] o.s.s.s.config.JpaPagingReaderJobConfig : --------------- Init customerJpaPagingStep ------------- 2024-11-10T13:39:25.378+09:00 INFO 2571 --- [ main] o.s.s.s.config.JpaPagingReaderJobConfig : -------------- Init customerJpaPagingJob -------------- 2024-11-10T13:39:25.459+09:00 INFO 2571 --- [ main] o.s.s.sample.SampleApplication : Started SampleApplication in 1.325 seconds (process running for 1.541) 2024-11-10T13:39:25.460+09:00 INFO 2571 --- [ main] o.s.b.a.b.JobLauncherApplicationRunner : Running default command line with: [] 2024-11-10T13:39:25.513+09:00 INFO 2571 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=JPA_PAGING_CHUNK_JOB]] launched with the following parameters: [{'run.id':'{value=3, type=class java.lang.Long, identifying=true}'}] 2024-11-10T13:39:25.531+09:00 INFO 2571 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [customerJpaPagingStep] Hibernate: select c1_0.id, c1_0.age, c1_0.gender, c1_0.name from customer c1_0 where c1_0.age>? order by c1_0.id desc limit ?, ? 2024-11-10T13:39:25.694+09:00 INFO 2571 --- [ main] o.s.s.sample.jobs.CustomerItemProcessor : Item Processor --------- Customer(id=5, name=KIDO3, age=30, gender=M) 2024-11-10T13:39:25.695+09:00 INFO 2571 --- [ main] o.s.s.sample.jobs.CustomerItemProcessor : Item Processor --------- Customer(id=4, name=KIDO4, age=40, gender=M) Hibernate: select c1_0.id, c1_0.age, c1_0.gender, c1_0.name from customer c1_0 where c1_0.age>? order by c1_0.id desc limit ?, ? 2024-11-10T13:39:25.701+09:00 INFO 2571 --- [ main] o.s.s.sample.jobs.CustomerItemProcessor : Item Processor --------- Customer(id=3, name=KIDO7, age=25, gender=M) 2024-11-10T13:39:25.701+09:00 INFO 2571 --- [ main] o.s.s.sample.jobs.CustomerItemProcessor : Item Processor --------- Customer(id=2, name=KIDO8, age=33, gender=F) Hibernate: select c1_0.id, c1_0.age, c1_0.gender, c1_0.name from customer c1_0 where c1_0.age>? order by c1_0.id desc limit ?, ? 2024-11-10T13:39:25.704+09:00 INFO 2571 --- [ main] o.s.s.sample.jobs.CustomerItemProcessor : Item Processor --------- Customer(id=1, name=KIDO9, age=41, gender=M) 2024-11-10T13:39:25.708+09:00 INFO 2571 --- [ main] o.s.batch.core.step.AbstractStep : Step: [customerJpaPagingStep] executed in 176ms 2024-11-10T13:39:25.721+09:00 INFO 2571 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=JPA_PAGING_CHUNK_JOB]] completed with the following parameters: [{'run.id':'{value=3, type=class java.lang.Long, identifying=true}'}] and the following status: [COMPLETED] in 198ms 2024-11-10T13:39:25.724+09:00 INFO 2571 --- [ionShutdownHook] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default' 2024-11-10T13:39:25.726+09:00 INFO 2571 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated... 2024-11-10T13:39:25.729+09:00 INFO 2571 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed. Process finished with exit code 0
읽기연산이 정상적으로 잘 수행되는 것을 볼 수 있습니다.
JobConfig가 2개 작성되면 오류가 발생하여, 기존에 ReaderJobConfig는 주석처리 하였습니다.
JpaItemWriter를 통해 쓰기 연산 수행하기
package org.schooldevops.springbatch.sample.config; import jakarta.persistence.EntityManagerFactory; import lombok.extern.slf4j.Slf4j; import org.schooldevops.springbatch.sample.domain.Customer; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.item.database.JpaItemWriter; import org.springframework.batch.item.database.builder.JpaItemWriterBuilder; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.transaction.PlatformTransactionManager; @Slf4j @Configuration public class JpaItemJobConfig { public static final int CHUNK_SIZE = 100; // 한 번에 처리할 데이터 묶음 크기 public static final String ENCODING = "UTF-8"; // 파일 인코딩 설정 public static final String JPA_ITEM_WRITER_JOB = "JPA_ITEM_WRITER_JOB"; // Job 이름 설정 @Autowired EntityManagerFactory entityManagerFactory; // JPA 엔티티 매니저 팩토리 // FlatFileItemReader 빈 설정: CSV 파일에서 데이터를 읽어오기 위한 설정 @Bean public FlatFileItemReader<Customer> flatFileItemReader() { return new FlatFileItemReaderBuilder<Customer>() .name("FlatFileItemReader") // Reader의 이름 설정 .resource(new ClassPathResource("./customer.csv")) // 파일 위치 설정 .encoding(ENCODING) // 파일 인코딩 설정 .delimited().delimiter(",") // 구분자 설정 (쉼표) .names("name", "age", "gender") // 필드 이름 설정 .targetType(Customer.class) // 매핑할 클래스 타입 설정 .build(); } // JpaItemWriter 빈 설정: 읽은 데이터를 JPA를 통해 DB에 쓰기 위한 설정 @Bean public JpaItemWriter<Customer> jpaItemWriter() { return new JpaItemWriterBuilder<Customer>() .entityManagerFactory(entityManagerFactory) // 엔티티 매니저 팩토리 설정 .usePersist(true) // 엔티티를 영속화하는 옵션 .build(); } // Step 빈 설정: Job의 실행 단위인 Step을 정의 @Bean public Step flatFileStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) { log.info("-------------- Init flatFileStep ---------"); return new StepBuilder("flatFileStep", jobRepository) .<Customer, Customer>chunk(CHUNK_SIZE, transactionManager) // Chunk 기반 데이터 처리 .reader(flatFileItemReader()) // Reader 설정 .writer(jpaItemWriter()) // Writer 설정 .build(); } // Job 빈 설정: 하나 이상의 Step을 포함하는 Job을 정의 @Bean public Job flatFileJob(Step flatFileStep, JobRepository jobRepository) { log.info("----------- Init flatFileJob -----------"); return new JobBuilder(JPA_ITEM_WRITER_JOB, jobRepository) .incrementer(new RunIdIncrementer()) // Job 실행 시마다 ID 증가 설정 .start(flatFileStep) // Job의 첫 번째 Step 설정 .build(); } }
properties에서 job 설정을 변경해줍니다.
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.url=jdbc:mysql://localhost:3306/testdb?allowPublicKeyRetrieval=true&useSSL=false&useUnicode=true&serverTimezone=Asia/Seoul spring.datasource.username= root spring.datasource.password= spring.jpa.hibernate.ddl-auto=update spring.jpa.show-sql=true #spring.jpa.defer-datasource-initialization=true spring.jpa.properties.hibernate.format_sql=true spring.batch.jdbc.initialize-schema=always #spring.batch.job.name=JPA_PAGING_CHUNK_JOB spring.batch.job.name=JPA_ITEM_WRITER_JOB
실행결과
2024-11-10T13:44:05.040+09:00 INFO 2673 --- [ main] o.s.s.sample.config.JpaItemJobConfig : -------------- Init flatFileStep --------- 2024-11-10T13:44:05.057+09:00 INFO 2673 --- [ main] o.s.s.sample.config.JpaItemJobConfig : ----------- Init flatFileJob ----------- 2024-11-10T13:44:05.140+09:00 INFO 2673 --- [ main] o.s.s.sample.SampleApplication : Started SampleApplication in 1.28 seconds (process running for 1.481) 2024-11-10T13:44:05.141+09:00 INFO 2673 --- [ main] o.s.b.a.b.JobLauncherApplicationRunner : Running default command line with: [] 2024-11-10T13:44:05.205+09:00 INFO 2673 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=JPA_ITEM_WRITER_JOB]] launched with the following parameters: [{'run.id':'{value=3, type=class java.lang.Long, identifying=true}'}] 2024-11-10T13:44:05.228+09:00 INFO 2673 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [flatFileStep] Hibernate: select next_val as id_val from customer_seq for update Hibernate: update customer_seq set next_val= ? where next_val=? Hibernate: insert into customer (age, gender, name, id) values (?, ?, ?, ?) Hibernate: insert into customer (age, gender, name, id) values (?, ?, ?, ?) Hibernate: insert into customer (age, gender, name, id) values (?, ?, ?, ?) Hibernate: insert into customer (age, gender, name, id) values (?, ?, ?, ?) Hibernate: insert into customer (age, gender, name, id) values (?, ?, ?, ?) 2024-11-10T13:44:05.275+09:00 INFO 2673 --- [ main] o.s.batch.core.step.AbstractStep : Step: [flatFileStep] executed in 45ms 2024-11-10T13:44:05.283+09:00 INFO 2673 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=JPA_ITEM_WRITER_JOB]] completed with the following parameters: [{'run.id':'{value=3, type=class java.lang.Long, identifying=true}'}] and the following status: [COMPLETED] in 69ms 2024-11-10T13:44:05.286+09:00 INFO 2673 --- [ionShutdownHook] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default' 2024-11-10T13:44:05.287+09:00 INFO 2673 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated... 2024-11-10T13:44:05.291+09:00 INFO 2673 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed. Process finished with exit code 0
테이블에 정상적으로 데이터가 입력되는 것을 볼 수 있습니다.
Reference
https://devocean.sk.com/blog/techBoardDetail.do?ID=166902
[SpringBatch 연재 06] JpaPagingItemReader로 DB내용을 읽고, JpaItemWriter로 DB에 쓰기
devocean.sk.com
'Spring-Batch' 카테고리의 다른 글
Spring-Batch 08. 여러 단계에 걸쳐 데이터 변환하기 (2) 2024.11.22 Spring-Batch 07. MyBatisPagingItemReader로 DB 내용을 읽고, MyBatisItemWriter DB에 쓰기 (3) 2024.11.20 [탐구] 추상화 클래스를 상속한 JobStep 클래스에 대해서 (0) 2024.11.06 FlatfileReadWrite를 사용하여 경제종합지수 전월대비율 계산하기 (0) 2024.11.03 Spring-Batch 05. JdbcPagingItemReader로 DB 내용을 읽고, JdbcBatchItemWriter로 DB에 쓰기 (3) 2024.11.03