1.背景
基于上篇说明的OSS异常内容和功能弱的缘故,考虑自定义Sink处理的方式。主要关注点是文件命名的动态化和高效批写入。
2.代码内容
代码语言:javascript复制import org.apache.flink.configuration.Configuration;
import org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSS;
import org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClientBuilder;
import org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.model.AppendObjectRequest;
import org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.model.AppendObjectResult;
import org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.model.ObjectMetadata;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.io.ByteArrayInputStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
public class OssSink extends RichSinkFunction<String> {
private static final String OSS_ENDPOINT = "https://oss-xxxx.aliyuncs.com";
private static final String OSS_ACCESS_KEYID = "xxxxx";
private static final String OSS_ACCESSKEYSECRET = "xxxx";
private static final String OSS_BUCKETNAME = "xxxxx";
private List<String> result;
private Map<String,String> cap;
private Long startTs;
private String path;
private DateFormat format;
private ObjectMetadata meta;
private OSS ossClient;
private AppendObjectRequest appendObjectRequest;
private AppendObjectResult appendObjectResult;
@Override
public void open(Configuration parameters) throws Exception {
result = new ArrayList<>();
cap = new HashMap<>();
startTs = System.currentTimeMillis();
this.ossClient = new OSSClientBuilder().build(OSS_ENDPOINT, OSS_ACCESS_KEYID, OSS_ACCESSKEYSECRET);
this.meta = new ObjectMetadata();
// 指定上传的内容类型。
meta.setContentType("text/plain");
format=new SimpleDateFormat("yyyyMMdd");
}
@Override
public void invoke(String value, Context context) throws Exception {
result.add(value "n");
//TODO: 3是Map存储还是其他的进行批量写入
if (100 <= result.size()) {
batchWrite();
result.clear();
}
}
@Override
public void close() throws Exception {
this.ossClient.shutdown();
super.close();
}
private void batchWrite(){
StringBuffer sb = new StringBuffer();
for (String str: result) {
sb.append(str);
}
Date date=new Date();
String day = format.format(date);
path = "user_event/day=" day "/id=${id}/sdk=sa_sdk/user_event.json";
appendObjectRequest = new AppendObjectRequest(OSS_BUCKETNAME, path, new ByteArrayInputStream(sb.toString().getBytes()), meta);
if (null == appendObjectResult) {
appendObjectRequest.setPosition(0l);
} else {
Long position = appendObjectResult.getNextPosition();
appendObjectRequest.setPosition(position);
}
appendObjectResult = ossClient.appendObject(appendObjectRequest);
}
}
3. 进阶代码版本
代码语言:javascript复制@Override
public void invoke(Tuple2<String,String> value, Context context) throws Exception {
result.add(value);
final long currentTimeMillis = System.currentTimeMillis();
if (100 <= result.size() || (currentTimeMillis - startTs) >= 60000) {
startTs = currentTimeMillis;
batchWrite();
result.clear();
}
}
@Override
public void close() throws Exception {
if (this.ossClient != null) {
this.ossClient.shutdown();
}
super.close();
}
private void batchWrite(){
if (this.ossClient != null) {
this.ossClient.shutdown();
}
this.ossClient = new OSSClientBuilder().build(OSS_ENDPOINT, OSS_ACCESS_KEYID, OSS_ACCESSKEYSECRET);
LOG.info( "{} oss batch start ms {}",Thread.currentThread().getName(), startTs);
Map<String, StringBuffer> batchMap = new HashMap<>();
for (Tuple2<String,String> value: result) {
String key = value.f0;
String message = value.f1;
StringBuffer sb = batchMap.get(key);
if (null == sb) {
sb = new StringBuffer();
}
sb.append(message).append("n");
batchMap.put(key, sb);
}
final Set<String> keySet = batchMap.keySet();
for (String key : keySet) {
final String[] split = key.split("\^");
String day = split[0];
String tid = split[1];
path = "user_event/day=" day "/tid=" tid "/sdk=sa_sdk/user_event_" startTs ".json";
final String messages = batchMap.get(key).toString();
appendObjectRequest = new AppendObjectRequest(OSS_BUCKETNAME, path, new ByteArrayInputStream(messages.getBytes()), meta);
final boolean exist = this.ossClient.doesObjectExist(OSS_BUCKETNAME, path);
if (exist) {
final ObjectMetadata metadata = this.ossClient.getObjectMetadata(OSS_BUCKETNAME, path);
Long position = metadata.getContentLength();
appendObjectRequest.setPosition(position);
} else {
appendObjectRequest.setPosition(0L);
}
final AppendObjectResult appendObjectResult = this.ossClient.appendObject(appendObjectRequest);
LOG.info( "oss position {}", appendObjectResult.getNextPosition());
}
}
这种写法,并发运行一段时间,会报
代码语言:javascript复制org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.ClientException: Connection reset by peer: socket write error
[ErrorCode]: SocketException
[RequestId]: Unknown
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.utils.ExceptionFactory.createNetworkException(ExceptionFactory.java:71)
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.comm.DefaultServiceClient.sendRequestCore(DefaultServiceClient.java:127)
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.comm.ServiceClient.sendRequestImpl(ServiceClient.java:133)
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.comm.ServiceClient.sendRequest(ServiceClient.java:70)
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.send(OSSOperation.java:83)
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:145)
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:102)
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSObjectOperation.writeObjectInternal(OSSObjectOperation.java:897)
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSObjectOperation.appendObject(OSSObjectOperation.java:185)
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClient.appendObject(OSSClient.java:595)
at com.am.oss.OssSink.batchWrite(OssSink.java:101)
所以更改方式
代码语言:javascript复制@Override
public void open(Configuration parameters) throws Exception {
result = new ArrayList<>();
startTs = System.currentTimeMillis();
this.ossClient = new OSSClientBuilder().build(OSS_ENDPOINT, OSS_ACCESS_KEYID, OSS_ACCESSKEYSECRET);
}
@Override
public void invoke(Tuple2<String,String> value, Context context) throws Exception {
result.add(value);
final long currentTimeMillis = System.currentTimeMillis();
if (1000 <= result.size() || (currentTimeMillis - startTs) >= 60000) {
startTs = currentTimeMillis;
batchWrite();
result.clear();
}
}
@Override
public void close() throws Exception {
if (this.ossClient != null) {
this.ossClient.shutdown();
}
super.close();
}
private void batchWrite(){
if (null == this.ossClient) {
this.ossClient = new OSSClientBuilder().build(OSS_ENDPOINT, OSS_ACCESS_KEYID, OSS_ACCESSKEYSECRET);
}
LOG.info( "{} oss batch start ms {}",Thread.currentThread().getName(), startTs);
Map<String, StringBuffer> batchMap = new HashMap<>();
for (Tuple2<String,String> value: result) {
String key = value.f0;
String message = value.f1;
StringBuffer sb = batchMap.get(key);
if (null == sb) {
sb = new StringBuffer();
}
sb.append(message).append("n");
batchMap.put(key, sb);
}
final Set<String> keySet = batchMap.keySet();
for (String key : keySet) {
final String[] split = key.split("\^");
String day = split[0];
String tid = split[1];
//OSS写入文件有5G限制,所以增加时间戳,putObject的方式
path = "user_event/day=" day "/tid=" tid "/sdk=sa_sdk/user_event_" startTs ".json";
final String messages = batchMap.get(key).toString();
final PutObjectResult putObjectResult = this.ossClient.putObject(OSS_BUCKETNAME, path, new ByteArrayInputStream(messages.getBytes()));
final String eTag = putObjectResult.getETag();
LOG.info( "oss put {} end, tag {}", path, eTag);
}
}
对应代码完整调用:
代码语言:javascript复制final FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
TOPIC_NAME,
new SimpleStringSchema(),
props);
// kafkaConsumer.setStartFromTimestamp(System.currentTimeMillis() - 2 * 24 * 60 * 60 * 1000);
kafkaConsumer.setStartFromLatest();
final KeyedStream<Tuple2<String, String>, String> streamSource = env.addSource(kafkaConsumer)
.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String jsonData) throws Exception {
//json格式处理,获取time, tid
String tid = null;
if (isJson(jsonData)) {
JSONObject json = JSONObject.parseObject(jsonData);
final JSONObject jsonObject = json.getJSONObject(PROPERTIES);
if (jsonObject.containsKey(TID)) {
tid = jsonObject.getString(TID);
}
final Long time = json.getLong(RECV_TIME);
Date date = new Date(time);
String day = format.format(date);
String key = day FLAG tid;
return new Tuple2<>(key, jsonData);
} else {
return null;
}
}
})
.keyBy(new KeySelector<Tuple2<String, String>, String>() {
@Override
public String getKey(Tuple2<String, String> tuple2) throws Exception {
return tuple2.f0;
}
});
System.out.println("start ----- >");
LOG.info("start ----- >");
streamSource.addSink( new OssSink());