Implementata meglio la gestione dei thread

Leonardo Robol [2009-10-28 16:00]
Implementata meglio la gestione dei thread
Filename
spidy.py
diff --git a/spidy.py b/spidy.py
index 3311c7d..d7c2560 100755
--- a/spidy.py
+++ b/spidy.py
@@ -2,31 +2,146 @@
 # -*- coding: utf-8 -*-
 #

-import random
-import urllib2
-import mutex
-import threading
-import re, time
+import random, urllib2, threading, re, time, sys
 from optparse import OptionParser

 default_page = "http://poisson.phc.unipi.it"

 __author__ = "Leonardo Robol <leo@robol.it>"

-mtx_url_dict = mutex.mutex()
+freepages = threading.Event()

 size = 1000
-url_dict = {}
 url_stack = range(size)
 link_counter = 0

 max_steps = 5

-debug = 0
+debug_value = 0
 outfile = "connections.txt"

 ExitRequired = False

+def debug(message, level):
+    """Stampa debug se il livello è abbastanza alto"""
+    if debug_value >= level:
+        print " => " + message
+
+class UrlDict():
+
+    def __init__(self):
+        ## Creo il dizionario che mi servirà
+        ## in seguito
+
+        self.url_dict = {}
+        self.lock = threading.RLock()
+        freepages.set()
+
+    def getPage(self, url):
+        """Ritorna un oggetto Page per URL, oppure
+        None se non è presente"""
+
+        self.acquire()
+        if self.url_dict.has_key(url):
+            ## Effettuiamo una copia locale e
+            ## poi usciamo
+            page = self.url_dict[url]
+        else:
+            page = None
+
+        self.release()
+        return page
+
+    def addPage(self, page):
+        """Aggiunge una pagina al database"""
+        self.acquire("addPage")
+        self.url_dict[page.url] = page
+        if not page.analyzed:
+            freepages.set()
+            debug(" => Aggiunta %s" % str(self.url_dict[page.url]), 3)
+        self.release("addPage")
+        return None
+
+    def getNewPage(self):
+        """Ottiene una pagina non analizzata"""
+        ## Aspetto che ci siano pagine libere
+        freepages.wait()
+        debug(" => Abbiamo ottenuto una nuova pagina", 3)
+
+        self.acquire("getNewPage")
+        resp_page = None
+        for url in self.url_dict:
+            page = self.url_dict[url]
+            if not page.analyzed:
+                resp_page = page
+                debug("Restituisco la pagina %s" % str(resp_page), 3)
+                break
+
+        debug("Page vale %s" % str(resp_page), 4)
+        last = True
+        for url in self.url_dict:
+            page = self.url_dict[url]
+            if not page.analyzed:
+                last = False
+        if last:
+            freepages.clear()
+
+        self.release("getNewPage")
+        return resp_page
+
+    def hasKey(self, url):
+        """Ritorna True se la key url è presente"""
+        self.acquire("hasKey")
+        resp = self.url_dict.has_key(url)
+        self.release("hasKey")
+        return resp
+
+    def addLink(self, url, ID):
+        global link_counter
+        self.acquire("addLink")
+        self.url_dict[url].links.append(ID)
+        link_counter += 1
+        self.release("addLink")
+
+    def getLinks(self, url):
+        """Ritorna una lista con i link della pagina
+        contrassegnata da url"""
+        self.acquire("getLinks")
+        links = self.url_dict[url].links
+        self.release("getLinks")
+        return links
+
+    def __iter__(self):
+        """Iteratore sulle pagine nel dizionario"""
+        # self.acquire("__iter__")
+        try:
+            for url in self.url_dict:
+                try:
+                    yield url
+                except Exception, e:
+                    print " => Exception: %s" % e
+        except Exception, e:
+            print " => Exception caught, %s" % e
+            # self.release("__iter__")
+
+    def setAnalyzed(self, url):
+        """Contrassegna un url come analizzato"""
+        self.acquire("setAnalyzed")
+        if self.url_dict.has_key(url):
+            self.url_dict[url].analyzed = True
+        self.release("setAnalzyed")
+        return None
+
+    def acquire(self, message=""):
+        debug(" => Thread %s acquiring lock -- %s " % (str(threading.currentThread()), message), 4)
+        self.lock.acquire()
+
+    def release(self, message = ""):
+        debug(" => Thread %s releasing lock  -- %s" % (str(threading.currentThread()), message), 4)
+        self.lock.release()
+
+url_dict = UrlDict()
+

 def get_links(page):
     """Restituisce una lista con i link
@@ -113,13 +228,10 @@ class Page():

     def __init__(self, url="", parent=None):

-
-        if(url != ""):
-            mtx_url_dict.lock(self.__new_page, (url, parent))
-            mtx_url_dict.unlock()
-        else:
-            mtx_url_dict.lock(self.__get_page, parent)
-            mtx_url_dict.unlock()
+         if(url != ""):
+             self.__new_page(url, parent)
+         else:
+             self.__get_page(parent)

     def __get_page(self, parent):

@@ -127,19 +239,10 @@ class Page():
             self.exhausted = True
             return

-        page_found = False
-
-        while(not page_found):
+        page = None
+        while(page == None):

-            for url in url_dict:
-                page = Page(url)
-                if not page.analyzed:
-                    page_found = True
-                    self.url = url
-                    break
-
-            if not page_found:
-                time.sleep(1)
+            page = url_dict.getNewPage()

             # Questo è un punto dove il Crawler
             # si potrebbe bloccare e quindi facciamo
@@ -147,16 +250,17 @@ class Page():
             # Ovviamente poi sarà necessario che anche
             # il chiamante lo faccia!
             if ExitRequired:
+                debug("Ho incotrato exit required!", 2)
                 return

-
         self.ID = page.ID
         self.analyzed = page.analyzed
         self.exhausted = False
         self.step = page.step
-        url_dict[url].analyzed = True
+        self.url = page.url
+        url_dict.setAnalyzed(page.url)

-    def __new_page(self, (url, parent)):
+    def __new_page(self, url, parent):
         # Questo ci serve per tenere il
         # conto di tutti gli url
         global url_dict
@@ -167,11 +271,12 @@ class Page():
         self.url = url


-        if(url_dict.has_key(url)):
+        if(url_dict.hasKey(url)):
             # Preservo i parametri che esistono già!
-            self.ID = url_dict[url].ID
-            self.analyzed = url_dict[url].analyzed
-            self.step = url_dict[url].step
+            page = url_dict.getPage(url)
+            self.ID = page.ID
+            self.analyzed = page.analyzed
+            self.step = page.step
             if parent == None:
                 self.step = 0
             else:
@@ -184,14 +289,14 @@ class Page():
             except IndexError:
                 self.exhausted = True

-
             # Conto in quanti passi si raggiunge questa pagina
             if parent == None:
                 self.step = 0
             else:
                 self.step = parent.step + 1
-            url_dict[url] = self
-            url_dict[url].links = []
+            self.links = []
+            url_dict.addPage(self)
+


     def add_link(self, page):
@@ -199,18 +304,12 @@ class Page():
         if(page.exhausted):
             return -1
         if debug >= 2:
-            print " => Adding link to %s" % page.url
-        mtx_url_dict.lock(self.__add_link, page.ID)
-        mtx_url_dict.unlock()
+            debug(" => Adding link to %s" % page.url, 2)
+        url_dict.addLink(page.url, page.ID)
         return 0

-    def __add_link(self, ID):
-        global link_counter
-        url_dict[self.url].links.append(ID)
-        link_counter += 1
-
     def links(self):
-        return url_dict[self.url].links
+        return url_dict.getLinks(url)



@@ -241,7 +340,10 @@ class Crawler(threading.Thread):
             ## Stiamo attenti a non fare troppi passi
             ## dalla pagina di partenza
             page = Page()
-
+
+            while page == None:
+                page = Page()
+

             ## Se ci chiedono di uscire perché abbiamo
             ## analizzato tutte la pagine ce ne andiamo
@@ -255,14 +357,15 @@ class Crawler(threading.Thread):


             if page.step >= max_steps:
-                print " => Ohi, troppi step! Riparto"
+                debug(" => Ohi, troppi step! Riparto",2)
                 page = Page(self.start_page)

             self.waitingforpage = False


             if debug >= 1:
-                print " => Analyzing page %s" % page.url
+                debug(" => Analyzing page %s" % str(page), 2)
+


             # Come prima cosa devo fare il parsing dei
@@ -310,7 +413,7 @@ if __name__ == "__main__":
     (option, args) = parser.parse_args()

     concurrency = int(option.concurrency)
-    debug = bool(option.debug)
+    debug_value = bool(option.debug)
     outfile = option.outfile
     size = int(option.size)
     url_stack = range(size)
@@ -357,7 +460,10 @@ if __name__ == "__main__":
         ## Se non c'è niente da fare posso
         ## anche rilassarmi ed aspettare un
         ## secondo prima di rieseguire il check
-        time.sleep(1)
+        try:
+            time.sleep(1)
+        except KeyboardInterrupt:
+            sys.exit()


     ## Qui non c'è modo umano di terminare il
@@ -370,6 +476,7 @@ if __name__ == "__main__":
     ## la matrice in un formato soddisfacente

     out = open(outfile, 'w')
+    leg = open("legenda.txt", 'w')

     ## Il numero massimo di pagine meno quelle avanzate =
     ## le pagine effettivamente usate!
@@ -380,9 +487,16 @@ if __name__ == "__main__":



-    for page in url_dict:
-        for link in url_dict[page].links:
-            out.write(page + "\t" + str(url_dict[page].ID) + "\t" + str(link) + "\n")
+    for url in url_dict:
+        try:
+            leg.write(str(url_dict.getPage(url).ID) + "\t" + url + "\n" )
+        except AttributeError:
+            print " => No ID on %s" % str(url)
+        for link in url_dict.getLinks(url):
+            out.write(str(url_dict.getPage(url).ID) + "\t" + str(link) + "\n")
+
+    out.close()
+    leg.close()

     l = time.localtime(time.time())
     print " => Work completed at %s:%s:%s " % (l.tm_hour,l.tm_min,l.tm_sec)
ViewGit