最近有个需求需要实现消息发送的频率限制,并且延后发送。使用了一个自定义的简易消息队列
实现
定义一个Map用来存储不同消息类型的队列,队列使用的阻塞队列ArrayBlockingQueue
1
| public static ConcurrentHashMap<String, ArrayBlockingQueue<DingTalkRobotSendVO>> robotSendQueue = new ConcurrentHashMap<>();
|
一些队列及ScheduledThreadPoolExecutor线程池参数配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
public static int ROBOT_SEND_QUEUE_SIZE = 50;
public static Long ROBOT_SEND_QUEUE_WAIT_TIME = 3L;
public static ScheduledExecutorService threadPool;
public static int SEND_MAX_COUNT = 20;
static { ThreadFactory threadFactory = new NameTreadFactory(); RejectedExecutionHandler handler = new MyIgnorePolicy(); threadPool = new ScheduledThreadPoolExecutor(5, threadFactory, handler); }
|
线程池线程命名规则
1 2 3 4 5 6 7 8 9 10
| static class NameTreadFactory implements ThreadFactory { private final AtomicInteger mThreadNum = new AtomicInteger(1);
@Override public Thread newThread(Runnable r) { Thread t = new Thread(r, "DingTalkRobotSend-Thread-" + mThreadNum.getAndIncrement()); LOG.info(t.getName() + " has been created"); return t; } }
|
线程池拒绝策略
1 2 3 4 5 6 7 8 9
| public static class MyIgnorePolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { doLog(r, e); }
private void doLog(Runnable r, ThreadPoolExecutor e) { LOG.info("线程池请求拒绝completedTaskCount: " + e.getCompletedTaskCount()); } }
|
核心队列消费线程定时任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
public static void addTask(final ArrayBlockingQueue<DingTalkRobotSendVO> queue) { threadPool.scheduleWithFixedDelay(new Runnable() { @Override public void run() { int count = 0; LOG.info("Start Queue Size:" + queue.size()); while(queue.size()>0 && count < SEND_MAX_COUNT) { count ++; DingTalkRobotSendVO robotSendVO = null; try { robotSendVO = queue.poll(ROBOT_SEND_QUEUE_WAIT_TIME, TimeUnit.SECONDS); Thread.sleep(500); LOG.info(robotSendVO.toString()); } catch (InterruptedException e) { LOG.error("队列取出阻塞异常{}", e); } } LOG.info("End Queue Size:" + queue.size()); } }, 1, 60, TimeUnit.SECONDS); }
|
添加任务到消息队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| ArrayBlockingQueue<DingTalkRobotSendVO> queue;
synchronized (QueueManager.robotSendQueue) { queue = QueueManager.robotSendQueue.get(sendType); if(queue == null) { queue = new ArrayBlockingQueue<DingTalkRobotSendVO>(QueueManager.ROBOT_SEND_QUEUE_SIZE, true); QueueManager.robotSendQueue.put(dingTalkRobotSendVO.getRobotUrl(), queue); QueueManager.addTask(queue); LOG.info("加入任务组" + dingTalkRobotSendVO.getRobotUrl()); } } try { boolean result = queue.offer(dingTalkRobotSendVO, QueueManager.ROBOT_SEND_QUEUE_WAIT_TIME, TimeUnit.SECONDS); if(result) { LOG.info("加入队列成功"); } else { LOG.info("加入队列失败"); } } catch (InterruptedException e) { LOG.error("加入队列阻塞异常{}", e); }
|
附上完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
|
public class QueueManager { private static Logger LOG = LoggerFactory.getLogger(QueueManager.class);
public static ConcurrentHashMap<String, ArrayBlockingQueue<DingTalkRobotSendVO>> robotSendQueue = new ConcurrentHashMap<>();
public static int ROBOT_SEND_QUEUE_SIZE = 50;
public static Long ROBOT_SEND_QUEUE_WAIT_TIME = 3L;
public static ScheduledExecutorService threadPool;
public static int SEND_MAX_COUNT = 20; private static IDingTalkService dingTalkService = (IDingTalkService) ServerContext .getApplicationContext().getBean("dingTalkService");
static { ThreadFactory threadFactory = new NameTreadFactory(); RejectedExecutionHandler handler = new MyIgnorePolicy(); threadPool = new ScheduledThreadPoolExecutor(5, threadFactory, handler); }
public static void addTask(final ArrayBlockingQueue<DingTalkRobotSendVO> queue) { threadPool.scheduleWithFixedDelay(new Runnable() { @Override public void run() { int count = 0; LOG.info("Start Queue Size:" + queue.size()); while(queue.size()>0 && count < SEND_MAX_COUNT) { count ++; DingTalkRobotSendVO robotSendVO = null; try { robotSendVO = queue.poll(ROBOT_SEND_QUEUE_WAIT_TIME, TimeUnit.SECONDS); Thread.sleep(500); LOG.info(robotSendVO.toString()); } catch (InterruptedException e) { LOG.error("队列取出阻塞异常{}", e); } } LOG.info("End Queue Size:" + queue.size()); } }, 1, 60, TimeUnit.SECONDS); }
static class NameTreadFactory implements ThreadFactory {
private final AtomicInteger mThreadNum = new AtomicInteger(1);
@Override public Thread newThread(Runnable r) { Thread t = new Thread(r, "DingTalkRobotSend-Thread-" + mThreadNum.getAndIncrement()); LOG.info(t.getName() + " has been created"); return t; } }
public static class MyIgnorePolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { doLog(r, e); }
private void doLog(Runnable r, ThreadPoolExecutor e) { LOG.info("线程池请求拒绝completedTaskCount: " + e.getCompletedTaskCount()); } } }
|