From 57af6ee9be227dfd321f6de9a72369e2033b3aab Mon Sep 17 00:00:00 2001 From: Leonardo Robol Date: Wed, 28 Oct 2009 17:00:13 +0100 Subject: [PATCH] Implementata meglio la gestione dei thread --- spidy.py | 222 +++++++++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 168 insertions(+), 54 deletions(-) diff --git a/spidy.py b/spidy.py index 3311c7d..d7c2560 100755 --- a/spidy.py +++ b/spidy.py @@ -2,31 +2,146 @@ # -*- coding: utf-8 -*- # -import random -import urllib2 -import mutex -import threading -import re, time +import random, urllib2, threading, re, time, sys from optparse import OptionParser default_page = "http://poisson.phc.unipi.it" __author__ = "Leonardo Robol " -mtx_url_dict = mutex.mutex() +freepages = threading.Event() size = 1000 -url_dict = {} url_stack = range(size) link_counter = 0 max_steps = 5 -debug = 0 +debug_value = 0 outfile = "connections.txt" ExitRequired = False +def debug(message, level): + """Stampa debug se il livello è abbastanza alto""" + if debug_value >= level: + print " => " + message + +class UrlDict(): + + def __init__(self): + ## Creo il dizionario che mi servirà + ## in seguito + + self.url_dict = {} + self.lock = threading.RLock() + freepages.set() + + def getPage(self, url): + """Ritorna un oggetto Page per URL, oppure + None se non è presente""" + + self.acquire() + if self.url_dict.has_key(url): + ## Effettuiamo una copia locale e + ## poi usciamo + page = self.url_dict[url] + else: + page = None + + self.release() + return page + + def addPage(self, page): + """Aggiunge una pagina al database""" + self.acquire("addPage") + self.url_dict[page.url] = page + if not page.analyzed: + freepages.set() + debug(" => Aggiunta %s" % str(self.url_dict[page.url]), 3) + self.release("addPage") + return None + + def getNewPage(self): + """Ottiene una pagina non analizzata""" + ## Aspetto che ci siano pagine libere + freepages.wait() + debug(" => Abbiamo ottenuto una nuova pagina", 3) + + self.acquire("getNewPage") + resp_page = None + for url in self.url_dict: + page = self.url_dict[url] + if not page.analyzed: + resp_page = page + debug("Restituisco la pagina %s" % str(resp_page), 3) + break + + debug("Page vale %s" % str(resp_page), 4) + last = True + for url in self.url_dict: + page = self.url_dict[url] + if not page.analyzed: + last = False + if last: + freepages.clear() + + self.release("getNewPage") + return resp_page + + def hasKey(self, url): + """Ritorna True se la key url è presente""" + self.acquire("hasKey") + resp = self.url_dict.has_key(url) + self.release("hasKey") + return resp + + def addLink(self, url, ID): + global link_counter + self.acquire("addLink") + self.url_dict[url].links.append(ID) + link_counter += 1 + self.release("addLink") + + def getLinks(self, url): + """Ritorna una lista con i link della pagina + contrassegnata da url""" + self.acquire("getLinks") + links = self.url_dict[url].links + self.release("getLinks") + return links + + def __iter__(self): + """Iteratore sulle pagine nel dizionario""" + # self.acquire("__iter__") + try: + for url in self.url_dict: + try: + yield url + except Exception, e: + print " => Exception: %s" % e + except Exception, e: + print " => Exception caught, %s" % e + # self.release("__iter__") + + def setAnalyzed(self, url): + """Contrassegna un url come analizzato""" + self.acquire("setAnalyzed") + if self.url_dict.has_key(url): + self.url_dict[url].analyzed = True + self.release("setAnalzyed") + return None + + def acquire(self, message=""): + debug(" => Thread %s acquiring lock -- %s " % (str(threading.currentThread()), message), 4) + self.lock.acquire() + + def release(self, message = ""): + debug(" => Thread %s releasing lock -- %s" % (str(threading.currentThread()), message), 4) + self.lock.release() + +url_dict = UrlDict() + def get_links(page): """Restituisce una lista con i link @@ -113,13 +228,10 @@ class Page(): def __init__(self, url="", parent=None): - - if(url != ""): - mtx_url_dict.lock(self.__new_page, (url, parent)) - mtx_url_dict.unlock() - else: - mtx_url_dict.lock(self.__get_page, parent) - mtx_url_dict.unlock() + if(url != ""): + self.__new_page(url, parent) + else: + self.__get_page(parent) def __get_page(self, parent): @@ -127,19 +239,10 @@ class Page(): self.exhausted = True return - page_found = False - - while(not page_found): + page = None + while(page == None): - for url in url_dict: - page = Page(url) - if not page.analyzed: - page_found = True - self.url = url - break - - if not page_found: - time.sleep(1) + page = url_dict.getNewPage() # Questo è un punto dove il Crawler # si potrebbe bloccare e quindi facciamo @@ -147,16 +250,17 @@ class Page(): # Ovviamente poi sarà necessario che anche # il chiamante lo faccia! if ExitRequired: + debug("Ho incotrato exit required!", 2) return - self.ID = page.ID self.analyzed = page.analyzed self.exhausted = False self.step = page.step - url_dict[url].analyzed = True + self.url = page.url + url_dict.setAnalyzed(page.url) - def __new_page(self, (url, parent)): + def __new_page(self, url, parent): # Questo ci serve per tenere il # conto di tutti gli url global url_dict @@ -167,11 +271,12 @@ class Page(): self.url = url - if(url_dict.has_key(url)): + if(url_dict.hasKey(url)): # Preservo i parametri che esistono già! - self.ID = url_dict[url].ID - self.analyzed = url_dict[url].analyzed - self.step = url_dict[url].step + page = url_dict.getPage(url) + self.ID = page.ID + self.analyzed = page.analyzed + self.step = page.step if parent == None: self.step = 0 else: @@ -184,14 +289,14 @@ class Page(): except IndexError: self.exhausted = True - # Conto in quanti passi si raggiunge questa pagina if parent == None: self.step = 0 else: self.step = parent.step + 1 - url_dict[url] = self - url_dict[url].links = [] + self.links = [] + url_dict.addPage(self) + def add_link(self, page): @@ -199,18 +304,12 @@ class Page(): if(page.exhausted): return -1 if debug >= 2: - print " => Adding link to %s" % page.url - mtx_url_dict.lock(self.__add_link, page.ID) - mtx_url_dict.unlock() + debug(" => Adding link to %s" % page.url, 2) + url_dict.addLink(page.url, page.ID) return 0 - def __add_link(self, ID): - global link_counter - url_dict[self.url].links.append(ID) - link_counter += 1 - def links(self): - return url_dict[self.url].links + return url_dict.getLinks(url) @@ -241,7 +340,10 @@ class Crawler(threading.Thread): ## Stiamo attenti a non fare troppi passi ## dalla pagina di partenza page = Page() - + + while page == None: + page = Page() + ## Se ci chiedono di uscire perché abbiamo ## analizzato tutte la pagine ce ne andiamo @@ -255,14 +357,15 @@ class Crawler(threading.Thread): if page.step >= max_steps: - print " => Ohi, troppi step! Riparto" + debug(" => Ohi, troppi step! Riparto",2) page = Page(self.start_page) self.waitingforpage = False if debug >= 1: - print " => Analyzing page %s" % page.url + debug(" => Analyzing page %s" % str(page), 2) + # Come prima cosa devo fare il parsing dei @@ -310,7 +413,7 @@ if __name__ == "__main__": (option, args) = parser.parse_args() concurrency = int(option.concurrency) - debug = bool(option.debug) + debug_value = bool(option.debug) outfile = option.outfile size = int(option.size) url_stack = range(size) @@ -357,7 +460,10 @@ if __name__ == "__main__": ## Se non c'è niente da fare posso ## anche rilassarmi ed aspettare un ## secondo prima di rieseguire il check - time.sleep(1) + try: + time.sleep(1) + except KeyboardInterrupt: + sys.exit() ## Qui non c'è modo umano di terminare il @@ -370,6 +476,7 @@ if __name__ == "__main__": ## la matrice in un formato soddisfacente out = open(outfile, 'w') + leg = open("legenda.txt", 'w') ## Il numero massimo di pagine meno quelle avanzate = ## le pagine effettivamente usate! @@ -380,9 +487,16 @@ if __name__ == "__main__": - for page in url_dict: - for link in url_dict[page].links: - out.write(page + "\t" + str(url_dict[page].ID) + "\t" + str(link) + "\n") + for url in url_dict: + try: + leg.write(str(url_dict.getPage(url).ID) + "\t" + url + "\n" ) + except AttributeError: + print " => No ID on %s" % str(url) + for link in url_dict.getLinks(url): + out.write(str(url_dict.getPage(url).ID) + "\t" + str(link) + "\n") + + out.close() + leg.close() l = time.localtime(time.time()) print " => Work completed at %s:%s:%s " % (l.tm_hour,l.tm_min,l.tm_sec) -- 2.1.4