The job is querying data to create user notifications, and each of the steps performs a query for each type of data being processed.
Please also read my blog on Spring Batch Decision for running two jobs at different times here.
Environment:
Spring Boot / Spring Boot Starter Batch Version 1.5.6.RELEASE
Oracle Java 8
mysql Ver 14.14 Distrib 5.7.10, for Win64 (x86_64)
I am using MYSQL, so had to set up the necessary tables in my schema to support Spring Batch - https://docs.spring.io/spring-batch/trunk/reference/html/metaDataSchema.html.
- They are located in spring-batch-core jar file under package org.springframework.batch.core for many different example DBs. I used the schema-drop-mysql.sql and schema-mysql.sql as examples since I was using MySql
- Navigate to the spring-batch-core-*.jar. I am using gradle, so I found it here - C:\Users\jhsu\.gradle\caches\modules-2\files-2.1\org.springframework.batch\spring-batch-core\3.0.8.RELEASE\5116a8aec6959f869cd78e779a153e2d43084097\spring-batch-core-3.0.8.RELEASE.jar\org\springframework\batch\core\
README.md
/src/main/java/com/cherryshoe/batch/BatchProcessorApp.java
/src/main/java/com/cherryshoe/batch/job/config/TypeOneJobConfig.java
/src/main/java/com/cherryshoe/batch/job/config/TypeTwoJobConfig.java
/src/main/java/com/cherryshoe/batch/job/config/UserNotificationJobConfig.java
/src/main/java/com/cherryshoe/batch/job/JobNotificationListener.java
/src/main/java/com/cherryshoe/batch/job/UserNotificationJobLauncher.java
/src/main/java/com/cherryshoe/batch/model/CsAuditBatchProcess.java
/src/main/java/com/cherryshoe/batch/model/CsUserNotification.java
/src/main/java/com/cherryshoe/batch/model/dto/CsAndUsersDTO.java
/src/main/java/com/cherryshoe/batch/model/dto/UserInfoDTO.java
/src/main/java/com/cherryshoe/batch/model/dto/UserNotificationDTO.java
/src/main/java/com/cherryshoe/batch/processor/TypeOneProcessor.java
/src/main/java/com/cherryshoe/batch/processor/TypeTwoProcessor.java
/src/main/java/com/cherryshoe/batch/writer/CustomUpdateNotificationWriter.java
/src/main/resources/application.properties
/src/main/resources/logback-spring.xml
Below is a short description of each file followed by the file contents:
/src/main/java/com/cherryshoe/batch/BatchProcessorApp.java
Main entry point of the app.
package com.cherryshoe.batch;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableBatchProcessing
// TODO: Schedule via application.properties, or schedule via OS cron job,
// calling script to invoke jar @EnableScheduling
public class BatchProcessorApp {
public static void main(String[] args) {
SpringApplication.run(BatchProcessorApp.class, args);
}
}
/src/main/java/com/cherryshoe/batch/job/config/TypeOneJobConfig.java
Step / Type One Notification Job Configuration where the Reader, Processor, Writer, and Step is defined.
package com.cherryshoe.batch.job.config;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import com.cherryshoe.batch.model.dto.CsAndUsersDTO;
import com.cherryshoe.batch.model.dto.UserNotificationDTO;
import com.cherryshoe.batch.processor.TypeOneProcessor;
import com.cherryshoe.batch.writer.CustomUpdateNotificationWriter;
/**
* This is the Type One Notification Job Configuration.
*/
@Configuration
public class TypeOneJobConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(TypeOneProcessor.class);
// You need DISTINCT GROUP_CONCAT on user_id since a user can have multiple
// roles on a review, and you only need one of them per user.
// In addition, the user list should have only roles 2 and 3 at
// the system level.
private static final String TYPE_ONE_SELECT_QUERY = "SELECT review.id, review.cs_title, review.created_dt, review.last_updated_dt, "
+ "GROUP_CONCAT(DISTINCT rur.user_id) as user_list FROM cs_review review "
+ "LEFT JOIN rel_review_users rru ON review.id = rru.review_id "
+ "LEFT JOIN rel_user_role rur ON rur.user_id = rru.user_id AND rur.role_id IN (2,3) "
+ "WHERE review.created_dt <= DATE_ADD(CURDATE(), INTERVAL 30 DAY) AND review.active = 1 "
+ "GROUP BY review.id";
/**
* Type One Reader. Did not write a custom Reader class, did it right inside
* the Job Config here.
*
* @param dataSource
* @return
*/
@Bean
ItemReader<CsAndUsersDTO> typeOneReader(DataSource dataSource) {
LOGGER.info("Type One Reader Query - " + TYPE_ONE_SELECT_QUERY);
// TODO: Paging if performance is bad
JdbcCursorItemReader<CsAndUsersDTO> databaseReader = new JdbcCursorItemReader<>();
databaseReader.setDataSource(dataSource);
databaseReader.setSql(TYPE_ONE_SELECT_QUERY);
databaseReader.setRowMapper(new BeanPropertyRowMapper<>(CsAndUsersDTO.class));
return databaseReader;
}
/**
* Type One Processor. Creates the UserNotificationDTO, populating the
* appropriate values and creating the details
*
* @return
*/
@Bean
ItemProcessor<CsAndUsersDTO, UserNotificationDTO> typeOneProcessor() {
return new TypeOneProcessor();
}
/**
* Type One Writer. Uses the UserNotificationDTO information to write to the
* cs_user_notifications table.
*
* @param dataSource
* @param jdbcTemplate
*/
@Bean
ItemWriter<UserNotificationDTO> typeOneWriter() {
return new CustomUpdateNotificationWriter(UserNotificationJobConfig.NOTIF_TYPE_ONE);
}
/**
* Step typeOneStep
*
* @param typeOneReader
* @param typeOneProcessor
* @param typeOneWriter
* @param stepBuilderFactory
* @return
*/
@Bean
Step typeOneStep(ItemReader<CsAndUsersDTO> typeOneReader,
ItemProcessor<CsAndUsersDTO, UserNotificationDTO> typeOneProcessor,
ItemWriter<UserNotificationDTO> typeOneWriter, StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("typeOneStep").<CsAndUsersDTO, UserNotificationDTO>chunk(1).reader(typeOneReader)
.processor(typeOneProcessor).writer(typeOneWriter).build();
}
}
/src/main/java/com/cherryshoe/batch/job/config/TypeTwoJobConfig.javaStep / Type Two Notification Job Configuration where the Reader, Processor, Writer, and Step is defined.
package com.cherryshoe.batch.job.config;
import javax.sql.DataSource;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import com.cherryshoe.batch.model.dto.CsAndUsersDTO;
import com.cherryshoe.batch.model.dto.UserNotificationDTO;
import com.cherryshoe.batch.processor.TypeTwoProcessor;
import com.cherryshoe.batch.writer.CustomUpdateNotificationWriter;
/**
* This is the Type Two Notification Job Configuration.
*/
@Configuration
public class TypeTwoJobConfig {
// You need DISTINCT GROUP_CONCAT on user_id since a user can have multiple
// roles on a review, and you only need one of them per user.
// In addition, the user list should have only roles 2 and 3 at
// the system level.
private static final String TYPE_TWO_SELECT_QUERY = "SELECT review.id, review.cs_title, review.last_updated_dt, "
+ "GROUP_CONCAT(DISTINCT rur.user_id) as user_list FROM cs_review review "
+ "LEFT JOIN rel_review_users rru ON review.id = rru.review_id "
+ "LEFT JOIN rel_user_role rur ON rur.user_id = rru.user_id AND rur.role_id IN (2,3) "
+ "WHERE review.last_updated_dt <= DATE_ADD(CURDATE(), INTERVAL 15 DAY) AND review.active = 1 "
+ "GROUP BY review.id";
/**
* Type Two Reader. Did not write a custom Reader class, did it right inside
* the Job Config here.
*
* @param dataSource
* @return
*/
@Bean
ItemReader<CsAndUsersDTO> typeTwoReader(DataSource dataSource) {
// TODO: Paging if performance is bad
JdbcCursorItemReader<CsAndUsersDTO> databaseReader = new JdbcCursorItemReader<>();
databaseReader.setDataSource(dataSource);
databaseReader.setSql(TYPE_TWO_SELECT_QUERY);
databaseReader.setRowMapper(new BeanPropertyRowMapper<>(CsAndUsersDTO.class));
return databaseReader;
}
/**
* Type Two Processor. Creates the UserNotificationDTO, populating the
* appropriate values and creating the details
*
* @return
*/
@Bean
ItemProcessor<CsAndUsersDTO, UserNotificationDTO> typeTwoProcessor() {
return new TypeTwoProcessor();
}
/**
* Type Two Writer. Uses the UserNotificationDTO information to write to the
* cs_user_notifications table.
*
* @param dataSource
* @param jdbcTemplate
*/
@Bean
ItemWriter<UserNotificationDTO> typeTwoWriter() {
return new CustomUpdateNotificationWriter(UserNotificationJobConfig.NOTIF_TYPE_TWO);
}
/**
* Step typeTwoStep
*
* @param typeTwoReader
* @param typeTwoProcessor
* @param typeTwoWriter
* @param stepBuilderFactory
* @return
*/
@Bean
Step typeTwoStep(ItemReader<CsAndUsersDTO> typeTwoReader,
ItemProcessor<CsAndUsersDTO, UserNotificationDTO> typeTwoProcessor,
ItemWriter<UserNotificationDTO> typeTwoWriter, StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("typeTwoStep").<CsAndUsersDTO, UserNotificationDTO>chunk(1).reader(typeTwoReader)
.processor(typeTwoProcessor).writer(typeTwoWriter).build();
}
}
/src/main/java/com/cherryshoe/batch/job/config/UserNotificationJobConfig.javaThis is the "main" Notification Job Configuration which holds the User Notification overall configuration. Each notification type has a specific Job Configuration class where the Reader, Processor, Writer, and Step should be defined.
package com.cherryshoe.batch.job.config;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.cherryshoe.batch.job.JobNotificationListener;
/**
* This is the "main" Notification Job Configuration which holds the User
* Notification overall configuration. Each notification type has a specific Job
* Configuration class where the Reader, Processor, Writer, and Step should be
* defined.
*/
@Configuration
public class UserNotificationJobConfig {
/**
* Common constants used across each notification type Job Config
*/
public static final ThreadLocal<DateFormat> THREAD_LOCAL_NOTIFICATIONS_DF = new ThreadLocal<DateFormat>() {
@Override
protected DateFormat initialValue() {
return new SimpleDateFormat("MM/dd/yyyy");
}
};
public static final Long SYSTEM_USER = 100000L;
public static final Boolean ACTIVE = true;
public static final Long NOTIF_TYPE_ONE = 1L;
public static final Long NOTIF_TYPE_TWO = 2L;
/**
* Create a "Step" for each of the notification types for the
* userNotificationJob in its respective Config class.
*/
/**
* Notification Job Configuration. Each of the notification Reader steps
* queries for items still "valid" for that step. If they are not valid,
* they will not be touched (last_updated_dt) modified. If they are not
* touched for that batch run, they will be "cleaned up" in the Writer part
* of the step, which cleans up records that have not been touched on this
* day's batch job run.
*
* @param jobBuilderFactory
* @param jobListener
* @param typeOneStep
* @param typeTwoStepStep
* @return
*/
@Bean
Job userNotificationJob(JobBuilderFactory jobBuilderFactory, JobNotificationListener jobListener,
@Qualifier("typeOneStep") Step typeOneStep, @Qualifier("typeTwoStep") Step typeTwoStepStep) {
return jobBuilderFactory.get("userNotificationJob").incrementer(new RunIdIncrementer()).listener(jobListener)
.flow(typeOneStep).next(typeTwoStepStep).end().build();
}
}
/src/main/java/com/cherryshoe/batch/job/JobNotificationListener.javaDefines the beforeJob and afterJob methods that perform cs_audit_batch_process database record writing and updating for auditing purposes.
package com.cherryshoe.batch.job;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.namedparam.BeanPropertySqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import org.springframework.jdbc.support.GeneratedKeyHolder;
import org.springframework.jdbc.support.KeyHolder;
import org.springframework.stereotype.Component;
import com.cherryshoe.batch.job.config.UserNotificationJobConfig;
import com.cherryshoe.batch.model.CsAuditBatchProcess;
@Component
public class JobNotificationListener extends JobExecutionListenerSupport {
private static final Logger LOGGER = LoggerFactory.getLogger(JobNotificationListener.class);
public static final String QUERY_INSERT_AUDIT_BATCH_PROCESS = "INSERT "
+ "INTO cs_audit_batch_process(batch_type, start_time, end_time, created_by, last_updated_by, created_dt, last_updated_dt) "
+ "VALUES (:batchType, :startTime, :endTime, :createdBy, :lastUpdatedBy, :createdDt, :lastUpdatedDt)";
public static final String QUERY_UPDATE_AUDIT_BATCH_PROCESS = "UPDATE cs_audit_batch_process SET end_time = :endTime, "
+ "last_updated_by = :lastUpdatedBy, last_updated_dt = :lastUpdatedDt WHERE id = :id";
@Autowired
private NamedParameterJdbcTemplate jdbcTemplate;
private Long batchId;
@Override
public void beforeJob(JobExecution jobExecution) {
// Insert new batch id into audit table.
Date now = new Date();
CsAuditBatchProcess auditBatchProcess = new CsAuditBatchProcess();
auditBatchProcess.setBatchType("Notifications");
auditBatchProcess.setStartTime(now);
auditBatchProcess.setEndTime(null);
auditBatchProcess.setCreatedBy(UserNotificationJobConfig.SYSTEM_USER);
auditBatchProcess.setCreatedDt(now);
auditBatchProcess.setLastUpdatedBy(null);
auditBatchProcess.setLastUpdatedDt(null);
SqlParameterSource parameterSource = new BeanPropertySqlParameterSource(auditBatchProcess);
KeyHolder holder = new GeneratedKeyHolder();
jdbcTemplate.update(QUERY_INSERT_AUDIT_BATCH_PROCESS, parameterSource, holder, new String[] { "id" });
// Retrieve the new batch id we added into audit table.
batchId = holder.getKey().longValue();
// Set batch id for the job.
jobExecution.getExecutionContext().put("batch_id", batchId);
}
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
Date now = new Date();
CsAuditBatchProcess auditBatchProcess = new CsAuditBatchProcess();
auditBatchProcess.setId(batchId);
auditBatchProcess.setEndTime(now);
auditBatchProcess.setLastUpdatedBy(UserNotificationJobConfig.SYSTEM_USER);
auditBatchProcess.setLastUpdatedDt(now);
SqlParameterSource parameterSource = new BeanPropertySqlParameterSource(auditBatchProcess);
jdbcTemplate.update(QUERY_UPDATE_AUDIT_BATCH_PROCESS, parameterSource);
LOGGER.info("JOB FINISHED - BATCH ID " + batchId);
}
}
}
/src/main/java/com/cherryshoe/batch/job/UserNotificationJobLauncher.javaJob Launcher code.
package com.cherryshoe.batch.job;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import com.cherryshoe.batch.job.config.UserNotificationJobConfig;
@Component
public class UserNotificationJobLauncher {
private static final Logger LOGGER = LoggerFactory.getLogger(UserNotificationJobLauncher.class);
private final Job job;
private final JobLauncher jobLauncher;
@Autowired
UserNotificationJobLauncher(@Qualifier("userNotificationJob") Job job, JobLauncher jobLauncher) {
this.job = job;
this.jobLauncher = jobLauncher;
}
// TODO: Schedule via application.properties, or schedule via OS cron job,
// calling script to invoke jar
// @Scheduled(cron = "${cs.usernotification.job.cron}")
void launchUserNotificationJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException,
JobRestartException, JobInstanceAlreadyCompleteException {
LOGGER.info("Starting userNotificationJob job");
jobLauncher.run(job, newExecution());
LOGGER.info("Stopping userNotificationJob job");
}
private JobParameters newExecution() {
Map<String, JobParameter> parameters = new HashMap<>();
Date currentTime = new Date();
// TODO This is not being called when the scheduler is not in place, so
// how to inject Job Parameters?
LOGGER.info("****************************************************Job Parameter currentTime["
+ UserNotificationJobConfig.THREAD_LOCAL_NOTIFICATIONS_DF.get().format(currentTime) + "]");
JobParameter parameter = new JobParameter(currentTime);
parameters.put("currentTime", parameter);
return new JobParameters(parameters);
}
}
/src/main/java/com/cherryshoe/batch/model/CsAuditBatchProcess.javaPOJO that represents the cs_audit_batch_process table.
package com.cherryshoe.batch.model;
import java.util.Date;
public class CsAuditBatchProcess {
// needed for retrieval
private Long id;
private String batchType;
private Date startTime;
private Date endTime;
private Long createdBy;
private Long lastUpdatedBy;
private Date createdDt;
private Date lastUpdatedDt;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getBatchType() {
return batchType;
}
public void setBatchType(String batchType) {
this.batchType = batchType;
}
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public Date getEndTime() {
return endTime;
}
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
public Long getCreatedBy() {
return createdBy;
}
public void setCreatedBy(Long createdBy) {
this.createdBy = createdBy;
}
public Long getLastUpdatedBy() {
return lastUpdatedBy;
}
public void setLastUpdatedBy(Long lastUpdatedBy) {
this.lastUpdatedBy = lastUpdatedBy;
}
public Date getCreatedDt() {
return createdDt;
}
public void setCreatedDt(Date createdDt) {
this.createdDt = createdDt;
}
public Date getLastUpdatedDt() {
return lastUpdatedDt;
}
public void setLastUpdatedDt(Date lastUpdatedDt) {
this.lastUpdatedDt = lastUpdatedDt;
}
}
/src/main/java/com/cherryshoe/batch/model/CsUserNotification.java
POJO that represents the cs_user_notification table.
package com.cherryshoe.batch.model;
import java.util.Date;
/**
* Represents the cs_notification table. Only has the fields necessary for use
* as a BeanPropertySqlParameterSource, so we can use the
* NamedParameterJdbcTemplate. update(String sql, SqlParameterSource
* paramSource, KeyHolder generatedKeyHolder) method, in order to get the id of
* the created obejct.
*/
public class CsUserNotification {
// needed for retrieval
private Long id;
// needed for insert and update
private Long userNotificationTypeId;
private Long csId;
private String csTitle;
private String details;
private Long batchId;
private Boolean active;
private Long createdBy;
private Long lastUpdatedBy;
private Date createdDt;
private Date lastUpdatedDt;
public CsUserNotification() {
super();
}
public CsUserNotification(Long userNotificationTypeId, Long csId, String csTitle, String details, Long batchId,
Boolean active, Long createdBy, Long lastUpdatedBy, Date createdDt, Date lastUpdatedDt) {
super();
this.userNotificationTypeId = userNotificationTypeId;
this.csId = csId;
this.csTitle = csTitle;
this.details = details;
this.batchId = batchId;
this.active = active;
this.createdBy = createdBy;
this.lastUpdatedBy = lastUpdatedBy;
this.createdDt = createdDt;
this.lastUpdatedDt = lastUpdatedDt;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Long getUserNotificationTypeId() {
return userNotificationTypeId;
}
public void setUserNotificationTypeId(Long userNotificationTypeId) {
this.userNotificationTypeId = userNotificationTypeId;
}
public Long getCsId() {
return csId;
}
public void setCsId(Long csId) {
this.csId = csId;
}
public String getCsTitle() {
return csTitle;
}
public void setCsTitle(String csTitle) {
this.csTitle = csTitle;
}
public String getDetails() {
return details;
}
public void setDetails(String details) {
this.details = details;
}
public Long getBatchId() {
return batchId;
}
public void setBatchId(Long batchId) {
this.batchId = batchId;
}
public Boolean getActive() {
return active;
}
public void setActive(Boolean active) {
this.active = active;
}
public Long getCreatedBy() {
return createdBy;
}
public void setCreatedBy(Long createdBy) {
this.createdBy = createdBy;
}
public Long getLastUpdatedBy() {
return lastUpdatedBy;
}
public void setLastUpdatedBy(Long lastUpdatedBy) {
this.lastUpdatedBy = lastUpdatedBy;
}
public Date getLastUpdatedDt() {
return lastUpdatedDt;
}
public void setLastUpdatedDt(Date lastUpdatedDt) {
this.lastUpdatedDt = lastUpdatedDt;
}
public Date getCreatedDt() {
return createdDt;
}
public void setCreatedDt(Date createdDt) {
this.createdDt = createdDt;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((active == null) ? 0 : active.hashCode());
result = prime * result + ((batchId == null) ? 0 : batchId.hashCode());
result = prime * result + ((createdBy == null) ? 0 : createdBy.hashCode());
result = prime * result + ((createdDt == null) ? 0 : createdDt.hashCode());
result = prime * result + ((details == null) ? 0 : details.hashCode());
result = prime * result + ((id == null) ? 0 : id.hashCode());
result = prime * result + ((lastUpdatedBy == null) ? 0 : lastUpdatedBy.hashCode());
result = prime * result + ((lastUpdatedDt == null) ? 0 : lastUpdatedDt.hashCode());
result = prime * result + ((csId == null) ? 0 : csId.hashCode());
result = prime * result + ((csTitle == null) ? 0 : csTitle.hashCode());
result = prime * result + ((userNotificationTypeId == null) ? 0 : userNotificationTypeId.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
CsUserNotification other = (CsUserNotification) obj;
if (active == null) {
if (other.active != null)
return false;
} else if (!active.equals(other.active))
return false;
if (batchId == null) {
if (other.batchId != null)
return false;
} else if (!batchId.equals(other.batchId))
return false;
if (createdBy == null) {
if (other.createdBy != null)
return false;
} else if (!createdBy.equals(other.createdBy))
return false;
if (createdDt == null) {
if (other.createdDt != null)
return false;
} else if (!createdDt.equals(other.createdDt))
return false;
if (details == null) {
if (other.details != null)
return false;
} else if (!details.equals(other.details))
return false;
if (id == null) {
if (other.id != null)
return false;
} else if (!id.equals(other.id))
return false;
if (lastUpdatedBy == null) {
if (other.lastUpdatedBy != null)
return false;
} else if (!lastUpdatedBy.equals(other.lastUpdatedBy))
return false;
if (lastUpdatedDt == null) {
if (other.lastUpdatedDt != null)
return false;
} else if (!lastUpdatedDt.equals(other.lastUpdatedDt))
return false;
if (csId == null) {
if (other.csId != null)
return false;
} else if (!csId.equals(other.csId))
return false;
if (csTitle == null) {
if (other.csTitle != null)
return false;
} else if (!csTitle.equals(other.csTitle))
return false;
if (userNotificationTypeId == null) {
if (other.userNotificationTypeId != null)
return false;
} else if (!userNotificationTypeId.equals(other.userNotificationTypeId))
return false;
return true;
}
@Override
public String toString() {
return "UserNotification [id=" + id + ", userNotificationTypeId=" + userNotificationTypeId + ", csId=" + csId
+ ", reviewTitle=" + csTitle + ", details=" + details + ", batchId=" + batchId + ", active=" + active
+ ", createdBy=" + createdBy + ", lastUpdatedBy=" + lastUpdatedBy + ", createdDt=" + createdDt
+ ", lastUpdatedDt=" + lastUpdatedDt + "]";
}
}
/src/main/java/com/cherryshoe/batch/model/dto/CsAndUsersDTO.javaData Transfer Object to transfer data between the Reader to the Processor.
package com.cherryshoe.batch.model.dto;
import java.util.Date;
/**
* Cs and users associated to the review Data Transfer Object populated by the
* Reader. Used for a Cs-based Notification.
*/
public class CsAndUsersDTO extends UserInfoDTO {
private Long id;
private String csTitle;
private Date createdDt;
private Date lastUpdatedDt;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getCsTitle() {
return csTitle;
}
public void setCsTitle(String csTitle) {
this.csTitle = csTitle;
}
public Date getCreatedDt() {
return createdDt;
}
public void setCreatedDt(Date createdDt) {
this.createdDt = createdDt;
}
public Date getLastUpdatedDt() {
return lastUpdatedDt;
}
public void setLastUpdatedDt(Date lastUpdatedDt) {
this.lastUpdatedDt = lastUpdatedDt;
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + ((id == null) ? 0 : id.hashCode());
result = prime * result + ((createdDt == null) ? 0 : createdDt.hashCode());
result = prime * result + ((lastUpdatedDt == null) ? 0 : lastUpdatedDt.hashCode());
result = prime * result + ((csTitle == null) ? 0 : csTitle.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (getClass() != obj.getClass())
return false;
CsAndUsersDTO other = (CsAndUsersDTO) obj;
if (id == null) {
if (other.id != null)
return false;
} else if (!id.equals(other.id))
return false;
if (createdDt == null) {
if (other.createdDt != null)
return false;
} else if (!createdDt.equals(other.createdDt))
return false;
if (lastUpdatedDt == null) {
if (other.lastUpdatedDt != null)
return false;
} else if (!lastUpdatedDt.equals(other.lastUpdatedDt))
return false;
if (csTitle == null) {
if (other.csTitle != null)
return false;
} else if (!csTitle.equals(other.csTitle))
return false;
return true;
}
@Override
public String toString() {
return "CsAndUsersDTO [id=" + id + ", csTitle=" + csTitle + ", createdDt=" + createdDt + ", lastUpdatedDt="
+ lastUpdatedDt + ", userList=" + getUserList() + "]";
}
}
/src/main/java/com/cherryshoe/batch/model/dto/UserInfoDTO.javaBase Data Transfer Object.
package com.cherryshoe.batch.model.dto;
import java.util.List;
public class UserInfoDTO {
private List<Long> userList;
public UserInfoDTO() {
super();
}
public UserInfoDTO(List<Long> userList) {
super();
this.userList = userList;
}
public List<Long> getUserList() {
return userList;
}
public void setUserList(List<Long> userList) {
this.userList = userList;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((userList == null) ? 0 : userList.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
UserInfoDTO other = (UserInfoDTO) obj;
if (userList == null) {
if (other.userList != null)
return false;
} else if (!userList.equals(other.userList))
return false;
return true;
}
@Override
public String toString() {
return "UserInfo [userList=" + userList + "]";
}
}
/src/main/java/com/cherryshoe/batch/model/dto/UserNotificationDTO.javaData Transfer Object to transfer data between the Processor and the Writer.
package com.cherryshoe.batch.model.dto;
import java.util.List;
/**
* Cs User Notification Data Transfer Object populated by the Processors. Used
* for all types of Notifications.
*/
public class UserNotificationDTO extends UserInfoDTO {
private Long id;
private Long userNotificationTypeId;
private Long csId;
private String csTitle;
private String details;
private Boolean active;
private Long createdBy;
public UserNotificationDTO() {
super();
}
/**
* Do not populate id manually, use DB auto-increment.
*
* @param userNotificationTypeId
* @param csId
* @param csTitle
* @param details
* @param active
* @param createdBy
* @param userList
*/
public UserNotificationDTO(Long userNotificationTypeId, Long csId, String csTitle, String details, Boolean active,
Long createdBy, List<Long> userList) {
super(userList);
this.userNotificationTypeId = userNotificationTypeId;
this.csId = csId;
this.csTitle = csTitle;
this.details = details;
this.active = active;
this.createdBy = createdBy;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Long getUserNotificationTypeId() {
return userNotificationTypeId;
}
public void setUserNotificationTypeId(Long userNotificationTypeId) {
this.userNotificationTypeId = userNotificationTypeId;
}
public Long getCsId() {
return csId;
}
public void setCsId(Long csId) {
this.csId = csId;
}
public String getCsTitle() {
return csTitle;
}
public void setCsTitle(String csTitle) {
this.csTitle = csTitle;
}
public String getDetails() {
return details;
}
public void setDetails(String details) {
this.details = details;
}
public Boolean getActive() {
return active;
}
public void setActive(Boolean active) {
this.active = active;
}
public Long getCreatedBy() {
return createdBy;
}
public void setCreatedBy(Long createdBy) {
this.createdBy = createdBy;
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + ((active == null) ? 0 : active.hashCode());
result = prime * result + ((createdBy == null) ? 0 : createdBy.hashCode());
result = prime * result + ((details == null) ? 0 : details.hashCode());
result = prime * result + ((id == null) ? 0 : id.hashCode());
result = prime * result + ((csId == null) ? 0 : csId.hashCode());
result = prime * result + ((csTitle == null) ? 0 : csTitle.hashCode());
result = prime * result + ((userNotificationTypeId == null) ? 0 : userNotificationTypeId.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (getClass() != obj.getClass())
return false;
UserNotificationDTO other = (UserNotificationDTO) obj;
if (active == null) {
if (other.active != null)
return false;
} else if (!active.equals(other.active))
return false;
if (createdBy == null) {
if (other.createdBy != null)
return false;
} else if (!createdBy.equals(other.createdBy))
return false;
if (details == null) {
if (other.details != null)
return false;
} else if (!details.equals(other.details))
return false;
if (id == null) {
if (other.id != null)
return false;
} else if (!id.equals(other.id))
return false;
if (csId == null) {
if (other.csId != null)
return false;
} else if (!csId.equals(other.csId))
return false;
if (csTitle == null) {
if (other.csTitle != null)
return false;
} else if (!csTitle.equals(other.csTitle))
return false;
if (userNotificationTypeId == null) {
if (other.userNotificationTypeId != null)
return false;
} else if (!userNotificationTypeId.equals(other.userNotificationTypeId))
return false;
return true;
}
@Override
public String toString() {
return "UserNotificationDTO [id=" + id + ", userNotificationTypeId=" + userNotificationTypeId + ", csId=" + csId
+ ", csTitle=" + csTitle + ", details=" + details + ", active=" + active + ", createdBy=" + createdBy
+ ", userList=" + getUserList() + "]";
}
}
/src/main/java/com/cherryshoe/batch/processor/TypeOneProcessor.javaProcesses the data from the Type One Reader and transforms it to the DTO that the custom Writer is expecting.
package com.cherryshoe.batch.processor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import com.cherryshoe.batch.job.config.UserNotificationJobConfig;
import com.cherryshoe.batch.model.dto.CsAndUsersDTO;
import com.cherryshoe.batch.model.dto.UserNotificationDTO;
public class TypeOneProcessor implements ItemProcessor<CsAndUsersDTO, UserNotificationDTO> {
private static final Logger LOGGER = LoggerFactory.getLogger(TypeOneProcessor.class);
@Override
public UserNotificationDTO process(CsAndUsersDTO item) throws Exception {
LOGGER.info("Processing review and user information: {}", item);
String details = "Projected Start Date: "
+ UserNotificationJobConfig.THREAD_LOCAL_NOTIFICATIONS_DF.get().format(item.getCreatedDt());
UserNotificationDTO userNotifDTO = new UserNotificationDTO(UserNotificationJobConfig.NOTIF_TYPE_ONE,
item.getId(), item.getCsTitle(), details, UserNotificationJobConfig.ACTIVE,
UserNotificationJobConfig.SYSTEM_USER, item.getUserList());
return userNotifDTO;
}
}
/src/main/java/com/cherryshoe/batch/processor/TypeTwoProcessor.javaProcesses the data from the Type Two Reader and transforms it to the DTO that the custom Writer is expecting.
package com.cherryshoe.batch.processor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import com.cherryshoe.batch.job.config.UserNotificationJobConfig;
import com.cherryshoe.batch.model.dto.CsAndUsersDTO;
import com.cherryshoe.batch.model.dto.UserNotificationDTO;
public class TypeTwoProcessor implements ItemProcessor<CsAndUsersDTO, UserNotificationDTO> {
private static final Logger LOGGER = LoggerFactory.getLogger(TypeTwoProcessor.class);
@Override
public UserNotificationDTO process(CsAndUsersDTO item) throws Exception {
LOGGER.info("Processing review and user information: {}", item);
String details = "Date: "
+ UserNotificationJobConfig.THREAD_LOCAL_NOTIFICATIONS_DF.get().format(item.getLastUpdatedDt());
UserNotificationDTO userNotifDTO = new UserNotificationDTO(UserNotificationJobConfig.NOTIF_TYPE_TWO,
item.getId(), item.getCsTitle(), details, UserNotificationJobConfig.ACTIVE,
UserNotificationJobConfig.SYSTEM_USER, item.getUserList());
return userNotifDTO;
}
}
/src/main/java/com/cherryshoe/batch/writer/CustomUpdateNotificationWriter.java
Custom Writer that takes DTO data from each type's Processor. This inserts/updates data in the cs_user_notification table.
package com.cherryshoe.batch.writer;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.namedparam.BeanPropertySqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import org.springframework.jdbc.support.GeneratedKeyHolder;
import org.springframework.jdbc.support.KeyHolder;
import com.cherryshoe.batch.job.config.UserNotificationJobConfig;
import com.cherryshoe.batch.model.CsUserNotification;
import com.cherryshoe.batch.model.dto.UserNotificationDTO;
/**
* This is the custom writer that all notification's will use to write to the
* cs_user_notification
*/
public class CustomUpdateNotificationWriter implements ItemWriter<UserNotificationDTO> {
private static final Logger LOGGER = LoggerFactory.getLogger(CustomUpdateNotificationWriter.class);
/**
* Insert a new notification row if it does not exist for the (notification
* type id, review id) pair.
*
* Update an existing notification row if it exists for the (notification
* type id, review id) pair. Make sure to set the active flag for the
* existing row, because we found a valid notification. Update the details
* as well, because the old details may be out of date.
*/
public static final String QUERY_UPDATE_USER_NOTIFICATION = "INSERT "
+ "INTO cs_user_notification(user_notification_type_id, cs_id, cs_title, details, batch_id, active, created_by, created_dt) "
+ "VALUES (:userNotificationTypeId, :csId, :csTitle, :details, :batchId, :active, :createdBy, :createdDt) "
+ "ON DUPLICATE KEY UPDATE details = :details, batch_id = :batchId, active = 1, last_updated_dt = :lastUpdatedDt, last_updated_by = :lastUpdatedBy";
/**
* update query to make cs_user_notification record inactive
*
* If batch_id is not equal to the current batch_id, then it is no longer a
* valid user notification.
*/
public static final String QUERY_MAKE_INACTIVE_USER_NOTIFICATION = "UPDATE cs_user_notification "
+ "SET active = 0, last_updated_dt = :lastUpdatedDt, last_updated_by = :lastUpdatedBy WHERE batch_id != :batchId "
+ "AND user_notification_type_id = :userNotificationTypeId AND active = 1";
@Autowired
private NamedParameterJdbcTemplate jdbcTemplate;
private Long userNotificationTypeId;
private Long batchId;
public CustomUpdateNotificationWriter(Long userNotificationTypeId) {
this.userNotificationTypeId = userNotificationTypeId;
}
@BeforeStep
public void retrieveBatchId(StepExecution stepExecution) {
batchId = stepExecution.getJobExecution().getExecutionContext().getLong("batch_id");
LOGGER.info("Batch ID: " + ((batchId != null) ? batchId : "null"));
}
/**
* This takes a List of items, where the size is determined by the step's
* chunk size
*/
@Override
public void write(List<? extends UserNotificationDTO> items) throws Exception {
LOGGER.info("Received the information of {} user notifications", items.size());
Date timeStamp = new Date();
KeyHolder keyHolder;
for (UserNotificationDTO item : items) {
LOGGER.debug("**********Received the information of a user notification: {}", item);
keyHolder = processUserNotification(item, timeStamp);
}
processDeactivateUserNotification(timeStamp);
}
/**
* Processes insert / update of cs_user_notification records
*
* @param item
* @param timeStamp
* @return
* @throws Exception
*/
protected KeyHolder processUserNotification(UserNotificationDTO item, Date timeStamp) throws Exception {
// map UserNotificationDTO to UserNotification
CsUserNotification userNotif = new CsUserNotification(item.getUserNotificationTypeId(), item.getCsId(),
item.getCsTitle(), item.getDetails(), batchId, item.getActive(), item.getCreatedBy(),
UserNotificationJobConfig.SYSTEM_USER, timeStamp, timeStamp);
// insert or update cs_user_notification using SqlParameterSource
// since all the fields needed are available there
SqlParameterSource parameterSource = new BeanPropertySqlParameterSource(userNotif);
KeyHolder keyHolder = new GeneratedKeyHolder();
int notifCount = jdbcTemplate.update(QUERY_UPDATE_USER_NOTIFICATION, parameterSource, keyHolder,
new String[] { "id" });
if (LOGGER.isDebugEnabled()) {
List<Map<String, Object>> keyList = keyHolder.getKeyList();
String key;
Object value;
for (Map<String, Object> map : keyList) {
for (Map.Entry<String, Object> entry : map.entrySet()) {
key = entry.getKey();
value = entry.getValue();
LOGGER.debug("key[" + key + "]value[" + value + "]");
}
}
}
return keyHolder;
}
protected void processDeactivateUserNotification(Date timeStamp) throws Exception {
Map<String, Object> relNamedParams = new HashMap<String, Object>();
relNamedParams.put("batchId", batchId);
relNamedParams.put("userNotificationTypeId", userNotificationTypeId);
relNamedParams.put("lastUpdatedBy", UserNotificationJobConfig.SYSTEM_USER);
relNamedParams.put("lastUpdatedDt", timeStamp);
jdbcTemplate.update(QUERY_MAKE_INACTIVE_USER_NOTIFICATION, relNamedParams);
}
}
/src/main/resources/application.propertiesConfiguration for the application.
#Database Configuration
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/cherryshoe?useSSL=false
spring.datasource.username=cherryshoe
spring.datasource.password=cherryshoe
spring.datasource.test-on-borrow=true
spring.datasource.remove-abandoned=true
spring.datasource.validation-query=SELECT 1;
#Batch Configuration
# Switch off Initialization of Spring Batch database, we are controlling these with our DB scripts
spring.batch.initializer.enabled=false
# If you don't want to use spring batch provided scheduler, set this to true to run on startup once
spring.batch.job.enabled=true
#Spring Batch Configuration
#* * * * * *
#| | | | | |
#| | | | | +-- Year (range: 1900-3000)
#| | | | +---- Day of the Week (range: 1-7, 1 standing for Monday)
#| | | +------ Month of the Year (range: 1-12)
#| | +-------- Day of the Month (range: 1-31)
#| +---------- Hour (range: 0-23)
#+------------ Minute (range: 0-59)
# TODO: Schedule via application.properties, or schedule via OS cron job, calling script to invoke jar
# cs.usernotification.job.cron=1 0 * * * *
/src/main/resources/logback-spring.xml
Configuration for logging for the application.
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/base.xml" />
<!-- Override default FILE appender -->
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>cs-batch-processor.log</file>
<append>false</append>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- Also log to spring provided CONSOLE appender -->
<logger name="com.cherryshoe.batch" level="DEBUG"
additivity="false">
<appender-ref ref="FILE" />
<appender-ref ref="CONSOLE" />
</logger>
<root level="INFO">
<appender-ref ref="FILE" />
<appender-ref ref="CONSOLE" />
</root>
</configuration>
Below is sample SQL to create the cs_batch_process and cs_user_notification tables:
CREATE TABLE `cs_audit_batch_process` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`batch_type` varchar(50) NOT NULL,
`start_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT 'Date batch process was started.',
`end_time` datetime DEFAULT NULL COMMENT 'Date batch process ended. Null if still running or crashed.',
`created_dt` datetime DEFAULT CURRENT_TIMESTAMP COMMENT 'Date record was created.',
`last_updated_dt` datetime DEFAULT NULL COMMENT 'Date record was last updated.',
`created_by` int(11) DEFAULT NULL,
`last_updated_by` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=134 DEFAULT CHARSET=utf8;
CREATE TABLE `cs_user_notification` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_notification_type_id` int(11) NOT NULL,
`cs_id` int(11) NOT NULL,
`cs_title` varchar(100) NOT NULL,
`details` varchar(200) NOT NULL,
`batch_id` int(11) NOT NULL,
`active` tinyint(1) NOT NULL DEFAULT '1',
`created_dt` datetime DEFAULT CURRENT_TIMESTAMP COMMENT 'Date record was created.',
`last_updated_dt` datetime DEFAULT NULL COMMENT 'Date record was last updated.',
`created_by` int(11) DEFAULT NULL,
`last_updated_by` int(11) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uq_type_id_review_id` (`user_notification_type_id`,`cs_id`)
) ENGINE=InnoDB AUTO_INCREMENT=43167 DEFAULT CHARSET=utf8;
The following were helpful:
- https://www.petrikainulainen.net/programming/spring-framework/spring-batch-tutorial-introduction/
- https://github.com/pkainulainen/spring-batch-examples/tree/master/spring-boot
- https://stackoverflow.com/questions/18999724/spring-batch-one-reader-multiple-processors-and-writers
- http://javasampleapproach.com/spring-framework/spring-batch/spring-batch-programmatic-flow-decision#4_Create_Flow_Decision
- https://stackoverflow.com/questions/21782008/how-to-terminate-step-within-a-spring-batch-split-flow-with-a-decider
- https://narmo7.wordpress.com/2014/05/14/spring-batch-how-to-setup-a-flow-job-with-java-based-configuration/
No comments:
Post a Comment
I appreciate your time in leaving a comment!