viewgit/index.php:465 Only variables should be passed by reference [2048]
viewgit/index.php:466 Non-static method GeSHi::get_language_name_from_extension() should not be called statically [2048]
# # -*- coding: utf-8 -*- # from numpy import array, zeros, convolve, dot, roll, sqrt, concatenate from numpy.linalg import norm from fast_filters import filter_and_downsample, upsample_and_filter class AbstractFilter(): def __init__(self): pass def __call__ (self, samples): """ Apply the filter to samples. This method has to be overloaded from the specific filter class """ pass class FIR(AbstractFilter): def __init__(self, coefficients): self.coefficients = array(coefficients) def __call__(self, samples): """Apply the FIR to samples""" # Per ora tronchiamo brutalmente la convoluzione, anche se # questo vuol dire perdere dei samples finali. d'altronde # sembra essere l'unico modo per prevere i delay. sz = len(samples) return convolve (self.coefficients, samples)[0:sz] def GetResponse(self): """ Obtain the vector of coefficients. """ return self.coefficients def __mul__(self, other): return FIR(convolve(self.coefficients, other.GetResponse())) def DownSample(samples): return array(samples[::2]) def UpSample(samples): s = zeros(2 * len(samples)) s[::2] = samples return array(s) def PRCheck(f): """ Return the delay that the filterbank will cause or raise RuntimeError if PR condition is not satisfied""" a = 0.5 * (f.lowPassFilter * f.lowPassInverseFilter).GetResponse () b = 0.5 * (f.highPassFilter * f.highPassInverseFilter).GetResponse () f0 = f.lowPassInverseFilter.GetResponse() f1 = f.highPassInverseFilter.GetResponse() f0[1::2] = (-1) * f0[1::2] f1[1::2] = (-1) * f1[1::2] c = (f.lowPassFilter * FIR(f0)).GetResponse() d = (f.highPassFilter * FIR(f1)).GetResponse() # Ora cerco di capire quanto sarà il ritardo. for i in range(0,len(a+b)): if (a+b)[i] > 0.5: break s = zeros(len(a+b)) s[i] = 1 if (norm(a+b - s) > 1e-15): raise RuntimeError('PR condition is not satisfied') if (norm(c+d) > 1e-15): raise RuntimeError('PR condition on aliasing is not satisfied') return i class WaveletStack(): def __init__(self): """ Creates a new WaveletTransformedSignal to store the samples from the filterbank """ self.samples_number = None self.low_samples = [] self.high_samples = [] self.end = [] self.end_effect = [] def PushEndSamples(self, samples): """Push end samples, i.e. the samples that exceeds the ideal dimensione pow(2,m) where m is the depth of the filterbank.""" self.end = samples def PopEndSamples(self): return self.end def PushEndEffectSamples(self, samples): """Push some end samples to preserve them to end effect""" self.end_effect = samples def PopEndEffectSamples(self): return self.end_effect def PushHighSamples(self, samples): """ Add a new High Sample """ if self.samples_number is None: self.samples_number = len(samples) self.high_samples.append(samples) else: if len(samples) != (self.samples_number / 2): raise IndexError("High frequency samples do not have the expected length") else: self.high_samples.append (samples) self.samples_number = len(samples) def PushLowSamples(self, samples): """ Add the Low samples """ if self.samples_number != len(samples): raise IndexError("Low frequency samples do not have the expected length") else: self.low_samples.append(samples) def PopLowSamples(self): """ Get the low frequency samples """ try: return self.low_samples.pop() except: raise IndexError("No low samples in the WaveletStack") def GetLowSamplesNumber(self): """Get the length of the lowsample array, i.e. how many low samples track do we have pushed.""" return len(self.low_samples) def GetHighSamplesNumber(self): """Get the length of the lowsample array, i.e. how many low samples track do we have pushed.""" return len(self.high_samples) def GetAllSamplesNumber(self): return sum(map(len, self.low_samples)) + sum(map(len,self.high_samples)) + len(self.end_effect) + len(self.end) def GetNumSamples(self): """Get the total number of samples in the WaveletStack.""" return len(self.low_samples) + len(self.high_samples) def GetSamplesMaxValue(self): """Get the maximum abs value of the lowsample and high sample array""" m = [0] for low in self.low_samples: m.append (max(abs(low))) for high in self.high_samples: m.append(max(abs(high))) return max(m) def PopHighSamples(self): """ Get the high frequency samples """ try: return self.high_samples.pop () except: raise IndexError("No more high samples in the stack") class FilterBank(): def __init__(self): """Create a new empty filter bank""" self.lowPassFilter = None self.highPassFilter = None self.highPassInverseFilter = None self.lowPassInverseFilter = None self.depth = 1 def SetFilterMatrix(self, filter_matrix): """ Set the filter matrix for the filter bank. It must have this shape: [ h0 , h1, f0, f1 ] Where h0 is the lowPass coefficients vector, h1 the high pass one, f0 the inverse low pass and f1 the inverse high pass. This method can only use FIR filters, if you are looking for more freedom use SetLowPassFilter () and similar. """ self.lowPassFilter = FIR (filter_matrix[0]) self.highPassFilter = FIR (filter_matrix[1]) self.lowPassInverseFilter = FIR (filter_matrix[2]) self.highPassInverseFilter = FIR (filter_matrix[3]) def _Truncate(self, samples): """ Returns the samples truncated to make recursive split possibile (i.e. len(samples) % 2^depth = 0) """ toRemove = len(samples) % pow(2,self.depth) if toRemove is 0: return samples, [] else: return samples[:-1 * toRemove], samples[-1 * toRemove:] def SetDepth(self, depth): """ Set the depth of the filter, i.e. how many recursion will be done when filtering and downsampling """ self.depth = depth def SetLowPassFilter(self, lowpass): """ Set the LowPassFilter for the filterbank. It has to be a callable that transform the samples. It will usually be a FIR """ self.lowPassFilter = lowpass def SetHighPassFilter(self, highpass): """ Set the High pass filter. It is usually a FIR but can be a generic callable that returns the filtered samples """ self.highPassFilter = highpass def SetLowPassInverseFilter(self, lowpass): """ Set the LowPass inverse filter to be used in reconstruction. """ self.lowPassInverseFilter = lowpass def SetHighPassInverseFilter(self, highpass): """ Set the HighPass inverse filter to be used in signal reconstruction """ self.highPassInverseFilter = highpass def SetLength(self, length = None): """ Set the length of the filter, i.e how much will be the delay when recovering """ if length is None: self.length = PRCheck(self) else: self.length = length # print "length set to %d" % self.length def EndEffectLength(self): """Cerchiamo di dare una valutazione di quanto gli effetti di bordo potrebbero rovinare la traccia per preseravare le informazioni finali""" return (-2) * pow(2,self.depth) * len(self.lowPassFilter.GetResponse()) def Split(self, samples): """ Split the samples using the filter provided by the user. It returns a WaveletStack that contains the details and the last average. The original signal can be rebuilt collapsing all these downsampled signals with the Rebuild method. """ samplesStack = WaveletStack() low, end = self._Truncate(samples) samplesStack.PushEndSamples (end) # Non sono proprio sicuro che questo sia il modo migliore per farlo. samplesStack.PushEndEffectSamples (low[self.EndEffectLength():]) # Do the real filtering and downsampling. Store the downsampled # details in the array. for recursion in xrange(0, self.depth): # samplesStack.PushHighSamples (DownSample (self.highPassFilter (low))) # low = DownSample (self.lowPassFilter (low)) samplesStack.PushHighSamples ( filter_and_downsample(low, 2, self.highPassFilter.GetResponse())) low = filter_and_downsample(low, 2, self.lowPassFilter.GetResponse()) # In the end append the low downsampled data to the array samplesStack.PushLowSamples (low) return samplesStack def Rebuild(self, samplesStack): """ Rebuild an array of samples obtained by the Split() method. """ low = samplesStack.PopLowSamples () end = samplesStack.PopEndSamples () counter = 0 # high = samplesStack.PopHighSamples () while(samplesStack.GetHighSamplesNumber() > 0): # Otteniamo l'high samples dallo stack high = samplesStack.PopHighSamples () # E li filtriamo insieme ai low samples. # low = self.lowPassInverseFilter (UpSample (low)) low = upsample_and_filter(low, 2, self.lowPassInverseFilter.GetResponse()) # low += self.highPassInverseFilter (UpSample (high)) low += upsample_and_filter(high, 2, self.highPassInverseFilter.GetResponse()) # Facciamo shiftare l'array in modo che il delay dei sample # ricostruiti non disturbi la ricostruzione dei prossimi. # Sfortunatamente questo ci rovinerà tutta l'ultima parte del # segnale, ma per ora non vedo una soluzione comoda. low = roll(low, -1 * self.length) # Riparo gli eventuali end effect low[self.EndEffectLength():] = samplesStack.PopEndEffectSamples () # Riattacchiamo la coda return concatenate ((low, end)) DaubechiesFilterBank = FilterBank () DaubechiesFilterBank.SetFilterMatrix ( [ 0.125 * array([ 1 + sqrt(3), 3 + sqrt(3), 3 - sqrt(3), 1 - sqrt(3)]), 0.125 * array([ 1 - sqrt(3),-3 + sqrt(3), 3 + sqrt(3),-1 - sqrt(3)]), 0.25 * array([ 1 - sqrt(3), 3 - sqrt(3), 3 + sqrt(3), 1 + sqrt(3)]), 0.25 * array([-1 - sqrt(3), 3 + sqrt(3),-3 + sqrt(3), 1 - sqrt(3)]) ]) DaubechiesFilterBank.SetLength (3) HaarFilterBank = FilterBank () HaarFilterBank.SetFilterMatrix ( [ [0.5, 0.5], [0.5 ,-0.5], [1 , 1 ], [-1 , 1] ]) HaarFilterBank.SetLength(1) StrangFilterBank = FilterBank () StrangFilterBank.SetFilterMatrix ( [ 0.125 * array([-1,2,6,2,-1]), 0.25 * array([1,-2,1]), 0.5 * array([1,2,1]), 0.25 * array([1,2,-6,2,1]) ]) StrangFilterBank.SetLength(3) h0 = StrangFilterBank.lowPassFilter h1 = StrangFilterBank.highPassFilter f0 = StrangFilterBank.lowPassInverseFilter f1 = StrangFilterBank.highPassInverseFilter