Asynchronous Spring Batch Job Processing

Hi folks, after a long break we come back with another tutorial from a real world use-case showing how to use the Spring Batch framework with Spring Boot for uploading a csv file and processing large-volume of data asynchronously using the asynchronous joblauncher, so we can have a non-blocking system. How to upload a CSV file with Spring Boot 2, how to use the Spring Batch framework for heavy data processing and how to use the asynchronous jobLauncher.

TL;DR Hi folks, after a long break we come back with another tutorial from a real world use-case showing how to use the Spring Batch framework with Spring Boot for processing large-volume of data asynchronously using the asynchronous joblauncher, so we can have a non-blocking system.

Spring Batch, is an open source project designed for batch processing – execution of a series of jobs. Spring Batch offers classes and APIs to read/process/write resources, transaction management, job processing statistics, job restart, job status and partitioning techniques to process large-volume of data.

Requirements

This Spring Batch tutorial is achieved using the following dependencies:

  • Spring Boot 2.1.8.RELEASE
  • Spring Batch
  • Thymeleaf
  • h2database
  • Okhttp 4.2.0
  • FasterXML/Jackson
  • JDK 1.8

What We Will Build

In this tutorial, we will create a Spring Boot 2 application that will upload a CSV file which contains a list of anonymous IP addresses and produces a CSV file which has the IP addresses and their Geolocation using the Spring Batch framework and the ip-api.com API. In this project we will launch the spring batch job with an asynchronous job launcher which will result in a non-blocking system for client’s requests.

Create a Spring Boot Project

Go to https://start.spring.io/ and create the Spring Boot 2 Project with the following dependencies, or use the STS4 IDE to generate a Spring Boot starter project.

[pom.xml]

Project Directory Structure

This is the project structure you will get.

[Project structure image]

Create the Models

/**
 * @author ninjadevcorner
 * 16 sep. 2019
 */
public class IPApiRequestBody {

 private String query;

 public IPApiRequestBody() {
  super();
 }

 public String getQuery() {
  return query;
 }

 public void setQuery(String query) {
  this.query = query;
 }
}

The IPApiRequestBody model will be used to send a request with the list of IP addresses and the appropriate parameters to the ip-api API.

/**
 * @author ninjadevcorner
 * 16 sep. 2019
 */
public class IPApiResponseBody {
 private String status;
 private String isp;
 private String org;
 // query represent the IP Address
 private String query;
 private String country;
 private String countryCode;
 private String region;
 private String regionName;
 private String city;
 private String timezone;
 private String zip;
 
 public IPApiResponseBody() {
  super();
 }

// Getters and Setters
}

The IPApiResponseBody model handles the response returned from the ip-api API.

/**
 * @author ninjadevcorner
 * 16 sep. 2019
 */
public class IPAddressDetailsDTO {

 private String receiver;
 private String ipAddress;
 private String city;
 private String regionName;
 private String country;
 private String isp;
 
 // Getters and Setters
}

And finally, the IPAddressDetailsDTO is the model responsible of wrapping the data of the uploaded CSV file and the data returned from the server. It is used by the ItemWriter Interface of Spring Batch Framework to write the results to the generated CSV file.

Configure the Spring Batch Job

In this step we will configure our Job using some built-in methods within the Spring Batch project.

First let’s create the Configuration class IpLocatorJobConfig and annotated it with the @EnableBatchProcessing annotation.

/**
 * @author ninjadevcorner
 * 16 sep. 2019
 */
@Configuration
@EnableBatchProcessing
public class IPLocatorJobConfig {

private static final Path CLEAN_DIR = Paths.get("cleaned-dir");
 
 @Autowired
 private JobBuilderFactory jobBuilderFactory;
 
 @Autowired
 private StepBuilderFactory stepBuilderFactory;
 
 @Bean
    @Scope(value="step", proxyMode = ScopedProxyMode.INTERFACES)
    public FlatFileItemReader ipLocatorReader(@Value("#{jobParameters[fullPathFileName]}") String pathToFile) {
        FlatFileItemReader reader = new FlatFileItemReader<>();
        reader.setLinesToSkip(1);
        reader.setResource(new FileSystemResource(pathToFile));
        reader.setLineMapper(new DefaultLineMapper() {{
            setLineTokenizer(new DelimitedLineTokenizer(",") {{
                setNames(new String[]{"receiver", "ipAddress"});
            }});
            setFieldSetMapper(new BeanWrapperFieldSetMapper() {{
                setTargetType(IPAddressDetailsDTO.class);
            }});
        }});
        return reader;
 }
 
 @Bean
 @Scope(value="step", proxyMode = ScopedProxyMode.INTERFACES)
 public ItemStreamWriter ipLocatorWriter(@Value("#{jobParameters[fullPathFileName]}") String pathToFile) {

  Resource resource = new FileSystemResource(pathToFile);
  String filename = resource.getFilename();
  Path userDirectory = CLEAN_DIR;
  
  File cleanedFile = userDirectory.resolve("ipLocator_"+filename).toFile();
  
  IPLocatorItemWriter writer = new IPLocatorItemWriter();
     writer.setResource(new FileSystemResource(cleanedFile));
     DelimitedLineAggregator delLineAgg = new DelimitedLineAggregator();
     delLineAgg.setDelimiter(",");
     BeanWrapperFieldExtractor fieldExtractor = new BeanWrapperFieldExtractor();
     fieldExtractor.setNames(new String[] {"receiver", "ipAddress", "country", "regionName", "city", "isp"});
     delLineAgg.setFieldExtractor(fieldExtractor);
     writer.setLineAggregator(delLineAgg);
        return writer;
 }
 
 @Bean
    @Scope(value="job", proxyMode = ScopedProxyMode.INTERFACES)
    public JobExecutionListener ipLocatorJobCompletionListener() {
  
  return new IpLocatorJobCompletionListener();
 }
 
 @Bean
    public Job ipLocatorJob(ItemReader ipLocatorReader) {
  
        return jobBuilderFactory.get("ipLocatorJob")
                .incrementer(new RunIdIncrementer())
                .listener(ipLocatorJobCompletionListener())
                .flow(ipLocatorStep(ipLocatorReader))
                .end()
                .build();
    }
 
 // skip exception https://stackoverflow.com/questions/24233821/skippable-exception-classes-for-spring-batch-with-java-based-configuration
 // https://github.com/debop/spring-batch-experiments/blob/master/chapter08/src/test/java/kr/spring/batch/chapter08/test/skip/SkipBehaviorConfiguration.java
    //https://blog.netapsys.fr/spring-batch-part-ii-validation-and-skip-policy-ou-comment-gerer-les-donnees-non-valides-dans-un-batch/comment-page-1/
 @Bean
    public Step ipLocatorStep(ItemReader ipLocatorReader) {
        return stepBuilderFactory.get("ipLocatorStep")
                . chunk(80)
                .reader(ipLocatorReader)
                .writer(ipLocatorWriter(null))
                .build();
    }
}

The code in this class is self explanatory, but we will pass through each method of this class to see what it does.

We have annotated some methods with the @Scope(value="step", proxyMode = ScopedProxyMode.INTERFACES). A scope of Step is required to use late binding, because the bean cannot actually be instantiated until the Step starts, to allow the attributes to be found. Using this we are able pass dynamic parameters during runtime which are the jobParameters.

The first method is ipLocatorReader, which allows reading the IP addresses entries from the uploaded CSV file with Spring Boot 2 and passed as a param to the method, then it map the data obtained from the file into an object called IPAddressDetailsDTO using the setFieldSetMapper, BeanWrapperFieldSetMapper and the setTargetType.

The second method is ipLocatorWriter, which is responsible of configuring the writer which will write the details about the IP addresses into the new generated CSV file whose path is passed as a parameter to the method. It also shows how the fields are extracted by the IPLocatorItemWriter which extends the FlatFileItemWriter class and writes them to the new CSV file.

The following is the implementation of the IPLocatorItemWriter class which communicate with the ip-api API and gets the details about the IP addresses sent in the request body.

/**
 * @author ninjadevcorner
 * 16 sep. 2019
 */
public class IPLocatorItemWriter extends FlatFileItemWriter{

 private static final MediaType JSON = MediaType.parse("application/json; charset=utf-8");
 private static final String IP_API_BATCH_URL = "http://ip-api.com/batch?fields=country,regionName,city,isp,status";
 private static final List USER_AGENTS = new ArrayList<>();
 static {
  USER_AGENTS.add("Mozilla/5.0 (Windows NT 6.1; WOW64; rv:40.0) Gecko/20100101 Firefox/40.1");
  USER_AGENTS.add("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10; rv:33.0) Gecko/20100101 Firefox/33.0");
  USER_AGENTS.add("Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36");
  USER_AGENTS.add("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2227.1 Safari/537.36");
  USER_AGENTS.add("Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; AS; rv:11.0) like Gecko");
  USER_AGENTS.add("Opera/9.80 (X11; Linux i686; Ubuntu/14.10) Presto/2.12.388 Version/12.16");
  USER_AGENTS.add("Opera/9.80 (Windows NT 6.0) Presto/2.12.388 Version/12.14");
 }
 
 private OkHttpClient client = new OkHttpClient().newBuilder().build();
 
 @Override
 public void write(List ipAddressDetailsDTOs) throws Exception {
  
  System.out.println("IPLocatorItemWriter");
  
  List<IPApiRequestBody> ipApiResquestBody = new ArrayList<IPApiRequestBody>();
  
  ipAddressDetailsDTOs.stream().forEach(ipAddressDetails -> { 
   IPApiRequestBody ipRequest = new IPApiRequestBody();
   
   ipRequest.setQuery(ipAddressDetails.getIpAddress());
   ipApiResquestBody.add(ipRequest);
  });
  
  ObjectMapper mapper = new ObjectMapper();
  String resquestBodyJson = mapper.writeValueAsString(ipApiResquestBody);
  System.out.println("json request : "+resquestBodyJson);
  String responseJson = post(IP_API_BATCH_URL, resquestBodyJson);
  System.out.println("json response : "+responseJson);
  //Define Custom Type reference for List<IPResponseBody> type
     TypeReference<List<IPApiResponseBody>> mapType = new TypeReference<List<IPApiResponseBody>>() {};
     List<IPApiResponseBody> ipApiResponseBody = mapper.readValue(responseJson, mapType);
     System.out.println("response body : "+ ipApiResponseBody.get(0).getCity());
     for(int i=0; i < ipAddressDetailsDTOs.size(); i++){
      
      ipAddressDetailsDTOs.get(i).setCountry(ipApiResponseBody.get(i).getCountry());
      ipAddressDetailsDTOs.get(i).setRegionName(ipApiResponseBody.get(i).getRegionName());
      ipAddressDetailsDTOs.get(i).setCity(ipApiResponseBody.get(i).getCity());
      ipAddressDetailsDTOs.get(i).setIsp(ipApiResponseBody.get(i).getIsp());

     }
     
  super.write(ipAddressDetailsDTOs);
  Thread.sleep(1000);
 }

 private String post(String url, String json) throws IOException {
  
  int index = new Random().nextInt(USER_AGENTS.size());
  
  RequestBody body = RequestBody.Companion.create(json, JSON);
  Request request = new Request.Builder()
    .url(url)
    .post(body)
    .header("User-Agent", USER_AGENTS.get(index))
    .build();
  Response response = client.newCall(request).execute();
  return response.body().string();
 }
}

The next method is ipLocatorJob, which configures the Batch job and its flow composed of steps, in our case it is the ipLocatorStep and how the job must be done.

The last method is ipLocatorStep, which configures the step and what process should be followed during the step. For example the number of items that will be processed before the transaction is committed, in our case it is 80.

Configure an Asynchronous JobLauncher

/**
 * @author ninjadevcorner.com
 * 16 sep. 2019
 */
@Configuration
public class BatchConfig {
 
 @Autowired
 private JobRepository jobRepository;
 
 @Bean
 public TaskExecutor threadPoolTaskExecutor(){
  
  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setMaxPoolSize(12);
        executor.setCorePoolSize(8);
        executor.setQueueCapacity(15);
  
   return executor;
 }
 
 @Bean
    public JobLauncher asyncJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(threadPoolTaskExecutor());
        return jobLauncher;
 }
}

The following BatchConfig class is a configuration class, it is annotated with the configuration annotation and it autowires the jobRepository dependency used by the job launcher, it defines also 2 Beans, the TaskExecutor Bean which is a ThreadPoolTaskExecutor that reuse the system resources and finally the JobLauncher Bean which configures the job launcher using the JobRepository dependency and the ThreadPoolExecutor Bean.

This JobLauncher bean will be autowired as a dependency in our controller to run the desired Spring Batch job.

Spring Batch Controller

/**
 * @author ninjadevcorner
 * 16 sep. 2019
 */
@Controller
public class IPLocatorController {

 private StorageService storageService;
    private JobLauncher jobLauncher;
    private Job ipLocatorJob;
    
    @Autowired
 public IPLocatorController(StorageService storageService,
   @Qualifier("asyncJobLauncher") JobLauncher jobLauncher,
   Job ipLocatorJob) {
  super();
  this.storageService = storageService;
  this.jobLauncher = jobLauncher;
  this.ipLocatorJob = ipLocatorJob;
 }
    
    @GetMapping(path="/upload")
    public String helloWorld() {
     return "upload";
    }
    
 @PostMapping(path="/locator")
 public ResponseEntity receiversIpLocator(@RequestParam("file") MultipartFile file) {
  
  String message = "";
  
  try {
   Path path = storageService.store(file);
   File fileToImport = path.toFile();
   
   message = "You successfully uploaded " + file.getOriginalFilename() + "! you will be notified when IP Location file generated";
   
   System.out.println("Path : ---------------> "+ path);
   System.out.println("File : ---------------> "+ fileToImport.getAbsolutePath());
   
   jobLauncher.run(ipLocatorJob, new JobParametersBuilder()
           .addString("fullPathFileName", fileToImport.getAbsolutePath())
           .addDate("jobDate", new Date())
           .toJobParameters());
   
   return ResponseEntity.status(HttpStatus.OK).body("{\"message\":\""+message+"\"}");
   
  } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
    | JobParametersInvalidException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
   message = "File Uploaded" + file.getOriginalFilename() + ", error during Localisation Job! exception msg : "+e.getMessage();
   return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("{\"message\":\""+message+"\"}");
  }
 }
    
}

The IPLocatorController is the place where we will expose our Spring Batch file processing service. This controller will be annotated with the @Controller annotation and not the @RestController one. We will use a POST request to upload the file with Spring Boot containing the anonymous IP addresses, and then we will pass the path of the uploaded file as a job parameter to the JobLauncher, so we can run the job and generate a new CSV file with the IP addresses details.

The best thing about this controller is that the request doesn’t get blocked by the Spring Batch Job because the type of the job launcher we have used is an asynchronous jobLauncher, so the client doesn’t need to wait while the Spring batch job is being processed.

Once the Spring Batch job is completed you will receive a message in your console and you will find the generated CSV file inside the cleaned-dir folder in the root of your project.

Template

This is the upload.html file which must be included in the templates folder, it is a simple html template for uploading the CSV file with Spring Boot 2.


<!DOCTYPE html>
<html>
<body>

<h1>Asynchronous Spring Batch Job Processing</h1>

<form method="POST" action="/locator" enctype="multipart/form-data">
    <input type="file" name="file" /><br/><br/>
    <input type="submit" value="Submit" />
</form>

</body>
</html>

Test It!

After launching your Spring Boot app, go to http://localhost:8080/upload and upload your CSV file.

spring boot upload file spring batch async job launcher

This is the uploaded CSV file.
input file spring boot upload

You will get this message on your console.

spring batch async job launcher console result

The generated csv file will look like this.

async spring batch job file writer

Conclusion

In this article we have learned how to upload a CSV file with Spring Boot 2, how to use the Spring Batch framework for heavy data processing and how to use the asynchronous jobLauncher in order to create a non-blocking system for the client requests.

Hope that you have learned something new today, if so you can buy me a coffee ☕ 😎

Spring boot 2 tutorial, Spring Bacth tutorial, upload file with spring boot

Name

Angular,7,Angular 8,1,Best Practices,1,Design,1,Firebase,1,Ionic,1,Java,5,Nodejs,2,Python,1,Restful API,1,Software Development,1,Spring,3,Spring Batch,1,Spring Boot 2,1,Web Development,1,
ltr
item
Programming Tutorials, News and Reviews: Asynchronous Spring Batch Job Processing
Asynchronous Spring Batch Job Processing
Hi folks, after a long break we come back with another tutorial from a real world use-case showing how to use the Spring Batch framework with Spring Boot for uploading a csv file and processing large-volume of data asynchronously using the asynchronous joblauncher, so we can have a non-blocking system. How to upload a CSV file with Spring Boot 2, how to use the Spring Batch framework for heavy data processing and how to use the asynchronous jobLauncher.
https://4.bp.blogspot.com/-QetIwL9a-FY/XYt7EwDgGfI/AAAAAAAAAW8/j_STlViZSrkn_LOFwCR5tVmwPD3nRYGvwCLcBGAsYHQ/s200/desktop-background-desktop-wallpaper-environment-2607956.jpg
https://4.bp.blogspot.com/-QetIwL9a-FY/XYt7EwDgGfI/AAAAAAAAAW8/j_STlViZSrkn_LOFwCR5tVmwPD3nRYGvwCLcBGAsYHQ/s72-c/desktop-background-desktop-wallpaper-environment-2607956.jpg
Programming Tutorials, News and Reviews
https://www.ninjadevcorner.com/2019/09/asynchronous-spring-batch-job-processing.html
https://www.ninjadevcorner.com/
https://www.ninjadevcorner.com/
https://www.ninjadevcorner.com/2019/09/asynchronous-spring-batch-job-processing.html
true
493653397416713395
UTF-8
Loaded All Posts Not found any posts VIEW ALL Readmore Reply Cancel reply Delete By Home PAGES POSTS View All RECOMMENDED FOR YOU LABEL ARCHIVE SEARCH ALL POSTS Not found any post match with your request Back Home Sunday Monday Tuesday Wednesday Thursday Friday Saturday Sun Mon Tue Wed Thu Fri Sat January February March April May June July August September October November December Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec just now 1 minute ago $$1$$ minutes ago 1 hour ago $$1$$ hours ago Yesterday $$1$$ days ago $$1$$ weeks ago more than 5 weeks ago Followers Follow THIS CONTENT IS PREMIUM Please share to unlock Copy All Code Select All Code All codes were copied to your clipboard Can not copy the codes / texts, please press [CTRL]+[C] (or CMD+C with Mac) to copy