ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spring-Batch 04. FlatFileItemReader로 파일을 읽고, FlatFileItemWriter로 파일 쓰기
    Spring-Batch 2024. 10. 27. 21:35

     

     

    FlatFileItemReader 

    @Bean
    public FlatFileItemReader<Customer> flatFileItemReader() {
    	return new FlatFileItemReaderBuilder<Customer>()
        	.name("FlatFileItemReader")
            .resource(new ClassPathResource("./customer.csv"))
            .encoding(ENCODING)
            .delimited().delimiter(",")
            .names("name", "age", "gender")
            .targetType(Customer.class)
            .build();
    }

     

    FlatFileItemReader는 Spring Batch에서 기존적인 ItemReader 으로 텍스트 파일부터 데이터를 읽습니다. 고정 길이, 구분자 기반, 멀티라인 등의 텍스트 파일을 지원하며 설정 및 사용이 간편하며 대규모 데이터 처리에도 효율적입니다. 토크나이저, 필터 등을 통해서 기능을 확장할 수도 있습니다. 복잡한 데이터 구조 처리에는 적합하지 않습니다.

     

     

    FlatFileItemWriter

    @Bean
    public FlatFileItemWriter<Customer> flatFileItemWriter() {
    	return new FlatFileItemWriterBuilder<Customer>()
        	.name("flatFileItemWriter")
            .resource(new FileSystemResource("./output/customer_new.csv"))
            .encoding(ENCODING)
            .delimited().delimiter("\t")
            .names("Name", "Age", "Gender")
            .append(false)
            .lineAggregator(new CustomerLineAggregator())
            .headerCallback(new CustomerHeader())
            .footerCallback(new CustomerFooter(aggregateInfos))
            .build();
    }

     

    FlatFileItemWriter는 Spring Batch에서 ItemWriter 인터페이스를 구현하는 클래스입니다. 데이터를 텍스트 파일로 출력하는 데 사용됩니다. 간편한 방법을 제공해주고, 다양한 설정을 통해서 원하는 형식으로 출력 파일을 만들 수 있습니다. 대량의 데이터를 빠르게 출력할 수 있습니다. 텍스트 파일 형식만 지원하고 복잡한 구조의 데이터를 출력할 경우에는 설정이 복잡해질 수 있습니다. 설정 오류 시 출력 파일이 손상될 수 있습니다.

    스프링 배치 실습해보기

    Customer

    
    public class Customer {
    
        private String name;
        private int age;
        private String gender;
    
    
        public void setName(String name){
            this.name = name;
        }
    
        public void setAge(int age){
            this.age = age;
        }
    
        public void setGender(String gender){
            this.gender = gender;
        }
    
        public String getName(){
            return this.name;
        }
    
        public int getAge(){
            return this.age;
        }
    
        public String getGender(){
            return this.gender;
        }
    
    }

     

    FlatFileItemJobConfig

    import lombok.extern.slf4j.Slf4j;
    import org.schooldevops.springbatch.sample.aggregator.CustomerLineAggregator;
    import org.schooldevops.springbatch.sample.domain.Customer;
    import org.schooldevops.springbatch.sample.footer.CustomerFooter;
    import org.schooldevops.springbatch.sample.header.CustomerHeader;
    import org.schooldevops.springbatch.sample.processor.AggregateCustomerProcessor;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.job.builder.JobBuilder;
    import org.springframework.batch.core.launch.support.RunIdIncrementer;
    import org.springframework.batch.core.repository.JobRepository;
    import org.springframework.batch.core.step.builder.StepBuilder;
    import org.springframework.batch.item.ItemProcessor;
    import org.springframework.batch.item.file.FlatFileItemReader;
    import org.springframework.batch.item.file.FlatFileItemWriter;
    import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
    import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.io.ClassPathResource;
    import org.springframework.core.io.FileSystemResource;
    import org.springframework.transaction.PlatformTransactionManager;
    
    import java.util.concurrent.ConcurrentHashMap;
    
    @Slf4j
    @Configuration
    public class FlatFileItemJobConfig {
    
        public static final int CHUNK_SIZE = 100;
        public static final String ENCODING = "UTF-8";
        public static final String FLAT_FILE_WRITER_CHUNK_JOB = "FLAT_FILE_WRITER_CHUNK_JOB";
    
        private ConcurrentHashMap<String, Integer> aggregateInfos = new ConcurrentHashMap<>();
    
        private final ItemProcessor<Customer, Customer> itemProcessor = new AggregateCustomerProcessor(aggregateInfos);
    
        @Bean
        public FlatFileItemReader<Customer> flatFileItemReader() {
            return new FlatFileItemReaderBuilder<Customer>()
                    .name("FlatFileItemReader")
                    .resource(new ClassPathResource("./customer.csv"))
                    .encoding(ENCODING)
                    .delimited().delimiter(",")
                    .names("name", "age", "gender")
                    .targetType(Customer.class)
                    .build();
        }
    
        @Bean
        public FlatFileItemWriter<Customer> flatFileItemWriter() {
            return new FlatFileItemWriterBuilder<Customer>()
                    .name("flatFileItemWriter")
                    .resource(new FileSystemResource("./output/customer_new.csv"))
                    .encoding(ENCODING)
                    .delimited().delimiter("\t")
                    .names("name", "age", "gender")
                    .append(false)
                    .lineAggregator(new CustomerLineAggregator())
                    .headerCallback(new CustomerHeader())
                    .footerCallback(new CustomerFooter(aggregateInfos))
                    .build();
        }
    
        @Bean
        public Step flatFileStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
            log.info("------------- Init flatFileStep --------------");
    
            return new StepBuilder("flatFileStep", jobRepository)
                    .<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
                    .reader(flatFileItemReader())
                    .processor(itemProcessor)
                    .writer(flatFileItemWriter())
                    .build();
        }
    
        @Bean
        public Job flatFileJob(Step flatFileStep, JobRepository jobRepository) {
            log.info("------------ Init flatFileJob ------------");
    
            return new JobBuilder(FLAT_FILE_WRITER_CHUNK_JOB, jobRepository)
                    .incrementer(new RunIdIncrementer())
                    .start(flatFileStep)
                    .build();
        }
    }

    CustomerHeader

    import org.springframework.batch.item.file.FlatFileHeaderCallback;
    
    import java.io.IOException;
    import java.io.Writer;
    
    public class CustomerHeader implements FlatFileHeaderCallback {
        @Override
        public void writeHeader(Writer writer) throws IOException {
            writer.write("ID,AGE");
        }
    }

     

     

    CustomerFooter

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.batch.item.file.FlatFileFooterCallback;
    
    import java.io.IOException;
    import java.io.Writer;
    import java.util.concurrent.ConcurrentHashMap;
    
    @Slf4j
    public class CustomerFooter implements FlatFileFooterCallback {
    
        ConcurrentHashMap<String, Integer> aggregateCustomers;
    
        public CustomerFooter(ConcurrentHashMap<String, Integer> aggregateCustomers){
            this.aggregateCustomers = aggregateCustomers;
        }
    
        @Override
        public void writeFooter(Writer writer) throws IOException {
            writer.write("총 고객 수: " + aggregateCustomers.get("TOTAL_CUSTOMERS"));
            writer.write( System.lineSeparator());
            writer.write("총 나이: " + aggregateCustomers.get("TOTAL_AGES"));
        }
    }

     

    AggregateCustomerProcessor : ItemProcessor 를 통해 구현

    import lombok.extern.slf4j.Slf4j;
    import org.schooldevops.springbatch.sample.domain.Customer;
    import org.springframework.batch.item.ItemProcessor;
    
    import java.util.concurrent.ConcurrentHashMap;
    
    @Slf4j
    public class AggregateCustomerProcessor implements ItemProcessor<Customer, Customer> {
    
        ConcurrentHashMap<String, Integer> aggregateCustomers;
    
        public AggregateCustomerProcessor(ConcurrentHashMap<String, Integer> aggregateCustomers){
            this.aggregateCustomers = aggregateCustomers;
        }
    
        @Override
        public Customer process(Customer item) throws Exception {
            aggregateCustomers.putIfAbsent("TOTAL_CUSTOMERS", 0);
            aggregateCustomers.putIfAbsent("TOTAL_AGES", 0); // 아무것도 없을때 초기값 0으로 넣기
    
            aggregateCustomers.put("TOTAL_CUSTOMERS", aggregateCustomers.get("TOTAL_CUSTOMERS") + 1);
            aggregateCustomers.put("TOTAL_AGES", aggregateCustomers.get("TOTAL_AGES") + item.getAge());
            return item;
        }
    }

     

    데이터 생성

    Customer.csv를 chatGPT를 통하여 생성하여 직접 실습을 진행해보았습니다.

     

     

    실행결과

    at org.schooldevops.springbatch.sample.SampleApplication.main(SampleApplication.java:12) ~[main/:na]
    Caused by: org.springframework.validation.BindException: org.springframework.validation.BeanPropertyBindingResult: 1 errors
    Field error in object 'target' on field 'age': rejected value [age]; codes [typeMismatch.target.age,typeMismatch.age,typeMismatch.int,typeMismatch]; arguments [org.springframework.context.support.DefaultMessageSourceResolvable: codes [target.age,age]; arguments []; default message [age]]; default message [Failed to convert property value of type 'java.lang.String' to required type 'int' for property 'age'; For input string: "age"]
    	at org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper.mapFieldSet(BeanWrapperFieldSetMapper.java:186) ~[spring-batch-infrastructure-5.1.2.jar:5.1.2]
    	at org.springframework.batch.item.file.mapping.DefaultLineMapper.mapLine(DefaultLineMapper.java:42) ~[spring-batch-infrastructure-5.1.2.jar:5.1.2]
    	at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:194) ~[spring-batch-infrastructure-5.1.2.jar:5.1.2]
    	... 52 common frames omitted
    
    2024-10-27T17:30:55.263+09:00  INFO 58313 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [flatFileStep] executed in 28ms
    2024-10-27T17:30:55.273+09:00  INFO 58313 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=FLAT_FILE_WRITER_CHUNK_JOB]] completed with the following parameters: [{'run.id':'{value=10, type=class java.lang.Long, identifying=true}'}] and the following status: [FAILED] in 46ms
    2024-10-27T17:30:55.275+09:00  INFO 58313 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
    2024-10-27T17:30:55.299+09:00  INFO 58313 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.
    
    Process finished with exit code 0

     

    age 키 값을 String으로 인식하여 AggragateCustomer에 정의된 TOTAL_AGES에 더해지는 코드에서 오류가 발생하였습니다.

    에러가 발생했지만, writeHeader와 writeFooter에서 작성된 write에 맞게 csv파일이 생성되는 것을 확인하였습니다.

     

    하지만 데이터 값 자리도 제대로 들어가지 않았고, null이 입력된 것을 볼 수 있습니다.

    오류를 조금 더 자세히 살펴보면 flatFileStep에서 오류가 발생한 것을 알 수 있습니다. 

     

    flatFileItemReader을 호출하는과정에서 발생한 예외인데요.

    구현체를 열심히 읽어보다가 문득 지나간 부분은 데이터의 키값 제목 name, age, gender에서 age라는 이름이 int로 인식되지 않는 다는 것인가? 하는 의문이 들었습니다.

    그래서 데이터를 키값 제목을 제거해서 다시 생성하엿고,

     

     

    수정 후 실행 결과

    2024-10-27T21:15:53.877+09:00  INFO 68596 --- [           main] o.s.s.s.config.FlatFileItemJobConfig     : ------------- Init flatFileStep --------------
    2024-10-27T21:15:53.900+09:00  INFO 68596 --- [           main] o.s.s.s.config.FlatFileItemJobConfig     : ------------ Init flatFileJob ------------
    2024-10-27T21:15:53.963+09:00  INFO 68596 --- [           main] o.s.s.sample.SampleApplication           : Started SampleApplication in 0.626 seconds (process running for 0.848)
    2024-10-27T21:15:53.964+09:00  INFO 68596 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
    2024-10-27T21:15:54.030+09:00  INFO 68596 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=FLAT_FILE_CHUNK_JOB]] launched with the following parameters: [{'run.id':'{value=4, type=class java.lang.Long, identifying=true}'}]
    2024-10-27T21:15:54.048+09:00  INFO 68596 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [flatFileStep]
    2024-10-27T21:15:54.075+09:00  INFO 68596 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [flatFileStep] executed in 26ms
    2024-10-27T21:15:54.087+09:00  INFO 68596 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=FLAT_FILE_CHUNK_JOB]] completed with the following parameters: [{'run.id':'{value=4, type=class java.lang.Long, identifying=true}'}] and the following status: [COMPLETED] in 48ms
    2024-10-27T21:15:54.089+09:00  INFO 68596 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
    2024-10-27T21:15:54.099+09:00  INFO 68596 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.
    
    Process finished with exit code 0

     

    오류 없이 실행되었다가 종료되었으며

     

     

    정상적으로 output에 파일이 생성되어 데이터가 보이는 것을 확인할 수 있었습니다.

    중간에 어떠한 연산이 없었기 때문에 데이터가 그대로 다시 노출된 것을 볼 수 있습니다.

     

     

     

    Reference

    https://devocean.sk.com/blog/techBoardDetail.do?ID=166828

     

    [SpringBatch 연재 04] FlatFileItemReader로 단순 파일 읽고, FlatFileItemWriter로 파일에 쓰기

     

    devocean.sk.com

     

    https://stackoverflow.com/questions/33719747/flatfileparseexception-parsing-error-spring-batch

     

    FlatFileParseException Parsing error - Spring Batch

    I follow this tutorial and I'm getting FlatFileParseException error: org.springframework.batch.item.file.FlatFileParseException: Parsing error at line: 1 in resource=[class path resource [coun...

    stackoverflow.com

     

    https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/core/io/FileSystemResource.html

     

    FileSystemResource (Spring Framework 6.1.14 API)

    getContentAsString Description copied from interface: Resource Return the contents of this resource as a string, using the specified charset. Specified by: getContentAsString in interface Resource Parameters: charset - the charset to use for decoding Re

    docs.spring.io

     

     

Designed by Tistory.