# 批处理中心
Spring Batch是一个基于Spring的企业级批处理框架。
* 1、读数据
* 2、业务处理
* 3、归档结果数据
## 基础理论


# 设定读取处理写入规则
public Step slaveStep(DeliverPostProcessorItem processorItem,
JdbcCursorItemReader reader) {
CompositeItemProcessor itemProcessor = new CompositeItemProcessor();
List<ItemProcessor> processorList = new ArrayList<>();
return stepBuilderFactory.get("slaveStep")
.<DeliverPost, DeliverPost>chunk(1000)//事务提交批次
## 数据分片
* @create 2019年4月2日
* Content :根据数据ID分片
public class ColumnRangePartitioner implements Partitioner {
private JdbcOperations jdbcTemplate;
ColumnRangePartitioner(DataSource dataSource){
this.jdbcTemplate = new JdbcTemplate(dataSource);
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> result = new LinkedHashMap<String, ExecutionContext>();
int current_thread = 1 ;
int total_thread = gridSize ;
while (current_thread <= total_thread) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + current_thread, value);
value.putInt("current_thread", current_thread);
value.putInt("total_thread", total_thread);
return result;
## 本地基于游标方式读取分片信息
@Bean(destroyMethod = "")
public JdbcCursorItemReader<DeliverPost> JdbcCursorItemReader(
@Value("#{stepExecutionContext['current_thread']}") Long current_thread,
@Value("#{stepExecutionContext['total_thread']}") Long total_thread) {
JdbcCursorItemReader<DeliverPost> reader = new JdbcCursorItemReader<>();
reader.setDataSource(this.dataSource); // 设置数据源
reader.setFetchSize(100); // 设置一次最大读取条数
reader.setRowMapper(new DeliverPostRowMapper()); // 把数据库中的每条数据映射到Person对中
reader.setSql("select order_id , post_id from oc_deliver_post_t where post_id is not null and post_id <> '0' and mod(substring(order_id , -4) ,? )= ( ? -1 )");
reader.setPreparedStatementSetter(new PreparedStatementSetter() {
public void setValues(PreparedStatement preparedStatement) throws SQLException {
preparedStatement.setLong(1, total_thread);
preparedStatement.setLong(2, current_thread);
return reader;
### 分片数据处理过程
* @create 2019年4月2日
* Content :数据处理Item
public class DeliverPostProcessorItem implements ItemProcessor<DeliverPost, DeliverPost> {
Logger logger = LoggerFactory.getLogger(DeliverPostProcessorItem.class);
private CommonDao commonDao ;
private ThirdServiceProp thirdServiceProp;
public DeliverPost process(DeliverPost deliverPost) throws Exception {
logger.info("订单号:【{}】经过处理器 ", deliverPost.getOrderId());
// ems是否签收
String resp = this.getEms(deliverPost.getPostId());
try {
Map respMap = JSONObject.parseObject(resp, Map.class);
if ("0000".equals(respMap.get("code"))) {
Map rep = (Map) respMap.get("rep");
Map msg = (Map) rep.get("msg");
List<Map> traces = (List<Map>) msg.get("traces");
for (Iterator<Map> it = traces.iterator(); it.hasNext();) {
Map temp = it.next();
if ("10".equals(temp.get("code"))) {
// 已签收
} catch (Exception e) {
// 中通是否签收
String resp = this.getZT(deliverPost.getPostId());
try {
Map respMap = JSONObject.parseObject(resp, Map.class);
if ("0000".equals(respMap.get("code"))) {
Map rep = (Map) respMap.get("rep");
Map msg = (Map) rep.get("msg");
List<Map> data = (List<Map>) msg.get("data");
for (Iterator<Map> it = data.iterator(); it.hasNext();) {
Map temp = it.next();
List<Map> traces = (List<Map>) temp.get("traces");
for (Iterator<Map> it1 = traces.iterator(); it1.hasNext();) {
Map tempT = it1.next();
if ("收件".equals(tempT.get("scanType"))) {
// 已签收
} catch (Exception e) {
return deliverPost;
public String getEms(String postId) {
String transid= PointUtil.getRandom() ;
// JSONObject resultJosn = JSONObject.fromObject(result);
StringBuffer strbuf = new StringBuffer();
String jsonOut = "";
try {
com.alibaba.fastjson.JSONObject obj = new com.alibaba.fastjson.JSONObject();
obj.put("method", "ems.inland.trace.query");
obj.put("action", "3th_ems");
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
obj.put("prea", sdf.format(date));// 163315236523523
com.alibaba.fastjson.JSONObject req = new com.alibaba.fastjson.JSONObject();
obj.put("req", req);
com.alibaba.fastjson.JSONObject msg = new com.alibaba.fastjson.JSONObject();
req.put("msg", msg);
msg.put("mailNo", postId);
msg.put("authorization", "408a6c32e61d3ad5cb5c4e0cb3d2b089");
msg.put("timestamp", System.currentTimeMillis());
// 请求数据
jsonOut = obj.toString();
logger.info("EMS请求处理开始: transid=【{}】 ,req=【{}】", transid ,jsonOut);
String callurl = commonDao.getHttpUrl("104");
int timeOut = 3000;
URL url = new URL(thirdServiceProp.getUrl() + callurl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
byte[] contentbyte = jsonOut.getBytes("UTF-8");
conn.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
conn.setRequestProperty("Content-Length", contentbyte.length + "");
conn.setRequestProperty("Accept-Encoding", "");
conn.setRequestProperty("Accept", "application/json");
OutputStream out = conn.getOutputStream();
out.write(contentbyte); // 发送请求报文
InputStream in = conn.getInputStream();
BufferedReader dr = new BufferedReader(new InputStreamReader(in, "utf-8"));
String text_rsp = null;
while ((text_rsp = dr.readLine()) != null) {
logger.info("EMS请求处理结束: transid=【{}】 ,res=【{}】 ",transid, strbuf);
} catch (Exception e) {
logger.error(postId + "EMS转发接口报错!!!");
return strbuf.toString();
public String getZT(String postId) {
String transid= PointUtil.getRandom() ;
// JSONObject resultJosn = JSONObject.fromObject(result);
StringBuffer strbuf = new StringBuffer();
String jsonOut = "";
try {
com.alibaba.fastjson.JSONObject obj = new com.alibaba.fastjson.JSONObject();
obj.put("method", "api.traceInterfaceNewTraces");
obj.put("action", "3th_zto");
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
obj.put("prea", sdf.format(date));// 163315236523523
com.alibaba.fastjson.JSONObject req = new com.alibaba.fastjson.JSONObject();
obj.put("req", req);
com.alibaba.fastjson.JSONObject msg = new com.alibaba.fastjson.JSONObject();
req.put("msg", msg);
msg.put("company_id", "20f74746141c4433a15e7ddd5aade604");
msg.put("data", Arrays.asList(postId));
msg.put("msg_type", "NEW_TRACES");
// 请求数据
jsonOut = obj.toString();
logger.info("中通请求处理开始: transid=【{}】 ,req=【{}】 ",transid , jsonOut);
String callurl = commonDao.getHttpUrl("103");
callurl =callurl.replace("tokenid", "798d3ed2ebaec83ae608c10207f783d6") ;
int timeOut = 3000;
URL url = new URL(thirdServiceProp.getUrl() + callurl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
byte[] contentbyte = jsonOut.getBytes("UTF-8");
conn.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
conn.setRequestProperty("Content-Length", contentbyte.length + "");
conn.setRequestProperty("Accept-Encoding", "");
conn.setRequestProperty("Accept", "application/json");
OutputStream out = conn.getOutputStream();
out.write(contentbyte); // 发送请求报文
InputStream in = conn.getInputStream();
BufferedReader dr = new BufferedReader(new InputStreamReader(in, "utf-8"));
String text_rsp = null;
while ((text_rsp = dr.readLine()) != null) {
logger.info("中通请求处理结束: transid=【{}】 ,res=【{}】 ",transid, strbuf);
} catch (Exception e) {
logger.error(postId + "中通转发接口报错!!!");
return strbuf.toString();
* @create 2019年4月2日
* Content :数据输出item
public class DBWriterItem<T> implements ItemWriter<T> {
private DeliverPostDao deliverPostDao ;
public void write(List<? extends T> list) throws Exception {
deliverPostDao.batchInsert((List<? extends DeliverPost>) list);
