Java实现简易的消息队列

最近有个需求需要实现消息发送的频率限制,并且延后发送。使用了一个自定义的简易消息队列

实现

定义一个Map用来存储不同消息类型的队列,队列使用的阻塞队列ArrayBlockingQueue

1
public static ConcurrentHashMap<String, ArrayBlockingQueue<DingTalkRobotSendVO>> robotSendQueue = new ConcurrentHashMap<>();

一些队列及ScheduledThreadPoolExecutor线程池参数配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 阻塞队列大小
*/
public static int ROBOT_SEND_QUEUE_SIZE = 50;

/**
* 队列插入移出等待时间
*/
public static Long ROBOT_SEND_QUEUE_WAIT_TIME = 3L;
/**
* 定时任务线程池
*/
public static ScheduledExecutorService threadPool;

/**
* 一个周期内最多发送次数
*/
public static int SEND_MAX_COUNT = 20;

/**
* 初始化线程池
*/
static {
ThreadFactory threadFactory = new NameTreadFactory();
RejectedExecutionHandler handler = new MyIgnorePolicy();
threadPool = new ScheduledThreadPoolExecutor(5, threadFactory, handler);
}

线程池线程命名规则

1
2
3
4
5
6
7
8
9
10
static class NameTreadFactory implements ThreadFactory {
private final AtomicInteger mThreadNum = new AtomicInteger(1);

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "DingTalkRobotSend-Thread-" + mThreadNum.getAndIncrement());
LOG.info(t.getName() + " has been created");
return t;
}
}

线程池拒绝策略

1
2
3
4
5
6
7
8
9
public static class MyIgnorePolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
doLog(r, e);
}

private void doLog(Runnable r, ThreadPoolExecutor e) {
LOG.info("线程池请求拒绝completedTaskCount: " + e.getCompletedTaskCount());
}
}

核心队列消费线程定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 为每一个队列发布定时任务任务
* @param queue
*/
public static void addTask(final ArrayBlockingQueue<DingTalkRobotSendVO> queue) {
//等待上一个任务结束时才开始计时
threadPool.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
int count = 0;
LOG.info("Start Queue Size:" + queue.size());
while(queue.size()>0 && count < SEND_MAX_COUNT) {
count ++;
DingTalkRobotSendVO robotSendVO = null;
try {
//取出数据若无则等待加入队列元素一定时间
robotSendVO = queue.poll(ROBOT_SEND_QUEUE_WAIT_TIME, TimeUnit.SECONDS);
Thread.sleep(500);
LOG.info(robotSendVO.toString());
} catch (InterruptedException e) {
LOG.error("队列取出阻塞异常{}", e);
}
}
LOG.info("End Queue Size:" + queue.size());
}
}, 1, 60, TimeUnit.SECONDS);
}

添加任务到消息队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
ArrayBlockingQueue<DingTalkRobotSendVO> queue;
//ConcurrentHashMap写是线程安全读是不安全的,此处加锁为了避免读取不到robotSendQueue数据而重复添加覆盖队列
synchronized (QueueManager.robotSendQueue) {
queue = QueueManager.robotSendQueue.get(sendType);
if(queue == null) {
queue = new ArrayBlockingQueue<DingTalkRobotSendVO>(QueueManager.ROBOT_SEND_QUEUE_SIZE, true);
QueueManager.robotSendQueue.put(dingTalkRobotSendVO.getRobotUrl(), queue);
QueueManager.addTask(queue);
LOG.info("加入任务组" + dingTalkRobotSendVO.getRobotUrl());
}
}
try {
//添加任务到消息队列,若消息队列已满等待一定时间
boolean result = queue.offer(dingTalkRobotSendVO, QueueManager.ROBOT_SEND_QUEUE_WAIT_TIME, TimeUnit.SECONDS);
if(result) {
LOG.info("加入队列成功");
} else {
LOG.info("加入队列失败");
}
} catch (InterruptedException e) {
LOG.error("加入队列阻塞异常{}", e);
}

附上完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/**
* 队列管理工具
* @author XX
*/
public class QueueManager {

private static Logger LOG = LoggerFactory.getLogger(QueueManager.class);

/**
* 所有队列存储
*/
public static ConcurrentHashMap<String, ArrayBlockingQueue<DingTalkRobotSendVO>> robotSendQueue = new ConcurrentHashMap<>();

/**
* 阻塞队列大小
*/
public static int ROBOT_SEND_QUEUE_SIZE = 50;

/**
* 队列插入移出等待时间
*/
public static Long ROBOT_SEND_QUEUE_WAIT_TIME = 3L;
/**
* 定时任务线程池
*/
public static ScheduledExecutorService threadPool;

/**
* 一周期内最多发送次数
*/
public static int SEND_MAX_COUNT = 20;

private static IDingTalkService dingTalkService = (IDingTalkService) ServerContext
.getApplicationContext().getBean("dingTalkService");

/**
* 初始化线程池
*/
static {
ThreadFactory threadFactory = new NameTreadFactory();
RejectedExecutionHandler handler = new MyIgnorePolicy();
threadPool = new ScheduledThreadPoolExecutor(5, threadFactory, handler);
}

/**
* 为每一个队列发布定时任务任务
* @param queue
*/
public static void addTask(final ArrayBlockingQueue<DingTalkRobotSendVO> queue) {
//等待上一个任务结束时才开始计时
threadPool.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
int count = 0;
LOG.info("Start Queue Size:" + queue.size());
while(queue.size()>0 && count < SEND_MAX_COUNT) {
count ++;
DingTalkRobotSendVO robotSendVO = null;
try {
robotSendVO = queue.poll(ROBOT_SEND_QUEUE_WAIT_TIME, TimeUnit.SECONDS);
Thread.sleep(500);
LOG.info(robotSendVO.toString());
} catch (InterruptedException e) {
LOG.error("队列取出阻塞异常{}", e);
}
}
LOG.info("End Queue Size:" + queue.size());
}
}, 1, 60, TimeUnit.SECONDS);
}

/**
* 线程命名规则
* @author XX
*
*/
static class NameTreadFactory implements ThreadFactory {

private final AtomicInteger mThreadNum = new AtomicInteger(1);

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "DingTalkRobotSend-Thread-" + mThreadNum.getAndIncrement());
LOG.info(t.getName() + " has been created");
return t;
}
}

/**
* 拒绝策略
* @author XX
*/
public static class MyIgnorePolicy implements RejectedExecutionHandler {

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
doLog(r, e);
}

private void doLog(Runnable r, ThreadPoolExecutor e) {
LOG.info("线程池请求拒绝completedTaskCount: " + e.getCompletedTaskCount());
}
}
}