作者:M. Fauzan Alfariz
编译:东岸因为@一点人工一点智能
FastSLAM是用于机器人技术的一种主流的实时执行同步定位和建图算法,包括自动驾驶汽车、自动化机器人和无人机 (UAV)。FastSLAM 将粒子过滤器与基于特征的建图相结合,以实现高效准确的定位和建图功能。
以下是这些应用程序的 FastSLAM 算法的高级介绍:
01 FastSLAM组件
FastSLAM算法由两个主要组成部分组成:粒子滤波器和建图算法。
粒子滤波器是FastSLAM算法的核心。它维护一组粒子,每个粒子代表对环境的假设姿态和地图。当机器人移动并观察周围环境时,粒子被更新。
02 粒子滤波器
步骤1:初始化:
· 创建一组粒子,每个粒子代表车辆或机器人的潜在状态。
· 为每个粒子初始化一个随机姿态,并分配初始权重。
步骤2:预测步骤:
· 根据运动模型和控制输入预测每个粒子的新姿势。
· 从运动模型中采样,更新粒子的姿态。
步骤3:更新步骤:
· 接收传感器测量数据,如激光雷达或相机数据。
· 对于每个粒子:
1)使用最近邻或数据关联算法将观测到的特征与地图中的地标进行关联。
2)根据预测的地图和传感器模型计算观测到的特征的概率。
3)根据概率更新粒子的权重。
步骤4:重采样:
· 根据粒子的权重对粒子进行重采样,以确保具有多样性的粒子集合准确地代表后验分布。
· 使用系统重采样或随机通用采样等技术,根据粒子的权重选择粒子。
步骤5:地图更新:
· 使用粒子的姿态和相应的测量值的加权平均值,更新地图。
· 对现有地标和新观测之间进行数据关联。
步骤6:循环:
为了保持实时定位和建图,重复步骤2-5来处理后续时间步骤。
03 建图算法
占位栅格地图绘制(Occupancy Grid Mapping)
· 占位栅格地图将环境表示为一个网格,其中每个单元格被分类为占位、未占位或未知。
· 诸如贝叶斯滤波器、FastSLAM和基于栅格的FastSLAM等算法可以根据传感器测量和运动模型来更新占位概率。
路网地图绘制算法(Roadmap Mapping Algorithm):
1)构建地图:地图绘制算法获取由粒子滤波器生成的一组粒子,并使用它们来构建环境地图。
2)地图合并:如果多个粒子具有相似的地图,地图绘制算法将它们合并以提高整体地图质量。
3)环路闭合:地图绘制算法通过检测机器人何时重访先前观察过的位置来检测和纠正地图中的错误。
使用传感器生成的地图和估计的机器人路径,其中(a)是大感知领域,(b)是小感知领域。
代码语言:javascript复制import json
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm
from Utils.OccupancyGrid import OccupancyGrid
from Utils.ScanMatcher_OGBased import ScanMatcher
import math
import copy
class ParticleFilter:
def __init__(self, numParticles, ogParameters, smParameters):
'''
__init__ : method initializes the particle filter with the
number of particles (numParticles), original (OG) parameters,
and sensor model (SM) parameters.
'''
self.numParticles = numParticles
self.particles = []
''' Call the initParticles method to initialize particles '''
self.initParticles(ogParameters, smParameters)
self.step = 0
''' Variable to store the previous matched reading '''
self.prevMatchedReading = None
''' Variable to store the previous raw reading '''
self.prevRawReading = None
self.particlesTrajectory = []
def initParticles(self, ogParameters, smParameters):
''' Iterate over the range of the number of particles '''
for i in range(self.numParticles):
''' Create a new particle with the given OG and SM parameters '''
p = Particle(ogParameters, smParameters)
self.particles.append(p)
def updateParticles(self, reading, count):
''' Iterate over the range of the number of particles '''
for i in range(self.numParticles):
''' For each iteration, it calls the update method of the particle
at index i in the list of particles (self.particles[i]),
passing the reading and count as arguments.
'''
self.particles[i].update(reading, count)
def weightUnbalanced(self):
self.normalizeWeights()
variance = 0
for i in range(self.numParticles):
''' Calculate the quared difference between the particle's weight and the expected weight '''
diff = self.particles[i]/weight - 1 / self.numParticles
squared_dif = diff * 2
''' Accumulate the squared difference to calculate the variance'''
variance = squared_diff
if variance > ((self.numParticles - 1) / self.numParticles)**2 (self.numParticles - 1.000000000000001) * (1 / self.numParticles)**2:
return True
else:
return False
''' Calculate the sum of all particle weights '''
def normalizeWeights(self):
weightSum = 0
''' iterate over the rane of particle indices'''
for i in range(self.numParticles):
''' Add the weight of the particles at index 'i' to the weightSum'''
weightSum = self.particles[i].weight
''' Divide each particle's weight by the total weight sum to normalize them '''
for i in range(self.numParticles):
self.particles[i].weight = self.particles[i].weight / weightSum
def resample(self):
# for particle in self.particles:
# particle.plotParticle()
# print(particle.weight)
weights = np.zeros(self.numParticles)
tempParticles = []
for i in range(self.numParticles):
weights[i] = self.particles[i].weight
tempParticles.append(copy.deepcopy(self.particles[i]))
resampledParticlesIdx = np.random.choice(np.arange(self.numParticles), self.numParticles, p=weights)
for i in range(self.numParticles):
self.particles[i] = copy.deepcopy(tempParticles[resampledParticlesIdx[i]])
self.particles[i].weight = 1 / self.numParticles
class Particle:
def __init__(self, ogParameters, smParameters):
initMapXLength, initMapYLength, initXY, unitGridSize, lidarFOV, lidarMaxRange, numSamplesPerRev, wallThickness = ogParameters
scanMatchSearchRadius, scanMatchSearchHalfRad, scanSigmaInNumGrid, moveRSigma, maxMoveDeviation,
turnSigma, missMatchProbAtCoarse, coarseFactor = smParameters
og = OccupancyGrid(initMapXLength, initMapYLength, initXY, unitGridSize, lidarFOV, numSamplesPerRev, lidarMaxRange, wallThickness)
sm = ScanMatcher(og, scanMatchSearchRadius, scanMatchSearchHalfRad, scanSigmaInNumGrid, moveRSigma, maxMoveDeviation, turnSigma, missMatchProbAtCoarse, coarseFactor)
self.og = og
self.sm = sm
self.xTrajectory = []
self.yTrajectory = []
self.weight = 1
def updateEstimatedPose(self, currentRawReading):
estimatedTheta = self.prevMatchedReading['theta'] currentRawReading['theta'] - self.prevRawReading['theta']
estimatedReading = {'x': self.prevMatchedReading['x'], 'y': self.prevMatchedReading['y'], 'theta': estimatedTheta,
'range': currentRawReading['range']}
dx, dy = currentRawReading['x'] - self.prevRawReading['x'], currentRawReading['y'] - self.prevRawReading['y']
estMovingDist = math.sqrt(dx ** 2 dy ** 2)
rawX, rawY, prevRawX, prevRawY = currentRawReading['x'], currentRawReading['y'], self.prevRawReading['x'],
self.prevRawReading['y']
rawXMove, rawYMove = rawX - prevRawX, rawY - prevRawY
rawMove = math.sqrt((rawX - prevRawX) ** 2 (rawY - prevRawY) ** 2)
if rawMove > 0.3:
if self.prevRawMovingTheta != None:
if rawYMove > 0:
rawMovingTheta = math.acos(rawXMove / rawMove) # between -pi and pi
else:
rawMovingTheta = -math.acos(rawXMove / rawMove)
rawTurnTheta = rawMovingTheta - self.prevRawMovingTheta
estMovingTheta = self.prevMatchedMovingTheta rawTurnTheta
else:
if rawYMove > 0:
rawMovingTheta = math.acos(rawXMove / rawMove) # between -pi and pi
else:
rawMovingTheta = -math.acos(rawXMove / rawMove)
estMovingTheta = None
else:
rawMovingTheta = None
estMovingTheta = None
return estimatedReading, estMovingDist, estMovingTheta, rawMovingTheta
def getMovingTheta(self, matchedReading):
x, y, theta, range = matchedReading['x'], matchedReading['y'], matchedReading['theta'], matchedReading['range']
prevX, prevY = self.xTrajectory[-1], self.yTrajectory[-1]
xMove, yMove = x - prevX, y - prevY
move = math.sqrt(xMove ** 2 yMove ** 2)
if move != 0:
if yMove > 0:
movingTheta = math.acos(xMove / move)
else:
movingTheta = -math.acos(xMove / move)
else:
movingTheta = None
return movingTheta
def update(self, reading, count):
if count == 1:
self.prevRawMovingTheta, self.prevMatchedMovingTheta = None, None
matchedReading, confidence = reading, 1
else:
currentRawReading = reading
estimatedReading, estMovingDist, estMovingTheta, rawMovingTheta = self.updateEstimatedPose(currentRawReading)
matchedReading, confidence = self.sm.matchScan(estimatedReading, estMovingDist, estMovingTheta, count, matchMax=False)
self.prevRawMovingTheta = rawMovingTheta
self.prevMatchedMovingTheta = self.getMovingTheta(matchedReading)
self.updateTrajectory(matchedReading)
self.og.updateOccupancyGrid(matchedReading)
self.prevMatchedReading, self.prevRawReading = matchedReading, reading
self.weight *= confidence
def updateTrajectory(self, matchedReading):
x, y = matchedReading['x'], matchedReading['y']
self.xTrajectory.append(x)
self.yTrajectory.append(y)
def plotParticle(self):
plt.figure(figsize=(19.20, 19.20))
plt.scatter(self.xTrajectory[0], self.yTrajectory[0], color='r', s=500)
colors = iter(cm.rainbow(np.linspace(1, 0, len(self.xTrajectory) 1)))
for i in range(len(self.xTrajectory)):
plt.scatter(self.xTrajectory[i], self.yTrajectory[i], color=next(colors), s=35)
plt.scatter(self.xTrajectory[-1], self.yTrajectory[-1], color=next(colors), s=500)
plt.plot(self.xTrajectory, self.yTrajectory)
self.og.plotOccupancyGrid([-13, 20], [-25, 7], plotThreshold=False)
def processSensorData(pf, sensorData, plotTrajectory = True):
# gtData = readJson("../DataSet/PreprocessedData/intel_corrected_log") ######### For Debug Only #############
count = 0
plt.figure(figsize=(19.20, 19.20))
for key in sorted(sensorData.keys()):
count = 1
print(count)
pf.updateParticles(sensorData[key], count)
if pf.weightUnbalanced():
pf.resample()
print("resample")
plt.figure(figsize=(19.20, 19.20))
maxWeight = -1
for particle in pf.particles:
if maxWeight < particle.weight:
maxWeight = particle.weight
bestParticle = particle
plt.plot(particle.xTrajectory, particle.yTrajectory)
xRange, yRange = [-13, 20], [-25, 7]
ogMap = bestParticle.og.occupancyGridVisited / bestParticle.og.occupancyGridTotal
xIdx, yIdx = bestParticle.og.convertRealXYToMapIdx(xRange, yRange)
ogMap = ogMap[yIdx[0]: yIdx[1], xIdx[0]: xIdx[1]]
ogMap = np.flipud(1 - ogMap)
plt.imshow(ogMap, cmap='gray', extent=[xRange[0], xRange[1], yRange[0], yRange[1]])
plt.savefig('../Output/' str(count).zfill(3) '.png')
plt.close()
#if count == 100:
# break
maxWeight = 0
for particle in pf.particles:
particle.plotParticle()
if maxWeight < particle.weight:
maxWeight = particle.weight
bestParticle = particle
bestParticle.plotParticle()
def readJson(jsonFile):
with open(jsonFile, 'r') as f:
input = json.load(f)
return input['map']
def main():
initMapXLength, initMapYLength, unitGridSize, lidarFOV, lidarMaxRange = 50, 50, 0.02, np.pi, 10 # in Meters
scanMatchSearchRadius, scanMatchSearchHalfRad, scanSigmaInNumGrid, wallThickness, moveRSigma, maxMoveDeviation, turnSigma,
missMatchProbAtCoarse, coarseFactor = 1.4, 0.25, 2, 5 * unitGridSize, 0.1, 0.25, 0.3, 0.15, 5
sensorData = readJson("../DataSet/PreprocessedData/intel_gfs")
numSamplesPerRev = len(sensorData[list(sensorData)[0]]['range']) # Get how many points per revolution
initXY = sensorData[sorted(sensorData.keys())[0]]
numParticles = 10
ogParameters = [initMapXLength, initMapYLength, initXY, unitGridSize, lidarFOV, lidarMaxRange, numSamplesPerRev, wallThickness]
smParameters = [scanMatchSearchRadius, scanMatchSearchHalfRad, scanSigmaInNumGrid, moveRSigma, maxMoveDeviation, turnSigma,
missMatchProbAtCoarse, coarseFactor]
pf = ParticleFilter(numParticles, ogParameters, smParameters)
processSensorData(pf, sensorData, plotTrajectory=True)
if __name__ == '__main__':
main()