避开JVM,带你从代码层面优化Java代码

2024-08-24 17:01:49 浏览数 (3)

前言

在Java中,提到“优化”这两个字,很多人首先都会想到JVM优化。的确,JVM提供了很多参数,让优化工作看起来更为直观。例如我们通过Xms、Xmx就可以调整程序的启动内存,通过 -XX: UseG1GC我们就可以使用G1垃圾收集器。

我在大数据开发中,遇到过大数据量的数据转换、接入,为了避免程序的OOM,除了在前期增加处理主机之外,后来更多的是在代码层面进行优化。所以今天就看看我在代码开发时,可以从代码层做哪些优化。

集合

在Java中,list、set、map是我们使用比较多的,就拿list来说,常用的实现类有ArrayList、LinkedList,对于这两种list的选择,我们还是需要根据实际业务来。

当时在大学学习数据结构的时候,书中写到数组是访问快,但是删除和增加困难。链表是访问慢,但是删除和增加快。那时候对这句话真的是一点都不理解,后来在代码开发中就慢慢明白了这个道理。

ArrayList

ArrayList使用数组elementData来存储数据。...

对于Array来说,使用下标index直接访问元素,不需要遍历整个数组,时间复杂度为0(1)。而如果删除或增加一个元素,后面元素的下标都会修改,例如删除index为7的元素,那么删除之后index为8的元素就要前移1,变成index为7的元素,后面是元素整体前移。

所以在ArrayList的remove中,使用arrayCopy直接将elementData后半部分数据,前移一位。

所以说每次删除或者增加数据,都要调用一次arrayCopy数组复制。

LinkedList

对于LinkedList,存储结构是Node,而且会有一个first头结点和一个last尾节点。

对于每个Node对象,都会有prev前置节点指向前一个Node,和next后继节点指向下一个Node。

链表的存储结构大概是这样的:头节点没有prev前置节点,为节点没有next后继节点,除此之外每个Node都有pre和next。

在LinkedList中,在指定的index插入时,如果index与size相等,则表示是在尾部插入,即插入尾结点。如果不是,则调用linkBefore插入节点。

在linkeBefore中,我们可以看到:当在链表中下标index处,增加一个节点时,会将index的Node的prev修改为新节点,index原本的前置节点的next指向新节点。对于新节点,next指向index节点,prev指向index节点原来的前置节点。

流程如图所示:

这样LinkedList在增加或删除元素时,就不需要做数据复制和index位移,只需要修改几个节点的prev和next节点即可。所以在选择List时,如果查询操作多,就选择ArrayList,删除或者增加元素多,就选择LinkedList。

Map

对于Map的时候,主要是注意对Map容量大小的初始化,默认容量为16,且loadFactor为0.75。

我new一个HashMap,然后put插入数据,通过断点可以发现,当 size > threshold时,则调用resize扩容。

那么threshold是如何而来的?通过对map的容量capacity loadFactor*而来。

也就是说,容量为16的map只能存储12个元素,当存入第13个的时候,就会发生扩容,这时候threshold就变成了24,而容量就是24 / 0.75 = 32,直接扩容2倍。

在resize()中,newCap表示新容量,扩容运算规则是在oldCap基础上,<<右移一位,即*2.

而扩容之后,会为map新建一个容量为32的Node数据结构,然后将原来16的Node中的元素,复制到新Node中。

所以,在使用map时,要预估要存放元素的个数,然后指定初始化大小,避免扩容带来的性能问题。

ConcurrentLinkedQueue

遇到过这么一个场景:消费kafka中的数据,然后去做处理。但是kakfa中的一个分区只能被一个thread消费,所以thread的数量最大为分区数。但是这些线程满足不了我的处理性能需求。

所以将Kafka的消费与数据处理逻辑代码解耦,先利用少量线程消费kakfa,将数据放入queue中,然后数据处理模块读取queue消费。为了保证高并发下的线程数据安全,使用了ConcurrentLinkedQueue

ConcurrentLinkedQueue优点就是无锁,在poll()中看不到一个关于锁(synchronized、lock等)。

那么,ConcurrentLinkedQueue是如何保证线程安全的呢?这就要提到上图中的CAS。

CAS

CAS,comprare and swap,第一次接触还是在java的Atomic类中。CAS不是锁,只是CPU提供的一个原子性操作指令,直接使用UnSafe类将原子操作实现在硬件级别实现,解决ABA问题。

poll()中调用了casItem方法,而casItem调用的是UNSAFE.compareAndSwapObject

假如queue中的head节点元素(item)为1,将item设置为null,就表示这个节点被移除了,head就指向了下一个元素。而使用compareAndSwapObject,就是为了将item中的元素替换(swap)为null,但是在swap之前需要compare一下,这个item还是之前的item吗。只要当item还是之前的item,才能被swap。

可以看到在poll()中的最开始部分,有一个for(;;),这就是死循环的一个写法,类似于while true,但是在这里被称作自旋,如果多个线程都在调用poll(),那么每个线程都会陷入自旋,等到有一个线程获取到head节点的数据,并通过CompareAndSwap这个过程,将其修改为null为止。

  1. 获得queue的head头结点
  2. 调用casItem尝试p节点(head节点)的item修改为null
  3. 当item的值为item时(item就像是版本号),所以compareAndSwapObject将item修改为null。
  4. 上面的步骤如果失败了就会一直重复,俗称自旋

所以,ConcurrentLinkedQueue使用CAS代替了锁保证线程安全,但是有一个问题就是如果queue中没有数据,调用poll()返回的是null,所以在数据处理时要增加非空判断。

如果想要进一步提升性能,推荐使用disruptor来替换ConcurrentLinkedQueue。

disruptor

disruptor也是无锁的CAS设计,有着高并发高吞吐的性能,同时也有着类似于kafka中生产者和消费者的定义。

其底层是基于数组实现的缓冲区RingBuffer,生产者,消费者,都有各自独立的Sequence,在RingBuffer缓冲区中,Sequence标示着写入进度,例如每次生产者要写入数据进缓冲区时,都要调用RingBuffer.next()来获得下一个可使用的相对位置。

1. Event:生产者和消费者之间传递的对象

如果想要使用disruptor,首先要构建Event载体,也就是数据对象。

代码语言:java

复制

代码语言:javascript复制
public class ByteArrayEvent {
    private byte[] bytes;

    public void setBytes(byte[] bytes) {
        this.bytes = bytes;
    }
}
2. EventFactory :创建event的工厂类

代码语言:java

复制

代码语言:javascript复制
public class ByteArrayEventFactory implements EventFactory<ByteArrayEvent> {
    @Override
    public ByteArrayEvent newInstance() {
        return new ByteArrayEvent();
    }
}
3. EventHandler:消费者的消费逻辑

代码语言:java

复制

代码语言:javascript复制
public class ByteArrayEventHandler implements EventHandler<ByteArrayEvent> {
    @Override
    public void onEvent(ByteArrayEvent byteArrayEvent, long sequence, boolean endOfBatch) throws Exception {
        // 处理事件的逻辑
    }
}
4. 构建Disruptor

默认情况下,Disruptor会将生产者指定为多线程模式,ProducerType.SINGLE来设置生产者为单线程模式。

代码语言:java

复制

代码语言:javascript复制
// 必须是2的幂
int bufferSize = 1024;
/**
DaemonThreadFactory:线程池,create threads for processors.
ProducerType#SINGLE:一个ringbuffer支持多个publisher; ProducerType#MULTI:支持多个publisher
BlockingWaitStrategy:消费者等待策略。SleepingWaitStrategy:对生产者影响小。BlockingWaitStrategy使用了lock,效率不太行。
YieldingWaitStrategy性能最好无锁策略,使用了 Thread.yield() 多线程交替执行
**/
disruptor = new Disruptor<>(new ByteArrayEventFactory(), bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());

// handleEventsWith:消费数据,每一次绑定一个消费者,可以使用then进行链式处理
// 一个handler就是一个消费者
disruptor.handleEventsWith(new ByteArrayEventHandler());
// 启动
disruptor.start();  

volatile

可以参考为了研究Java内存模型(JMM),我又学了一点汇编指中提到的volatile和synchronized部分

clone()

我最近遇到一个需求,将TLV格式的二进制数据解析为二进制明文。TLV是什么意思呢,就是每条数据的每个字段由TLV格式表示的,T代表tag,是一个字段的唯一id,L是length,表示后面V即value的长度。每条数据都是固定的字段个数,但是数据中有的字段为空,则这个字段就不用TLV表示,则直接跳到下一个字段。

这里字段存储使用的数据结构是数组,原因就是按照index查找速度快。这里先创建一个DataObject类。

代码语言:java

复制

代码语言:javascript复制
private Object[] data;
private static final String KAFKA_SEPARATOR = "|";

public DataObject(Object[] data) {
  this.data = data;
}
@Override
public void setData(int index, Object f) {
  data[index] = f;
}

@Override
public void data2String(StringBuilder sb) {
  sb.append(data[0]).append(KAFKA_SEPARATOR);
  sb.append(data[1]).append(KAFKA_SEPARATOR);
  // 省略其他字段...
}

解析每个字段时,都先解析出来tag,然后以tag为index,调用setData将value放到Array中。

代码语言:java

复制

代码语言:javascript复制
int tag = buffer[off] & 0xFF;
int formatAndTagHigh = buffer[off   1] & 0xFF;
int format = ((buffer[off   1] & 0xFF) >> 4) & 0X0F;
off  = 2;
int length = 0;
switch (format) {
    case 1:
        length = 1;
        dataObject.setData(tag, buffer[off] & 0xff);
        break;
    case 4:
        xdr.dataObject(tag, ConvToByte.byteToUnsignedInt(buffer, off));
        length = 4;
        break;

但在java中array的默认值为null,但是我在字段中默认值想要设置为空字符,所以当时思考了几个方案。

方案一就是,在将array中的数据转换成String的时候,使用replace替换字符串。但是这个方案直接被我否决了,对于数据量很大且实时性要求很高的解析程序来说,replace是个很耗费性能的事情。

方案二就是,array作为数据对象的私有属性,在构造函数constructer中初始化array,并循环array将每个值设置为""。但是这样带来的问题也显而易见,就是每创造一个数据对象时,都会循环array,而且每条数据有200个字段,就要循环200次,也会带来性能问题。

所以后来我就采用了方案三,就是在外部构造一个array,循环遍历将每个值设置为"",在每次创造数据对象之前,我调用array.clone()复制一个array,在new数据对象时,作为构造参数赋值给数据对象的私有变量。这样就完美解决了我的需求。

代码语言:java

复制

代码语言:javascript复制
private Object[] data;

public DataDecode() {
    int length = 200;
    data = new Object[length];
    data20 = new Object[length20];
    for (int i = 0; i < length; i  ) {
        data[i] = "";
    }
}

DataObject dataObject = new DataObject(data.clone());
// 解码处理逻辑,省略....

dataObject.data2String(sb);

clone()有哪些优势?

  1. 比new对象快,不需要调用构造方法
  2. 在我的需求场景中,array只需要初始化一次
  3. clone()出来的对象和原对象是各自独立的两个对象

综合以上,在合适的场景选择clone()是一个不错的选择。

0 人点赞