Flink自定义OSS的Sink

2021-06-16 10:04:37 浏览数 (2)

1.背景

基于上篇说明的OSS异常内容和功能弱的缘故,考虑自定义Sink处理的方式。主要关注点是文件命名的动态化和高效批写入。

2.代码内容

代码语言:javascript复制
import org.apache.flink.configuration.Configuration;
import org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSS;
import org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClientBuilder;
import org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.model.AppendObjectRequest;
import org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.model.AppendObjectResult;
import org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.model.ObjectMetadata;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.io.ByteArrayInputStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;

public class OssSink extends RichSinkFunction<String> {

    private static final  String OSS_ENDPOINT = "https://oss-xxxx.aliyuncs.com";
    private static final  String OSS_ACCESS_KEYID = "xxxxx";
    private static final  String OSS_ACCESSKEYSECRET = "xxxx";
    private static final  String OSS_BUCKETNAME = "xxxxx";

    private List<String> result;
    private Map<String,String> cap;
    private Long startTs;


    private String path;
    private DateFormat format;
    private ObjectMetadata meta;
    private OSS ossClient;
    private AppendObjectRequest appendObjectRequest;
    private AppendObjectResult appendObjectResult;

    @Override
    public void open(Configuration parameters) throws Exception {
        result = new ArrayList<>();
        cap = new HashMap<>();
        startTs = System.currentTimeMillis();

        this.ossClient = new OSSClientBuilder().build(OSS_ENDPOINT, OSS_ACCESS_KEYID, OSS_ACCESSKEYSECRET);
        this.meta = new ObjectMetadata();
        // 指定上传的内容类型。
        meta.setContentType("text/plain");
        format=new SimpleDateFormat("yyyyMMdd");
    }

    @Override
    public void invoke(String value, Context context) throws Exception {
        result.add(value   "n");
        //TODO: 3是Map存储还是其他的进行批量写入
        if (100 <= result.size()) {
            batchWrite();
            result.clear();
        }
    }

    @Override
    public void close() throws Exception {
        this.ossClient.shutdown();
        super.close();
    }


    private void batchWrite(){
        StringBuffer sb = new StringBuffer();
        for (String str: result) {
            sb.append(str);
        }
        Date date=new Date();
        String day = format.format(date);
        path = "user_event/day="   day   "/id=${id}/sdk=sa_sdk/user_event.json";
        appendObjectRequest = new AppendObjectRequest(OSS_BUCKETNAME, path, new ByteArrayInputStream(sb.toString().getBytes()), meta);
        if (null == appendObjectResult) {
            appendObjectRequest.setPosition(0l);
        } else {
            Long position = appendObjectResult.getNextPosition();
            appendObjectRequest.setPosition(position);
        }
        appendObjectResult = ossClient.appendObject(appendObjectRequest);
    }
}

3. 进阶代码版本

代码语言:javascript复制
@Override
    public void invoke(Tuple2<String,String>  value, Context context) throws Exception {
        result.add(value);
        final long currentTimeMillis = System.currentTimeMillis();
        if (100 <= result.size() || (currentTimeMillis - startTs) >= 60000) {
            startTs = currentTimeMillis;
            batchWrite();
            result.clear();
        }
    }

    @Override
    public void close() throws Exception {
        if (this.ossClient != null) {
            this.ossClient.shutdown();
        }
        super.close();
    }

    private void batchWrite(){
        if (this.ossClient != null) {
            this.ossClient.shutdown();
        }

        this.ossClient =  new OSSClientBuilder().build(OSS_ENDPOINT, OSS_ACCESS_KEYID, OSS_ACCESSKEYSECRET);
        LOG.info( "{} oss batch start ms {}",Thread.currentThread().getName(), startTs);

        Map<String, StringBuffer> batchMap = new HashMap<>();
        for (Tuple2<String,String> value: result) {
            String key = value.f0;
            String message = value.f1;

            StringBuffer sb = batchMap.get(key);
            if (null == sb) {
                sb = new StringBuffer();
            }
            sb.append(message).append("n");
            batchMap.put(key, sb);
        }

        final Set<String> keySet = batchMap.keySet();
        for (String key : keySet) {
            final String[] split = key.split("\^");
            String day = split[0];
            String tid = split[1];
            path = "user_event/day="   day   "/tid="   tid   "/sdk=sa_sdk/user_event_" startTs ".json";

            final String messages = batchMap.get(key).toString();
            appendObjectRequest = new AppendObjectRequest(OSS_BUCKETNAME, path, new ByteArrayInputStream(messages.getBytes()), meta);
            final boolean exist = this.ossClient.doesObjectExist(OSS_BUCKETNAME, path);
            if (exist) {
                final ObjectMetadata metadata = this.ossClient.getObjectMetadata(OSS_BUCKETNAME, path);
                Long position =  metadata.getContentLength();
                appendObjectRequest.setPosition(position);
            } else {
                appendObjectRequest.setPosition(0L);
            }

            final AppendObjectResult appendObjectResult = this.ossClient.appendObject(appendObjectRequest);
            LOG.info( "oss position {}", appendObjectResult.getNextPosition());
        }
    }

这种写法,并发运行一段时间,会报

代码语言:javascript复制
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.ClientException: Connection reset by peer: socket write error
[ErrorCode]: SocketException
[RequestId]: Unknown
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.utils.ExceptionFactory.createNetworkException(ExceptionFactory.java:71)
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.comm.DefaultServiceClient.sendRequestCore(DefaultServiceClient.java:127)
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.comm.ServiceClient.sendRequestImpl(ServiceClient.java:133)
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.comm.ServiceClient.sendRequest(ServiceClient.java:70)
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.send(OSSOperation.java:83)
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:145)
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:102)
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSObjectOperation.writeObjectInternal(OSSObjectOperation.java:897)
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSObjectOperation.appendObject(OSSObjectOperation.java:185)
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClient.appendObject(OSSClient.java:595)
	at com.am.oss.OssSink.batchWrite(OssSink.java:101)

所以更改方式

代码语言:javascript复制
@Override
    public void open(Configuration parameters) throws Exception {
        result = new ArrayList<>();
        startTs = System.currentTimeMillis();

        this.ossClient = new OSSClientBuilder().build(OSS_ENDPOINT, OSS_ACCESS_KEYID, OSS_ACCESSKEYSECRET);
    }

    @Override
    public void invoke(Tuple2<String,String>  value, Context context) throws Exception {
        result.add(value);
        final long currentTimeMillis = System.currentTimeMillis();
        if (1000 <= result.size() || (currentTimeMillis - startTs) >= 60000) {
            startTs = currentTimeMillis;
            batchWrite();
            result.clear();
        }
    }

    @Override
    public void close() throws Exception {
        if (this.ossClient != null) {
            this.ossClient.shutdown();
        }
        super.close();
    }

    private void batchWrite(){
        if (null == this.ossClient) {
            this.ossClient =  new OSSClientBuilder().build(OSS_ENDPOINT, OSS_ACCESS_KEYID, OSS_ACCESSKEYSECRET);
        }
        LOG.info( "{} oss batch start ms {}",Thread.currentThread().getName(), startTs);

        Map<String, StringBuffer> batchMap = new HashMap<>();
        for (Tuple2<String,String> value: result) {
            String key = value.f0;
            String message = value.f1;

            StringBuffer sb = batchMap.get(key);
            if (null == sb) {
                sb = new StringBuffer();
            }
            sb.append(message).append("n");
            batchMap.put(key, sb);
        }

        final Set<String> keySet = batchMap.keySet();
        for (String key : keySet) {
            final String[] split = key.split("\^");
            String day = split[0];
            String tid = split[1];
            //OSS写入文件有5G限制,所以增加时间戳,putObject的方式
            path = "user_event/day="   day   "/tid="   tid   "/sdk=sa_sdk/user_event_" startTs ".json";
            final String messages = batchMap.get(key).toString();
            final PutObjectResult putObjectResult = this.ossClient.putObject(OSS_BUCKETNAME, path, new ByteArrayInputStream(messages.getBytes()));
            final String eTag = putObjectResult.getETag();
            LOG.info( "oss put {} end, tag {}", path, eTag);
        }
    }

对应代码完整调用:

代码语言:javascript复制
final FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                TOPIC_NAME,
                new SimpleStringSchema(),
                props);
//        kafkaConsumer.setStartFromTimestamp(System.currentTimeMillis() - 2 * 24 * 60 * 60 * 1000);
        kafkaConsumer.setStartFromLatest();

        final KeyedStream<Tuple2<String, String>, String> streamSource = env.addSource(kafkaConsumer)
                .map(new MapFunction<String, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> map(String jsonData) throws Exception {
                        //json格式处理,获取time,  tid
                        String tid = null;
                        if (isJson(jsonData)) {
                            JSONObject json = JSONObject.parseObject(jsonData);
                            final JSONObject jsonObject = json.getJSONObject(PROPERTIES);
                            if (jsonObject.containsKey(TID)) {
                                tid = jsonObject.getString(TID);
                            }

                            final Long time = json.getLong(RECV_TIME);
                            Date date = new Date(time);
                            String day = format.format(date);

                            String key = day   FLAG   tid;
                            return new Tuple2<>(key, jsonData);
                        } else {
                            return null;
                        }
                    }
                })
                .keyBy(new KeySelector<Tuple2<String, String>, String>() {
                    @Override
                    public String getKey(Tuple2<String, String> tuple2) throws Exception {
                        return tuple2.f0;
                    }
                });

        System.out.println("start ----- >");
        LOG.info("start ----- >");

        streamSource.addSink( new OssSink());

0 人点赞