Leaf——美团点评分布式ID生成系统 https://github.com/Meituan-Dianping/Leaf
数据库号段发号
线程池设置线程编号
代码语言:javascript复制public static class UpdateThreadFactory implements ThreadFactory {
private static int threadInitNumber = 0;
private static synchronized int nextThreadNum() {
return threadInitNumber ;
}
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Thread-Segment-Update-" nextThreadNum());
}
}
加载号段 DoubleCheck
代码语言:javascript复制 if (!buffer.isInitOk()) {
synchronized (buffer) {
if (!buffer.isInitOk()) {
try {
updateSegmentFromDb(key, buffer.getCurrent());
logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent());
buffer.setInitOk(true);
} catch (Exception e) {
logger.warn("Init buffer {} exception", buffer.getCurrent(), e);
}
}
}
}
动态调整步长
代码语言:javascript复制 long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
int nextStep = buffer.getStep();
// 15分钟内更新号段,就放大步长
if (duration < SEGMENT_DURATION) {
if (nextStep * 2 > MAX_STEP) {
// do nothing
} else {
nextStep = nextStep * 2;
}
} else if (duration < SEGMENT_DURATION * 2) {
// do nothing with nextStep
} else {
// 30分钟以上才更新号段,就缩小步长,避免停机造成大量号段浪费
// 最小步长是数据库里配置的
nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
}
预加载号段
代码语言:javascript复制 buffer.rLock().lock();
try {
final Segment segment = buffer.getCurrent();
if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) {
service.execute(new Runnable() {
- 防止当前号段用尽时阻塞等待加载新号段
- 对共享变量的判断,需要加读写锁,保证线程安全
- 预加载条件
- 下一个号段没有就绪
- 剩余号小于90%
- CAS成功后去异步加载
修改多个共享变量加锁
代码语言:javascript复制buffer.wLock().lock();
buffer.setNextReady(true);
buffer.getThreadRunning().set(false);
buffer.wLock().unlock();
获取锁后 DoubleCheck
代码语言:javascript复制 buffer.wLock().lock();
try {
final Segment segment = buffer.getCurrent();
long value = segment.getValue().getAndIncrement();
if (value < segment.getMax()) {
return new Result(value, Status.SUCCESS);
}
优化轮询等待
代码语言:javascript复制 private void waitAndSleep(SegmentBuffer buffer) {
int roll = 0;
while (buffer.getThreadRunning().get()) {
roll = 1;
if (roll > 10000) {
try {
TimeUnit.MILLISECONDS.sleep(10);
break;
} catch (InterruptedException e) {
logger.warn("Thread {} Interrupted", Thread.currentThread().getName());
break;
}
}
}
}
- 先循环1w次,超过1w次时,每次sleep 10 ms
- 循环不耗时,主要看循环内部的代码。例如10亿次循环累加才几百毫秒
- 等待分为正常情况和异常情况
- 正常情况很快,所以轮询请求
- 异常情况很慢,例如数据库异常、网络异常,所以需要sleep,让出cpu
类snowflake算法
位运算
代码语言:javascript复制 private final long workerIdBits = 10L;
private final long maxWorkerId = ~(-1L << workerIdBits);//最大能够分配的workerid =1023
使用位运算提高效率
使用负数 非运算~
代替减 1 操作
- 还可以用与运算(&)判断是否是偶数
- n % 2 == 0 等价于 (n & 1) == 0 ,注意由于==的优先级高于&,所以需要给与运算加上小括号。
public int nextPos() {
return (currentPos 1) % 2;
}
public int nextPos() {
return (currentPos 1) & 1;
}
id生成提供默认实现
代码语言:javascript复制public class ZeroIDGen implements IDGen {
@Override
public Result get(String key) {
return new Result(0, Status.SUCCESS);
}
@Override
public boolean init() {
return true;
}
}
public SnowflakeService() throws InitException {
Properties properties = PropertyFactory.getProperties();
boolean flag = Boolean.parseBoolean(properties.getProperty(Constants.LEAF_SNOWFLAKE_ENABLE, "true"));
if (flag) {
// init
} else {
idGen = new ZeroIDGen();
logger.info("Zero ID Gen Service Init Successfully");
}
}
- 当用户未正确配置Leaf时,提供一个不影响Leaf运行,但不能使用的id生成器
非主线程作为守护线程运行
代码语言:javascript复制 private void ScheduledUploadData(final CuratorFramework curator, final String zk_AddressNode) {
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "schedule-upload-time");
thread.setDaemon(true);
return thread;
}
}).scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
updateNewData(curator, zk_AddressNode);
}
}, 1L, 3L, TimeUnit.SECONDS);//每3s上报数据
}
- 当所有非守护线程停止运行时,jvm就会退出
- 如果不以守护线程运行,即使main方法退出,非守护线程依然会继续运行,jvm不会退出
缓存workId
代码语言:javascript复制 private static final String PROP_PATH = System.getProperty("java.io.tmpdir") File.separator PropertyFactory.getProperties().getProperty("leaf.name") "/leafconf/{port}/workerID.properties";
public boolean init() {
try {
// 从zk获取workId
} catch (Exception e) {
LOGGER.error("Start node ERROR {}", e);
try {
Properties properties = new Properties();
properties.load(new FileInputStream(new File(PROP_PATH.replace("{port}", port ""))));
workerID = Integer.valueOf(properties.getProperty("workerID"));
LOGGER.warn("START FAILED ,use local node file properties workerID-{}", workerID);
} catch (Exception e1) {
LOGGER.error("Read file error ", e1);
return false;
}
}
return true;
}
- 在节点文件系统上缓存一个workid值,zk失效,机器重启时保证能够正常启动
- 容器化时得挂载出来