风骚的操作:区块链监控个人账户即时在线充值

2020-12-02 10:26:27 浏览数 (1)

在区块链的世界里,常常很多时候用户需要充值,要不拉起钱包,要不支付到某个特定账号,这个时候可以监控合约交易记录实现实时到账,有的时候上某些网站的时候,至于是哪些网站,小编就不太好说了,有见过直接备注信息充值扫码支付到个人二维码,然后立马就会充值成功,那么这个是怎么实现的呢

当然是后台一个守护进程,然后实时监控到账情况,通过MEMO进行订单信息识别入款,springboot里新开一个守护进程很多种方式,但是小编我比较懒,就给大家介绍一种一个地方改代码就能实现的方式:

通过实现ApplicationRunner的多线程方式:

RechargeMonitorRunne.java

代码语言:javascript复制
package io.xxxschedule;

import io.xxx.component.RechargeMonitorServer;
import io.xxx.configuration.AsyncConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

@Component
public class RechargeMonitorRunner implements ApplicationRunner {
    private static final Logger logger = LoggerFactory.getLogger(RechargeMonitorRunner.class);
    @Resource
    private RechargeMonitorServer monitorServer;
    @Resource
    private AsyncConfiguration threadConfig;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        SimpleDateFormat df = new SimpleDateFormat("HH:mm");//设置日期格式
        logger.info("计时器开启。。。。。。");
        while (true){
            monitoServer.tradeMonitor();
        // this.tradeMonitor();
            String s=df.format(new Date());
            //System.out.println(s);
            if ("23:50".equals(s)){//当时间为23:50
                logger.info("当前时间为:" df.format(new Date()) "开启数据入库--------");
                break;//退出这个while(true)循环
            }
        }
    }
    @Async
    public Future<String> tradeMonitor() {
        System.out.println("nn----------------------------------------------");
        System.out.println(Thread.currentThread().getName()   "正在处理请求 tradeMonitor");
        System.out.println("----------------------------------------------");
        String result = "请求失败";

        //....你的业务逻辑
        //  return CompletableFuture.completedFuture(result);
        return null;
    }


}

监控合约接口,当然也是一个内部api,异步执行,配置异步多线程池:

AsyncConfiguration.java

代码语言:javascript复制
package io.xxx.configuration;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
@EnableAsync  // 启用异步任务
public class AsyncConfiguration {

    // 声明一个线程池(并指定线程池的名字)
    @Bean("AsyncThread")
    public Executor asyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程数5:线程池创建时候初始化的线程数
        executor.setCorePoolSize(5);
        //最大线程数5:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
        executor.setMaxPoolSize(10);
        //缓冲队列500:用来缓冲执行任务的队列
        executor.setQueueCapacity(500);
        //允许线程的空闲时间60秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
        executor.setKeepAliveSeconds(60);
        //线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
        executor.setThreadNamePrefix("AsyncThread-");
        executor.initialize();
        return executor;
    }
}

业务实现monitorServer.java,这个异步执行文件需要另外一个类分开才能执行:

代码语言:javascript复制
package io.xxx.component;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import javax.servlet.http.HttpSession;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

@Service
public class RechargeMonitorServer {
    @Async("AsyncThread")
    public void tradeMonitor() {
        System.out.println("nn----------------------------------------------");
        System.out.println(Thread.currentThread().getName()   "正在处理请求 tradeMonitor");
        System.out.println("----------------------------------------------");
        String result = "请求失败";
        //....你的业务逻辑
      //  return CompletableFuture.completedFuture(result);
    }
}

一run,是可以跑起来,哐当,隔一会儿就会报错,报错信息如下:

代码语言:javascript复制
2020-11-26 09:50:58.061  INFO 18692 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'AsyncThread'
RechargeMonitorRunner:

Caused by: org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@6c4ce583[Running, pool size = 10, active threads = 10, queued tasks = 466, completed tasks = 271803]] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$654/1923626523@272778ae

原来while true里不停的加任务,任务满了就把这个守护线程挂了

怎么办,那么换一种写法捕捉队列满异常:

代码语言:javascript复制
package io.xxx.schedule;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.xxx.Utils.ErrorAlarm;
import io.xxx.Utils.OkHttpUtils;
import io.xxx.component.RechargeMonitorServer;
import io.xxx.domain.Order;
import io.xxx.service.OrderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;

@Component
public class RechargeMonitorRunner implements ApplicationRunner {
    private static final Logger logger = LoggerFactory.getLogger(RechargeMonitorRunner.class);
    @Resource
    private RechargeMonitorServer monitorServer;
    @Value("${xxx.rechargeMonitor.url}")
    private  String rechargeMonitorUrl;
    @Value("${xxx.rechargeMonitor.monitorAccount}")
    private  String eosMonitorAccount;
    private ConcurrentHashMap requestMap= new ConcurrentHashMap();
    private String requestJson = "";
    @Autowired
    private OrderService orderService;
    @Autowired
    private ErrorAlarm errorAlarm;
    @Override
    public void run(ApplicationArguments args) throws Exception {
        SimpleDateFormat df = new SimpleDateFormat("HH:mm");//设置日期格式
        ThreadPoolTaskExecutor threadPoolTaskExecutor = initThreadPoolTaskExecutor();
        requestMap.put("code",eosMonitorAccount);
        requestMap.put("scope",eosMonitorAccount);
        requestMap.put("table","comein");
        requestMap.put("json",true);
        requestJson = JSON.toJSONString(requestMap);
        while (true) {
         //   System.out.println("Run start thread name->"   Thread.currentThread().getName());
          //List<Future> taskFutureList = new ArrayList<>();
           this.tradeMonitor();
            /*
          Future future = threadPoolTaskExecutor.submit(() -> {
                try {
                    this.tradeMonitor();
                } catch (ExecutionException | InterruptedException e) {
                    e.printStackTrace();
                }
            });

            taskFutureList.add(future);
            for (Future future2 : taskFutureList) {
               String result = (String) future2.get();
               System.out.println(result);
            }*/
            System.out.println("任务队列任务数量: "   threadPoolTaskExecutor.getThreadPoolExecutor().getQueue().size());

            //this.tradeMonitor();
            // monitorServer.tradeMonitor();
            String s = df.format(new Date());
            if ("23:50".equals(s)) {//当时间为23:50
                logger.info("当前时间为:"   df.format(new Date())   "开启数据入库--------");
                break;//退出这个while(true)循环
            }
            // 获取任务队列中的任务数量

        }
    }

    @Async
    public Future<String> tradeMonitor() throws ExecutionException, InterruptedException {
       // System.out.println("Runner thread name->"   Thread.currentThread().getName());
       // System.out.println("nn----------------------------------------------");
        System.out.println(Thread.currentThread().getName()   "正在处理请求 tradeMonitor");
        System.out.println("----------------------------------------------");
        String result = "";
        result = OkHttpUtils.httpPostJson(rechargeMonitorUrl,requestJson);
        //result = null ;
        if(result==null  || result.equals("")){
            //报警
            ConcurrentHashMap alarmMap = new ConcurrentHashMap();
            alarmMap.put("api_url",rechargeMonitorUrl);
            alarmMap.put("params",requestJson);
            alarmMap.put("method","POST");
            alarmMap.put("message","xxx错误");
            int alarmFlag = errorAlarm.remoteRequestAlarm(alarmMap);
            result = "";
        }
        JSONObject jo = JSON.parseObject(result);
        CopyOnWriteArrayList<Order> orderIdList =  new CopyOnWriteArrayList<>();
        for(int i = 0;i<jo.getJSONArray("rows").size();i  ){
            Order order = new Order();
            JSONObject oneRecord = (JSONObject)jo.getJSONArray("rows").get(i);
            String memo = oneRecord.getString("memo");
            String quantity = oneRecord.getString("quantity");
            if(memo.indexOf("orderID")>=0){
                String[] orderIdInfo = memo.split("\:");
                Long orderID = Long.valueOf(orderIdInfo[1]);
                String[] amountInfo = quantity.split("\s");
                BigDecimal amount = new BigDecimal(amountInfo[0]);
                if(amountInfo[1].equals("USDT") || amountInfo[1].equals("USDE")){
                    order.setId(orderID);
                    order.setHash(amount);
                }
                else {
                    logger.info("RechargeMonitor record orderID:" orderID "is  NOTUSDTORUSDE");
                }
                orderIdList.add(order);

            }
        }
        int affectRows = orderService.confirmOrder(orderIdList);
        if(affectRows!=orderIdList.size()){
            logger.info("RechargeMonitor orderInfo :" JSON.toJSONString(orderIdList) "is  ORDERCONFIRMINCOMPLETE");
        }
        //  Thread.sleep(2000);
       // monitorServer.tradeMonitor();
        //....你的业务逻辑
         return CompletableFuture.completedFuture(result);
       // return null;
    }

    private ThreadPoolTaskExecutor initThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setThreadNamePrefix("xxxTread-");
        threadPoolTaskExecutor.setCorePoolSize(5);
        threadPoolTaskExecutor.setMaxPoolSize(10);
        threadPoolTaskExecutor.setQueueCapacity(500);
        threadPoolTaskExecutor.setKeepAliveSeconds(60);
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                System.out.println("丢弃");
                logger.info("task full,reject,taskReject log");
            }
        });
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }

}

一跑,飞快,还能把CPU和内存尽可能的使用,当然如果不想即时监控,使用定时任务每秒跑一次也是可以的

0 人点赞