1 package org.ajmm.obsearch.index;
2
3 import java.io.File;
4 import java.io.IOException;
5 import java.net.URI;
6 import java.util.ArrayList;
7 import java.util.Arrays;
8 import java.util.Collections;
9 import java.util.Enumeration;
10 import java.util.HashSet;
11 import java.util.Iterator;
12 import java.util.LinkedList;
13 import java.util.List;
14 import java.util.Random;
15 import java.util.concurrent.ArrayBlockingQueue;
16 import java.util.concurrent.BlockingQueue;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ConcurrentMap;
19 import java.util.concurrent.Semaphore;
20 import java.util.concurrent.atomic.AtomicBoolean;
21 import java.util.concurrent.atomic.AtomicIntegerArray;
22 import java.util.concurrent.atomic.AtomicLong;
23 import java.util.concurrent.atomic.AtomicLongArray;
24
25 import net.jxta.discovery.DiscoveryEvent;
26 import net.jxta.discovery.DiscoveryListener;
27 import net.jxta.discovery.DiscoveryService;
28 import net.jxta.document.Advertisement;
29 import net.jxta.document.AdvertisementFactory;
30 import net.jxta.document.MimeMediaType;
31 import net.jxta.endpoint.ByteArrayMessageElement;
32 import net.jxta.endpoint.EndpointAddress;
33 import net.jxta.endpoint.Message;
34 import net.jxta.endpoint.Message.ElementIterator;
35 import net.jxta.exception.PeerGroupException;
36 import net.jxta.id.ID;
37 import net.jxta.id.IDFactory;
38 import net.jxta.peergroup.PeerGroup;
39 import net.jxta.pipe.PipeID;
40 import net.jxta.pipe.PipeMsgEvent;
41 import net.jxta.pipe.PipeMsgListener;
42 import net.jxta.pipe.PipeService;
43 import net.jxta.platform.NetworkConfigurator;
44 import net.jxta.platform.NetworkManager;
45 import net.jxta.protocol.DiscoveryResponseMsg;
46 import net.jxta.protocol.PeerAdvertisement;
47 import net.jxta.protocol.PipeAdvertisement;
48 import net.jxta.util.JxtaBiDiPipe;
49 import net.jxta.util.JxtaServerPipe;
50
51 import org.ajmm.obsearch.AsynchronousIndex;
52 import org.ajmm.obsearch.Index;
53 import org.ajmm.obsearch.OB;
54 import org.ajmm.obsearch.Result;
55 import org.ajmm.obsearch.SynchronizableIndex;
56 import org.ajmm.obsearch.TimeStampResult;
57 import org.ajmm.obsearch.exception.AlreadyFrozenException;
58 import org.ajmm.obsearch.exception.BoxNotAvailableException;
59 import org.ajmm.obsearch.exception.IllegalIdException;
60 import org.ajmm.obsearch.exception.NotFrozenException;
61 import org.ajmm.obsearch.exception.OBException;
62 import org.ajmm.obsearch.exception.OutOfRangeException;
63 import org.ajmm.obsearch.exception.UndefinedPivotsException;
64 import org.ajmm.obsearch.index.AbstractSynchronizableIndex.TimeStampIterator;
65 import org.ajmm.obsearch.index.utils.Directory;
66 import org.apache.log4j.Logger;
67
68 import com.sleepycat.bind.tuple.TupleInput;
69 import com.sleepycat.bind.tuple.TupleOutput;
70 import com.sleepycat.je.DatabaseException;
71
72 /*
73 OBSearch: a distributed similarity search engine
74 This project is to similarity search what 'bit-torrent' is to downloads.
75 Copyright (C) 2007 Arnoldo Jose Muller Molina
76
77 This program is free software: you can redistribute it and/or modify
78 it under the terms of the GNU General Public License as published by
79 the Free Software Foundation, either version 3 of the License, or
80 (at your option) any later version.
81
82 This program is distributed in the hope that it will be useful,
83 but WITHOUT ANY WARRANTY; without even the implied warranty of
84 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
85 GNU General Public License for more details.
86
87 You should have received a copy of the GNU General Public License
88 along with this program. If not, see <http://www.gnu.org/licenses/>.
89 */
90 /**
91 * AbstractP2PIndex holds common functionality of indexes that span several
92 * computers. The current implementation uses the JXTA library.
93 * @param <O>
94 * The type of object to be stored in the Index.
95 * @author Arnoldo Jose Muller Molina
96 * @since 0.7
97 */
98
99 public abstract class AbstractP2PIndex < O extends OB > implements Index < O >,
100 DiscoveryListener, AsynchronousIndex {
101
102 /**
103 * Lists all the message types available in the network.
104 */
105 public static enum MessageType {
106 /** time. */
107 TIME,
108 /** box information (for sync and box selection purposes). */
109 BOX,
110 /** synchronize box (request for synchronization). */
111 SYNCBOX,
112 /** synchronize continue (asks for more data). */
113 SYNCC,
114 /**
115 * synchronize retry (asks for the last packet in the event of a
116 * timeout).
117 */
118 SYNCR,
119 /** synchronize end (ends the synchronization for one box). */
120 SYNCE,
121 /** local index data. */
122 INDEX,
123 /** insert object (after a SYNCBOX). */
124 INSOB,
125 /** data sync query. */
126 DSYNQ,
127 /** data sync reply. */
128 DSYNR,
129 /** search query. */
130 SQ,
131 /** search result. */
132 SR
133
134 };
135
136 /**
137 * Lists all the message element types available (these elements are
138 * components found in MessageType packets).
139 */
140 public static enum MessageElementType {
141 /** Search header. */
142 SH,
143 /** Search object. */
144 SO,
145 /** Search sub result. */
146 SSR
147
148 };
149
150 /**
151 * Class logger.
152 */
153 private final transient Logger logger;
154
155 /**
156 * Messages bigger than this one have problems.
157 */
158 private static final int messageSize = 40 * 1024;
159
160 /**
161 * The string that holds the original index xml.
162 */
163 protected String indexXml;
164
165 /**
166 * Min number of pivots to have at any time. For controlling purposes the
167 * necessary minimum number of peers to allow matching might be bigger than
168 * this.
169 */
170 public static final int minNumberOfPeers = 2;
171
172 // JXTA variables
173 /**
174 * JXTA network manager.
175 */
176 private transient NetworkManager manager;
177
178 /**
179 * JXTA discovery service.
180 */
181 private transient DiscoveryService discovery;
182
183 /**
184 * JXTA network client name.
185 */
186 private String clientName;
187
188 /**
189 * JXTA peer group.
190 */
191 private PeerGroup netPeerGroup;
192
193 /**
194 * OBSearch's pipe name.
195 */
196 private static final String pipeName = "OBSearchPipe";
197
198 /**
199 * Maximum number of peers to use.
200 */
201 private static final int maxNumberOfPeers = 100;
202
203 /**
204 * Interval for each heartbeat (in miliseconds) heartbeats check for missing
205 * resources and make sure we are all well connected all the time.
206 */
207 private static final int heartBeatInterval = 10000;
208
209 /**
210 * General timeout used for most p2p operations.
211 */
212 private static final int globalTimeout = 30 * 1000;
213
214 /**
215 * Maximum time difference between the peers. peers that have bigger time
216 * differences will be dropped.
217 */
218 private static final int maxTimeDifference = 3600000;
219
220 /**
221 * Maximum number of objects to query at the same time.
222 * (60)
223 */
224 protected static final int maximumItemsToProcess = 15;
225
226 /**
227 * Maximum time to wait for a query to be answered.
228 */
229 protected static final int queryTimeout = 30000;
230
231 /**
232 * Internally gives ids that can be used to wait for the
233 * processing of a query.
234 */
235 protected BlockingQueue < Integer > takeATab;
236
237 /**
238 * Maximum number of objects to match at the same time. This should be close
239 * to the amount of CPUs available. Currently not being used.
240 */
241 protected int maximumServerThreads;
242
243 /**
244 * Object in charge of accepting new connections.
245 */
246 private JxtaServerPipe serverPipe;
247
248 /**
249 * Contains a pipe per client that have tried to connect to us or that We
250 * have tried to connect to the key is a peer id and not a pipe!
251 */
252 private ConcurrentMap < String, PipeHandler > clients;
253
254 /**
255 * Time when the index was created.
256 */
257 protected long indexTime;
258
259 /**
260 * Location of this DB. Basically a place were JXTA info is stored.
261 */
262 private File dbPath;
263
264 /**
265 * Object used as a monitor. (hack :))
266 */
267 private final String timer = "time";
268
269 /**
270 * Pipe advertisement. to be re-published every once in a while
271 */
272 private PipeAdvertisement pipeAdv;
273
274 /**
275 * Peer advertisement. to be re-published every once in a while
276 */
277 private PeerAdvertisement peerAdv;
278
279 /**
280 * Lifetime and expiration for advertisements.
281 */
282 long lifetime = 60 * 2 * 1000L;
283
284 /**
285 * Lifetime and expiration for advertisements.
286 */
287 long expiration = 60 * 2 * 1000L;
288
289 /**
290 * The boxes that we are currently serving.
291 */
292 private int[] ourBoxes;
293
294 /**
295 * Total number of boxes that will be supported.
296 */
297 private int boxesCount;
298
299 /**
300 * Each entry has a List. An entry in the array equals to a box #. Every
301 * List holds handlers that hold the box in which they are indexed each
302 * handler is responsible of registering and unregistering
303 */
304 private List < List < PipeHandler > > handlersPerBox;
305
306 /**
307 * keeps track of the latest handler that was used for each box. at search
308 * time this values are used to distribute the queries "evenly"
309 */
310 private AtomicIntegerArray handlerSearchIndexes;
311
312 /**
313 * Set to true when some data is inserted or deleted or when a new peer
314 * comes in set to false when we publish all the peers our information or
315 * when a new peer comes in.
316 */
317 private AtomicBoolean boxesUpdated;
318
319 /**
320 * If this peer is in client or server mode. If it is in client mode, Peers
321 * actively search for other peers. Otherwise the peer just waits for
322 * connections. The JXTA library also is configured differently for servers
323 * and for clients.
324 */
325 private boolean isClient = true;
326
327 /**
328 * A unique identifier of requests.
329 */
330 private AtomicLong requestId;
331
332 /**
333 * Maximum number of threads to be used in the search.
334 */
335 private Semaphore searchThreads;
336
337 /**
338 * It is set to true when data is inserted or deleted into this index. once
339 * the available boxes are published, this variable returns to false.
340 */
341 private AtomicBoolean modifiedData;
342
343 /**
344 * Tells if we are syncing or not. Used to sync only box at a time.
345 */
346 private AtomicBoolean syncing;
347
348 /**
349 * The peer that we are currently syincing.
350 */
351 private PipeHandler syncingPipe;
352
353 /**
354 * The last request time for a box.
355 */
356 private AtomicLong syncingBoxLastRequestTime;
357
358 /**
359 * Initialize the abstract class
360 * @param index
361 * the index that will be distributed
362 * @param dbPath
363 * the path where we will store information related to this index
364 * @param clientName
365 * The name of this peer.
366 * @param boxesToServe
367 * The number of boxes that will be served by this index
368 * @param maximumServerThreads
369 * Max number of threads to support. (currently it has no effect)
370 * @throws IOException
371 * If a serialization exception occurs.
372 * @throws PeerGroupException
373 * If a JXTA exception occurs.
374 * @throws NotFrozenException
375 * If index is not frozen.
376 * @throws DatabaseException
377 * If something goes wrong with the DB
378 * @throws OBException
379 * User generated exception
380 */
381 protected AbstractP2PIndex(final SynchronizableIndex < O > index,
382 final File dbPath, final String clientName, final int boxesToServe,
383 final int maximumServerThreads) throws IOException,
384 PeerGroupException, NotFrozenException, DatabaseException,
385 OBException {
386 if (!index.isFrozen()) {
387 throw new NotFrozenException();
388 }
389 if (!dbPath.exists()) {
390 throw new IOException(dbPath + " does not exit");
391 }
392 this.maximumServerThreads = maximumServerThreads;
393 clients = new ConcurrentHashMap < String, PipeHandler >();
394 boxesCount = index.totalBoxes();
395 this.dbPath = dbPath;
396 this.clientName = clientName;
397 this.boxesUpdated = new AtomicBoolean(false);
398 logger = Logger.getLogger(clientName);
399
400 handlersPerBox = Collections
401 .synchronizedList(new ArrayList < List < PipeHandler >>(index
402 .totalBoxes()));
403 initHandlersPerBox(handlersPerBox);
404
405 // initialize the boxes this index is supporting if the given index
406 // has the corresponding data.
407 if (index.databaseSize() != 0) { // if the database has some
408 // data, we serve the data
409 // of the db
410 int i = 0;
411 List < Integer > boxes = new ArrayList < Integer >(index
412 .totalBoxes());
413 while (i < index.totalBoxes()) {
414 if (index.latestModification(i) != -1) {
415 boxes.add(i);
416 }
417 i++;
418 }
419 this.ourBoxes = new int[boxes.size()];
420 i = 0;
421 while (i < boxes.size()) {
422 ourBoxes[i] = boxes.get(i);
423 i++;
424 }
425 } else {
426 decideServicedBoxes(boxesToServe, index);
427 }
428
429 takeATab = new ArrayBlockingQueue < Integer >(maximumItemsToProcess);
430 int i = 0;
431 while (i < maximumItemsToProcess) {
432 takeATab.add(i);
433 i++;
434 }
435
436 handlerSearchIndexes = new AtomicIntegerArray(boxesCount);
437
438 requestId = new AtomicLong(0);
439
440 searchThreads = new Semaphore(maximumServerThreads, true);
441
442 this.modifiedData = new AtomicBoolean(false);
443
444 syncing = new AtomicBoolean(false);
445 syncingPipe = null;
446 syncingBoxLastRequestTime = new AtomicLong(-1);
447 }
448
449 /**
450 * @return The underlying index
451 */
452 protected abstract SynchronizableIndex < O > getIndex();
453
454 /**
455 * Makes sure that all the components of the given list of lists are
456 * synchronized.
457 * @param x
458 * A list of lists of pipe handlers.
459 */
460 private void initHandlersPerBox(final List < List < PipeHandler > > x) {
461 int i = 0;
462 while (i < boxesCount) {
463 x.add(Collections.synchronizedList(new ArrayList < PipeHandler >(
464 maxNumberOfPeers)));
465 i++;
466 }
467 }
468
469 /**
470 * Returns true if any box == 0. This means we need sync But only if we are
471 * connected to providers of every box.
472 * @return True if we have to sync.
473 * @throws DatabaseException
474 * If something goes wrong with the DB
475 * @throws OBException
476 * User generated exception
477 */
478 private boolean needSync() throws OBException, DatabaseException {
479 if (syncing.get()) {
480 // we won't sync again if we are syncing already
481 // logger.debug("no need to sync because we are syncing");
482 return false;
483 }
484 if (ourBoxes == null) {
485 return false;
486 }
487
488 int i = 0;
489 while (i < ourBoxes.length) {
490 int box = ourBoxes[i];
491 if (needSyncInBox(box)) {
492 // logger.debug("box " + box + " should be synced");
493 return true;
494 }
495 i++;
496 }
497 return false;
498 }
499
500 /**
501 * @param i
502 * (box #)
503 * @return true if I have to sync box i or false otherwise
504 * @throws DatabaseException
505 * If something goes wrong with the DB
506 * @throws OBException
507 * User generated exception
508 */
509 private boolean needSyncInBox(final int i) throws OBException,
510 DatabaseException {
511 long boxTime = getIndex().latestModification(i);
512
513 PipeHandler mostRecent = mostRencentPipeHandlerPerBox(i);
514 if (mostRecent == null) {
515 return false; // we should wait until we get peers that serve
516 // this box
517 }
518 assert mostRecent.isServing(i);
519 long mr = mostRecent.lastUpdated(i);
520 // logger.debug(" Most recent box " + mr + " box Time " + boxTime);
521
522 return mr > boxTime;
523 }
524
525 /**
526 * Returns the pipe handler whose ith box has the most recent modification
527 * time.
528 * @param i
529 * Box to search
530 * @return most recent pipe handler
531 */
532 private PipeHandler mostRencentPipeHandlerPerBox(final int i) {
533 List < PipeHandler > boxList = handlersPerBox.get(i);
534 PipeHandler ph = null;
535 synchronized (boxList) {
536 long time = -1;
537
538 Iterator < PipeHandler > it = boxList.iterator();
539 while (it.hasNext()) {
540 PipeHandler p = it.next();
541 assert p.isServing(i);
542 if (time < p.lastUpdated(i)) {
543 time = p.lastUpdated(i);
544 ph = p;
545 }
546 }
547 }
548 return ph;
549 }
550
551 /**
552 * Initializes the p2p network.
553 * @param isClient
554 * If this index is a client peer.
555 * @param c
556 * Configuration mode
557 * @param clearCache
558 * If the cache has to be cleared
559 * @param seedURI
560 * The url of the seed file.
561 * @throws IOException
562 * If some network error occurs.
563 * @throws PeerGroupException
564 * If some JXTA error occurs.
565 */
566 private void init(final boolean isClient,
567 final NetworkManager.ConfigMode c, final boolean clearCache,
568 final URI seedURI) throws IOException, PeerGroupException {
569
570 File cache = new File(new File(dbPath, ".cache"), clientName);
571 if (clearCache) {
572 Directory.deleteDirectory(cache);
573 }
574 manager = new NetworkManager(c, clientName, cache.toURI());
575 NetworkConfigurator configurator = manager.getConfigurator();
576
577 // clear the seeds
578 configurator.setRendezvousSeedURIs(new LinkedList < String >());
579 configurator.setRelaySeedURIs(new LinkedList < String >());
580 configurator.setRelaySeedingURIs(new HashSet < String >());
581 // end of clear the seeds
582
583 configurator.addRdvSeedingURI(seedURI);
584 configurator.addRelaySeedingURI(seedURI);
585 configurator.setUseOnlyRelaySeeds(true);
586 configurator.setUseOnlyRendezvousSeeds(true);
587 configurator.setHttpEnabled(false);
588 configurator.setTcpIncoming(true);
589 configurator.setTcpEnabled(true);
590 configurator.setTcpOutgoing(true);
591 configurator.setUseMulticast(false);
592 manager.startNetwork();
593 // Get the NetPeerGroup
594 netPeerGroup = manager.getNetPeerGroup();
595
596 // get the discovery service
597 discovery = netPeerGroup.getDiscoveryService();
598 discovery.addDiscoveryListener(this);
599 pipeAdv = getPipeAdvertisement();
600 // init the incoming connection listener
601 serverPipe = new JxtaServerPipe(netPeerGroup, pipeAdv);
602 serverPipe.setPipeTimeout(0);
603
604 peerAdv = netPeerGroup.getPeerAdvertisement();
605
606 // wait for rendevouz connection
607 if (isClient) {
608 logger.debug("Waiting for rendevouz connection");
609 manager.waitForRendezvousConnection(0);
610 logger.debug("Rendevouz connection found");
611 }
612 if (logger.isDebugEnabled()) {
613 logger.debug("Peer id: "
614 + netPeerGroup.getPeerAdvertisement().getPeerID().toURI());
615 }
616
617 // publishAdvertisements();
618
619 assert netPeerGroup.getPeerAdvertisement().equals(
620 netPeerGroup.getPeerAdvertisement());
621 }
622
623 /**
624 * Publish advertisements
625 * @throws IOException
626 * if JXTA signals an error.
627 */
628 private void publishAdvertisements() throws IOException {
629 discovery.publish(peerAdv, lifetime, expiration);
630 discovery.remotePublish(peerAdv, expiration);
631 discovery.publish(pipeAdv, expiration, expiration);
632 discovery.remotePublish(pipeAdv, expiration);
633 }
634
635 /**
636 * This class performs a heartbeat. It makes sure that all the resources we
637 * need to properly work.
638 */
639 private class HeartBeat implements Runnable {
640
641 /**
642 * If we caught an error.
643 */
644 private boolean error = false;
645
646 /**
647 * This method starts network connections and calls heartbeat
648 * undefinitely until the program stops.
649 */
650 public void run() {
651 long count = 0;
652 while (!error) {
653
654 try {
655 heartBeat1();
656 heartBeat3(count);
657 heartBeat6(count);
658 heartBeat10(count);
659 heartBeat100(count);
660 synchronized (timer) {
661 timer.wait(heartBeatInterval);
662 }
663 } catch (InterruptedException i) {
664 if (logger.isDebugEnabled()) {
665 logger.debug("HeartBeat interrupted");
666 }
667 } catch (IOException e) {
668 // a pipe gave some error, this is expected
669 } catch (Exception e) {
670 error = true;
671 logger.fatal("Exception in heartBeat", e);
672 assert false;
673 }
674 count++;
675 }
676 if (error) {
677 logger.fatal("Stopping heartbeat because of error");
678 assert false;
679 }
680 }
681
682 /**
683 * Executed once per heart beat.
684 * @throws IOException
685 * If some network error occurs.
686 * @throws PeerGroupException
687 * If some JXTA error occurs.
688 * @throws DatabaseException
689 * If something goes wrong with the DB
690 * @throws OBException
691 * User generated exception
692 */
693 public void heartBeat1() throws PeerGroupException, IOException,
694 OBException, DatabaseException {
695 if (needSync()) {
696 // lower
697 sync();
698 }
699
700 }
701
702 /**
703 * Executed every 10 heart beats.
704 * @param count
705 * The current heart beat count.
706 * @throws IOException
707 * If some network error occurs.
708 * @throws PeerGroupException
709 * If some JXTA error occurs.
710 * @throws DatabaseException
711 * If something goes wrong with the DB
712 * @throws OBException
713 * User generated exception
714 */
715 public void heartBeat10(final long count) throws DatabaseException,
716 IOException, OBException, PeerGroupException {
717 if (count % 10 == 0) {
718 // find pipes if not enough peers are available
719 // or if not all the boxes have been covered
720 if (!minimumNumberOfPeers() || !totalBoxesCovered()) {
721 // logger.debug("Finding pipes!");
722 findPipes();
723 }
724 // check timeouts
725 //
726 }
727 }
728
729 /**
730 * Executed every 6 heart beats.
731 * @param count
732 * The current heart beat count.
733 * @throws IOException
734 * If some network error occurs.
735 * @throws PeerGroupException
736 * If some JXTA error occurs.
737 * @throws DatabaseException
738 * If something goes wrong with the DB
739 * @throws OBException
740 * User generated exception
741 */
742 public void heartBeat6(final long count) throws PeerGroupException,
743 IOException, OBException, DatabaseException {
744 if (count % 6 == 0) {
745
746 }
747 }
748
749 /**
750 * Executed every 3 heart beats.
751 * @param count
752 * The current heart beat count.
753 * @throws IOException
754 * If some network error occurs.
755 * @throws PeerGroupException
756 * If some JXTA error occurs.
757 * @throws DatabaseException
758 * If something goes wrong with the DB
759 * @throws OBException
760 * User generated exception
761 */
762 public void heartBeat3(final long count) throws PeerGroupException,
763 IOException, OBException, DatabaseException {
764 if (count % 3 == 0) {
765
766 sendBoxInfo();
767 info();
768
769 // check for timeouts in the sync process
770 syncAlive();
771 publishAdvertisements();
772 queryTimeoutCheck();
773 }
774 }
775
776 /**
777 * Executed every 100 heart beats.
778 * @param count
779 * The current heart beat count.
780 * @throws IOException
781 * If some network error occurs.
782 * @throws PeerGroupException
783 * If some JXTA error occurs.
784 * @throws DatabaseException
785 * If something goes wrong with the DB
786 * @throws OBException
787 * User generated exception
788 */
789 public void heartBeat100(final long count) throws PeerGroupException,
790 IOException, OBException, DatabaseException {
791 if (count % 100 == 0) {
792 // advertisements should be proactively searched for if we are
793 // running out
794 // of connections
795 // sync();
796 timeBeat();
797 }
798 }
799 }
800
801 /**
802 * @return true if the index is still processing query results
803 */
804 public final boolean isProcessingQueries() {
805 return takeATab.size() != maximumItemsToProcess;
806 }
807
808 /**
809 * Monitors all the queries being executed.
810 */
811 protected final void queryTimeoutCheck() {
812 if (isProcessingQueries()) {
813 long time = System.currentTimeMillis();
814 int i = 0;
815 while (i < maximumItemsToProcess) {
816 queryTimeoutCheckEntry(i, time);
817 i++;
818 }
819 }
820 }
821
822 /**
823 * Process the query entry for tab "tab"
824 * @param tab
825 * The tab to be processed.
826 * @param time
827 * The current time.
828 */
829 protected abstract void queryTimeoutCheckEntry(int tab, long time);
830
831 /**
832 * Prerequisites: Each PipeHandler contains information of the latest
833 * modification of each of its served boxes. The sync method has to
834 * accomplish several things: 1) Compare the latest updates performed by
835 * other pipes and if there is someone with a more recent update, ask for
836 * the data 2) Decide if we shall serve other boxes depending on the desired
837 * amount of boxes to serve and the number of boxes currently served by the
838 * peers that surround us.
839 * @throws IOException
840 * If some network error occurs.
841 * @throws DatabaseException
842 * If something goes wrong with the DB
843 * @throws OBException
844 * User generated exception
845 */
846 private void sync() throws OBException, DatabaseException, IOException {
847 logger.debug("sync");
848 int i = 0;
849 while (i < boxesCount) {
850 int box = ourBoxes[i];
851 if (needSyncInBox(box)) {
852 // start syncing from this box:
853 PipeHandler ph = mostRencentPipeHandlerPerBox(i);
854 // we should send the lastestModification - 1
855 // millisecond to be safe
856 ph.sendRequestSyncMessage(i,
857 getIndex().latestModification(box) - 1);
858 // do this only for one box.
859 break;
860 }
861 i++;
862 }
863
864 }
865
866 /**
867 * Prints some information of the peer.
868 * @throws DatabaseException
869 * If something goes wrong with the DB
870 * @throws OBException
871 * User generated exception
872 */
873 private void info() throws OBException, DatabaseException {
874 int[] servicedBoxes = servicedBoxes();
875 if (servicedBoxes != null) {
876 int[] boxCount = new int[servicedBoxes.length];
877 // long[] times = new long[servicedBoxes.length];
878 int i = 0;
879 while (i < boxCount.length) {
880 boxCount[i] = getIndex().elementsPerBox(i);
881
882 /*
883 * if(logger.isDebugEnabled()){ times[i] =
884 * getIndex().latestModification(i); }
885 */
886
887 i++;
888 }
889 logger.info("Heart: Connected Peers: " + clients.size() + " B: "
890 + Arrays.toString(ourBoxes) + ", boxes: "
891 + Arrays.toString(boxCount));
892 /*
893 * if(logger.isDebugEnabled()){ logger.debug("Latest modifications:" +
894 * Arrays.toString(times)); }
895 */
896 } else {
897 logger.info("Heart: Connected Peers: " + clients.size()
898 + " no boxes being served. ");
899 }
900 }
901
902 /**
903 * Make sure the sync process is alive. If it is not, retry the sync.
904 * @throws IOException
905 * If some network error occurs.
906 * @throws DatabaseException
907 * If something goes wrong with the DB
908 * @throws OBException
909 * User generated exception
910 */
911 private void syncAlive() throws IOException, DatabaseException, OBException {
912 if (syncing.get()
913 && (System.currentTimeMillis() - this.syncingBoxLastRequestTime
914 .get()) > globalTimeout) {
915 if (syncingPipe != null) {
916 logger.debug("Re-syncing after timeout");
917 this.syncingPipe.sendReSyncMessage();
918 }
919
920 }
921 }
922
923 /**
924 * Receives the # of elements per box, and: If we are not serving any box,
925 * it finds this.minBoxesToServe boxes. It will select boxes whose count is
926 * the least. If we are serving boxes, and we are serving a box that is
927 * exceedingly being served, then
928 * @param boxesToServe #
929 * of boxes that will be served
930 * @param index
931 * The index used to find out the total amount of boxes available
932 * Changes the internal ourBoxes array with the boxes that will
933 * be served by this index
934 */
935 private void decideServicedBoxes(final int boxesToServe,
936 final SynchronizableIndex < O > index) {
937 int[] sb = this.servicedBoxes();
938 Random r = new Random(System.currentTimeMillis());
939 if (sb == null) {
940 sb = new int[boxesToServe];
941 // select random boxes and make sure no
942 // repeated boxes are selected
943 int i = 0;
944 while (i < sb.length) {
945 int nbox = r.nextInt(index.totalBoxes());
946 while (inArray(nbox, i, sb)) {
947 // generate a new random until the value
948 // is not in the array
949 nbox = r.nextInt(index.totalBoxes());
950 }
951 sb[i] = nbox;
952 i++;
953 }
954 Arrays.sort(sb);
955 }
956 this.ourBoxes = sb;
957 }
958
959 /**
960 * Returns true if x is found in arr in the interval [0,i[.
961 * @param x
962 * the x to search
963 * @param i
964 * the maximum bound
965 * @param arr
966 * the array
967 * @return True if x is found in the interval [0,i[
968 */
969 private boolean inArray(final int x, final int i, final int[] arr) {
970 int cx = 0;
971 while (cx < i) {
972 if (arr[cx] == x) {
973 return true;
974 }
975 cx++;
976 }
977 return false;
978 }
979
980 /**
981 * For each pipe in pipes, send a time message.
982 * @throws IOException
983 * If some network error occurs.
984 */
985 private void timeBeat() throws IOException {
986 synchronized (clients) {
987 Iterator < PipeHandler > it = this.clients.values().iterator();
988 while (it.hasNext()) {
989 PipeHandler u = it.next();
990 u.sendTimeMessage();
991 }
992 }
993 }
994
995 /**
996 * Sends box information to all the peers if box information has been
997 * changed.
998 * @throws IOException
999 * If some network error occurs.
1000 * @throws DatabaseException
1001 * If something goes wrong with the DB
1002 * @throws OBException
1003 * User generated exception
1004 */
1005 private void sendBoxInfo() throws IOException, DatabaseException,
1006 OBException {
1007
1008 synchronized (clients) {
1009 Iterator < PipeHandler > it = clients.values().iterator();
1010 while (it.hasNext()) {
1011 PipeHandler u = it.next();
1012 u.sendBoxMessage();
1013 }
1014 }
1015
1016 }
1017
1018 /**
1019 * @return true if we are connected to the minimum number of peers.
1020 */
1021 protected final boolean minimumNumberOfPeers() {
1022 return this.clients.size() >= AbstractP2PIndex.minNumberOfPeers;
1023 }
1024
1025 /**
1026 * @return The number of peers connected to this peer.
1027 */
1028 public final int getNumberOfPeers() {
1029 return this.clients.size();
1030 }
1031
1032 /**
1033 * Returns true if all the peers have data that is synchronized to the same
1034 * timestamp. This should not be used normally but it is useful for testing
1035 * purposes.
1036 * @return true if all the peers are in sync with me. *
1037 * @throws DatabaseException
1038 * If something goes wrong with the DB
1039 * @throws OBException
1040 * User generated exception
1041 */
1042 // TODO: Improve this so that boxes are only sent when we modify one
1043 // of our boxes
1044 public final boolean areAllPeersSynchronizedWithMe() throws OBException,
1045 DatabaseException {
1046 // browse each box of the
1047 int i = 0;
1048 long[] boxes = new long[boxesCount];
1049 while (i < boxesCount) {
1050 long box = getIndex().latestModification(i);
1051 if (box != -1) {
1052 if (boxes[i] == 0) {
1053 boxes[i] = box;
1054 } else if (boxes[i] != box) {
1055 return false;
1056 }
1057 }
1058 // get the boxes from
1059 List < PipeHandler > b = handlersPerBox.get(i);
1060 Iterator < PipeHandler > it = b.iterator();
1061 while (it.hasNext()) {
1062 PipeHandler p = it.next();
1063 if (p.isServing(i)) {
1064 box = p.lastUpdated(i);
1065 if (boxes[i] == 0) {
1066 boxes[i] = box;
1067 } else if (boxes[i] != box) {
1068 return false;
1069 }
1070 // }
1071 }
1072 }
1073 i++;
1074 }
1075 return true;
1076 }
1077
1078 /**
1079 * Check if we have peers who serve at least one box per box #.
1080 * @return true if the above condition applies
1081 */
1082 public final boolean areAllBoxesAvailable() {
1083
1084 Iterator < List < PipeHandler >> it = handlersPerBox.iterator();
1085 while (it.hasNext()) {
1086 List < PipeHandler > l = it.next();
1087 if (l.size() == 0) {
1088 return false;
1089 }
1090 }
1091 return true;
1092 }
1093
1094 /**
1095 * This method must be called by all users once It starts the network, and
1096 * creates some background threads like the hearbeat and the incoming
1097 * connection handler.
1098 * @param client
1099 * If true, the index will be created in client mode (from the
1100 * p2p network perspective) If false, the index will be a
1101 * "server".
1102 * @param clearPeerCache
1103 * If we should clear network related cache information
1104 * @param seedFile
1105 * The seed file to be used for this index. Only the given seeds
1106 * will be used
1107 * @throws IOException
1108 * If some network error occurs.
1109 * @throws PeerGroupException
1110 * If some network error occurs.
1111 */
1112 public final void open(final boolean client, final boolean clearPeerCache,
1113 final File seedFile) throws IOException, PeerGroupException {
1114 this.isClient = client;
1115 NetworkManager.ConfigMode c = null;
1116 if (!seedFile.exists()) {
1117 throw new IOException("File does not exist: " + seedFile);
1118 }
1119 if (client) {
1120 c = NetworkManager.ConfigMode.RELAY;
1121 } else {
1122 c = NetworkManager.ConfigMode.RENDEZVOUS_RELAY;
1123 }
1124 URI seedURI = seedFile.toURI();
1125 // initialize JXTA
1126 init(client, c, clearPeerCache, seedURI);
1127
1128 logger.debug("Starting heart...");
1129 Thread thread = new Thread(new HeartBeat(), "Heart Beat Thread");
1130 thread.start();
1131 logger.debug("Starting incoming connections server...");
1132 Thread thread2 = new Thread(new IncomingConnectionHandler(),
1133 "Incoming connection Thread");
1134 thread2.start();
1135 }
1136
1137 /**
1138 * Generates a pipe advertisement.
1139 * @return A pipe advertisement.
1140 */
1141 private PipeAdvertisement getPipeAdvertisement() {
1142 PipeID pipeID = (PipeID) ID.create(generatePipeID());
1143
1144 PipeAdvertisement advertisement = (PipeAdvertisement) AdvertisementFactory
1145 .newAdvertisement(PipeAdvertisement.getAdvertisementType());
1146
1147 advertisement.setPipeID(pipeID);
1148 advertisement.setType(PipeService.UnicastType);
1149 advertisement.setName(pipeName);
1150 return advertisement;
1151 }
1152
1153 /**
1154 * Returns the pipe ID in URI form.
1155 * @return an URI representing the pipe id.
1156 */
1157 private URI generatePipeID() {
1158 return IDFactory.newPipeID(netPeerGroup.getPeerGroupID()).toURI();
1159 }
1160
1161 /**
1162 * Finds the pipe for the given peer. Gets advertisements of pipes for the
1163 * given peer.n
1164 * @param peer
1165 * The peer we are going to search.
1166 * @throws IOException
1167 * If some network error occurs.
1168 * @throws PeerGroupException
1169 * If some network error occurs.
1170 */
1171 protected final void findPipePeer(final URI peer) throws IOException,
1172 PeerGroupException {
1173 if (!minimumNumberOfPeers()) {
1174 discovery.getRemoteAdvertisements(peer.toString(),
1175 DiscoveryService.ADV, "Name", pipeName, 1, null);
1176
1177 }
1178 }
1179
1180 /**
1181 * Query the discovery service for OBSearch pipes.
1182 * @throws IOException
1183 * If some network error occurs.
1184 * @throws PeerGroupException
1185 * If some network error occurs.
1186 */
1187 protected final void findPipes() throws IOException, PeerGroupException {
1188 if (isClient) {
1189 discovery.getRemoteAdvertisements(null, DiscoveryService.PEER,
1190 null, null, minNumberOfPeers, null);
1191 }
1192
1193 }
1194
1195 /**
1196 * Check if all the boxes are being supplied.
1197 * @return true if all the boxes are represented by a peer.
1198 */
1199 protected final boolean totalBoxesCovered() {
1200 if (handlersPerBox == null) {
1201 return false;
1202 }
1203 int i = 0;
1204 int max = getIndex().totalBoxes();
1205
1206 while (i < max) {
1207 if (this.handlersPerBox.get(i).size() == 0) {
1208 return false;
1209 }
1210 i++;
1211 }
1212 return true;
1213 }
1214
1215 /**
1216 * Method called by the discovery server.
1217 * @param ev
1218 * The event that the discovery server found.
1219 */
1220 public final void discoveryEvent(final DiscoveryEvent ev) {
1221
1222 try {
1223 DiscoveryResponseMsg res = ev.getResponse();
1224
1225 Advertisement adv;
1226 Enumeration en = res.getAdvertisements();
1227 // logger.debug("Discovery event" + ev);
1228 if (en != null) {
1229 while (en.hasMoreElements()) {
1230 adv = (Advertisement) en.nextElement();
1231 // logger.info("Discovery event: " + adv);
1232 if (adv instanceof PipeAdvertisement) {
1233
1234 synchronized (clients) {
1235 // add the pipe only if its peer hasn't been
1236 // added
1237 // this is a hack in order to prevent additions
1238 // of
1239 // pipes whose
1240 // TODO: if there is some inconsistency with the
1241 // string generated by EndpointAddress,
1242 // this hack won't work
1243 String hack = "urn:"
1244 + ((EndpointAddress) ev.getSource())
1245 .toURI().toString().replaceAll("/",
1246 "");
1247 URI u = new URI(hack);
1248 // avoid connecting to ourselves and connecting
1249 // to
1250 // peers we have already
1251 if (!u.toString().equals(
1252 this.netPeerGroup.getPeerAdvertisement()
1253 .getPeerID().toURI().toString())
1254 && !isConnectedToPeer(u.toString())) {
1255 logger.info("Discovered new peer: " + u);
1256 PipeAdvertisement p = (PipeAdvertisement) adv;
1257 addPipe(u, p);
1258 }
1259 }
1260 } else if (adv instanceof PeerAdvertisement) {
1261 // if we get a peer advertisement, then if the peer is
1262 // not yet connected to us
1263 // we can search for pipes.
1264 synchronized (clients) {
1265 PeerAdvertisement p = (PeerAdvertisement) adv;
1266 if (!isConnectedToPeer(p.getPeerID().toURI()
1267 .toString())) {
1268 // find pipes
1269 findPipePeer(p.getPeerID().toURI());
1270 }
1271 }
1272 }
1273 }
1274 }
1275 } catch (Exception e) {
1276 logger.fatal("Exception", e);
1277 assert false;
1278 }
1279 }
1280
1281 /**
1282 * Returns true if the given peer id is connected to us.
1283 * @param id
1284 * The id of the peer we are looking for
1285 * @return True if the given id is connected to us.
1286 */
1287 private boolean isConnectedToPeer(final String id) {
1288 return clients.containsKey(id);
1