View Javadoc

1   package org.ajmm.obsearch.index;
2   
3   import java.io.File;
4   import java.io.IOException;
5   import java.net.URI;
6   import java.util.ArrayList;
7   import java.util.Arrays;
8   import java.util.Collections;
9   import java.util.Enumeration;
10  import java.util.HashSet;
11  import java.util.Iterator;
12  import java.util.LinkedList;
13  import java.util.List;
14  import java.util.Random;
15  import java.util.concurrent.ArrayBlockingQueue;
16  import java.util.concurrent.BlockingQueue;
17  import java.util.concurrent.ConcurrentHashMap;
18  import java.util.concurrent.ConcurrentMap;
19  import java.util.concurrent.Semaphore;
20  import java.util.concurrent.atomic.AtomicBoolean;
21  import java.util.concurrent.atomic.AtomicIntegerArray;
22  import java.util.concurrent.atomic.AtomicLong;
23  import java.util.concurrent.atomic.AtomicLongArray;
24  
25  import net.jxta.discovery.DiscoveryEvent;
26  import net.jxta.discovery.DiscoveryListener;
27  import net.jxta.discovery.DiscoveryService;
28  import net.jxta.document.Advertisement;
29  import net.jxta.document.AdvertisementFactory;
30  import net.jxta.document.MimeMediaType;
31  import net.jxta.endpoint.ByteArrayMessageElement;
32  import net.jxta.endpoint.EndpointAddress;
33  import net.jxta.endpoint.Message;
34  import net.jxta.endpoint.Message.ElementIterator;
35  import net.jxta.exception.PeerGroupException;
36  import net.jxta.id.ID;
37  import net.jxta.id.IDFactory;
38  import net.jxta.peergroup.PeerGroup;
39  import net.jxta.pipe.PipeID;
40  import net.jxta.pipe.PipeMsgEvent;
41  import net.jxta.pipe.PipeMsgListener;
42  import net.jxta.pipe.PipeService;
43  import net.jxta.platform.NetworkConfigurator;
44  import net.jxta.platform.NetworkManager;
45  import net.jxta.protocol.DiscoveryResponseMsg;
46  import net.jxta.protocol.PeerAdvertisement;
47  import net.jxta.protocol.PipeAdvertisement;
48  import net.jxta.util.JxtaBiDiPipe;
49  import net.jxta.util.JxtaServerPipe;
50  
51  import org.ajmm.obsearch.AsynchronousIndex;
52  import org.ajmm.obsearch.Index;
53  import org.ajmm.obsearch.OB;
54  import org.ajmm.obsearch.Result;
55  import org.ajmm.obsearch.SynchronizableIndex;
56  import org.ajmm.obsearch.TimeStampResult;
57  import org.ajmm.obsearch.exception.AlreadyFrozenException;
58  import org.ajmm.obsearch.exception.BoxNotAvailableException;
59  import org.ajmm.obsearch.exception.IllegalIdException;
60  import org.ajmm.obsearch.exception.NotFrozenException;
61  import org.ajmm.obsearch.exception.OBException;
62  import org.ajmm.obsearch.exception.OutOfRangeException;
63  import org.ajmm.obsearch.exception.UndefinedPivotsException;
64  import org.ajmm.obsearch.index.AbstractSynchronizableIndex.TimeStampIterator;
65  import org.ajmm.obsearch.index.utils.Directory;
66  import org.apache.log4j.Logger;
67  
68  import com.sleepycat.bind.tuple.TupleInput;
69  import com.sleepycat.bind.tuple.TupleOutput;
70  import com.sleepycat.je.DatabaseException;
71  
72  /*
73   OBSearch: a distributed similarity search engine
74   This project is to similarity search what 'bit-torrent' is to downloads.
75   Copyright (C)  2007 Arnoldo Jose Muller Molina
76  
77   This program is free software: you can redistribute it and/or modify
78   it under the terms of the GNU General Public License as published by
79   the Free Software Foundation, either version 3 of the License, or
80   (at your option) any later version.
81  
82   This program is distributed in the hope that it will be useful,
83   but WITHOUT ANY WARRANTY; without even the implied warranty of
84   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
85   GNU General Public License for more details.
86  
87   You should have received a copy of the GNU General Public License
88   along with this program.  If not, see <http://www.gnu.org/licenses/>.
89   */
90  /**
91   * AbstractP2PIndex holds common functionality of indexes that span several
92   * computers. The current implementation uses the JXTA library.
93   * @param <O>
94   *            The type of object to be stored in the Index.
95   * @author Arnoldo Jose Muller Molina
96   * @since 0.7
97   */
98  
99  public abstract class AbstractP2PIndex < O extends OB > implements Index < O >,
100         DiscoveryListener, AsynchronousIndex {
101 
102     /**
103      * Lists all the message types available in the network.
104      */
105     public static enum MessageType {
106         /** time. */
107         TIME,
108         /** box information (for sync and box selection purposes). */
109         BOX,
110         /** synchronize box (request for synchronization). */
111         SYNCBOX,
112         /** synchronize continue (asks for more data). */
113         SYNCC,
114         /**
115          * synchronize retry (asks for the last packet in the event of a
116          * timeout).
117          */
118         SYNCR,
119         /** synchronize end (ends the synchronization for one box). */
120         SYNCE,
121         /** local index data. */
122         INDEX,
123         /** insert object (after a SYNCBOX). */
124         INSOB,
125         /** data sync query. */
126         DSYNQ,
127         /** data sync reply. */
128         DSYNR,
129         /** search query. */
130         SQ,
131         /** search result. */
132         SR
133 
134     };
135 
136     /**
137      * Lists all the message element types available (these elements are
138      * components found in MessageType packets).
139      */
140     public static enum MessageElementType {
141         /** Search header. */
142         SH,
143         /** Search object. */
144         SO,
145         /** Search sub result. */
146         SSR
147 
148     };
149 
150     /**
151      * Class logger.
152      */
153     private final transient  Logger logger;
154 
155     /**
156      * Messages bigger than this one have problems.
157      */
158     private static final int messageSize = 40 * 1024;
159 
160     /**
161      * The string that holds the original index xml.
162      */
163     protected String indexXml;
164 
165     /**
166      * Min number of pivots to have at any time. For controlling purposes the
167      * necessary minimum number of peers to allow matching might be bigger than
168      * this.
169      */
170     public static final int minNumberOfPeers = 2;
171 
172     // JXTA variables
173     /**
174      * JXTA network manager.
175      */
176     private transient NetworkManager manager;
177 
178     /**
179      * JXTA discovery service.
180      */
181     private transient DiscoveryService discovery;
182 
183     /**
184      * JXTA network client name.
185      */
186     private String clientName;
187 
188     /**
189      * JXTA peer group.
190      */
191     private PeerGroup netPeerGroup;
192 
193     /**
194      * OBSearch's pipe name.
195      */
196     private static final String pipeName = "OBSearchPipe";
197 
198     /**
199      * Maximum number of peers to use.
200      */
201     private static final  int maxNumberOfPeers = 100;
202 
203     /**
204      * Interval for each heartbeat (in miliseconds) heartbeats check for missing
205      * resources and make sure we are all well connected all the time.
206      */
207     private static final  int heartBeatInterval = 10000;
208 
209     /**
210      * General timeout used for most p2p operations.
211      */
212     private static final  int globalTimeout = 30 * 1000;
213 
214     /**
215      * Maximum time difference between the peers. peers that have bigger time
216      * differences will be dropped.
217      */
218     private static final  int maxTimeDifference = 3600000;
219 
220     /**
221      * Maximum number of objects to query at the same time.
222      * (60)
223      */
224     protected static final int maximumItemsToProcess = 15;
225 
226     /**
227      * Maximum time to wait for a query to be answered.
228      */
229     protected static final int queryTimeout = 30000;
230 
231     /**
232      * Internally gives ids that can be used to wait for the
233      * processing of a query.
234      */
235     protected BlockingQueue < Integer > takeATab;
236 
237     /**
238      * Maximum number of objects to match at the same time. This should be close
239      * to the amount of CPUs available. Currently not being used.
240      */
241     protected int maximumServerThreads;
242 
243     /**
244      * Object in charge of accepting new connections.
245      */
246     private JxtaServerPipe serverPipe;
247 
248     /**
249      * Contains a pipe per client that have tried to connect to us or that We
250      * have tried to connect to the key is a peer id and not a pipe!
251      */
252     private ConcurrentMap < String, PipeHandler > clients;
253 
254     /**
255      * Time when the index was created.
256      */
257     protected long indexTime;
258 
259     /**
260      * Location of this DB. Basically a place were JXTA info is stored.
261      */
262     private File dbPath;
263 
264     /**
265      * Object used as a monitor. (hack :))
266      */
267     private final String timer = "time";
268 
269     /**
270      * Pipe advertisement. to be re-published every once in a while
271      */
272     private PipeAdvertisement pipeAdv;
273 
274     /**
275      * Peer advertisement. to be re-published every once in a while
276      */
277     private PeerAdvertisement peerAdv;
278 
279     /**
280      * Lifetime and expiration for advertisements.
281      */
282     long lifetime = 60 * 2 * 1000L;
283 
284     /**
285      * Lifetime and expiration for advertisements.
286      */
287     long expiration = 60 * 2 * 1000L;
288 
289     /**
290      * The boxes that we are currently serving.
291      */
292     private int[] ourBoxes;
293 
294     /**
295      * Total number of boxes that will be supported.
296      */
297     private int boxesCount;
298 
299     /**
300      * Each entry has a List. An entry in the array equals to a box #. Every
301      * List holds handlers that hold the box in which they are indexed each
302      * handler is responsible of registering and unregistering
303      */
304     private List < List < PipeHandler > > handlersPerBox;
305 
306     /**
307      * keeps track of the latest handler that was used for each box. at search
308      * time this values are used to distribute the queries "evenly"
309      */
310     private AtomicIntegerArray handlerSearchIndexes;
311 
312     /**
313      * Set to true when some data is inserted or deleted or when a new peer
314      * comes in set to false when we publish all the peers our information or
315      * when a new peer comes in.
316      */
317     private AtomicBoolean boxesUpdated;
318 
319     /**
320      * If this peer is in client or server mode. If it is in client mode, Peers
321      * actively search for other peers. Otherwise the peer just waits for
322      * connections. The JXTA library also is configured differently for servers
323      * and for clients.
324      */
325     private boolean isClient = true;
326 
327     /**
328      * A unique identifier of requests.
329      */
330     private AtomicLong requestId;
331 
332     /**
333      * Maximum number of threads to be used in the search.
334      */
335     private Semaphore searchThreads;
336 
337     /**
338      * It is set to true when data is inserted or deleted into this index. once
339      * the available boxes are published, this variable returns to false.
340      */
341     private AtomicBoolean modifiedData;
342 
343     /**
344      * Tells if we are syncing or not. Used to sync only box at a time.
345      */
346     private AtomicBoolean syncing;
347 
348     /**
349      * The peer that we are currently syincing.
350      */
351     private PipeHandler syncingPipe;
352 
353     /**
354      * The last request time for a box.
355      */
356     private AtomicLong syncingBoxLastRequestTime;
357 
358     /**
359      * Initialize the abstract class
360      * @param index
361      *            the index that will be distributed
362      * @param dbPath
363      *            the path where we will store information related to this index
364      * @param clientName
365      *            The name of this peer.
366      * @param boxesToServe
367      *            The number of boxes that will be served by this index
368      * @param maximumServerThreads
369      *            Max number of threads to support. (currently it has no effect)
370      * @throws IOException
371      *             If a serialization exception occurs.
372      * @throws PeerGroupException
373      *             If a JXTA exception occurs.
374      * @throws NotFrozenException
375      *             If index is not frozen.
376      * @throws DatabaseException
377      *             If something goes wrong with the DB
378      * @throws OBException
379      *             User generated exception
380      */
381     protected AbstractP2PIndex(final SynchronizableIndex < O > index,
382             final File dbPath, final String clientName, final int boxesToServe,
383             final int maximumServerThreads) throws IOException,
384             PeerGroupException, NotFrozenException, DatabaseException,
385             OBException {
386         if (!index.isFrozen()) {
387             throw new NotFrozenException();
388         }
389         if (!dbPath.exists()) {
390             throw new IOException(dbPath + " does not exit");
391         }
392         this.maximumServerThreads = maximumServerThreads;
393         clients = new ConcurrentHashMap < String, PipeHandler >();
394         boxesCount = index.totalBoxes();
395         this.dbPath = dbPath;
396         this.clientName = clientName;
397         this.boxesUpdated = new AtomicBoolean(false);
398         logger = Logger.getLogger(clientName);
399 
400         handlersPerBox = Collections
401                 .synchronizedList(new ArrayList < List < PipeHandler >>(index
402                         .totalBoxes()));
403         initHandlersPerBox(handlersPerBox);
404 
405         // initialize the boxes this index is supporting if the given index
406         // has the corresponding data.
407         if (index.databaseSize() != 0) { // if the database has some
408             // data, we serve the data
409             // of the db
410             int i = 0;
411             List < Integer > boxes = new ArrayList < Integer >(index
412                     .totalBoxes());
413             while (i < index.totalBoxes()) {
414                 if (index.latestModification(i) != -1) {
415                     boxes.add(i);
416                 }
417                 i++;
418             }
419             this.ourBoxes = new int[boxes.size()];
420             i = 0;
421             while (i < boxes.size()) {
422                 ourBoxes[i] = boxes.get(i);
423                 i++;
424             }
425         } else {
426             decideServicedBoxes(boxesToServe, index);
427         }
428 
429         takeATab = new ArrayBlockingQueue < Integer >(maximumItemsToProcess);
430         int i = 0;
431         while (i < maximumItemsToProcess) {
432             takeATab.add(i);
433             i++;
434         }
435 
436         handlerSearchIndexes = new AtomicIntegerArray(boxesCount);
437 
438         requestId = new AtomicLong(0);
439 
440         searchThreads = new Semaphore(maximumServerThreads, true);
441 
442         this.modifiedData = new AtomicBoolean(false);
443 
444         syncing = new AtomicBoolean(false);
445         syncingPipe = null;
446         syncingBoxLastRequestTime = new AtomicLong(-1);
447     }
448 
449     /**
450      * @return The underlying index
451      */
452     protected abstract SynchronizableIndex < O > getIndex();
453 
454     /**
455      * Makes sure that all the components of the given list of lists are
456      * synchronized.
457      * @param x
458      *            A list of lists of pipe handlers.
459      */
460     private void initHandlersPerBox(final List < List < PipeHandler > > x) {
461         int i = 0;
462         while (i < boxesCount) {
463             x.add(Collections.synchronizedList(new ArrayList < PipeHandler >(
464                     maxNumberOfPeers)));
465             i++;
466         }
467     }
468 
469     /**
470      * Returns true if any box == 0. This means we need sync But only if we are
471      * connected to providers of every box.
472      * @return True if we have to sync.
473      * @throws DatabaseException
474      *             If something goes wrong with the DB
475      * @throws OBException
476      *             User generated exception
477      */
478     private boolean needSync() throws OBException, DatabaseException {
479         if (syncing.get()) {
480             // we won't sync again if we are syncing already
481             // logger.debug("no need to sync because we are syncing");
482             return false;
483         }
484         if (ourBoxes == null) {
485             return false;
486         }
487 
488         int i = 0;
489         while (i < ourBoxes.length) {
490             int box = ourBoxes[i];
491             if (needSyncInBox(box)) {
492                 // logger.debug("box " + box + " should be synced");
493                 return true;
494             }
495             i++;
496         }
497         return false;
498     }
499 
500     /**
501      * @param i
502      *            (box #)
503      * @return true if I have to sync box i or false otherwise
504      * @throws DatabaseException
505      *             If something goes wrong with the DB
506      * @throws OBException
507      *             User generated exception
508      */
509     private boolean needSyncInBox(final int i) throws OBException,
510             DatabaseException {
511         long boxTime = getIndex().latestModification(i);
512 
513         PipeHandler mostRecent = mostRencentPipeHandlerPerBox(i);
514         if (mostRecent == null) {
515             return false; // we should wait until we get peers that serve
516             // this box
517         }
518         assert mostRecent.isServing(i);
519         long mr = mostRecent.lastUpdated(i);
520         // logger.debug(" Most recent box " + mr + " box Time " + boxTime);
521 
522         return mr > boxTime;
523     }
524 
525     /**
526      * Returns the pipe handler whose ith box has the most recent modification
527      * time.
528      * @param i
529      *            Box to search
530      * @return most recent pipe handler
531      */
532     private PipeHandler mostRencentPipeHandlerPerBox(final int i) {
533         List < PipeHandler > boxList = handlersPerBox.get(i);
534         PipeHandler ph = null;
535         synchronized (boxList) {
536             long time = -1;
537 
538             Iterator < PipeHandler > it = boxList.iterator();
539             while (it.hasNext()) {
540                 PipeHandler p = it.next();
541                 assert p.isServing(i);
542                 if (time < p.lastUpdated(i)) {
543                     time = p.lastUpdated(i);
544                     ph = p;
545                 }
546             }
547         }
548         return ph;
549     }
550 
551     /**
552      * Initializes the p2p network.
553      * @param isClient
554      *            If this index is a client peer.
555      * @param c
556      *            Configuration mode
557      * @param clearCache
558      *            If the cache has to be cleared
559      * @param seedURI
560      *            The url of the seed file.
561      * @throws IOException
562      *             If some network error occurs.
563      * @throws PeerGroupException
564      *             If some JXTA error occurs.
565      */
566     private void init(final boolean isClient,
567             final NetworkManager.ConfigMode c, final boolean clearCache,
568             final URI seedURI) throws IOException, PeerGroupException {
569 
570         File cache = new File(new File(dbPath, ".cache"), clientName);
571         if (clearCache) {
572             Directory.deleteDirectory(cache);
573         }
574         manager = new NetworkManager(c, clientName, cache.toURI());
575         NetworkConfigurator configurator = manager.getConfigurator();
576 
577         // clear the seeds
578         configurator.setRendezvousSeedURIs(new LinkedList < String >());
579         configurator.setRelaySeedURIs(new LinkedList < String >());
580         configurator.setRelaySeedingURIs(new HashSet < String >());
581         // end of clear the seeds
582 
583         configurator.addRdvSeedingURI(seedURI);
584         configurator.addRelaySeedingURI(seedURI);
585         configurator.setUseOnlyRelaySeeds(true);
586         configurator.setUseOnlyRendezvousSeeds(true);
587         configurator.setHttpEnabled(false);
588         configurator.setTcpIncoming(true);
589         configurator.setTcpEnabled(true);
590         configurator.setTcpOutgoing(true);
591         configurator.setUseMulticast(false);
592         manager.startNetwork();
593         // Get the NetPeerGroup
594         netPeerGroup = manager.getNetPeerGroup();
595 
596         // get the discovery service
597         discovery = netPeerGroup.getDiscoveryService();
598         discovery.addDiscoveryListener(this);
599         pipeAdv = getPipeAdvertisement();
600         // init the incoming connection listener
601         serverPipe = new JxtaServerPipe(netPeerGroup, pipeAdv);
602         serverPipe.setPipeTimeout(0);
603 
604         peerAdv = netPeerGroup.getPeerAdvertisement();
605 
606         // wait for rendevouz connection
607         if (isClient) {
608             logger.debug("Waiting for rendevouz connection");
609             manager.waitForRendezvousConnection(0);
610             logger.debug("Rendevouz connection found");
611         }
612         if (logger.isDebugEnabled()) {
613             logger.debug("Peer id: "
614                     + netPeerGroup.getPeerAdvertisement().getPeerID().toURI());
615         }
616 
617         // publishAdvertisements();
618 
619         assert netPeerGroup.getPeerAdvertisement().equals(
620                 netPeerGroup.getPeerAdvertisement());
621     }
622 
623     /**
624      * Publish advertisements
625      * @throws IOException
626      *             if JXTA signals an error.
627      */
628     private void publishAdvertisements() throws IOException {
629         discovery.publish(peerAdv, lifetime, expiration);
630         discovery.remotePublish(peerAdv, expiration);
631         discovery.publish(pipeAdv, expiration, expiration);
632         discovery.remotePublish(pipeAdv, expiration);
633     }
634 
635     /**
636      * This class performs a heartbeat. It makes sure that all the resources we
637      * need to properly work.
638      */
639     private class HeartBeat implements Runnable {
640 
641         /**
642          * If we caught an error.
643          */
644         private boolean error = false;
645 
646         /**
647          * This method starts network connections and calls heartbeat
648          * undefinitely until the program stops.
649          */
650         public void run() {
651             long count = 0;
652             while (!error) {
653 
654                 try {
655                     heartBeat1();
656                     heartBeat3(count);
657                     heartBeat6(count);
658                     heartBeat10(count);
659                     heartBeat100(count);
660                     synchronized (timer) {
661                         timer.wait(heartBeatInterval);
662                     }
663                 } catch (InterruptedException i) {
664                     if (logger.isDebugEnabled()) {
665                         logger.debug("HeartBeat interrupted");
666                     }
667                 } catch (IOException e) {
668                     // a pipe gave some error, this is expected
669                 } catch (Exception e) {
670                     error = true;
671                     logger.fatal("Exception in heartBeat", e);
672                     assert false;
673                 }
674                 count++;
675             }
676             if (error) {
677                 logger.fatal("Stopping heartbeat because of error");
678                 assert false;
679             }
680         }
681 
682         /**
683          * Executed once per heart beat.
684          * @throws IOException
685          *             If some network error occurs.
686          * @throws PeerGroupException
687          *             If some JXTA error occurs.
688          * @throws DatabaseException
689          *             If something goes wrong with the DB
690          * @throws OBException
691          *             User generated exception
692          */
693         public void heartBeat1() throws PeerGroupException, IOException,
694                 OBException, DatabaseException {
695             if (needSync()) {
696                 // lower
697                 sync();
698             }
699 
700         }
701 
702         /**
703          * Executed every 10 heart beats.
704          * @param count
705          *            The current heart beat count.
706          * @throws IOException
707          *             If some network error occurs.
708          * @throws PeerGroupException
709          *             If some JXTA error occurs.
710          * @throws DatabaseException
711          *             If something goes wrong with the DB
712          * @throws OBException
713          *             User generated exception
714          */
715         public void heartBeat10(final long count) throws DatabaseException,
716                 IOException, OBException, PeerGroupException {
717             if (count % 10 == 0) {
718                 // find pipes if not enough peers are available
719                 // or if not all the boxes have been covered
720                 if (!minimumNumberOfPeers() || !totalBoxesCovered()) {
721                     // logger.debug("Finding pipes!");
722                     findPipes();
723                 }
724                 // check timeouts
725                 //
726             }
727         }
728 
729         /**
730          * Executed every 6 heart beats.
731          * @param count
732          *            The current heart beat count.
733          * @throws IOException
734          *             If some network error occurs.
735          * @throws PeerGroupException
736          *             If some JXTA error occurs.
737          * @throws DatabaseException
738          *             If something goes wrong with the DB
739          * @throws OBException
740          *             User generated exception
741          */
742         public void heartBeat6(final long count) throws PeerGroupException,
743                 IOException, OBException, DatabaseException {
744             if (count % 6 == 0) {
745                
746             }
747         }
748 
749         /**
750          * Executed every 3 heart beats.
751          * @param count
752          *            The current heart beat count.
753          * @throws IOException
754          *             If some network error occurs.
755          * @throws PeerGroupException
756          *             If some JXTA error occurs.
757          * @throws DatabaseException
758          *             If something goes wrong with the DB
759          * @throws OBException
760          *             User generated exception
761          */
762         public void heartBeat3(final long count) throws PeerGroupException,
763                 IOException, OBException, DatabaseException {
764             if (count % 3 == 0) {
765 
766                 sendBoxInfo();
767                 info();
768 
769                 // check for timeouts in the sync process
770                 syncAlive();
771                 publishAdvertisements();
772                 queryTimeoutCheck();
773             }
774         }
775 
776         /**
777          * Executed every 100 heart beats.
778          * @param count
779          *            The current heart beat count.
780          * @throws IOException
781          *             If some network error occurs.
782          * @throws PeerGroupException
783          *             If some JXTA error occurs.
784          * @throws DatabaseException
785          *             If something goes wrong with the DB
786          * @throws OBException
787          *             User generated exception
788          */
789         public void heartBeat100(final long count) throws PeerGroupException,
790                 IOException, OBException, DatabaseException {
791             if (count % 100 == 0) {
792                 // advertisements should be proactively searched for if we are
793                 // running out
794                 // of connections
795                 // sync();
796                 timeBeat();
797             }
798         }
799     }
800 
801     /**
802      * @return true if the index is still processing query results
803      */
804     public final boolean isProcessingQueries() {
805         return takeATab.size() != maximumItemsToProcess;
806     }
807 
808     /**
809      * Monitors all the queries being executed.
810      */
811     protected final void queryTimeoutCheck() {
812         if (isProcessingQueries()) {
813             long time = System.currentTimeMillis();
814             int i = 0;
815             while (i < maximumItemsToProcess) {
816                 queryTimeoutCheckEntry(i, time);
817                 i++;
818             }
819         }
820     }
821 
822     /**
823      * Process the query entry for tab "tab"
824      * @param tab
825      *            The tab to be processed.
826      * @param time
827      *            The current time.
828      */
829     protected abstract void queryTimeoutCheckEntry(int tab, long time);
830 
831     /**
832      * Prerequisites: Each PipeHandler contains information of the latest
833      * modification of each of its served boxes. The sync method has to
834      * accomplish several things: 1) Compare the latest updates performed by
835      * other pipes and if there is someone with a more recent update, ask for
836      * the data 2) Decide if we shall serve other boxes depending on the desired
837      * amount of boxes to serve and the number of boxes currently served by the
838      * peers that surround us.
839      * @throws IOException
840      *             If some network error occurs.
841      * @throws DatabaseException
842      *             If something goes wrong with the DB
843      * @throws OBException
844      *             User generated exception
845      */
846     private void sync() throws OBException, DatabaseException, IOException {
847         logger.debug("sync");
848         int i = 0;
849         while (i < boxesCount) {
850             int box = ourBoxes[i];
851             if (needSyncInBox(box)) {
852                 // start syncing from this box:
853                 PipeHandler ph = mostRencentPipeHandlerPerBox(i);
854                 // we should send the lastestModification - 1
855                 // millisecond to be safe
856                 ph.sendRequestSyncMessage(i,
857                         getIndex().latestModification(box) - 1);
858                 // do this only for one box.
859                 break;
860             }
861             i++;
862         }
863 
864     }
865 
866     /**
867      * Prints some information of the peer.
868      * @throws DatabaseException
869      *             If something goes wrong with the DB
870      * @throws OBException
871      *             User generated exception
872      */
873     private void info() throws OBException, DatabaseException {
874         int[] servicedBoxes = servicedBoxes();
875         if (servicedBoxes != null) {
876             int[] boxCount = new int[servicedBoxes.length];
877             // long[] times = new long[servicedBoxes.length];
878             int i = 0;
879             while (i < boxCount.length) {
880                 boxCount[i] = getIndex().elementsPerBox(i);
881 
882                 /*
883                  * if(logger.isDebugEnabled()){ times[i] =
884                  * getIndex().latestModification(i); }
885                  */
886 
887                 i++;
888             }
889             logger.info("Heart: Connected Peers: " + clients.size() + " B: "
890                     + Arrays.toString(ourBoxes) + ", boxes: "
891                     + Arrays.toString(boxCount));
892             /*
893              * if(logger.isDebugEnabled()){ logger.debug("Latest modifications:" +
894              * Arrays.toString(times)); }
895              */
896         } else {
897             logger.info("Heart: Connected Peers: " + clients.size()
898                     + " no boxes being served. ");
899         }
900     }
901 
902     /**
903      * Make sure the sync process is alive. If it is not, retry the sync.
904      * @throws IOException
905      *             If some network error occurs.
906      * @throws DatabaseException
907      *             If something goes wrong with the DB
908      * @throws OBException
909      *             User generated exception
910      */
911     private void syncAlive() throws IOException, DatabaseException, OBException {
912         if (syncing.get()
913                 && (System.currentTimeMillis() - this.syncingBoxLastRequestTime
914                         .get()) > globalTimeout) {
915             if (syncingPipe != null) {
916                 logger.debug("Re-syncing after timeout");
917                 this.syncingPipe.sendReSyncMessage();
918             }
919 
920         }
921     }
922 
923     /**
924      * Receives the # of elements per box, and: If we are not serving any box,
925      * it finds this.minBoxesToServe boxes. It will select boxes whose count is
926      * the least. If we are serving boxes, and we are serving a box that is
927      * exceedingly being served, then
928      * @param boxesToServe #
929      *            of boxes that will be served
930      * @param index
931      *            The index used to find out the total amount of boxes available
932      *            Changes the internal ourBoxes array with the boxes that will
933      *            be served by this index
934      */
935     private void decideServicedBoxes(final int boxesToServe,
936             final SynchronizableIndex < O > index) {
937         int[] sb = this.servicedBoxes();
938         Random r = new Random(System.currentTimeMillis());
939         if (sb == null) {
940             sb = new int[boxesToServe];
941             // select random boxes and make sure no
942             // repeated boxes are selected
943             int i = 0;
944             while (i < sb.length) {
945                 int nbox = r.nextInt(index.totalBoxes());
946                 while (inArray(nbox, i, sb)) {
947                     // generate a new random until the value
948                     // is not in the array
949                     nbox = r.nextInt(index.totalBoxes());
950                 }
951                 sb[i] = nbox;
952                 i++;
953             }
954             Arrays.sort(sb);
955         }
956         this.ourBoxes = sb;
957     }
958 
959     /**
960      * Returns true if x is found in arr in the interval [0,i[.
961      * @param x
962      *            the x to search
963      * @param i
964      *            the maximum bound
965      * @param arr
966      *            the array
967      * @return True if x is found in the interval [0,i[
968      */
969     private boolean inArray(final int x, final int i, final int[] arr) {
970         int cx = 0;
971         while (cx < i) {
972             if (arr[cx] == x) {
973                 return true;
974             }
975             cx++;
976         }
977         return false;
978     }
979 
980     /**
981      * For each pipe in pipes, send a time message.
982      * @throws IOException
983      *             If some network error occurs.
984      */
985     private void timeBeat() throws IOException {
986         synchronized (clients) {
987             Iterator < PipeHandler > it = this.clients.values().iterator();
988             while (it.hasNext()) {
989                 PipeHandler u = it.next();
990                 u.sendTimeMessage();
991             }
992         }
993     }
994 
995     /**
996      * Sends box information to all the peers if box information has been
997      * changed.
998      * @throws IOException
999      *             If some network error occurs.
1000      * @throws DatabaseException
1001      *             If something goes wrong with the DB
1002      * @throws OBException
1003      *             User generated exception
1004      */
1005     private void sendBoxInfo() throws IOException, DatabaseException,
1006             OBException {
1007 
1008         synchronized (clients) {
1009             Iterator < PipeHandler > it = clients.values().iterator();
1010             while (it.hasNext()) {
1011                 PipeHandler u = it.next();
1012                 u.sendBoxMessage();
1013             }
1014         }
1015 
1016     }
1017 
1018     /**
1019      * @return true if we are connected to the minimum number of peers.
1020      */
1021     protected final boolean minimumNumberOfPeers() {
1022         return this.clients.size() >= AbstractP2PIndex.minNumberOfPeers;
1023     }
1024 
1025     /**
1026      * @return The number of peers connected to this peer.
1027      */
1028     public final int getNumberOfPeers() {
1029         return this.clients.size();
1030     }
1031 
1032     /**
1033      * Returns true if all the peers have data that is synchronized to the same
1034      * timestamp. This should not be used normally but it is useful for testing
1035      * purposes.
1036      * @return true if all the peers are in sync with me. *
1037      * @throws DatabaseException
1038      *             If something goes wrong with the DB
1039      * @throws OBException
1040      *             User generated exception
1041      */
1042     // TODO: Improve this so that boxes are only sent when we modify one
1043     // of our boxes
1044     public final boolean areAllPeersSynchronizedWithMe() throws OBException,
1045             DatabaseException {
1046         // browse each box of the
1047         int i = 0;
1048         long[] boxes = new long[boxesCount];
1049         while (i < boxesCount) {
1050             long box = getIndex().latestModification(i);
1051             if (box != -1) {
1052                 if (boxes[i] == 0) {
1053                     boxes[i] = box;
1054                 } else if (boxes[i] != box) {
1055                     return false;
1056                 }
1057             }
1058             // get the boxes from
1059             List < PipeHandler > b = handlersPerBox.get(i);
1060             Iterator < PipeHandler > it = b.iterator();
1061             while (it.hasNext()) {
1062                 PipeHandler p = it.next();
1063                 if (p.isServing(i)) {
1064                     box = p.lastUpdated(i);
1065                     if (boxes[i] == 0) {
1066                         boxes[i] = box;
1067                     } else if (boxes[i] != box) {
1068                         return false;
1069                     }
1070                     // }
1071                 }
1072             }
1073             i++;
1074         }
1075         return true;
1076     }
1077 
1078     /**
1079      * Check if we have peers who serve at least one box per box #.
1080      * @return true if the above condition applies
1081      */
1082     public final boolean areAllBoxesAvailable() {
1083 
1084         Iterator < List < PipeHandler >> it = handlersPerBox.iterator();
1085         while (it.hasNext()) {
1086             List < PipeHandler > l = it.next();
1087             if (l.size() == 0) {
1088                 return false;
1089             }
1090         }
1091         return true;
1092     }
1093 
1094     /**
1095      * This method must be called by all users once It starts the network, and
1096      * creates some background threads like the hearbeat and the incoming
1097      * connection handler.
1098      * @param client
1099      *            If true, the index will be created in client mode (from the
1100      *            p2p network perspective) If false, the index will be a
1101      *            "server".
1102      * @param clearPeerCache
1103      *            If we should clear network related cache information
1104      * @param seedFile
1105      *            The seed file to be used for this index. Only the given seeds
1106      *            will be used
1107      * @throws IOException
1108      *             If some network error occurs.
1109      * @throws PeerGroupException
1110      *             If some network error occurs.
1111      */
1112     public final void open(final boolean client, final boolean clearPeerCache,
1113             final File seedFile) throws IOException, PeerGroupException {
1114         this.isClient = client;
1115         NetworkManager.ConfigMode c = null;
1116         if (!seedFile.exists()) {
1117             throw new IOException("File does not exist: " + seedFile);
1118         }
1119         if (client) {
1120             c = NetworkManager.ConfigMode.RELAY;
1121         } else {
1122             c = NetworkManager.ConfigMode.RENDEZVOUS_RELAY;
1123         }
1124         URI seedURI = seedFile.toURI();
1125         // initialize JXTA
1126         init(client, c, clearPeerCache, seedURI);
1127 
1128         logger.debug("Starting heart...");
1129         Thread thread = new Thread(new HeartBeat(), "Heart Beat Thread");
1130         thread.start();
1131         logger.debug("Starting incoming connections server...");
1132         Thread thread2 = new Thread(new IncomingConnectionHandler(),
1133                 "Incoming connection Thread");
1134         thread2.start();
1135     }
1136 
1137     /**
1138      * Generates a pipe advertisement.
1139      * @return A pipe advertisement.
1140      */
1141     private PipeAdvertisement getPipeAdvertisement() {
1142         PipeID pipeID = (PipeID) ID.create(generatePipeID());
1143 
1144         PipeAdvertisement advertisement = (PipeAdvertisement) AdvertisementFactory
1145                 .newAdvertisement(PipeAdvertisement.getAdvertisementType());
1146 
1147         advertisement.setPipeID(pipeID);
1148         advertisement.setType(PipeService.UnicastType);
1149         advertisement.setName(pipeName);
1150         return advertisement;
1151     }
1152 
1153     /**
1154      * Returns the pipe ID in URI form.
1155      * @return an URI representing the pipe id.
1156      */
1157     private URI generatePipeID() {
1158         return IDFactory.newPipeID(netPeerGroup.getPeerGroupID()).toURI();
1159     }
1160 
1161     /**
1162      * Finds the pipe for the given peer. Gets advertisements of pipes for the
1163      * given peer.n
1164      * @param peer
1165      *            The peer we are going to search.
1166      * @throws IOException
1167      *             If some network error occurs.
1168      * @throws PeerGroupException
1169      *             If some network error occurs.
1170      */
1171     protected final void findPipePeer(final URI peer) throws IOException,
1172             PeerGroupException {
1173         if (!minimumNumberOfPeers()) {
1174             discovery.getRemoteAdvertisements(peer.toString(),
1175                     DiscoveryService.ADV, "Name", pipeName, 1, null);
1176 
1177         }
1178     }
1179 
1180     /**
1181      * Query the discovery service for OBSearch pipes.
1182      * @throws IOException
1183      *             If some network error occurs.
1184      * @throws PeerGroupException
1185      *             If some network error occurs.
1186      */
1187     protected final void findPipes() throws IOException, PeerGroupException {
1188         if (isClient) {
1189             discovery.getRemoteAdvertisements(null, DiscoveryService.PEER,
1190                     null, null, minNumberOfPeers, null);
1191         }
1192 
1193     }
1194 
1195     /**
1196      * Check if all the boxes are being supplied.
1197      * @return true if all the boxes are represented by a peer.
1198      */
1199     protected final boolean totalBoxesCovered() {
1200         if (handlersPerBox == null) {
1201             return false;
1202         }
1203         int i = 0;
1204         int max = getIndex().totalBoxes();
1205 
1206         while (i < max) {
1207             if (this.handlersPerBox.get(i).size() == 0) {
1208                 return false;
1209             }
1210             i++;
1211         }
1212         return true;
1213     }
1214 
1215     /**
1216      * Method called by the discovery server.
1217      * @param ev
1218      *            The event that the discovery server found.
1219      */
1220     public final void discoveryEvent(final DiscoveryEvent ev) {
1221 
1222         try {
1223             DiscoveryResponseMsg res = ev.getResponse();
1224 
1225             Advertisement adv;
1226             Enumeration en = res.getAdvertisements();
1227             // logger.debug("Discovery event" + ev);
1228             if (en != null) {
1229                 while (en.hasMoreElements()) {
1230                     adv = (Advertisement) en.nextElement();
1231                     // logger.info("Discovery event: " + adv);
1232                     if (adv instanceof PipeAdvertisement) {
1233 
1234                         synchronized (clients) {
1235                             // add the pipe only if its peer hasn't been
1236                             // added
1237                             // this is a hack in order to prevent additions
1238                             // of
1239                             // pipes whose
1240                             // TODO: if there is some inconsistency with the
1241                             // string generated by EndpointAddress,
1242                             // this hack won't work
1243                             String hack = "urn:"
1244                                     + ((EndpointAddress) ev.getSource())
1245                                             .toURI().toString().replaceAll("/",
1246                                                     "");
1247                             URI u = new URI(hack);
1248                             // avoid connecting to ourselves and connecting
1249                             // to
1250                             // peers we have already
1251                             if (!u.toString().equals(
1252                                     this.netPeerGroup.getPeerAdvertisement()
1253                                             .getPeerID().toURI().toString())
1254                                     && !isConnectedToPeer(u.toString())) {
1255                                 logger.info("Discovered new peer: " + u);
1256                                 PipeAdvertisement p = (PipeAdvertisement) adv;
1257                                 addPipe(u, p);
1258                             }
1259                         }
1260                     } else if (adv instanceof PeerAdvertisement) {
1261                         // if we get a peer advertisement, then if the peer is
1262                         // not yet connected to us
1263                         // we can search for pipes.
1264                         synchronized (clients) {
1265                             PeerAdvertisement p = (PeerAdvertisement) adv;
1266                             if (!isConnectedToPeer(p.getPeerID().toURI()
1267                                     .toString())) {
1268                                 // find pipes
1269                                 findPipePeer(p.getPeerID().toURI());
1270                             }
1271                         }
1272                     }
1273                 }
1274             }
1275         } catch (Exception e) {
1276             logger.fatal("Exception", e);
1277             assert false;
1278         }
1279     }
1280 
1281     /**
1282      * Returns true if the given peer id is connected to us.
1283      * @param id
1284      *            The id of the peer we are looking for
1285      * @return True if the given id is connected to us.
1286      */
1287     private boolean isConnectedToPeer(final String id) {
1288         return clients.containsKey(id);
1