双向交叉场景的交通流仿真
公路匝道入口场景的交通流仿真
1. 交通流模型
分析和优化交通系统,首先要对交通系统进行数学建模,根据路网几何、每分钟车辆、车速等参数对交通流量进行模拟。
交通流系统模型通常分为三类:
- 微观模型(Microscopic Models):关注每辆车的行为,并尝试复现驾驶员行为。
微观模型
- 宏观模型(Macroscopic models):从交通密度(每公里车辆)和交通流量(车辆每分钟)的角度描述车辆的整体移动。
宏观模型
- 中观模型(Mesoscopic models):是结合微观和宏观模型特点的混合模型。
本文主要研究微观模型(Microscopic Models)。
2. 微观交通流模型
在微观模型中,使用驾驶员模型描述单个驾驶员/车辆的行为,因此,它必须是一个multi-agent系统。
3. 智能驾驶员模型(IDM)
2000年,Treiber, Hennecke 和 Helbing开发了智能驾驶员(Intelligent Driver Model,IDM)模型,它将第i辆车的加速度描述为如下自身变量和前面车辆变量的函数:
4. 智能驾驶员模型的工作原理
为了更好的理解这个等式,我们可以将其分为两个部分:自由道路加速度和交互加速度。
4.1 自由道路加速度
5. 交通道路网络模型
我们使用有向图G=(V, E)来表达道路网络。每辆车沿着一条由多个路段(Edge)组成的路径网络行驶。
6. 随机车辆发生器
为了将车辆添加模拟器中,我们有两种方法:
- 手动创建Vehicle实例,并添加到车辆列表中;
- 根据预先定义的概率,随机添加车辆;
对于第二种方法,我们必须定义一个随机车辆发生器(Vehicle Generator)。
号灯
红绿灯包含两个区域:
减速区(Slow Down Zone): 车辆使用减速系数减速的区域。
停车区(Stop Zone): 车辆停车的区域, 通过下面的运动学方程实现:
8. Python实现
8.1. Road实现
Road由一系列(start, end)组成的直线段连接而成。
代码语言:javascript复制from scipy.spatial import distance
class Road:
def __init__(self, start, end):
self.start = start
self.end = end
self.init_porperties()
def init_properties(self):
self.length = distance.euclidean(self.start, self.end)
self.angle_sin = (self.end[1]-self.start[1]) / self.length
self.angle_cos = (self.end[0]-self.start[0]) / self.length
Road元素添加到Simulator。
代码语言:javascript复制from .road import Road
class Simulation:
def __init__(self, config={}):
# Set default configuration
self.set_default_config()
# Update configuration
for attr, val in config.items():
setattr(self, attr, val)
def set_default_config(self):
self.t = 0.0 # Time keeping
self.frame_count = 0 # Frame count keeping
self.dt = 1/60 # Simulation time step
self.roads = [] # Array to store roads
def create_road(self, start, end):
road = Road(start, end)
self.roads.append(road)
return road
def create_roads(self, road_list):
for road in road_list:
self.create_road(*road)
在实际应用中,创建仿真环境中的道路场景。
代码语言:javascript复制from trafficSimulator import *
# Create simulation
sim = Simulation()
# Add one road
sim.create_road((300, 98), (0, 98))
# Add multiple roads
sim.create_roads([
((300, 98), (0, 98)),
((0, 102), (300, 102)),
((180, 60), (0, 60)),
((220, 55), (180, 60)),
((300, 30), (220, 55)),
((180, 60), (160, 98)),
((158, 130), (300, 130)),
((0, 178), (300, 178)),
((300, 182), (0, 182)),
((160, 102), (155, 180))
])
# Start simulation
win = Window(sim)
win.loop()
效果如下:
8.2 Vehicle实现
基于位置的泰勒展开:
基于速度的泰勒展开:
在每个迭代(或帧)中,使用IDM公式计算加速度后,我们将使用以下两个方程更新位置和速度:
代码如下:
代码语言:javascript复制self.a = ... # IDM formula
self.v = self.a * dt
self.x = self.v * dt self.a * dt * dt / 2
由于这只是一个近似值,速度有时可能会变为负值(但模型不允许这样),当速度为负时,会产生不稳定性,位置和速度会分化为负无穷大。
为了解决这个问题,当预测负速度时,我们就会将其设定为零。
代码如下:
代码语言:javascript复制if self.v self.a * dt < 0:
self.x -= 1/2 * self.v * self.v / self.a
self.v = 0
else:
self.v = self.a * dt
self.x = self.v * dt self.a * dt * dt / 2
要计算 IDM 加速度,我们将引导车辆表示为lead,并当lead不是None时,计算一个交互项。
代码语言:javascript复制alpha = 0
if lead:
delta_x = lead.x - self.x - lead.l
delta_v = self.v - lead.v
alpha = (self.s0 max(0, self.T * self.v delta_v * self.v / self.sqrt_ab)) / delta_x
self.a = self.a_max * (1 - (self.v / self.v_max)**4 - alpha**2)
如果车辆停止(例如在红绿灯处),我们将使用阻尼方程:
代码语言:javascript复制if self.stopped:
self.a = -self.b_max * self.v / self.v_max
然后,我们将所有内容整合在Vehicle类的update方法中:
代码语言:javascript复制import numpy as np
class Vehicle:
def __init__(self, config={}):
# Set default configuration
self.set_default_config()
for attr, val in config.items():
setattr(self, attr, val)
# Calculate properties
self.init_properties()
def set_default_config(self):
self.l = 4
self.s0 = 4
self.T = 1
self.v_max = 16.6
self.a_max = 1.44
self.b_max = 4.61
self.path = []
self.current_road_index = 0
self.x = 0
self.v = self.v_max
self.a = 0
self.stopped = False
def init_properties(self):
self.sqrt_ab = 2*np.sqrt(self.a_max*self.b_max)
self._v_max = self.v_max
def update(self, lead, dt):
if self.v self.a*dt < 0:
self.x -= 1/2*self.v*self.v/self.a
self.v = 0
else:
self.v = self.a*dt
self.x = self.v*dt self.a*dt*dt/2
# Update acceleration
alpha = 0
if lead:
delta_x = lead.x - self.x - lead.l
delta_v = self.v - lead.v
alpha = (self.s0 max(0, self.T*self.v delta_v*self.v/self.sqrt_ab)) / delta_x
self.a = self.a_max * (1-(self.v/self.v_max)**4 - alpha**2)
if self.stopped:
self.a = -self.b_max*self.v/self.v_max
def stop(self):
self.stopped = True
def unstop(self):
self.stopped = False
def slow(self, v):
self.v_max = v
def unslow(self):
self.v_max = self._v_max
在Road类中,我们将添加一个deque(双端队列)来跟踪车辆。队列是存储车辆的较好的数据结构,因为队列中的第一辆车是路上最远的车辆,它是第一个可以从队列中删除的车辆。
要从deque中删除第一个车辆,可以使用self.vehicles.popleft() 。
在Road类中添加一个update方法:
代码语言:javascript复制def update(self, dt):
n = len(self.vehicles)
if n > 0:
# Update first vehicle
self.vehicles[0].update(None, dt)
for i in range(1, n):
lead = self.vehicles[i-1]
self.vehicles[i].update(lead, dt)
在Simulation类中添加一个update方法:
代码语言:javascript复制def update(self):
for road in self.roads:
road.update(self.dt)
for gen in self.generators:
gen.update()
for signal in self.traffic_signals:
signal.update(self)
for road in self.roads:
if len(road.vehicles) == 0:
continue
vehicle = road.vehicles[0]
if vehicle.x >= road.length:
if vehicle.current_road_index 1 < len(vehicle.path):
vehicle.current_road_index = 1
new_vehicle = deepcopy(vehicle)
new_vehicle.x = 0
next_road_index = vehicle.path[vehicle.current_road_index]
self.roads[next_road_index].vehicles.append(new_vehicle)
road.vehicles.popleft()
self.t = self.dt
self.frame_count = 1
回到Window类,添加了一个run方法来实时更新仿真:
代码语言:javascript复制def run(self, steps_per_update=1):
def loop(sim):
sim.run(steps_per_update)
self.loop(loop)
现在我们将手动添加车辆:
代码语言:javascript复制sim.roads[4].vehicles.append(
Vehicle({
"path": [4, 3, 2]
})
)
sim.roads[0].vehicles.append(Vehicle())
sim.roads[1].vehicles.append(Vehicle())
sim.roads[6].vehicles.append(Vehicle())
sim.roads[7].vehicles.append(Vehicle())
8.3 车辆生成器代码
代码语言:javascript复制from .vehicle import Vehicle
from numpy.random import randint
class VehicleGenerator:
def __init__(self, sim, config={}):
...
def set_default_config(self):
self.vehicle_rate = 20
self.vehicles = [
(1, {})
]
def init_properties(self):
self.upcoming_vehicle = self.generate_vehicle()
def generate_vehicle(self):
"""Returns a random vehicle from self.vehicles with random proportions"""
...
def update(self):
"""Add vehicles"""
...
VehicleGenerator包含(odds, vehicle)元组的列表,元组第一个元素是在同一元组中生成车辆的权重(不是概率)。
例如,如果我们有3辆车权重分别为1、3、2,这相当于生成概率分别为1/6、3/6、2/6 。
为了实现这一点,我们使用以下算法
- 生成1到权重和之间的数字r。
- 当r是非负数,循环所有可能的车辆,并在每次迭代时减去其权重,返回最后使用的车辆。
def generate_vehicle(self):
total = sum(pair[0] for pair in self.vehicles)
r = randint(1, total 1)
for (weight, config) in self.vehicles:
r -= weight
if r <= 0:
return Vehicle(config)
每次生成器添加车辆时,last_added_time属性都会更新到当前时间。当当前时间和last_added_time之差值大于车辆生成周期时,添加车辆。
此外,还必须检查道路是否还有空间来添加即将行驶的车辆。我们通过检查道路上最后一辆车之间的距离和即将行驶的车辆的长度和安全距离的总和来做到这一点。
代码语言:javascript复制def update(self):
if self.sim.t - self.last_added_time >= 60 / self.vehicle_rate:
# If time elasped after last added vehicle is
# greater than vehicle_period; generate a vehicle
road = self.sim.roads[self.upcoming_vehicle.path[0]]
if len(road.vehicles) == 0
or road.vehicles[-1].x > self.upcoming_vehicle.s0 self.upcoming_vehicle.l:
# If there is space for the generated vehicle; add it
self.upcoming_vehicle.time_added = self.sim.t
road.vehicles.append(self.upcoming_vehicle)
# Reset last_added_time and upcoming_vehicle
self.last_added_time = self.sim.t
self.upcoming_vehicle = self.generate_vehicle()
最后,我们应该通过调用Simulation 的update方法来更新车辆生成。
代码语言:javascript复制sim.create_gen({
'vehicle_rate': 60,
'vehicles': [
[1, {"path": [4, 3, 2]}],
[1, {"path": [0]}],
[1, {"path": [1]}],
[1, {"path": [6]}],
[1, {"path": [7]}]
]
})
8.4 红绿灯实现
交通信号灯的默认属性是:
代码语言:javascript复制
class TrafficSignal:
def __init__(self, roads, config={}):
# Initialize roads
self.roads = roads
self.set_default_config()
for attr, val in config.items():
setattr(self, attr, val)
# Calculate properties
self.init_properties()
def set_default_config(self):
self.cycle = [(False, True), (True, False)]
self.slow_distance = 40
self.slow_factor = 10
self.stop_distance = 15
self.current_cycle_index = 0
self.last_t = 0
self.cycle是self.roads 的一个数组,每个元组包含道路的状态(True绿色,False红色)。
在默认配置中,(False, True)表示第一组道路为红色,第二组道路为绿色。 (True, False) 则恰恰相反。
所以使用此方法,是因为它易于扩展,可以用于创建各种红绿灯,包括超过2条道路、带有左右转弯单独信号的红绿灯,甚至用于多个交叉路口的同步交通信号灯。
交通信号灯的update函数是可定制的。其默认行为是对称的固定时间循环。
代码语言:javascript复制def init_properties(self):
for i in range(len(self.roads)):
for road in self.roads[i]:
road.set_traffic_signal(self, i)
def current_cycle(self):
return self.cycle[self.current_cycle_index]
def update(self, sim):
cycle_length = 30
k = (sim.t // cycle_length) % 2
self.current_cycle_index = int(k)
在Road类添加以下方法:
代码语言:javascript复制def set_traffic_signal(self, signal, group):
self.traffic_signal = signal
self.traffic_signal_group = group
self.has_traffic_signal = True
代码语言:javascript复制def traffic_signal_state(self):
if self.has_traffic_signal:
i = self.traffic_signal_group
return self.traffic_signal.current_cycle[i]
return True
更新在Road的update方法。
代码语言:javascript复制if self.traffic_signal_state:
# If traffic signal is green or doesn't exist
# Then let vehicles pass
self.vehicles[0].unstop()
for vehicle in self.vehicles:
vehicle.unslow()
else:
if self.vehicles[0].x >= self.length - self.traffic_signal.slow_distance:
# Slow vehicles in slowing zone
self.vehicles[0].slow(self.traffic_signal.slow_speed)
if self.vehicles[0].x >= self.length - self.traffic_signal.stop_distance and
self.vehicles[0].x <= self.length - self.traffic_signal.stop_distance / 2:
# Stop vehicles in the stop zone
self.vehicles[0].stop()
在Simulation 的update 方法中检查交通灯状态:
代码语言:javascript复制for signal in self.traffic_signals:
signal.update(self)
8.5 曲线实现
在现实世界中,道路有曲线。虽然从技术上讲,我们可以通过手写很多道路的坐标来接近曲线来创建此模拟中的曲线,但我们可以在程序上实现同样的事情。
我们将使用贝赛尔曲线来达到这个效果。
代码语言:javascript复制def curve_points(start, end, control, resolution=5):
if (start[0] - end[0])*(start[1] - end[1]) == 0:
return [start, end]
# If not return a curve
path = []
for i in range(resolution 1):
t = i/resolution
x = (1-t)**2 * start[0] 2*(1-t)*t * control[0] t**2 *end[0]
y = (1-t)**2 * start[1] 2*(1-t)*t * control[1] t**2 *end[1]
path.append((x, y))
return path
def curve_road(start, end, turn_direction, resolution=15):
points = curve_points(start, end, turn_direction, resolution)
return [(points[i-1], points[i]) for i in range(1, len(points))]
测试:
代码语言:javascript复制from trafficSimulator import *
# Create simulation
sim = Simulation()
# Add multiple roads
sim.create_roads([
((0, 100), (140, 100)),
((150, 110), (150, 200)),
*curve_road((140, 100), (150, 110), (150, 100))
])
sim.create_gen({
'vehicle_rate': 20,
'vehicles': [
[1, {"path": [0, *range(2, 17), 1]}]
]
})
# Start simulation
win = Window(sim)
win.run(steps_per_update=5)
9. 其它示例场景
环形交叉路口场景
钻石型分流交叉路口
本文完整代码地址:
https://github.com/BilHim/trafficSimulator