论坛首页 Java版 企业应用

高负载下的Java多线程程序性能问题(关于信号量控制)

浏览 1011 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
最后更新时间:2008-06-18 关键字: 多线程
环境:AIX + IBM JDK5 从消息队列(MQ)获得的消息,写入本地数据库。每天处理数据量大约是一百万笔。
运行介绍:
1.有10个线程访问MQ,获得数据后放入本地阻塞队列(LinkedBlockingQueue,长度为10),程序大约是这个样子:
Executor pool = Executors.newFixedThreadPool(10);
for(int i=0; i<10; i++){
  pool.execute(new Runnable(){
  public void run(){
    while(true){
      Message message = getReader().read();// 接收消息
      try {
        queue.put(message);// 将接收到的数据放入队列
      } catch (Exception e) {
        log.error("线程错误", e);
      }
    }
  }
});


2.有一个线程不停的扫描这个阻塞队列,发现有新的数据就立即启动线程把这条数据写入数据库。使用信号量(semaphore)控制写数据库线程数量。程序大约是这个样子:
final Executor pool = Executors.newFixedThreadPool(10);
pool.execute(new Runnable() {
  public void run() {
    while (true) {
      try {
        final Message message = queue.take();// 如果没取到数据,会阻塞线程
        try {
          semaphore.acquire();//这个地方会很慢???
        } catch (InterruptedException e1) {
          e1.printStackTrace();
        }
        pool.execute(new Runnable() {
          public void run() {
            try {
                getWriter().write(message);// 写入数据库
            } finally {
              semaphore.release();
            }
          }
        });
      } catch (Exception e1) {
        log.error("线程错误", e1);
      }
    }
  }
});


我分析日志后得出情况:semaphore.acquire();有时会很慢,可能1秒以上才能获得信号量。
而实际情况是写数据库线程非常快,系统应该拥有足够的信号量,不至于等待1秒以上的时间。
必须注意到我当前服务器负载还是比较高的,CPU经常在60%-70%。

问题:为什么这个信号量的获得会如此之慢?有没有其他的解决方法?
   
最后更新时间:2008-06-18
已经用了FixedThreadPool,为什么还要用信号量去控制写线程数量?完全没必要了,如果写的速度这么快,完全可以预先启动一两个线程,这两个线程一直去写,没必要用线程池了:
public void run(){
  while(!Thread.currentThread.isInterrupted()&&(message = queue.take())!=null)
  {
   getWriter().write(message);// 写入数据库 
  }
}
   
0 请登录后投票
最后更新时间:2008-06-18
如果数据对实时性要求不高,现在的数据量实在不必要使用多线程来处理;
1、一线程:读取MQ队列的数据,累计到某队列
2、二线程:读取队列的数据(可以每千条一批次),将逻辑放入内存中处理,将结果放入队列
3、三线程:读取结果队列的数据(可以每千条一批次),更新入数据库
这样处理效率会很惊人的,一小时几十万条很轻松。
   
0 请登录后投票
最后更新时间:2008-06-18
newdev 写道
如果数据对实时性要求不高,现在的数据量实在不必要使用多线程来处理;
1、一线程:读取MQ队列的数据,累计到某队列
2、二线程:读取队列的数据(可以每千条一批次),将逻辑放入内存中处理,将结果放入队列
3、三线程:读取结果队列的数据(可以每千条一批次),更新入数据库
这样处理效率会很惊人的,一小时几十万条很轻松。

数据实时性要求还是比较高的,所以不能批量做,不然就不使用消息机制,而使用文件系统来做了。
   
0 请登录后投票
最后更新时间:2008-06-18
dennis_zane 写道
已经用了FixedThreadPool,为什么还要用信号量去控制写线程数量?完全没必要了,如果写的速度这么快,完全可以预先启动一两个线程,这两个线程一直去写,没必要用线程池了:
public void run(){
  while(!Thread.currentThread.isInterrupted()&&(message = queue.take())!=null)
  {
   getWriter().write(message);// 写入数据库 
  }
}

JDK5的threadpool自带了一个阻塞队列,会把所有的任务放入队列中。当大量数据拥入时,可能会造成这个队列很长,反而影响性能。你的提议我还是要试一下,谢谢。
   
0 请登录后投票
最后更新时间:2008-06-18
beeke 写道
dennis_zane 写道
已经用了FixedThreadPool,为什么还要用信号量去控制写线程数量?完全没必要了,如果写的速度这么快,完全可以预先启动一两个线程,这两个线程一直去写,没必要用线程池了:
public void run(){
  while(!Thread.currentThread.isInterrupted()&&(message = queue.take())!=null)
  {
   getWriter().write(message);// 写入数据库 
  }
}

JDK5的threadpool自带了一个阻塞队列,会把所有的任务放入队列中。当大量数据拥入时,可能会造成这个队列很长,反而影响性能。你的提议我还是要试一下,谢谢。


我觉的吧,你想的太多了,还是要看实际表现。是生产的快,还是消费的快。
   
0 请登录后投票
最后更新时间:2008-08-18
先假设你用Semaphore semaphore=new Semaphore(100);创建信号量。
分析一下你的线程池吧
1个线程用于queue.take();在while(true)情况下,很快就take够100了,blocking。为什么呢?pool.execute方法执行时间几乎是0.
9个线程用于write数据库。这里注意,你的getWriter().write(message)很快哦(根据你所说的)。9个线程不是同时“运行”(应该说计算吧)的(因为你只有一个CPU)
只是在9个线程之中选一个来执行。如果run代码块执行非常快,切换线程的时间消耗可能大于run代码块执行时间。当线程run代码块执行的时间大于线程创建(或者切换时间)那使用线程是一种浪费。

信号量的作用只是掐着那单条的线程,让它不能把超过100个任务加入到线程池的等待队列,线程池不是表示只能加9个,而是表示9个处于running状态,而其余的91个处于线程池的等待队列里。
假设信号量是1000甚至是10000,只是阻止了因为等待队列过多而导致OutOfMemory。消费者根本不是在和生产者竞争,而是和线程池竞争,甚至连线程切换时间都要来分一杯。

所以说,semaphore.acquire()慢是正常,因为它在等待。

顺便说一下吧:newFixedThreadPool创建的是无界队列线程池,也就是说等待队列是无界的.容易照成内存溢出,所以通常都使用spring的一个包装类ThreadPoolExecutor来用

假设你的queue.take()是线程安全的。你还不如做10条这样的线程来做这个事情(说不行5条更快,视乎线程前换时间)。
[code="java"]
public void run(){
while (true) {  
      try {
      if(!queue.isEmpty()){  
        Message message = queue.take();// 如果没取到数据,会阻塞线程  
getWriter().write(message);// 写入数据库  
}
}catch(Exception e){

}
}
[code]
既不需要线程池,也不需要信号量。假如queue不是线程安全的。自己写一个线程安全的WrapperQueue也无妨。
多注意线程和线程池的方法对系统资源的消耗还是有好处的。
   
1 请登录后投票
最后更新时间:2008-07-17
用JDBC的批量插入 cStat.executeBatch(); conn.commit(); 2000条提交一次, 一天10个G的oracle数据, 随便搞搞。 我也是从 MQ里面取的数据。
   
0 请登录后投票
最后更新时间:2008-07-17
你信号量是多大的?怎么用了newFixedThreadPool又用了信号量,如果你信号量是1,那还不如直接用singleThreadpool呢
   
0 请登录后投票
最后更新时间:2008-07-17
不好意思,没仔细看你代码,上面说的不对,我觉得干脆别用信号量了,
  while (true) { 
#       try { 
#         final Message message = queue.take();// 如果没取到数据,会阻塞线程 
#         try { 
#           semaphore.acquire();//这个地方会很慢??? 
#         } catch (InterruptedException e1) { 
#           e1.printStackTrace(); 
#         } 
这里就直接跑一个单独的线程,然后把线程池用于数据库插入那里,线程池是10的话,也就是同时跑10个插入线程,当然你要是写入特别快,池子也省了。
   
0 请登录后投票
论坛首页 Java版 企业应用

跳转论坛:
JavaEye推荐