1 package net.obsearch.storage.l;
2
3 import hep.aida.bin.StaticBin1D;
4
5 import java.io.EOFException;
6 import java.io.File;
7 import java.io.FileNotFoundException;
8 import java.io.IOException;
9 import java.io.RandomAccessFile;
10 import java.nio.ByteBuffer;
11 import java.nio.MappedByteBuffer;
12 import java.nio.channels.FileChannel;
13 import java.nio.channels.FileChannel.MapMode;
14 import java.util.Arrays;
15 import java.util.NoSuchElementException;
16
17 import org.apache.log4j.Logger;
18
19
20
21
22 import net.obsearch.OperationStatus;
23 import net.obsearch.Status;
24 import net.obsearch.asserts.OBAsserts;
25 import net.obsearch.cache.OBCacheHandler;
26 import net.obsearch.cache.OBCacheHandlerByteArray;
27 import net.obsearch.cache.OBCacheHandlerLong;
28 import net.obsearch.cache.OBCacheLong;
29 import net.obsearch.constants.ByteConstants;
30 import net.obsearch.constants.OBSearchProperties;
31 import net.obsearch.exception.OBException;
32 import net.obsearch.exception.OBStorageException;
33 import net.obsearch.exception.OutOfRangeException;
34 import net.obsearch.storage.CloseIterator;
35 import net.obsearch.storage.OBStore;
36 import net.obsearch.storage.OBStoreFactory;
37 import net.obsearch.storage.Tuple;
38 import net.obsearch.storage.TupleBytes;
39 import net.obsearch.utils.bytes.ByteConversion;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72 public abstract class AbstractOBLStore<T extends Tuple> implements OBStore<T> {
73
74 private static final transient Logger logger = Logger
75 .getLogger(AbstractOBLStore.class);
76
77
78
79 private OBCacheLong<RAFileHolder> handles;
80
81 private OBStore<TupleBytes> storage;
82
83 private boolean duplicates;
84
85 private String name;
86
87 protected OBStoreFactory fact;
88
89 private File baseFolder;
90
91
92
93
94
95
96 private int recordSize;
97
98 protected AbstractOBLStore(String name, OBStore<TupleBytes> storage,
99 OBStoreFactory fact, boolean duplicates, int recordSize,
100 File baseFolder) throws OBException {
101 this.duplicates = duplicates;
102 this.name = name;
103 this.fact = fact;
104 this.storage = storage;
105 this.recordSize = recordSize;
106 this.baseFolder = new File(baseFolder, name);
107 this.handles = new OBCacheLong<RAFileHolder>(new HandlerLoader(),
108 OBSearchProperties.getLHandlesCacheSize());
109 logger.debug("Handle cache: " + OBSearchProperties.getLHandlesCacheSize());
110 }
111
112 public byte[] prepareBytes(byte[] in){
113 return storage.prepareBytes(in);
114 }
115
116 @Override
117 public void close() throws OBStorageException {
118 try {
119 handles.clearAll();
120 } catch (Exception e) {
121 throw new OBStorageException(e);
122 }
123 storage.close();
124 }
125
126
127
128
129
130
131
132
133 private File generateBucketFile(int id) {
134 StringBuilder fileName = new StringBuilder();
135 for (char c : Integer.toHexString(id).toCharArray()) {
136 fileName.append(File.separatorChar);
137 fileName.append(c);
138
139 }
140 fileName.append(".d");
141 return new File(baseFolder, fileName.toString());
142 }
143
144
145
146
147
148
149 private void deleteBucket(File f) throws OBException {
150 OBAsserts.chkAssert(f.delete(), "Could not delete file: "
151 + f.toString());
152 }
153
154 @Override
155 public OperationStatus delete(byte[] key) throws OBStorageException {
156 try {
157 if (duplicates) {
158 ByteBuffer bucket = storage.getValue(key);
159 if (bucket != null) {
160 int id = bucket.getInt();
161 handles.remove(id);
162 deleteBucket(generateBucketFile(id));
163 }
164 }
165 return storage.delete(key);
166
167 } catch (Exception e) {
168 throw new OBStorageException(e);
169 }
170 }
171
172 @Override
173 public void deleteAll() throws OBStorageException {
174 try {
175 CloseIterator<TupleBytes> it = storage.processAll();
176 while (it.hasNext()) {
177 TupleBytes b = it.next();
178 delete(b.getKey());
179 }
180 } catch (Exception e) {
181 throw new OBStorageException(e);
182 }
183 }
184
185 @Override
186 public OBStoreFactory getFactory() {
187 return fact;
188 }
189
190 @Override
191 public String getName() {
192 return name;
193 }
194
195 @Override
196 public StaticBin1D getReadStats() {
197
198 return null;
199 }
200
201 @Override
202 public ByteBuffer getValue(byte[] key) throws IllegalArgumentException,
203 OBStorageException {
204 if (duplicates) {
205 throw new IllegalArgumentException();
206 }
207 return storage.getValue(key);
208 }
209
210 @Override
211 public long nextId() throws OBStorageException {
212 return storage.nextId();
213 }
214
215 @Override
216 public CloseIterator<TupleBytes> processRange(byte[] low, byte[] high)
217 throws OBStorageException {
218 return new ByteArrayIterator(low, high, false, false);
219 }
220
221 @Override
222 public CloseIterator<TupleBytes> processRangeReverse(byte[] low, byte[] high)
223 throws OBStorageException {
224 return new ByteArrayIterator(low, high, false, true);
225 }
226
227 @Override
228 public CloseIterator<TupleBytes> processRangeNoDup(byte[] low, byte[] high)
229 throws OBStorageException {
230 assert false;
231 return null;
232 }
233
234 @Override
235 public CloseIterator<TupleBytes> processRangeReverseNoDup(byte[] low,
236 byte[] high) throws OBStorageException {
237 assert false;
238 return null;
239 }
240
241 @Override
242 public OperationStatus put(byte[] key, ByteBuffer value)
243 throws OBStorageException {
244 if (!duplicates) {
245 return storage.put(key, value);
246 }
247 try {
248 OBAsserts.chkAssertStorage(value.array().length == recordSize,
249 "Invalid record size: data: " + value.array().length + " system: " + recordSize);
250
251 OperationStatus res = new OperationStatus();
252 res.setStatus(Status.OK);
253
254 int id = -1;
255 ByteBuffer data = storage.getValue(key);
256 if (data == null) {
257 long idl = storage.nextId();
258 OBAsserts.chkAssert(idl <= Integer.MAX_VALUE,
259 "Exceeded possible number of buckets, fatal error");
260 id = (int) idl;
261 ByteBuffer j = ByteConversion
262 .createByteBuffer(ByteConstants.Int.getSize());
263 j.putInt(id);
264 storage.put(key, j);
265 } else {
266 id = data.getInt();
267 }
268 RandomAccessFile
269 bucket = handles.get(id).getFile();
270
271 long lastPosition = bucket.length();
272
273 bucket.setLength(bucket.length() + recordSize);
274
275 bucket.seek(lastPosition);
276 bucket.write(value.array());
277 return res;
278 } catch (Exception e) {
279 throw new OBStorageException(e);
280 }
281 }
282
283 @Override
284 public void setReadStats(StaticBin1D stats) {
285
286
287 }
288
289 @Override
290 public long size() throws OBStorageException {
291
292 CloseIterator<TupleBytes> it = storage.processAll();
293 long res = 0;
294 try {
295 while (it.hasNext()) {
296 TupleBytes t = it.next();
297 int id = t.getValue().getInt();
298 RandomAccessFile currentBucket = handles.get(id).getFile();
299 res += currentBucket.length() / recordSize;
300 }
301 it.closeCursor();
302 return res;
303 } catch (Exception e) {
304 throw new OBStorageException(e);
305 }
306
307 }
308
309 protected abstract class CursorIterator<T> implements CloseIterator<T> {
310 private CloseIterator<TupleBytes> it;
311
312
313
314 private FileHolder currentBucket;
315 private byte[] currentData = new byte[recordSize];
316 private TupleBytes currentTuple;
317 private long previousIndex = 0;
318
319
320
321
322
323
324
325 byte[] min;
326 byte[] max;
327
328 protected CursorIterator(byte[] min, byte[] max, boolean full,
329 boolean backwards) throws OBStorageException {
330 if (full) {
331 it = storage.processAll();
332 } else if (backwards) {
333 it = storage.processRangeReverse(min, max);
334 if(it.hasNext()){
335 it.next();
336 }
337
338 } else {
339 it = storage.processRange(min, max);
340 }
341
342 this.min = min;
343 this.max = max;
344 }
345
346 private boolean isCurrentFileFinished(){
347 boolean res;
348 try {
349 long p = -1;
350 long l = -1;
351
352 if (currentBucket != null) {
353 p = currentBucket.getFilePointer();
354 l = currentBucket.length();
355
356 }
357
358 res = currentBucket == null || p == l;
359
360
361
362 } catch (Exception e) {
363 throw new NoSuchElementException(e.toString());
364 }
365 return res;
366 }
367
368 private void loadNext() throws OBException, InstantiationException,
369 IllegalAccessException, IOException {
370 if (isCurrentFileFinished()) {
371 if (it.hasNext()) {
372 TupleBytes tuple = it.next();
373 assert currentTuple == null
374 || !Arrays.equals(tuple.getKey(), currentTuple
375 .getKey());
376 currentTuple = tuple;
377 int id = tuple.getValue().getInt();
378
379 this.currentBucket = new FileHolder(handles.get(id), id);
380
381 currentBucket.seek(0);
382 previousIndex = 0;
383
384 assert !isCurrentFileFinished() : "Empty buckets are wrong";
385
386 } else {
387 it = null;
388
389
390 }
391 }
392
393
394
395
396 previousIndex = currentBucket.getFilePointer();
397
398
399
400 assert recordSize == currentData.length;
401 currentBucket.readFully(currentData);
402
403 }
404
405 @Override
406 public boolean hasNext() {
407 boolean res = it != null && (it.hasNext() || !isCurrentFileFinished());
408
409 return res;
410 }
411
412 @Override
413 public T next() {
414 try {
415 loadNext();
416 return createTuple(currentTuple.getKey(),
417 ByteConversion.createByteBuffer(currentData));
418 } catch (Exception e) {
419 e.printStackTrace();
420 throw new UnsupportedOperationException(e);
421 }
422
423 }
424
425 @Override
426 public void remove() {
427 try {
428
429 if (!Arrays.equals(min, max)) {
430 throw new NoSuchElementException(
431 "Cannot remove if min != max. This is a bucket!");
432 }
433
434
435 currentBucket.seek(previousIndex);
436
437
438
439
440
441 long lastItem = currentBucket.length() - recordSize;
442 if (lastItem > previousIndex) {
443 byte[] lastItemData = new byte[recordSize];
444 currentBucket.seek(lastItem);
445 currentBucket.readFully(lastItemData);
446
447 currentBucket.seek(previousIndex);
448 currentBucket.write(lastItemData);
449 }
450
451
452 currentBucket.setLength(currentBucket.length() - recordSize);
453
454
455 currentBucket.seek(previousIndex);
456 if (currentBucket.length() == 0) {
457 delete(currentTuple.getKey());
458 }
459 } catch (Exception e) {
460 throw new NoSuchElementException(e.toString());
461 }
462 }
463
464 @Override
465 public void closeCursor() throws OBException {
466 it.closeCursor();
467
468 }
469
470
471
472
473
474
475
476
477
478
479
480 protected abstract T createTuple(byte[] key, ByteBuffer value);
481
482 }
483
484
485
486
487
488 protected final class ByteArrayIterator extends CursorIterator<TupleBytes> {
489
490 protected ByteArrayIterator() throws OBStorageException {
491 super(null, null, true, false);
492 }
493
494 protected ByteArrayIterator(byte[] min, byte[] max)
495 throws OBStorageException {
496 super(min, max, false, false);
497 }
498
499 protected ByteArrayIterator(byte[] min, byte[] max, boolean full,
500 boolean backwardsMode) throws OBStorageException {
501 super(min, max, full, backwardsMode);
502 }
503
504 @Override
505 protected TupleBytes createTuple(byte[] key, ByteBuffer value) {
506 return new TupleBytes(key, value);
507 }
508 }
509
510 protected final class HandlerLoader implements OBCacheHandlerLong<RAFileHolder> {
511
512 @Override
513 public long getDBSize() throws OBStorageException {
514 return storage.size();
515 }
516
517 @Override
518 public RAFileHolder loadObject(long key) throws OutOfRangeException,
519 OBException, InstantiationException, IllegalAccessException,
520 OBStorageException {
521 File f = generateBucketFile((int) key);
522 try {
523 if (!f.getParentFile().exists()) {
524 OBAsserts.chkAssert(f.getParentFile().mkdirs(),
525 "Could not create all dirs");
526 }
527 assert key <= Integer.MAX_VALUE;
528
529 return new RAFileHolder (new RandomAccessFile(f, "rw"));
530 } catch (Exception e) {
531 throw new OBStorageException(e);
532 }
533 }
534
535 @Override
536 public void store(long key, RAFileHolder object) throws OBException {
537
538 object.close();
539
540
541 }
542
543 }
544
545
546
547
548
549
550 protected final class RAFileHolder {
551 private boolean closed = false;
552 private RandomAccessFile f;
553 private FileChannel fc;
554 private MappedByteBuffer map;
555 public RAFileHolder(RandomAccessFile f) throws OBStorageException{
556 this.f = f;
557 fc = f.getChannel();
558 try{
559 reloadMap();
560 }catch(IOException e){
561 throw new OBStorageException(e);
562 }
563 }
564
565 public void reloadMap() throws IOException{
566 map = fc.map(MapMode.READ_WRITE, 0, f.length());
567 }
568
569 public void close() throws OBException{
570 closed = true;
571 try{
572 f.close();
573 fc.close();
574 }catch(IOException e){
575 throw new OBException(e);
576 }
577 }
578
579 public boolean isClosed() {
580 return closed;
581 }
582
583 public RandomAccessFile getFile() {
584 return f;
585 }
586
587 public RandomAccessFile getF() {
588 return f;
589 }
590
591 public FileChannel getFc() {
592 return fc;
593 }
594
595 public MappedByteBuffer getMap() {
596 return map;
597 }
598
599
600
601 }
602
603
604
605
606
607
608
609 protected final class FileHolder {
610
611 private RandomAccessFile file;
612 private MappedByteBuffer map;
613 private RAFileHolder holder;
614 private long offset;
615 private int bucketId;
616
617
618 public FileHolder(RAFileHolder h, int bucketId) {
619 this.bucketId = bucketId;
620 update(h);
621 offset = 0;
622 }
623
624 private void update(RAFileHolder h){
625 this.file = h.getFile();
626 this.holder = h;
627 this.map = h.getMap();
628 }
629
630
631 public ByteBuffer getMap(){
632 return map;
633 }
634
635
636
637
638
639
640
641
642
643 private void verifyFile() throws OBException, IOException{
644 try{
645 if(holder.isClosed()){
646 update(handles.get(bucketId));
647 }
648 }catch(Exception e){
649 throw new OBException(e);
650 }
651 assert offset <= holder.getFile().length() : " offset " + offset + " length " + holder.getFile().length();
652 }
653
654
655
656
657
658 private void updatePrev() throws IOException{
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679 private void updatePos() throws IOException{
680
681 offset = map.position();
682 }
683 public long length() throws IOException, OBException {
684 verifyFile();
685
686 return map.capacity();
687 }
688
689
690
691 public final void readFully(byte[] b) throws IOException, OBException {
692 verifyFile();
693 updatePrev();
694
695 map.get(b);
696 updatePos();
697 }
698
699 public void seek(long pos) throws IOException, OBException {
700 verifyFile();
701 map.position((int)pos);
702 updatePos();
703 }
704
705 public void write(byte[] b) throws IOException, OBException {
706 verifyFile();
707 updatePrevWrite();
708 holder.getFile().write(b);
709 updatePosWrite();
710 }
711
712
713
714 public long getFilePointer() throws IOException, OBException {
715 verifyFile();
716 return offset;
717 }
718
719 public void setLength(long newLength) throws IOException, OBException {
720 verifyFile();
721 holder.getFile().setLength(newLength);
722 updatePosWrite();
723 }
724
725
726
727 }
728
729 }