代码语言:javascript复制
import numpy as np
from sklearn.datasets import load_digits
from math import sqrt
from numpy import (array,unravel_index,nditer,linalg,
random,subtract,power,exp,pi,zeros,arange,outer,meshgrid,dot)
from collections import defaultdict
from warnings import warn
from sklearn.datasets import load_digits
from pylab import plot, axis, show, pcolor, colorbar, bone
def fast_norm(x):
return sqrt(dot(x,x.T))
class Som:
def __init__(self,x,y,input_len,sigma=1.0,learning_rate=0.5,random_seed=None):
if sigma>=x/2.0 or sigma>=y/2.0:
warn('Warning: sigma is too high for the dimension of the map.')
if random_seed:
self.random_generator=random.RandomState(random_seed)
else:
self.random_generator=random.RandomState(random_seed)
self.learning_rate=learning_rate
self.sigma=sigma
self.weights=self.random_generator.rand(x,y,input_len)*2-1
self.weights=array([v/linalg.norm(v) for v in self.weights])
self.activation_map=zeros((x,y))
self.neigx=arange(x)
self.neigy=arange(y)
self.neighborhood=self.gaussian
def _activate(self,x):
s=subtract(x,self.weights)
it=nditer(self.activation_map,flags=['multi_index'])
while not it.finished:
self.activation_map[it.multi_index]=fast_norm(s[it.multi_index])
it.iternext()
def activate(self,x):
self._activate(x)
return self.activation_map
def gaussian(self,c,sigma):
d=2*pi*sigma*sigma
ax=exp(-power(self.neigx-c[0],2)/d)
ay=exp(-power(self.neigy-c[1],2)/d)
return outer(ax,ay)
def diff_gaussian(self,c,sigma):
xx,yy=meshgrid(self.neigx,self.neigy)
p=power(xx-c[0],2) power(yy-c[1],2)
d=2*pi*sigma*sigma
return exp(-(p)/d)*(1-2/d*p)
def winner(self,x):
self._activate(x)
return unravel_index(self.activation_map.argmin(),self.activation_map.shape)
def update(self,x,win,t):
eta=self.learning_rate/(1 t/self.T)
sig=self.sigma/(1 t/self.T)
g=self.neighborhood(win,sig)*eta
it=nditer(g, flags=['multi_index'])
while not it.finished:
self.weights[it.multi_index] =g[it.multi_index]*(x-self.weights[it.multi_index])
self.weights[it.multi_index]=self.weights[it.multi_index]/fast_norm(self.weights[it.multi_index])
it.iternext()
def quantization(self,data):
q=zeros(data.shape)
for i,x in enumerate(data):
q[i]=self.weights[self.winner(x)]
return q
def random_weights_init(self,data):
it=nditer(self.activation_map, flags=['multi_index'])
while not it.finished:
self.weights[it.multi_index]=data[int(self.random_generator.rand()*len(data)-1)]
self.weights[it.multi_index]=self.weights[it.multi_index]/fast_norm(self.weights[it.multi_index])
it.iternext()
def train_random(self,data,num_iteration):
self._init_T(num_iteration)
for iteration in range(num_iteration):
rand_i=int(round(self.random_generator.rand()*len(data)-1))
self.update(data[rand_i],self.winner(data[rand_i]),iteration)
def train_batch(self,data,num_iteration):
self._init_T(len(data)*num_iteration)
iteration=0
while iteration<num_iteration:
idx=iteration%(len(data)-1)
self.update(data[idx],self.winner(data[idx]),iteration)
iteration =1
def _init_T(self,num_iteration):
self.T=num_iteration/2
def distance_map(self):
um=zeros((self.weights.shape[0],self.weights.shape[1]))
it=nditer(um, flags=['multi_index'])
while not it.finished:
for ii in range(it.multi_index[0]-1,it.multi_index[0] 2):
for jj in range(it.multi_index[1]-1,it.multi_index[1] 2):
if ii>=0 and ii<self.weights.shape[0] and jj>=0 and jj<self.weights.shape[1]:
um[it.multi_index] =fast_norm(self.weights[ii,jj,:]-self.weights[it.multi_index])
it.iternext()
um=um/um.max()
return um
def activation_response(self,data):
a=zeros((self.weights.shape[0],self.weights.shape[1]))
for x in data:
a[self.winner(x)] =1
return a
def quantization_error(self,data):
error=0
for x in data:
error =fast_norm(x-self.weights[self.winner(x)])
return error/len(data)
def win_map(self,data):
winmap=defaultdict(list)
for x in data:
winmap[self.winner(x)].append(x)
return winmap
class TestSom:
def setup_method(self,method):
self.som=Som(5,5,1)
for w in self.som.weights:
assert_almost_equal(1.0,linalg.norm(w))
self.som.weights = zeros((5,5))
self.som.weights[2,3]=5.0
self.som.weights[1,1]=2.0
def test_fast_norm(self):
assert fast_norm(array([1,3]))==sqrt(1 9)
def test_gaussian(self):
bell=self.som.gaussian((2,2),1)
assert bell.max()==1.0
assert bell.argmax()==12
def test_win_map(self):
winners=self.som.win_map([5.0,2.0])
assert winners[(2,3)][0]==5.0
assert winners[(1,1)][0]==2.0
def test_activation_reponse(self):
response=self.som.activation_response([5.0,2.0])
assert response[2,3]==1
assert response[1,1]==1
def test_activate(self):
assert self.som.activate(5.0).argmin()==13.0
def test_quantization_error(self):
self.som.quantization_error([5,2])==0.0
self.som.quantization_error([4,1])==0.5
def test_quantization(self):
q=self.som.quantization(array([4,2]))
assert q[0]==5.0
assert q[1]==2.0
def test_random_seed(self):
som1=Som(5,5,2,sigma=1.0,learning_rate=0.5,random_seed=1)
som2=Som(5,5,2,sigma=1.0,learning_rate=0.5,random_seed=1)
assert_array_almost_equal(som1.weights,som2.weights)
data=random.rand(100,2)
som1=Som(5,5,2,sigma=1.0,learning_rate=0.5,random_seed=1)
som1.train_random(data,10)
som2=Som(5,5,2,sigma=1.0,learning_rate=0.5,random_seed=1)
som2.train_random(data,10)
assert_array_almost_equal(som1.weights,som2.weights)
def test_train_batch(self):
som=Som(5,5,2,sigma=1.0,learning_rate=0.5,random_seed=1)
data=array([[4,2],[3,1]])
q1=som.quantization_error(data)
som.train_batch(data,10)
assert q1>som.quantization_error(data)
def test_train_random(self):
som=Som(5,5,2,sigma=1.0,learning_rate=0.5,random_seed=1)
data=array([[4,2],[3,1]])
q1=som.quantization_error(data)
som.train_random(data,10)
assert q1>som.quantization_error(data)
def test_random_weights_init(self):
som=Som(2,2,2,random_seed=1)
som.random_weights_init(array([[1.0,.0]]))
for w in som.weights:
assert_array_equal(w[0],array([1.0,.0]))
digits=load_digits()
data=digits.data
n_samples,n_features=data.shape
n_digits=len(np.unique(digits.target))
labels=digits.target
som=Som(16,16,64,sigma=1.3,learning_rate=0.3)
som.random_weights_init(data)
som.train_random(data,10000)
bone()
pcolor(som.distance_map().T)
colorbar()
labels[labels=='0']=0
labels[labels=='1']=1
labels[labels=='2']=2
labels[labels=='3']=3
labels[labels=='4']=4
labels[labels=='5']=5
labels[labels=='6']=6
labels[labels=='7']=7
labels[labels=='8']=8
labels[labels=='9']=9
markers=['o','v','1','3','8','s','p','x','D','*']
colors=["r","g","b","y","c",(0,0.1,0.8),(1,0.5,0),(1,1,0.3),"m",(0.4,0.6,0)]
for cnt,xx in enumerate(data):
w=som.winner(xx)
plot(w[0] .5,w[1] .5,markers[labels[cnt]],markerfacecolor='None',markeredgecolor=colors[labels[cnt]],markersize=12,markeredgewidth=2)
axis([0,som.weights.shape[0],0,som.weights.shape[1]])
show()
Initialising SOM. SOM Processing Complete.
算法:自组织映射是基于拓扑表示法的数据降维技术。
链接:https://github.com/alexarnimueller/som