浏览 1011 次
|
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
|---|---|
| 作者 | 正文 |
|
最后更新时间:2008-06-18 关键字: 多线程
环境:AIX + IBM JDK5 从消息队列(MQ)获得的消息,写入本地数据库。每天处理数据量大约是一百万笔。
运行介绍: 1.有10个线程访问MQ,获得数据后放入本地阻塞队列(LinkedBlockingQueue,长度为10),程序大约是这个样子:
Executor pool = Executors.newFixedThreadPool(10);
for(int i=0; i<10; i++){
pool.execute(new Runnable(){
public void run(){
while(true){
Message message = getReader().read();// 接收消息
try {
queue.put(message);// 将接收到的数据放入队列
} catch (Exception e) {
log.error("线程错误", e);
}
}
}
});
2.有一个线程不停的扫描这个阻塞队列,发现有新的数据就立即启动线程把这条数据写入数据库。使用信号量(semaphore)控制写数据库线程数量。程序大约是这个样子:
final Executor pool = Executors.newFixedThreadPool(10);
pool.execute(new Runnable() {
public void run() {
while (true) {
try {
final Message message = queue.take();// 如果没取到数据,会阻塞线程
try {
semaphore.acquire();//这个地方会很慢???
} catch (InterruptedException e1) {
e1.printStackTrace();
}
pool.execute(new Runnable() {
public void run() {
try {
getWriter().write(message);// 写入数据库
} finally {
semaphore.release();
}
}
});
} catch (Exception e1) {
log.error("线程错误", e1);
}
}
}
});
我分析日志后得出情况:semaphore.acquire();有时会很慢,可能1秒以上才能获得信号量。 而实际情况是写数据库线程非常快,系统应该拥有足够的信号量,不至于等待1秒以上的时间。 必须注意到我当前服务器负载还是比较高的,CPU经常在60%-70%。 问题:为什么这个信号量的获得会如此之慢?有没有其他的解决方法? 声明:JavaEye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
|
|
| 返回顶楼 | |
|
最后更新时间:2008-06-18
已经用了FixedThreadPool,为什么还要用信号量去控制写线程数量?完全没必要了,如果写的速度这么快,完全可以预先启动一两个线程,这两个线程一直去写,没必要用线程池了:
public void run(){ while(!Thread.currentThread.isInterrupted()&&(message = queue.take())!=null) { getWriter().write(message);// 写入数据库 } } |
|
| 返回顶楼 | |
|
最后更新时间:2008-06-18
如果数据对实时性要求不高,现在的数据量实在不必要使用多线程来处理;
1、一线程:读取MQ队列的数据,累计到某队列 2、二线程:读取队列的数据(可以每千条一批次),将逻辑放入内存中处理,将结果放入队列 3、三线程:读取结果队列的数据(可以每千条一批次),更新入数据库 这样处理效率会很惊人的,一小时几十万条很轻松。 |
|
| 返回顶楼 | |
|
最后更新时间:2008-06-18
newdev 写道 如果数据对实时性要求不高,现在的数据量实在不必要使用多线程来处理;
1、一线程:读取MQ队列的数据,累计到某队列 2、二线程:读取队列的数据(可以每千条一批次),将逻辑放入内存中处理,将结果放入队列 3、三线程:读取结果队列的数据(可以每千条一批次),更新入数据库 这样处理效率会很惊人的,一小时几十万条很轻松。 数据实时性要求还是比较高的,所以不能批量做,不然就不使用消息机制,而使用文件系统来做了。 |
|
| 返回顶楼 | |
|
最后更新时间:2008-06-18
dennis_zane 写道 已经用了FixedThreadPool,为什么还要用信号量去控制写线程数量?完全没必要了,如果写的速度这么快,完全可以预先启动一两个线程,这两个线程一直去写,没必要用线程池了:
public void run(){ while(!Thread.currentThread.isInterrupted()&&(message = queue.take())!=null) { getWriter().write(message);// 写入数据库 } } JDK5的threadpool自带了一个阻塞队列,会把所有的任务放入队列中。当大量数据拥入时,可能会造成这个队列很长,反而影响性能。你的提议我还是要试一下,谢谢。 |
|
| 返回顶楼 | |
|
最后更新时间:2008-06-18
beeke 写道 dennis_zane 写道 已经用了FixedThreadPool,为什么还要用信号量去控制写线程数量?完全没必要了,如果写的速度这么快,完全可以预先启动一两个线程,这两个线程一直去写,没必要用线程池了:
public void run(){ while(!Thread.currentThread.isInterrupted()&&(message = queue.take())!=null) { getWriter().write(message);// 写入数据库 } } JDK5的threadpool自带了一个阻塞队列,会把所有的任务放入队列中。当大量数据拥入时,可能会造成这个队列很长,反而影响性能。你的提议我还是要试一下,谢谢。 我觉的吧,你想的太多了,还是要看实际表现。是生产的快,还是消费的快。 |
|
| 返回顶楼 | |
|
最后更新时间:2008-08-18
先假设你用Semaphore semaphore=new Semaphore(100);创建信号量。
分析一下你的线程池吧 1个线程用于queue.take();在while(true)情况下,很快就take够100了,blocking。为什么呢?pool.execute方法执行时间几乎是0. 9个线程用于write数据库。这里注意,你的getWriter().write(message)很快哦(根据你所说的)。9个线程不是同时“运行”(应该说计算吧)的(因为你只有一个CPU) 只是在9个线程之中选一个来执行。如果run代码块执行非常快,切换线程的时间消耗可能大于run代码块执行时间。当线程run代码块执行的时间大于线程创建(或者切换时间)那使用线程是一种浪费。 信号量的作用只是掐着那单条的线程,让它不能把超过100个任务加入到线程池的等待队列,线程池不是表示只能加9个,而是表示9个处于running状态,而其余的91个处于线程池的等待队列里。 假设信号量是1000甚至是10000,只是阻止了因为等待队列过多而导致OutOfMemory。消费者根本不是在和生产者竞争,而是和线程池竞争,甚至连线程切换时间都要来分一杯。 所以说,semaphore.acquire()慢是正常,因为它在等待。 顺便说一下吧:newFixedThreadPool创建的是无界队列线程池,也就是说等待队列是无界的.容易照成内存溢出,所以通常都使用spring的一个包装类ThreadPoolExecutor来用 假设你的queue.take()是线程安全的。你还不如做10条这样的线程来做这个事情(说不行5条更快,视乎线程前换时间)。 [code="java"] public void run(){ while (true) { try { if(!queue.isEmpty()){ Message message = queue.take();// 如果没取到数据,会阻塞线程 getWriter().write(message);// 写入数据库 } }catch(Exception e){ } } [code] 既不需要线程池,也不需要信号量。假如queue不是线程安全的。自己写一个线程安全的WrapperQueue也无妨。 多注意线程和线程池的方法对系统资源的消耗还是有好处的。 |
|
| 返回顶楼 | |
|
最后更新时间:2008-07-17
用JDBC的批量插入
cStat.executeBatch();
conn.commit();
2000条提交一次,
一天10个G的oracle数据,
随便搞搞。
我也是从 MQ里面取的数据。
|
|
| 返回顶楼 | |
|
最后更新时间:2008-07-17
你信号量是多大的?怎么用了newFixedThreadPool又用了信号量,如果你信号量是1,那还不如直接用singleThreadpool呢
|
|
| 返回顶楼 | |
|
最后更新时间:2008-07-17
不好意思,没仔细看你代码,上面说的不对,我觉得干脆别用信号量了,
while (true) { # try { # final Message message = queue.take();// 如果没取到数据,会阻塞线程 # try { # semaphore.acquire();//这个地方会很慢??? # } catch (InterruptedException e1) { # e1.printStackTrace(); # } 这里就直接跑一个单独的线程,然后把线程池用于数据库插入那里,线程池是10的话,也就是同时跑10个插入线程,当然你要是写入特别快,池子也省了。 |
|
| 返回顶楼 | |






