自组织映射

2022-05-29 10:20:08 浏览数 (1)

代码语言:javascript复制
import numpy as np
from sklearn.datasets import load_digits
from math import sqrt
from numpy import (array,unravel_index,nditer,linalg,
random,subtract,power,exp,pi,zeros,arange,outer,meshgrid,dot)
from collections import defaultdict
from warnings import warn
from sklearn.datasets import load_digits
from pylab import plot, axis, show, pcolor, colorbar, bone
def fast_norm(x):
    return sqrt(dot(x,x.T))
class Som:
    def __init__(self,x,y,input_len,sigma=1.0,learning_rate=0.5,random_seed=None):
        if sigma>=x/2.0 or sigma>=y/2.0:
            warn('Warning: sigma is too high for the dimension of the map.')
        if random_seed:
            self.random_generator=random.RandomState(random_seed)
        else:
            self.random_generator=random.RandomState(random_seed)
        self.learning_rate=learning_rate
        self.sigma=sigma
        self.weights=self.random_generator.rand(x,y,input_len)*2-1
        self.weights=array([v/linalg.norm(v) for v in self.weights])
        self.activation_map=zeros((x,y))
        self.neigx=arange(x)
        self.neigy=arange(y)
        self.neighborhood=self.gaussian
    def _activate(self,x):
        s=subtract(x,self.weights)
        it=nditer(self.activation_map,flags=['multi_index'])
        while not it.finished:
            self.activation_map[it.multi_index]=fast_norm(s[it.multi_index])
            it.iternext()
    def activate(self,x):
        self._activate(x)
        return self.activation_map
    def gaussian(self,c,sigma):
        d=2*pi*sigma*sigma
        ax=exp(-power(self.neigx-c[0],2)/d)
        ay=exp(-power(self.neigy-c[1],2)/d)
        return outer(ax,ay)
    def diff_gaussian(self,c,sigma):
        xx,yy=meshgrid(self.neigx,self.neigy)
        p=power(xx-c[0],2) power(yy-c[1],2)
        d=2*pi*sigma*sigma
        return exp(-(p)/d)*(1-2/d*p)
    def winner(self,x):
        self._activate(x)
        return unravel_index(self.activation_map.argmin(),self.activation_map.shape)
    def update(self,x,win,t):
        eta=self.learning_rate/(1 t/self.T)
        sig=self.sigma/(1 t/self.T)
        g=self.neighborhood(win,sig)*eta
        it=nditer(g, flags=['multi_index'])
        while not it.finished:
            self.weights[it.multi_index] =g[it.multi_index]*(x-self.weights[it.multi_index])            
            self.weights[it.multi_index]=self.weights[it.multi_index]/fast_norm(self.weights[it.multi_index])
            it.iternext()
    def quantization(self,data):
        q=zeros(data.shape)
        for i,x in enumerate(data):
            q[i]=self.weights[self.winner(x)]
        return q
    def random_weights_init(self,data):
        it=nditer(self.activation_map, flags=['multi_index'])
        while not it.finished:
            self.weights[it.multi_index]=data[int(self.random_generator.rand()*len(data)-1)]
            self.weights[it.multi_index]=self.weights[it.multi_index]/fast_norm(self.weights[it.multi_index])
            it.iternext()
    def train_random(self,data,num_iteration):        
        self._init_T(num_iteration)        
        for iteration in range(num_iteration):
            rand_i=int(round(self.random_generator.rand()*len(data)-1))
            self.update(data[rand_i],self.winner(data[rand_i]),iteration)
    def train_batch(self,data,num_iteration):
        self._init_T(len(data)*num_iteration)
        iteration=0
        while iteration<num_iteration:
            idx=iteration%(len(data)-1)
            self.update(data[idx],self.winner(data[idx]),iteration)
            iteration =1
    def _init_T(self,num_iteration):
        self.T=num_iteration/2 
    def distance_map(self):
        um=zeros((self.weights.shape[0],self.weights.shape[1]))
        it=nditer(um, flags=['multi_index'])
        while not it.finished:
            for ii in range(it.multi_index[0]-1,it.multi_index[0] 2):
                for jj in range(it.multi_index[1]-1,it.multi_index[1] 2):
                    if ii>=0 and ii<self.weights.shape[0] and jj>=0 and jj<self.weights.shape[1]:
                        um[it.multi_index] =fast_norm(self.weights[ii,jj,:]-self.weights[it.multi_index])
            it.iternext()
        um=um/um.max()
        return um
    def activation_response(self,data):
        a=zeros((self.weights.shape[0],self.weights.shape[1]))
        for x in data:
            a[self.winner(x)] =1
        return a
    def quantization_error(self,data):
        error=0
        for x in data:
            error =fast_norm(x-self.weights[self.winner(x)])
        return error/len(data)
    def win_map(self,data):
        winmap=defaultdict(list)
        for x in data:
            winmap[self.winner(x)].append(x)
        return winmap
class TestSom:
    def setup_method(self,method):
        self.som=Som(5,5,1)
        for w in self.som.weights: 
            assert_almost_equal(1.0,linalg.norm(w))
        self.som.weights = zeros((5,5))
        self.som.weights[2,3]=5.0
        self.som.weights[1,1]=2.0
    def test_fast_norm(self):
        assert fast_norm(array([1,3]))==sqrt(1 9)
    def test_gaussian(self):
        bell=self.som.gaussian((2,2),1)
        assert bell.max()==1.0
        assert bell.argmax()==12
    def test_win_map(self):
        winners=self.som.win_map([5.0,2.0])
        assert winners[(2,3)][0]==5.0
        assert winners[(1,1)][0]==2.0
    def test_activation_reponse(self):
        response=self.som.activation_response([5.0,2.0])
        assert response[2,3]==1
        assert response[1,1]==1
    def test_activate(self):
        assert self.som.activate(5.0).argmin()==13.0 
    def test_quantization_error(self):
        self.som.quantization_error([5,2])==0.0
        self.som.quantization_error([4,1])==0.5
    def test_quantization(self):
        q=self.som.quantization(array([4,2]))
        assert q[0]==5.0
        assert q[1]==2.0
    def test_random_seed(self):
        som1=Som(5,5,2,sigma=1.0,learning_rate=0.5,random_seed=1)
        som2=Som(5,5,2,sigma=1.0,learning_rate=0.5,random_seed=1)
        assert_array_almost_equal(som1.weights,som2.weights)
        data=random.rand(100,2)
        som1=Som(5,5,2,sigma=1.0,learning_rate=0.5,random_seed=1)
        som1.train_random(data,10)
        som2=Som(5,5,2,sigma=1.0,learning_rate=0.5,random_seed=1)
        som2.train_random(data,10)
        assert_array_almost_equal(som1.weights,som2.weights)
    def test_train_batch(self):
        som=Som(5,5,2,sigma=1.0,learning_rate=0.5,random_seed=1)
        data=array([[4,2],[3,1]])
        q1=som.quantization_error(data)
        som.train_batch(data,10)
        assert q1>som.quantization_error(data)
    def test_train_random(self):
        som=Som(5,5,2,sigma=1.0,learning_rate=0.5,random_seed=1)
        data=array([[4,2],[3,1]])
        q1=som.quantization_error(data)
        som.train_random(data,10)
        assert q1>som.quantization_error(data)
    def test_random_weights_init(self):
        som=Som(2,2,2,random_seed=1)
        som.random_weights_init(array([[1.0,.0]]))
        for w in som.weights:
            assert_array_equal(w[0],array([1.0,.0]))
digits=load_digits()
data=digits.data
n_samples,n_features=data.shape
n_digits=len(np.unique(digits.target))
labels=digits.target
som=Som(16,16,64,sigma=1.3,learning_rate=0.3)
som.random_weights_init(data)
som.train_random(data,10000) 
bone()
pcolor(som.distance_map().T) 
colorbar()
labels[labels=='0']=0
labels[labels=='1']=1
labels[labels=='2']=2
labels[labels=='3']=3
labels[labels=='4']=4
labels[labels=='5']=5
labels[labels=='6']=6
labels[labels=='7']=7
labels[labels=='8']=8
labels[labels=='9']=9
markers=['o','v','1','3','8','s','p','x','D','*']
colors=["r","g","b","y","c",(0,0.1,0.8),(1,0.5,0),(1,1,0.3),"m",(0.4,0.6,0)]
for cnt,xx in enumerate(data):
 w=som.winner(xx) 
 plot(w[0] .5,w[1] .5,markers[labels[cnt]],markerfacecolor='None',markeredgecolor=colors[labels[cnt]],markersize=12,markeredgewidth=2)
axis([0,som.weights.shape[0],0,som.weights.shape[1]])
show()

Initialising SOM. SOM Processing Complete.

算法:自组织映射是基于拓扑表示法的数据降维技术。

链接:https://github.com/alexarnimueller/som

0 人点赞