在区块链的世界里,常常很多时候用户需要充值,要不拉起钱包,要不支付到某个特定账号,这个时候可以监控合约交易记录实现实时到账,有的时候上某些网站的时候,至于是哪些网站,小编就不太好说了,有见过直接备注信息充值扫码支付到个人二维码,然后立马就会充值成功,那么这个是怎么实现的呢
当然是后台一个守护进程,然后实时监控到账情况,通过MEMO进行订单信息识别入款,springboot里新开一个守护进程很多种方式,但是小编我比较懒,就给大家介绍一种一个地方改代码就能实现的方式:
通过实现ApplicationRunner的多线程方式:
RechargeMonitorRunne.java
代码语言:javascript复制package io.xxxschedule;
import io.xxx.component.RechargeMonitorServer;
import io.xxx.configuration.AsyncConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
@Component
public class RechargeMonitorRunner implements ApplicationRunner {
private static final Logger logger = LoggerFactory.getLogger(RechargeMonitorRunner.class);
@Resource
private RechargeMonitorServer monitorServer;
@Resource
private AsyncConfiguration threadConfig;
@Override
public void run(ApplicationArguments args) throws Exception {
SimpleDateFormat df = new SimpleDateFormat("HH:mm");//设置日期格式
logger.info("计时器开启。。。。。。");
while (true){
monitoServer.tradeMonitor();
// this.tradeMonitor();
String s=df.format(new Date());
//System.out.println(s);
if ("23:50".equals(s)){//当时间为23:50
logger.info("当前时间为:" df.format(new Date()) "开启数据入库--------");
break;//退出这个while(true)循环
}
}
}
@Async
public Future<String> tradeMonitor() {
System.out.println("nn----------------------------------------------");
System.out.println(Thread.currentThread().getName() "正在处理请求 tradeMonitor");
System.out.println("----------------------------------------------");
String result = "请求失败";
//....你的业务逻辑
// return CompletableFuture.completedFuture(result);
return null;
}
}
监控合约接口,当然也是一个内部api,异步执行,配置异步多线程池:
AsyncConfiguration.java
代码语言:javascript复制package io.xxx.configuration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@EnableAsync // 启用异步任务
public class AsyncConfiguration {
// 声明一个线程池(并指定线程池的名字)
@Bean("AsyncThread")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程数5:线程池创建时候初始化的线程数
executor.setCorePoolSize(5);
//最大线程数5:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
executor.setMaxPoolSize(10);
//缓冲队列500:用来缓冲执行任务的队列
executor.setQueueCapacity(500);
//允许线程的空闲时间60秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(60);
//线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
executor.setThreadNamePrefix("AsyncThread-");
executor.initialize();
return executor;
}
}
业务实现monitorServer.java,这个异步执行文件需要另外一个类分开才能执行:
代码语言:javascript复制package io.xxx.component;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.servlet.http.HttpSession;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@Service
public class RechargeMonitorServer {
@Async("AsyncThread")
public void tradeMonitor() {
System.out.println("nn----------------------------------------------");
System.out.println(Thread.currentThread().getName() "正在处理请求 tradeMonitor");
System.out.println("----------------------------------------------");
String result = "请求失败";
//....你的业务逻辑
// return CompletableFuture.completedFuture(result);
}
}
一run,是可以跑起来,哐当,隔一会儿就会报错,报错信息如下:
代码语言:javascript复制2020-11-26 09:50:58.061 INFO 18692 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'AsyncThread'
RechargeMonitorRunner:
Caused by: org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@6c4ce583[Running, pool size = 10, active threads = 10, queued tasks = 466, completed tasks = 271803]] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$654/1923626523@272778ae
原来while true里不停的加任务,任务满了就把这个守护线程挂了
怎么办,那么换一种写法捕捉队列满异常:
代码语言:javascript复制package io.xxx.schedule;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.xxx.Utils.ErrorAlarm;
import io.xxx.Utils.OkHttpUtils;
import io.xxx.component.RechargeMonitorServer;
import io.xxx.domain.Order;
import io.xxx.service.OrderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;
@Component
public class RechargeMonitorRunner implements ApplicationRunner {
private static final Logger logger = LoggerFactory.getLogger(RechargeMonitorRunner.class);
@Resource
private RechargeMonitorServer monitorServer;
@Value("${xxx.rechargeMonitor.url}")
private String rechargeMonitorUrl;
@Value("${xxx.rechargeMonitor.monitorAccount}")
private String eosMonitorAccount;
private ConcurrentHashMap requestMap= new ConcurrentHashMap();
private String requestJson = "";
@Autowired
private OrderService orderService;
@Autowired
private ErrorAlarm errorAlarm;
@Override
public void run(ApplicationArguments args) throws Exception {
SimpleDateFormat df = new SimpleDateFormat("HH:mm");//设置日期格式
ThreadPoolTaskExecutor threadPoolTaskExecutor = initThreadPoolTaskExecutor();
requestMap.put("code",eosMonitorAccount);
requestMap.put("scope",eosMonitorAccount);
requestMap.put("table","comein");
requestMap.put("json",true);
requestJson = JSON.toJSONString(requestMap);
while (true) {
// System.out.println("Run start thread name->" Thread.currentThread().getName());
//List<Future> taskFutureList = new ArrayList<>();
this.tradeMonitor();
/*
Future future = threadPoolTaskExecutor.submit(() -> {
try {
this.tradeMonitor();
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
});
taskFutureList.add(future);
for (Future future2 : taskFutureList) {
String result = (String) future2.get();
System.out.println(result);
}*/
System.out.println("任务队列任务数量: " threadPoolTaskExecutor.getThreadPoolExecutor().getQueue().size());
//this.tradeMonitor();
// monitorServer.tradeMonitor();
String s = df.format(new Date());
if ("23:50".equals(s)) {//当时间为23:50
logger.info("当前时间为:" df.format(new Date()) "开启数据入库--------");
break;//退出这个while(true)循环
}
// 获取任务队列中的任务数量
}
}
@Async
public Future<String> tradeMonitor() throws ExecutionException, InterruptedException {
// System.out.println("Runner thread name->" Thread.currentThread().getName());
// System.out.println("nn----------------------------------------------");
System.out.println(Thread.currentThread().getName() "正在处理请求 tradeMonitor");
System.out.println("----------------------------------------------");
String result = "";
result = OkHttpUtils.httpPostJson(rechargeMonitorUrl,requestJson);
//result = null ;
if(result==null || result.equals("")){
//报警
ConcurrentHashMap alarmMap = new ConcurrentHashMap();
alarmMap.put("api_url",rechargeMonitorUrl);
alarmMap.put("params",requestJson);
alarmMap.put("method","POST");
alarmMap.put("message","xxx错误");
int alarmFlag = errorAlarm.remoteRequestAlarm(alarmMap);
result = "";
}
JSONObject jo = JSON.parseObject(result);
CopyOnWriteArrayList<Order> orderIdList = new CopyOnWriteArrayList<>();
for(int i = 0;i<jo.getJSONArray("rows").size();i ){
Order order = new Order();
JSONObject oneRecord = (JSONObject)jo.getJSONArray("rows").get(i);
String memo = oneRecord.getString("memo");
String quantity = oneRecord.getString("quantity");
if(memo.indexOf("orderID")>=0){
String[] orderIdInfo = memo.split("\:");
Long orderID = Long.valueOf(orderIdInfo[1]);
String[] amountInfo = quantity.split("\s");
BigDecimal amount = new BigDecimal(amountInfo[0]);
if(amountInfo[1].equals("USDT") || amountInfo[1].equals("USDE")){
order.setId(orderID);
order.setHash(amount);
}
else {
logger.info("RechargeMonitor record orderID:" orderID "is NOTUSDTORUSDE");
}
orderIdList.add(order);
}
}
int affectRows = orderService.confirmOrder(orderIdList);
if(affectRows!=orderIdList.size()){
logger.info("RechargeMonitor orderInfo :" JSON.toJSONString(orderIdList) "is ORDERCONFIRMINCOMPLETE");
}
// Thread.sleep(2000);
// monitorServer.tradeMonitor();
//....你的业务逻辑
return CompletableFuture.completedFuture(result);
// return null;
}
private ThreadPoolTaskExecutor initThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix("xxxTread-");
threadPoolTaskExecutor.setCorePoolSize(5);
threadPoolTaskExecutor.setMaxPoolSize(10);
threadPoolTaskExecutor.setQueueCapacity(500);
threadPoolTaskExecutor.setKeepAliveSeconds(60);
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
System.out.println("丢弃");
logger.info("task full,reject,taskReject log");
}
});
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}
一跑,飞快,还能把CPU和内存尽可能的使用,当然如果不想即时监控,使用定时任务每秒跑一次也是可以的