论坛首页 Java版

Lucene并发访问

浏览 1219 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
最后更新时间:2008-02-27 关键字: lucene

在Lucene并发访问中,如果出现线程冲突,会造成索引文件的损坏,系统出现Lock obtain timed Out异常. 索引文件的临时目录会多一个work.lock或者commit.lock的文件,证明你并发访问出现了问题,索引文件已经损坏...

在Lucene in Action中给出IndexWriter和IndexReader并发访问操作矩阵,矩阵中描述了两个操作不能同时执行的描述:

矩阵可以归纳为:

 

  • IndexReader对象在从索引中删除一个文档时,IndexWriter对象不能向该索引库添加文档
  • IndexWriter对象在索引进行优化时,IndexReader对象不能从其中删除文档
  • IndexWriter对象在对索引进行合并时,IndexReader对象也不能从其中删除文档

以上三点引自 Lucene in Action 第二章

其实在程序中,到底该索引是否被锁定,我们可以通过IndexReader类的静态方法isLocked(Directory)进行判定..我测试过,如果使用IndexWriter访问索引,那么在我们创建IndexWriter对象的时候,索引文件就会被锁定,当调用IndexWriter的close方法时,锁才释放...当我们使用IndexReader访问索引文件时,当我们使用open打开索引时,文件不会被锁定,只有执行deleteDocument方法时才锁定,IndexReader的close方法时才释放...

所以在并发访问索引文件的时候,我们除了要保证IndexWriter和IndexReader它们各自的并发访问的线程安全外,还要保证IndexWriter和IndexReader之间的并发访问...

 

下面是我实现的部分代码:

 

public class IndexManager {
                private static Analyzer analyzer = null;
	
	private static IndexWriter _company_writer = null;
	
	private static IndexReader _company_reader = null;
	
                private static List<Thread> _company_writer_thread = new ArrayList();
	
	private static List<Thread> _company_reader_thread = new ArrayList();

               static{
		Paoding paoding = PaodingMaker.make();
		analyzer =  PaodingAnalyzer.writerMode(paoding);
	}
               private IndexManager(){}

	public static IndexWriter getCompanyIndexWriter(String path,Boolean isReBuild){
		synchronized(_company_writer_thread){
			if(_company_writer==null){
				try {
					while(true){
						if(!IndexReader.isLocked(path)){
							_company_writer = new IndexWriter(path,analyzer,isReBuild);
							break;
						}else{
							try {
								Thread.sleep(100);
								Thread.yield();
							} catch (InterruptedException e) {
								e.printStackTrace();
							}
						}
					}
				} catch (CorruptIndexException e) {
					e.printStackTrace();
				} catch (LockObtainFailedException e) {
					e.printStackTrace();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
			if(!_company_writer_thread.contains(Thread.currentThread()))_company_writer_thread.add(Thread.currentThread());
			return _company_writer;
		}
	}
	
	public static void closeCompanyIndexWriter(){
		synchronized(_company_writer_thread){
			if (_company_writer_thread.contains(Thread.currentThread()))
				_company_writer_thread.remove(Thread.currentThread());

		      if (_company_writer_thread.size() == 0)
		      {
		        if (_company_writer != null)
		        {
		        	try {
						_company_writer.close();
					} catch (CorruptIndexException e) {
						e.printStackTrace();
					} catch (IOException e) {
						e.printStackTrace();
					}
		        	_company_writer = null;
		        }
		      }

		}
	}
	
	
	public static IndexReader getCompanyIndexReader(String path){
		synchronized(_company_reader_thread){
			if(_company_reader==null){
				try {
					while(true){
						if(!IndexReader.isLocked(path)){
							_company_reader = IndexReader.open(path);
							break;
						}else{
							try {
								Thread.sleep(20);
								Thread.yield();
							} catch (InterruptedException e) {
								e.printStackTrace();
							}
						}
					}
				} catch (CorruptIndexException e) {
					e.printStackTrace();
				} catch (LockObtainFailedException e) {
					e.printStackTrace();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
			if(!_company_reader_thread.contains(Thread.currentThread()))_company_reader_thread.add(Thread.currentThread());
			return _company_reader;
		}
	}
	
	public static void closeCompanyIndexReader(){
		synchronized(_company_reader_thread){
			if (_company_reader_thread.contains(Thread.currentThread()))
				_company_reader_thread.remove(Thread.currentThread());

		      if (_company_reader_thread.size() == 0)
		      {
		        if (_company_reader != null)
		        {
		        	try {
						_company_reader.close();
					} catch (CorruptIndexException e) {
						e.printStackTrace();
					} catch (IOException e) {
						e.printStackTrace();
					}
		        	_company_reader = null;
		        }
		      }

		}
	}

}
 

  

该类使用静态方法获取IndexWriter和IndexReader对象,在获取方法中,要判断该索引是否被锁定<IndexReader.isLocked(Directory)方法>,该方法主要为了防止IndexWriter和IndexReader它们之间的并发问题,至于IndexWriter和IndexReader它们自己本身的并发访问问题,使用了一个线程集合来进行管理,可以确保获取的访问对象不会出现同步问题.但是获取的IndexWriter或者IndexReader对象,在close的时候,一定要调用IndexManager的closeXXXXXXX()方法,这样线程池才能有效的管理IndexWriter,IndexReader实例线程...

 

Lucene并发访问问题就介绍到这里,由于刚刚开始学写帖子,有很多地方词不达意,表述不清,请大家谅解,我相信:会进步的...谢谢 

   
最后更新时间:2008-02-28
1。lucene的索引尽量不要频繁而小量的编制,比如:用户每发一个贴子,就加入索引,那样对索引的结构和效率不利。可以采用定时或者定量,批量处理索引的方式。
2。在批量处理的基础上,解决冲突的问题的方案之一就是在同一个线程里操作Writer和reader。我们知道,lucence在1.4之前是没有所谓的modify操作的,所有的modifer操作可以分解为delete和add,那么实际上,在批量索引的任务栈里只有两种类型的操作:delete和add。让处理线程总是先执行delete,在执行add就好。
3。任何时候,索引线程实例只有一个在工作!

满足以上3点,能较好地处理目前大部分情况下的lucene索引更新要求
   
0 请登录后投票
最后更新时间:2008-04-22
/**
 * 
 * <p>
 * Lucene同一时刻只容许一个IndexWriter被打开。然而多个线程可以使用同一个IndexWriter
 * IndexWriter类管理全局IndexWriter实例
 * </p>
 * 
 * <p>
 * 当IndexWriter 或 IndexReader已经被写锁时,IndexReaders能够使用但不能做删除操作。
 * 如果共享IndexWriter被打开,并行添加操作是可以的
 * </p>
 * 
 * 
 */
public class IndexWriterFactory {
	private static Log logger = LogFactory.getLog(IndexWriterFactory.class);
	private Map lockLookup = new HashMap();
	private Map writerLookup = new HashMap();
	private int needExclusiveLock = 0;
	private int optimizeCount = 0;
	private static final int LUCENE_MAX_BUFFER_SIZE = PropsUtil.getInt(PropsUtil.LUCENE_MAX_BUFFER_SIZE);
	private static final int OPTIMIZE_INTERVAL = PropsUtil.getInt(PropsUtil.LUCENE_OPTIMIZE_INTERVAL);

	public IndexWriterFactory() {
		try {
			List modules = PropsUtil.getList(PropsUtil.LUCENE_MODULE);
			for (int i = 0; i < modules.size(); i++) {
				lockLookup.put(modules.get(i), new Semaphore(1));
			}
		} catch (Exception se) {
			logger.error(se);
		}
	}

	/**
	 * 创建IndexWriter对象
	 * @param module 模块的标识符
	 * @param create 是否重新创建索引
	 * @return
	 * @throws IOException
	 */
	public IndexWriter getWriter(String module, boolean create) throws IOException {
		boolean hasError = false;
		boolean newWriter = false;
		try {
			if (needExclusiveLock > 0) {
				acquireLock(module, false);
				releaseLock(module);
			}
			synchronized (this) {
				IndexWriterData writerData = (IndexWriterData) writerLookup.get(module);
				if (writerData == null) {
					newWriter = true;
					acquireLock(module, false);
					IndexWriter writer = new IndexWriter(LuceneUtil.getLuceneDir(module), LuceneUtil.getAnalyzer(), create);
					writer.setMaxBufferedDocs(LUCENE_MAX_BUFFER_SIZE);
					writerData = new IndexWriterData(module, writer, 0);
					writerLookup.put(module, writerData);
				}
				writerData.setCount(writerData.getCount() + 1);
				return writerData.getWriter();
			}
		} catch (Exception e) {
			hasError = true;
			logger.error("Unable to create a new writer", e);
			throw new IOException("Unable to create a new writer");
		} finally {
			if (hasError && newWriter) {
				try {
					releaseLock(module);
				} catch (Exception e) {
				}
			}
		}
	}
	
	/**
	 * @param module
	 * @param needExclusive
	 * @throws InterruptedException
	 */
	public void acquireLock(String module, boolean needExclusive) throws InterruptedException {
		Semaphore lock = (Semaphore)lockLookup.get(module);
		if (lock != null) {
			if (needExclusive) {
				synchronized (lockLookup) {
					needExclusiveLock++;
				}
			}
			try {
				lock.acquire();
			}
			finally {
				if (needExclusive) {
					synchronized (lockLookup) {
						needExclusiveLock--;
					}
				}
			}
		}
		else {
			if (logger.isWarnEnabled()) {
				logger.warn("IndexWriterFactory lock not found for " + module);
			}
		}
	}
	
	/**
	 * @param module
	 */
	public void releaseLock(String module) {
		Semaphore lock = (Semaphore)lockLookup.get(module);
		if (lock != null) {
			lock.release();
		}
	}
	
	public void write(String module) throws IOException {
		IndexWriterData writerData =
			(IndexWriterData)writerLookup.get(module);

		if (writerData != null) {
			decrement(writerData);
		}
		else {
			if (logger.isWarnEnabled()) {
				logger.warn("IndexWriterData not found for " + module);
			}
		}
	}

	public void write(IndexWriter writer) throws IOException {
		boolean writerFound = false;

		synchronized(this) {
			if (!writerLookup.isEmpty()) {
				Iterator itr = writerLookup.values().iterator();

				while (itr.hasNext()) {
					IndexWriterData writerData = (IndexWriterData)itr.next();

					if (writerData.getWriter() == writer) {
						writerFound = true;

						decrement(writerData);

						break;
					}
				}
			}
		}

		if (!writerFound) {
			try {
				optimizeCount++;
				if ((OPTIMIZE_INTERVAL == 0) ||
					(optimizeCount >= OPTIMIZE_INTERVAL)) {
					writer.optimize();
					optimizeCount = 0;
				}
			}
			finally {
				writer.close();
			}
		}
	}
	
	protected void decrement(IndexWriterData writerData) throws IOException {
		if (writerData.getCount() > 0) {
			writerData.setCount(writerData.getCount() - 1);

			if (writerData.getCount() == 0) {
				writerLookup.remove(writerData.getModule());

				try {
					IndexWriter writer = writerData.getWriter();
					try {
						optimizeCount++;
						if ((OPTIMIZE_INTERVAL == 0) ||
							(optimizeCount >= OPTIMIZE_INTERVAL)) {
							writer.optimize();
							optimizeCount = 0;
						}
					}
					finally {
						writer.close();
					}
				}
				catch (Exception e) {
					logger.error(e, e);
				}
				finally {
					releaseLock(writerData.getModule());
				}
			}
		}
	}

	/**
	 * @param module
	 * @param term
	 * @throws InterruptedException
	 * @throws IOException
	 */
	public void deleteDocuments(String module, Term term) throws InterruptedException, IOException {
		try {
			acquireLock(module, true);
			IndexReader reader = null;
			try {
				reader = IndexReader.open(LuceneUtil.getLuceneDir(module));
				int num = reader.deleteDocuments(term);
                        reader.commit();
				System.out.println(new Date() + "删除记录个数为:"+num);
			}
			finally {
				if (reader != null) {
					reader.close();
				}
			}
		}
		finally {
			releaseLock(module);
		}
	}
}
   
0 请登录后投票
最后更新时间:2008-07-16
引用

* Lucene同一时刻只容许一个IndexWriter被打开。然而多个线程可以使用同一个IndexWriter 
* IndexWriter类管理全局IndexWriter实例 



不是同一时刻,是把directory作为零界资源吧,只能被一个对象获得,故一个directory只能有一个IndexWriter被打开,
多个线程可以使用同一个IndexWriter,但没有任何意义,使用多线程无非是想同步,但是IndexWriter中还有提交事务的事务锁,也就是同一个时间,只能操作一个事务,这样就杜绝了并发使用 IndexWriter  的各种需要事务锁的操作
   
0 请登录后投票
论坛首页 Java版

跳转论坛:
JavaEye推荐