写在前面
聚类的学习过程总是很快乐的,因为真的太简单了!
1.基本概念
初始设定两个值:minPts以及半径r。
- 核心对象:若一个点的r邻域内点的个数大于等于minPts,我们就称该点为一个核心对象。
- 邻域的距离阈值:r。
- 直接密度可达:若某点p在核心点q的邻域内,则称p-q直接可达。
- 密度可达:若有一个点序列:q0,q1,q2,...,qk,对序列里任意两个相邻的点都是直接可达的,则称从q0到qk密度可达。
2.基本流程
- 任意选择一个未被访问的点p,并将该点标记为已访问。
- 如果p的邻域内点的个数大于mminPts(核心对象),则初始化一个簇C,将p以及p领域内的点加入到C中。
- 遍历C中每个点,如果有未被访问的,将其标记为已访问。如果该点也是核心对象,则同样将该点邻域内的点加入到C中。
- 重复步骤3直到C中不再存在没被访问的核心对象,将簇C加入到一个集合final中。
- 重复步骤1234直到没有核心点未被标记,剩余的点标记为噪声点。
- 输出final与噪声点。
3.代码与实现效果
代码语言:javascript复制import matplotlib.pyplot as plt
minPts = 5 #最小个数
epsilon = 1.0 #半径
color = ['red', 'black', 'blue', 'orange']
visited = []
C = [] #保存最终的聚类结果
noise = [] #噪声点
x = []
y = []
data = open('聚类数据集/dataset.txt')
for line in data.readlines():
x.append(float(line.strip().split('t')[0]))
y.append(float(line.strip().split('t')[1]))
for i in range(len(x)): #初始化标记数组
visited.append(False)
def judge(): #判断是否还存在核心点未被标记
for i in range(len(x)):
if visited[i]:
continue
cnt, lis = countObject(x, y, i)
if cnt >= minPts:
return True
return False
def select(): #选择一个没被标记的点
for i in range(len(visited)):
if not visited[i]:
return i
return -1
def countObject(x, y, p): #计算点p邻域的内点的个数
cnt = 0
lis = []
for i in range(len(x)):
if i == p:
continue
if (x[i] - x[p]) ** 2 (y[i] - y[p]) ** 2 <= epsilon ** 2:
cnt = 1
lis.append(i)
return cnt, lis
def check(c):
for i in c:
if visited[i]:
continue
cnt, lis = countObject(x,y , i)
if cnt >= minPts:
return True
return False
def dbscan():
while judge(): #判断是否还存在核心点未被标记
p = select() #选择一个没被访问的点
visited[p] = True
cnt, lis = countObject(x, y, p)
if cnt >= minPts:
c = []
c.append(p)
for i in lis:
c.append(i)
while(check(c)): #至少有一个点没被访问且该点领域内至少minPts个点
for i in c:
if not visited[i]:
visited[i] = True
cnt1, lis1 = countObject(x, y, i)
if cnt >= minPts:
for j in lis1:
c.append(j)
C.append(c)
for i in range(len(visited)):
if not visited[i]:
noise.append(i)
return C
if __name__ == '__main__':
cluster = dbscan()
X = []
Y = []
for i in noise:
X.append(x[i])
Y.append(y[i])
plt.scatter(X, Y, c='m', marker='D') # 噪声点
plt.legend(['noise'])
for i in range(len(cluster)):
X = []
Y = []
for j in cluster[i]:
X.append(x[j])
Y.append(y[j])
plt.scatter(X, Y, c=color[i], alpha=1, s=50)
plt.title('dbscan')
plt.show()