现在是这样的
1.首先我最外面有个 quartz 定时器,每隔 N 秒执行一次
2.定时器里执行的内容是这样,里面有个线程池,线程池大小是 2 个线程,coresize,maxsize 都是 2
3.线程池里的 2 个线程,分别一个去执行生产者方法,一个去执行消费者方法
4.生产者和消费者中间用消息队列来临时存数据
现在有个问题,就是消费者这边,怎么能保证取到数据,又能正确的退出线程,进行到下一次定时器的执行
在这之前我做的蠢办法是消费者那边加了个 while(true),结果定时器执行了 2 次后,线程池就满了,然后拒绝
所以不知道有啥好办法
代码:
//线程池的
public class TestTask extends QuartzJobBean {
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
2,
60,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(4));
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) {
threadPoolExecutor.execute(() -> {
//生产者代码
//数据放入消息队列
});
threadPoolExecutor.execute(() -> {
while (true) {
//取队列里的数据
//消费者代码
}
});
}
}
1
jinksw 2019-03-12 11:09:43 +08:00
看样子你定时器的意思是 每两秒生产一次 然后消费一次
为啥要 2 个线程 你每次运行一个线程 先生产再消费不行吗 |
2
sarlanori 2019-03-12 11:15:02 +08:00
在 C#里,我一般是用信号量来等待和通知。
|
3
shayang888 OP @jinksw 那样不就是同步了吗?我是想生产和消费分隔开来
|
4
shayang888 OP @jinksw 另外定时器并不是 2 秒执行一次啊 定时器的执行时间随便设置的
|
5
passerbytiny 2019-03-12 12:05:56 +08:00
消费者在线程之上,而不是之内,拿到数据后再从线程池里开线程去执行后续处理。消费者不能用多线程+死循环来取数据,而应该是单线程异步监听+同步取值,再具体我也不知道了,因为基本都是直接调用 API。
|
6
Counter 2019-03-12 12:11:53 +08:00
机制是不是不太对,改成这样如何呢?
生产后的数据加锁,生产者方法和消费者方法排队存取 |
7
shayang888 OP @passerbytiny 拿到数据后再从线程池里开线程去执行后续处理吗 好像有点思路
|
8
shayang888 OP @passerbytiny 可是消费者怎么知道它啥时候能拿到数据
|
9
shayang888 OP @Counter 没有懂你的意思
|
10
limhiaoing 2019-03-12 12:21:34 +08:00 via iPhone
生产者、消费者线程一般用条件变量 Condition Variable 来通信。
|
11
shayang888 OP @passerbytiny 可能也没这么复杂 我想做的只是在定时器里 将生产者和消费者异步执行就行
|
12
shayang888 OP @Counter 可能也没这么复杂 我想做的只是在定时器里 将生产者和消费者异步执行就行
|
13
passerbytiny 2019-03-12 13:02:02 +08:00 1
@shayang888 #12 消费者也不要放在定时器里,它应该是一个常驻的、独立的单线程。我不知道你的消息队列是什么队列,但一般的消息队列都是提供消费者 API 的,可以直接使用,自己做消费者太难了。
如果用线程去看消费者 /监听,那么是类似 while(true) {if(收到数据) {……} else { Thread.sleep(0.0001);} },这要是用线程去做,要么系统受不了,要么延迟时间受不了。消费者 /监听的轮询,回采用操作系统层次的东西,高级程序员都没必要知道的太深入,自己设计是肯定设计不来的。 你可以参考下 java.net.ServerSocket#accept()。 |
14
autogen 2019-03-12 13:19:51 +08:00
生产者发送的 msg 封装一下,加个 ctrl 字段,消费者接收到 msg.ctrl=exit 就退出
|
15
NieKing 2019-03-12 13:29:38 +08:00
我想起了 Android 里的 RxJava
|
16
linjiayu 2019-03-12 13:33:10 +08:00
实现 callable
|
17
linjiayu 2019-03-12 13:35:12 +08:00 1
public void offer(Event event)
{ synchronized (eventQueue) { while (eventQueue.size() >= max) { try { console(" the queue is full."); eventQueue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } console(" the new event is submitted"); eventQueue.addLast(event); eventQueue.notifyAll(); } } public Event take() { synchronized (eventQueue) { while (eventQueue.isEmpty()) { try { console(" the queue is empty."); eventQueue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Event event = eventQueue.removeFirst(); this.eventQueue.notifyAll(); console(" the event " + event + " is handled."); return event; } } |
18
shayang888 OP @autogen 我现在是多个生产者同时生产数据然后往队列里 push,然后只有一个消费者在从队列里消费 加字段的话 我给哪个生产者加这个字段呢
|
19
jingxyy 2019-03-12 13:48:43 +08:00
先不管怎么实现合理的问题
你是不是消费线程消费完了没退出啊?这样每一个 interval 之后你就有一个 while(true)的消费线程在跑,于是第 2 个周期后无法创建新的消费线程 |
20
ratel 2019-03-12 13:50:50 +08:00
使用消息中间件啊,消费者单独订阅消息消费,生产者用定时器就行了。
|
21
micean 2019-03-12 13:53:18 +08:00
quartz 为什么要玩 while(true)
|
22
shayang888 OP @jingxyy 对呀 我就是不知道怎么合适的退出
|
23
shayang888 OP @ratel 我现在就是没有用到中间件 想问问如果自己来弄的话咋做的好
|
24
woscaizi 2019-03-12 14:05:33 +08:00 via iPhone
简单的数据结构入栈出栈吧。
|
25
ratel 2019-03-12 14:09:38 +08:00
@shayang888 不用中间件,那也是用一样的设计模式,消费者和生产者是分开的,只依赖消息。
|
26
shayang888 OP @ratel 我不明白 为什么消费者要独立开来 我的消费只需要在这里进行消费呀 其它的地方都用不到它
|
27
pusidun 2019-03-12 14:21:59 +08:00
可以生产者定时生产消息,放入消息队列;消费者可以用线程池常驻,每个消费者线程轮询消息队列是否空,不为空处理,为空阻塞一段时间,就不用退出了。不过消息队列要保证是线程安全的
|
28
shayang888 OP @pusidun 你的意思和楼上他们说的是一个样吧 就是应该把消费者独立出来是吗?不应该放在定时器里
|
29
codingKingKong 2019-03-12 14:28:47 +08:00
大概是这样么?
```java import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class TT { public static void main(String[] args) throws Exception{ threadPoolExecutor.execute(() -> { try { String a = ""; while (a != null){ a = blockingDeque.poll(10, TimeUnit.SECONDS); System.out.println(a); } System.out.println("consumer exit."); } catch (InterruptedException e) { e.printStackTrace(); } }); Thread.sleep(2000); executeInternal(); Thread.sleep(2000); executeInternal(); } private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2, 2, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(4)); private static BlockingDeque<String> blockingDeque = new LinkedBlockingDeque<>(); private static void executeInternal() { threadPoolExecutor.execute(() -> { try { blockingDeque.put("123"); } catch (InterruptedException e) { e.printStackTrace(); } }); } } ``` |
30
Jrue0011 2019-03-12 17:37:59 +08:00
额,想问下定时器里为什么还要再通过线程池调度线程来执行任务...?
|
31
ratel 2019-03-13 09:51:49 +08:00
@shayang888 你去看下生产者消费者设计模式
|