View Javadoc

1   package net.obsearch.storage.l;
2   
3   import hep.aida.bin.StaticBin1D;
4   
5   import java.io.EOFException;
6   import java.io.File;
7   import java.io.FileNotFoundException;
8   import java.io.IOException;
9   import java.io.RandomAccessFile;
10  import java.nio.ByteBuffer;
11  import java.nio.MappedByteBuffer;
12  import java.nio.channels.FileChannel;
13  import java.nio.channels.FileChannel.MapMode;
14  import java.util.Arrays;
15  import java.util.NoSuchElementException;
16  
17  import org.apache.log4j.Logger;
18  
19  
20  
21  
22  import net.obsearch.OperationStatus;
23  import net.obsearch.Status;
24  import net.obsearch.asserts.OBAsserts;
25  import net.obsearch.cache.OBCacheHandler;
26  import net.obsearch.cache.OBCacheHandlerByteArray;
27  import net.obsearch.cache.OBCacheHandlerLong;
28  import net.obsearch.cache.OBCacheLong;
29  import net.obsearch.constants.ByteConstants;
30  import net.obsearch.constants.OBSearchProperties;
31  import net.obsearch.exception.OBException;
32  import net.obsearch.exception.OBStorageException;
33  import net.obsearch.exception.OutOfRangeException;
34  import net.obsearch.storage.CloseIterator;
35  import net.obsearch.storage.OBStore;
36  import net.obsearch.storage.OBStoreFactory;
37  import net.obsearch.storage.Tuple;
38  import net.obsearch.storage.TupleBytes;
39  import net.obsearch.utils.bytes.ByteConversion;
40  
41  /*
42   OBSearch: a distributed similarity search engine This project is to
43   similarity search what 'bit-torrent' is to downloads. 
44   Copyright (C) 2008 Arnoldo Jose Muller Molina
45  
46   This program is free software: you can redistribute it and/or modify
47   it under the terms of the GNU General Public License as published by
48   the Free Software Foundation, either version 3 of the License, or
49   (at your option) any later version.
50  
51   This program is distributed in the hope that it will be useful,
52   but WITHOUT ANY WARRANTY; without even the implied warranty of
53   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
54   GNU General Public License for more details.
55  
56   You should have received a copy of the GNU General Public License
57   along with this program.  If not, see <http://www.gnu.org/licenses/>.
58   */
59  
60  /**
61   * AbstractOBLStorage holds very large buckets. It allows efficient insertion
62   * deletion. Search is performed sequentially over each record. This is a
63   * meta-storage. It works on top of another storage.
64   * 
65   * This class has a bug:
66   * If we perform iterations on empty buckets there will be an error.
67   * This happens when we use the exists operation.
68   * 
69   * @author Arnoldo Jose Muller Molina
70   */
71  
72  public abstract class AbstractOBLStore<T extends Tuple> implements OBStore<T> {
73  
74  	private static final transient Logger logger = Logger
75  	.getLogger(AbstractOBLStore.class);
76  	/**
77  	 * Stores the handles of the files.
78  	 */
79  	private OBCacheLong<RAFileHolder> handles;
80  
81  	private OBStore<TupleBytes> storage;
82  
83  	private boolean duplicates;
84  
85  	private String name;
86  
87  	protected OBStoreFactory fact;
88  
89  	private File baseFolder;
90  
91  	/**
92  	 * Used to simplify the logic of the implementation. It has the added
93  	 * benefit that there is no fragmentation in the bucket. The record size has
94  	 * effect only if duplicates == true.
95  	 */
96  	private int recordSize;
97  
98  	protected AbstractOBLStore(String name, OBStore<TupleBytes> storage,
99  			OBStoreFactory fact, boolean duplicates, int recordSize,
100 			File baseFolder) throws OBException {
101 		this.duplicates = duplicates;
102 		this.name = name;
103 		this.fact = fact;
104 		this.storage = storage;
105 		this.recordSize = recordSize;
106 		this.baseFolder = new File(baseFolder, name);
107 		this.handles = new OBCacheLong<RAFileHolder>(new HandlerLoader(),
108 				OBSearchProperties.getLHandlesCacheSize());
109 		logger.debug("Handle cache: "  + OBSearchProperties.getLHandlesCacheSize());
110 	}
111 	
112 	public byte[] prepareBytes(byte[] in){
113 		return storage.prepareBytes(in);
114 	}
115 
116 	@Override
117 	public void close() throws OBStorageException {
118 		try {
119 			handles.clearAll();
120 		} catch (Exception e) {
121 			throw new OBStorageException(e);
122 		}
123 		storage.close();
124 	}
125 
126 	/**
127 	 * Create a new file based on the given id.
128 	 * 
129 	 * @param id
130 	 *            Id of the file
131 	 * @return return the file for the given id.
132 	 */
133 	private File generateBucketFile(int id) {
134 		StringBuilder fileName = new StringBuilder();
135 		for (char c : Integer.toHexString(id).toCharArray()) {
136 			fileName.append(File.separatorChar);
137 			fileName.append(c);
138 
139 		}
140 		fileName.append(".d");
141 		return new File(baseFolder, fileName.toString());
142 	}
143 
144 	/**
145 	 * Deletes the given bucket address.
146 	 * 
147 	 * @param f
148 	 */
149 	private void deleteBucket(File f) throws OBException {
150 		OBAsserts.chkAssert(f.delete(), "Could not delete file: "
151 				+ f.toString());
152 	}
153 
154 	@Override
155 	public OperationStatus delete(byte[] key) throws OBStorageException {
156 		try {
157 			if (duplicates) {
158 				ByteBuffer bucket = storage.getValue(key);
159 				if (bucket != null) {
160 					int id = bucket.getInt();
161 					handles.remove(id);
162 					deleteBucket(generateBucketFile(id));
163 				}
164 			}
165 			return storage.delete(key);
166 
167 		} catch (Exception e) {
168 			throw new OBStorageException(e);
169 		}
170 	}
171 
172 	@Override
173 	public void deleteAll() throws OBStorageException {
174 		try {
175 			CloseIterator<TupleBytes> it = storage.processAll();
176 			while (it.hasNext()) {
177 				TupleBytes b = it.next();
178 				delete(b.getKey());
179 			}
180 		} catch (Exception e) {
181 			throw new OBStorageException(e);
182 		}
183 	}
184 
185 	@Override
186 	public OBStoreFactory getFactory() {
187 		return fact;
188 	}
189 
190 	@Override
191 	public String getName() {
192 		return name;
193 	}
194 
195 	@Override
196 	public StaticBin1D getReadStats() {
197 		// TODO Auto-generated method stub
198 		return null;
199 	}
200 
201 	@Override
202 	public ByteBuffer getValue(byte[] key) throws IllegalArgumentException,
203 			OBStorageException {
204 		if (duplicates) {
205 			throw new IllegalArgumentException();
206 		}
207 		return storage.getValue(key);
208 	}
209 
210 	@Override
211 	public long nextId() throws OBStorageException {
212 		return storage.nextId();
213 	}
214 
215 	@Override
216 	public CloseIterator<TupleBytes> processRange(byte[] low, byte[] high)
217 			throws OBStorageException {
218 		return new ByteArrayIterator(low, high, false, false);
219 	}
220 
221 	@Override
222 	public CloseIterator<TupleBytes> processRangeReverse(byte[] low, byte[] high)
223 			throws OBStorageException {
224 		return new ByteArrayIterator(low, high, false, true);
225 	}
226 	
227 	@Override
228 	public CloseIterator<TupleBytes> processRangeNoDup(byte[] low, byte[] high)
229 			throws OBStorageException {
230 		assert false;
231 		return null;
232 	}
233 
234 	@Override
235 	public CloseIterator<TupleBytes> processRangeReverseNoDup(byte[] low,
236 			byte[] high) throws OBStorageException {
237 		assert false;
238 		return null;
239 	}
240 
241 	@Override
242 	public OperationStatus put(byte[] key, ByteBuffer value)
243 			throws OBStorageException {
244 		if (!duplicates) {
245 			return storage.put(key, value);
246 		}
247 		try {
248 			OBAsserts.chkAssertStorage(value.array().length == recordSize,
249 					"Invalid record size: data: " + value.array().length + " system: " + recordSize);
250 
251 			OperationStatus res = new OperationStatus();
252 			res.setStatus(Status.OK);
253 			// add the value at the end of the file.
254 			int id = -1;
255 			ByteBuffer data = storage.getValue(key);
256 			if (data == null) {
257 				long idl = storage.nextId();
258 				OBAsserts.chkAssert(idl <= Integer.MAX_VALUE,
259 						"Exceeded possible number of buckets, fatal error");
260 				id = (int) idl;
261 				ByteBuffer j = ByteConversion
262 						.createByteBuffer(ByteConstants.Int.getSize());
263 				j.putInt(id);
264 				storage.put(key, j);
265 			} else {
266 				id = data.getInt();
267 			}
268 			RandomAccessFile
269 			 bucket = handles.get(id).getFile();
270 			// record last position
271 			long lastPosition = bucket.length();
272 			// extend the file
273 			bucket.setLength(bucket.length() + recordSize);
274 			// write the data
275 			bucket.seek(lastPosition);
276 			bucket.write(value.array());
277 			return res;
278 		} catch (Exception e) {
279 			throw new OBStorageException(e);
280 		}
281 	}
282 
283 	@Override
284 	public void setReadStats(StaticBin1D stats) {
285 		// TODO Auto-generated method stub
286 
287 	}
288 
289 	@Override
290 	public long size() throws OBStorageException {
291 		// inneficient but simple.
292 		CloseIterator<TupleBytes> it = storage.processAll();
293 		long res = 0;
294 		try {
295 			while (it.hasNext()) {
296 				TupleBytes t = it.next();
297 				int id = t.getValue().getInt();
298 				RandomAccessFile currentBucket = handles.get(id).getFile();
299 				res += currentBucket.length() / recordSize;
300 			}
301 			it.closeCursor();
302 			return res;
303 		} catch (Exception e) {
304 			throw new OBStorageException(e);
305 		}
306 
307 	}
308 
309 	protected abstract class CursorIterator<T> implements CloseIterator<T> {
310 		private CloseIterator<TupleBytes> it;
311 		// TODO: If the cache closes the object, then
312 		// the iterator will become invalid. We have to
313 		// check for such case.
314 		private FileHolder currentBucket;
315 		private byte[] currentData = new byte[recordSize];
316 		private TupleBytes currentTuple;
317 		private long previousIndex = 0;
318 		
319 		//private ByteBuffer currentData = ByteBuffer.allocateDirect(recordSize);
320 		/**
321 		 * If this iterator goes backwards.
322 		 */
323 		//private boolean backwardsMode;
324 
325 		byte[] min;
326 		byte[] max;
327 
328 		protected CursorIterator(byte[] min, byte[] max, boolean full,
329 				boolean backwards) throws OBStorageException {
330 			if (full) {
331 				it = storage.processAll();
332 			} else if (backwards) {
333 				it = storage.processRangeReverse(min, max);		
334 				if(it.hasNext()){
335 					it.next();
336 				}
337 				
338 			} else {
339 				it = storage.processRange(min, max);
340 			}
341 			//this.backwardsMode = backwards;
342 			this.min = min;
343 			this.max = max;
344 		}
345 
346 		private boolean isCurrentFileFinished(){
347 			boolean res;
348 			try {
349 				long p = -1;
350 				long l = -1;
351 				// first time || after the first time
352 				if (currentBucket != null) {
353 					p = currentBucket.getFilePointer();
354 					l = currentBucket.length();
355 					
356 				}
357 				
358 					res = currentBucket == null || p == l;
359 				
360 				
361 				//logger.debug("p: " + p + " l: " + l + " finished: " + res + "it.hasnext " + it.hasNext() + " len: " );
362 			} catch (Exception e) {
363 				throw new NoSuchElementException(e.toString());
364 			}
365 			return res;
366 		}
367 
368 		private void loadNext() throws OBException, InstantiationException,
369 				IllegalAccessException, IOException {
370 			if (isCurrentFileFinished()) {
371 				if (it.hasNext()) {
372 					TupleBytes tuple = it.next();
373 					assert currentTuple == null
374 							|| !Arrays.equals(tuple.getKey(), currentTuple
375 									.getKey());
376 					currentTuple = tuple;
377 					int id = tuple.getValue().getInt();
378 					
379 					this.currentBucket = new FileHolder(handles.get(id), id);
380 					
381 						currentBucket.seek(0); // reset the file pointer.
382 						previousIndex = 0;
383 					
384 					assert !isCurrentFileFinished() : "Empty buckets are wrong";
385 					
386 				} else {
387 					it = null; // we are done.
388 					
389 					
390 				}
391 			}
392 			
393 			
394 			// we now just have to read the bytes
395 			// read the size of the bytes stored.
396 			previousIndex = currentBucket.getFilePointer();
397 			
398 			
399 			// currentData = new byte[recordSize];
400 			assert recordSize == currentData.length;
401 			currentBucket.readFully(currentData);
402 			
403 		}
404 
405 		@Override
406 		public boolean hasNext() {
407 			boolean res = it != null && (it.hasNext() || !isCurrentFileFinished());
408 			//logger.debug("last hasnext: " + res + " it.hasnext" + it.hasNext());
409 			return res;
410 		}
411 
412 		@Override
413 		public T next() {
414 			try {
415 				loadNext();
416 				return createTuple(currentTuple.getKey(), 
417 						ByteConversion.createByteBuffer(currentData));
418 			} catch (Exception e) {
419 				e.printStackTrace();
420 				throw new UnsupportedOperationException(e);
421 			}
422 
423 		}
424 
425 		@Override
426 		public void remove() {
427 			try {
428 
429 				if (!Arrays.equals(min, max)) {
430 					throw new NoSuchElementException(
431 							"Cannot remove if min != max. This is a bucket!");
432 				}
433 
434 				// remove the item we just returned.
435 				currentBucket.seek(previousIndex);
436 
437 				// take the last item of the record and put it
438 				// where we removed the data.
439 				// if the last item is not the record we are going to remove
440 				// now.
441 				long lastItem = currentBucket.length() - recordSize;
442 				if (lastItem > previousIndex) {
443 					byte[] lastItemData = new byte[recordSize];
444 					currentBucket.seek(lastItem);
445 					currentBucket.readFully(lastItemData);
446 					// copy the data to the deleted location
447 					currentBucket.seek(previousIndex);
448 					currentBucket.write(lastItemData);
449 				}
450 				// decrease the size of the file:
451 				//logger.debug("Length before remove: " + currentBucket.length());
452 				currentBucket.setLength(currentBucket.length() - recordSize);
453 				//logger.debug("Length after remove: " + currentBucket.length());
454 				// restore the pointer
455 				currentBucket.seek(previousIndex);
456 				if (currentBucket.length() == 0) {// erase the bucket.
457 					delete(currentTuple.getKey());
458 				}
459 			} catch (Exception e) {
460 				throw new NoSuchElementException(e.toString());
461 			}
462 		}
463 
464 		@Override
465 		public void closeCursor() throws OBException {
466 			it.closeCursor();
467 			
468 		}
469 
470 		/**
471 		 * Creates a tuple from the given key and value.
472 		 * 
473 		 * @param key
474 		 *            raw key.
475 		 * @param value
476 		 *            raw value.
477 		 * @return A new tuple of type T created from the raw data key and
478 		 *         value.
479 		 */
480 		protected abstract T createTuple(byte[] key, ByteBuffer value);
481 
482 	}
483 
484 	/**
485 	 * Iterator used to process range results.
486 	 */
487 
488 	protected final class ByteArrayIterator extends CursorIterator<TupleBytes> {
489 
490 		protected ByteArrayIterator() throws OBStorageException {
491 			super(null, null, true, false);
492 		}
493 
494 		protected ByteArrayIterator(byte[] min, byte[] max)
495 				throws OBStorageException {
496 			super(min, max, false, false);
497 		}
498 
499 		protected ByteArrayIterator(byte[] min, byte[] max, boolean full,
500 				boolean backwardsMode) throws OBStorageException {
501 			super(min, max, full, backwardsMode);
502 		}
503 
504 		@Override
505 		protected TupleBytes createTuple(byte[] key, ByteBuffer value) {
506 			return new TupleBytes(key, value);
507 		}
508 	}
509 
510 	protected final class HandlerLoader implements OBCacheHandlerLong<RAFileHolder> {
511 
512 		@Override
513 		public long getDBSize() throws OBStorageException {
514 			return storage.size();
515 		}
516 
517 		@Override
518 		public RAFileHolder loadObject(long key) throws OutOfRangeException,
519 				OBException, InstantiationException, IllegalAccessException,
520 				OBStorageException {
521 			File f = generateBucketFile((int) key);
522 			try {
523 				if (!f.getParentFile().exists()) {
524 					OBAsserts.chkAssert(f.getParentFile().mkdirs(),
525 							"Could not create all dirs");
526 				}
527 				assert key <= Integer.MAX_VALUE;
528 				//return new RandomAccessFileHolder(new RandomAccessFile(new IOController(1024,new RandomAccessFileContent(f, "rw"))));
529 				return new RAFileHolder (new RandomAccessFile(f, "rw"));
530 			} catch (Exception e) {
531 				throw new OBStorageException(e);
532 			}
533 		}
534 
535 		@Override
536 		public void store(long key, RAFileHolder object) throws OBException {
537 			
538 				object.close();
539 			
540 
541 		}
542 
543 	}
544 	
545 	/**
546 	 * Monitors if the random access file has been closed.
547 	 * @author amuller
548 	 *
549 	 */
550 	protected final class RAFileHolder {
551 		private boolean closed = false;
552 		private RandomAccessFile f;
553 		private FileChannel fc;
554 		private MappedByteBuffer map;
555 		public RAFileHolder(RandomAccessFile f) throws OBStorageException{
556 			this.f = f;
557 			fc = f.getChannel();
558 			try{
559 			reloadMap();
560 			}catch(IOException e){
561 				throw new OBStorageException(e);
562 			}
563 		}
564 		
565 		public void reloadMap() throws IOException{
566 			map = fc.map(MapMode.READ_WRITE, 0, f.length());
567 		}
568 		
569 		public void close() throws OBException{
570 			closed =  true;
571 			try{
572 				f.close();
573 				fc.close();
574 			}catch(IOException e){
575 				throw new OBException(e);
576 			}
577 		}
578 
579 		public boolean isClosed() {
580 			return closed;
581 		}
582 
583 		public RandomAccessFile getFile() {
584 			return f;
585 		}
586 
587 		public RandomAccessFile getF() {
588 			return f;
589 		}
590 
591 		public FileChannel getFc() {
592 			return fc;
593 		}
594 
595 		public MappedByteBuffer getMap() {
596 			return map;
597 		}
598 		
599 		
600 
601 	}
602 
603 	/**
604 	 * This class holds references to RandomAccessFile. It makes sure that we
605 	 * 
606 	 * @author amuller
607 	 * 
608 	 */
609 	protected final class FileHolder {
610 
611 		private RandomAccessFile file;
612 		private MappedByteBuffer map;
613 		private RAFileHolder holder;
614 		private long offset;
615 		private int bucketId;
616 		
617 
618 		public FileHolder(RAFileHolder h, int bucketId) {
619 			this.bucketId = bucketId;
620 			update(h);
621 			offset = 0;
622 		}
623 				
624 		private void update(RAFileHolder h){
625 			this.file = h.getFile();
626 			this.holder = h;	
627 			this.map = h.getMap();
628 		}
629 		
630 		
631 		public ByteBuffer getMap(){
632 			return map;
633 		}
634 		
635 		/**
636 		 * Reload the file if it is gone.
637 		 * @throws IllegalAccessException 
638 		 * @throws InstantiationException 
639 		 * @throws OBException 
640 		 * @throws IOException 
641 		 * @throws OutOfRangeException 
642 		 */
643 		private void verifyFile() throws OBException, IOException{
644 			try{
645 			if(holder.isClosed()){
646 				update(handles.get(bucketId));
647 			}
648 			}catch(Exception e){
649 				throw new OBException(e);
650 			}
651 			assert offset <= holder.getFile().length() : " offset "  + offset + " length " + holder.getFile().length();
652 		}
653 		
654 		/**
655 		 * Restore the offset position in the file.
656 		 * @throws IOException
657 		 */
658 		private void updatePrev() throws IOException{
659 			/**/
660 			if(offset != map.position()){
661 				map.position((int)offset);
662 			}
663 		}
664 		
665 		private void updatePrevWrite() throws IOException{
666 			if(offset != file.getFilePointer()){
667 				file.seek(offset);
668 			}
669 		}
670 		private void updatePosWrite() throws IOException{
671 			offset = file.getFilePointer();
672 			holder.reloadMap();
673 			map = holder.getMap();
674 		}
675 		/**
676 		 * Restore the offset position in the file.
677 		 * @throws IOException
678 		 */
679 		private void updatePos() throws IOException{
680 			//offset = file.getFilePointer();
681 			offset = map.position();
682 		}
683 		public long length() throws IOException, OBException {
684 			verifyFile();
685 			//return file.length();
686 			return map.capacity();
687 		}
688 
689 		
690 
691 		public final void readFully(byte[] b) throws IOException, OBException {
692 			verifyFile();
693 			updatePrev();
694 			//file.readFully(b);
695 			map.get(b);
696 			updatePos();
697 		}
698 
699 		public void seek(long pos) throws IOException, OBException {
700 			verifyFile();						
701 			map.position((int)pos);
702 			updatePos();
703 		}
704 
705 		public void write(byte[] b) throws IOException, OBException {
706 			verifyFile();		
707 			updatePrevWrite();		
708 			holder.getFile().write(b);
709 			updatePosWrite();
710 		}
711 
712 		
713 
714 		public long getFilePointer() throws IOException, OBException {
715 			verifyFile();
716 			return offset;
717 		}
718 
719 		public void setLength(long newLength) throws IOException, OBException {
720 			verifyFile();
721 			holder.getFile().setLength(newLength);			
722 			updatePosWrite();
723 		}
724 		
725 		
726 
727 	}
728 
729 }