1实体对象
代码语言:javascript复制package com.shi.mysqlEsTest;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
/**
* 电站信息ES对象
*
* @author shiye
* @create 2023-02-21 11:26
*/
public class PowerStationInfo implements Serializable {
/**
* 电站id
*/
private Long ps_id;
/**
* 电站名称
*/
private String ps_name;
/**
* 电站图片链接
*/
private String images;
/**
* 电站类型:
*/
private Integer ps_type;
/**
* 电站在线离线状态:
*/
private Integer ps_status;
/**
* 装机功率
*/
private Double design_capacity;
/**
* 实时功率
*/
private Double current_power;
/**
* 累计发电量
*/
private Double today_energy;
/**
* 等效小时
*/
private Double equivalent_hour;
/**
* 电站地址
*/
private String ps_location;
/**
* 电站PR
*/
private String pr_scale;
/**
* 瞬时辐照
*/
private Double radiation;
/**
* 日辐射量
*/
private Double daily_irradiation;
/**
* 电站有效标识
*/
private Integer valid_flag;
/**
* 离线时间
*/
private Date offline_time;
/**
* 行政区划code (最底级区划一般为区级区划)
*/
private Integer division_code;
/**
* 经度-WGS84格式
* 117.18748542291920
* 纬度-WGS84格式
* 31.81492717972597
*/
private String[] lon_lat;
/**
* 当前电站的 业主信息
*/
private List<OrgUserInfo> orgUserInfo_array;
/**
* 设备sn集合
*/
private String[] sn_array;
/**
* 关注用户集合
*/
private long[] userid_array;
static class OrgUserInfo{
/**
* 电站的分享类型:
*/
private Integer share_type;
/**
* 组织id
*/
private Long org_id;
/**
* 用户id
*/
private Long user_id;
/**
* 业主联系方式 - 手机号码
*/
private String moble_tel;
/**
* 业主联系方式 - 邮箱
*/
private String email;
public Integer getShare_type() {
return share_type;
}
public void setShare_type(Integer share_type) {
this.share_type = share_type;
}
public Long getOrg_id() {
return org_id;
}
public void setOrg_id(Long org_id) {
this.org_id = org_id;
}
public Long getUser_id() {
return user_id;
}
public void setUser_id(Long user_id) {
this.user_id = user_id;
}
public String getMoble_tel() {
return moble_tel;
}
public void setMoble_tel(String moble_tel) {
this.moble_tel = moble_tel;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
@Override
public String toString() {
return "OrgUserInfo{"
"share_type=" share_type
", org_id=" org_id
", user_id=" user_id
", moble_tel='" moble_tel '''
", email='" email '''
'}';
}
}
public List<OrgUserInfo> getOrgUserInfo_array() {
return orgUserInfo_array;
}
public void setOrgUserInfo_array(List<OrgUserInfo> orgUserInfo_array) {
this.orgUserInfo_array = orgUserInfo_array;
}
public Long getPs_id() {
return ps_id;
}
public void setPs_id(Long ps_id) {
this.ps_id = ps_id;
}
public String getPs_name() {
return ps_name;
}
public void setPs_name(String ps_name) {
this.ps_name = ps_name;
}
public String getImages() {
return images;
}
public void setImages(String images) {
this.images = images;
}
public Integer getPs_type() {
return ps_type;
}
public void setPs_type(Integer ps_type) {
this.ps_type = ps_type;
}
public Integer getPs_status() {
return ps_status;
}
public void setPs_status(Integer ps_status) {
this.ps_status = ps_status;
}
public Double getDesign_capacity() {
return design_capacity;
}
public void setDesign_capacity(Double design_capacity) {
this.design_capacity = design_capacity;
}
public Double getCurrent_power() {
return current_power;
}
public void setCurrent_power(Double current_power) {
this.current_power = current_power;
}
public Double getToday_energy() {
return today_energy;
}
public void setToday_energy(Double today_energy) {
this.today_energy = today_energy;
}
public Double getEquivalent_hour() {
return equivalent_hour;
}
public void setEquivalent_hour(Double equivalent_hour) {
this.equivalent_hour = equivalent_hour;
}
public String getPs_location() {
return ps_location;
}
public void setPs_location(String ps_location) {
this.ps_location = ps_location;
}
public String getPr_scale() {
return pr_scale;
}
public void setPr_scale(String pr_scale) {
this.pr_scale = pr_scale;
}
public Double getRadiation() {
return radiation;
}
public void setRadiation(Double radiation) {
this.radiation = radiation;
}
public Double getDaily_irradiation() {
return daily_irradiation;
}
public void setDaily_irradiation(Double daily_irradiation) {
this.daily_irradiation = daily_irradiation;
}
public Integer getValid_flag() {
return valid_flag;
}
public void setValid_flag(Integer valid_flag) {
this.valid_flag = valid_flag;
}
public Date getOffline_time() {
return offline_time;
}
public void setOffline_time(Date offline_time) {
this.offline_time = offline_time;
}
public Integer getDivision_code() {
return division_code;
}
public void setDivision_code(Integer division_code) {
this.division_code = division_code;
}
public String[] getLon_lat() {
return lon_lat;
}
public void setLon_lat(String[] lon_lat) {
this.lon_lat = lon_lat;
}
public String[] getSn_array() {
return sn_array;
}
public void setSn_array(String[] sn_array) {
this.sn_array = sn_array;
}
public long[] getUserid_array() {
return userid_array;
}
public void setUserid_array(long[] userid_array) {
this.userid_array = userid_array;
}
@Override
public String toString() {
return "PowerStationInfo{"
"ps_id=" ps_id
", ps_name='" ps_name '''
", images='" images '''
", ps_type=" ps_type
", ps_status=" ps_status
", design_capacity=" design_capacity
", current_power=" current_power
", today_energy=" today_energy
", equivalent_hour=" equivalent_hour
", ps_location='" ps_location '''
", pr_scale='" pr_scale '''
", radiation=" radiation
", daily_irradiation=" daily_irradiation
", valid_flag=" valid_flag
", offline_time=" offline_time
", division_code=" division_code
", lon_lat=" Arrays.toString(lon_lat)
", orgUserInfo_array=" orgUserInfo_array
", sn_array=" Arrays.toString(sn_array)
", userid_array=" Arrays.toString(userid_array)
'}';
}
}
2.源获取数据
代码语言:javascript复制package com.shi.mysqlEsTest;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
/**
* @author shiye
* @create 2023-02-21 11:54
*/
public class PowerStationSourceFromMysql extends RichSourceFunction<Long> {
private PreparedStatement ps = null;
private Connection connection = null;
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://10.0.81.151:3306/sungrow?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&serverTimezone=Asia/Shanghai&useSSL=false";
String username = "root";
String password = "1234";
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//加载驱动
Class.forName(driver);
connection = DriverManager.getConnection(url, username, password);
}
@Override
public void run(SourceContext<Long> sourceContext) throws Exception {
String sql = "SELECT t1.ps_id FROM power_station t1 ORDER BY t1.ps_id";
ps = connection.prepareStatement(sql);
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
Long ps_id = resultSet.getLong("ps_id");
sourceContext.collect(ps_id);
}
}
@Override
public void cancel() {
try {
super.close();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.map处理数据
代码语言:javascript复制package com.shi.mysqlEsTest;
import com.mysql.jdbc.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
/**
* @author shiye
* @create 2023-02-22 14:56
*/
public class SqlMapFunction extends RichMapFunction<Long, PowerStationInfo> {
private PreparedStatement ps = null;
private Connection connection = null;
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://10.0.81.151:3306/sungrow?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&serverTimezone=Asia/Shanghai&useSSL=false";
String username = "root";
String password = "1234";
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//加载驱动
Class.forName(driver);
connection = DriverManager.getConnection(url, username, password);
}
@Override
public void close() throws Exception {
try {
super.close();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public PowerStationInfo map(Long ps_id) throws Exception {
PowerStationInfo psInfo = new PowerStationInfo();
String sql2 = "SELECTn"
" t1.ps_id,n"
" IFNULL(t1.ps_name,'') AS ps_name,n"
" t1.ps_type,n"
" t1.ps_location,n"
" t1.valid_flag,n"
" t1.division_code,n"
" t1.longitude,n"
" t1.latitude,n"
" IFNULL(t2.`PICTURE_URL`,'') AS images,n"
" t3.`dev_status` AS ps_status,n"
" t3.`update_time` AS offline_time,n"
" t4.`DESIGN_CAPACITY` AS design_capacity,n"
" t4.`MAX_PR` AS max_pr,n"
" t4.`min_pr` AS min_pr,n"
" t5.`p83023` AS p83023,n"
" t5.`p83033` AS current_power,n"
" t5.`p83022` AS today_energy,n"
" t5.`p83024` AS total_energy,n"
" t5.`p83025` AS equivalent_hour,n"
" t5.`p83012` AS radiation,n"
" t5.`p83013` AS daily_irradiation,n"
" GROUP_CONCAT(DISTINCT t6.`DEVICE_PRO_SN`) AS sn_array,n"
" GROUP_CONCAT(DISTINCT t8.`user_id`) AS userid_arrayn"
"FROMn"
" power_station t1n"
"LEFT JOIN power_station_picture t2 ON t1.ps_id = t2.`PS_ID`n"
"LEFT JOIN power_device_attr t6 ON t1.`PS_ID` = t6.`PS_ID`n"
"LEFT JOIN power_device_status t3 ON t6.uuid = t3.`uuid`n"
"LEFT JOIN power_station_config_ratio t4 ON t1.ps_id = t4.`PS_ID`n"
"LEFT JOIN power_statistical_data t5 ON t1.ps_id = t5.`ps_id`n"
"LEFT JOIN user_station_follow_config t8 ON t1.`PS_ID` = t8.`ps_id`n"
"WHERE t1.`PS_ID` = " ps_id
" GROUP BY t1.`PS_ID`";
ResultSet resultSet2 = connection.prepareStatement(sql2).executeQuery();
if (resultSet2.next()) {
psInfo.setPs_id(ps_id);
psInfo.setPs_name(resultSet2.getString("ps_name"));
psInfo.setPs_type(resultSet2.getInt("ps_type"));
psInfo.setPs_location(resultSet2.getString("ps_location"));
psInfo.setValid_flag(resultSet2.getInt("valid_flag"));
psInfo.setValid_flag(resultSet2.getInt("division_code"));
String longitude = resultSet2.getString("longitude");
String latitude = resultSet2.getString("latitude");
String[] lon_lat = new String[]{longitude, latitude};
psInfo.setLon_lat(lon_lat);
psInfo.setImages(resultSet2.getString("images"));
psInfo.setPs_status(resultSet2.getInt("ps_status"));
Date offlineTime = resultSet2.getDate("offline_time");
psInfo.setOffline_time(offlineTime);
psInfo.setDesign_capacity(resultSet2.getDouble("design_capacity"));
double max_pr = resultSet2.getDouble("max_pr");
double min_pr = resultSet2.getDouble("min_pr");
double p83023 = resultSet2.getDouble("p83023");
if (p83023 == 0) {
psInfo.setPr_scale("--");
} else if (p83023 > max_pr) {
psInfo.setPr_scale(String.format("%.2f", (max_pr * 100)));
} else if (p83023 < min_pr) {
psInfo.setPr_scale(String.format("%.2f", (min_pr * 100)));
} else {
psInfo.setPr_scale(String.format("%.2f", (p83023 * 100)));
}
psInfo.setCurrent_power(resultSet2.getDouble("current_power"));
psInfo.setToday_energy(resultSet2.getDouble("today_energy"));
psInfo.setToday_energy(resultSet2.getDouble("total_energy"));
psInfo.setEquivalent_hour(resultSet2.getDouble("equivalent_hour"));
psInfo.setRadiation(resultSet2.getDouble("radiation"));
String snStr = resultSet2.getString("sn_array");
if (!StringUtils.isNullOrEmpty(snStr)) {
psInfo.setSn_array(snStr.split(","));
}
String useridStr = resultSet2.getString("userid_array");
if (!StringUtils.isNullOrEmpty(useridStr)) {
String[] split = resultSet2.getString("userid_array").split(",");
psInfo.setUserid_array(Arrays.asList(split).stream().mapToLong(Long::parseLong).toArray());
}
String sql3 = "SELECTn"
"tu.user_id,n"
"tu.moble_tel,n"
"tu.email,n"
"tp.share_type,n"
"tu.org_idn"
"FROMn"
"power_station_org pn"
"LEFT JOIN sys_organization s ON p.org_id = s.org_idn"
"LEFT JOIN sys_user_org o ON o.org_id = s.org_id AND o.share_type = 0n"
"LEFT JOIN sys_user u ON o.user_id = u.user_idn"
"WHEREn"
" p.ps_id = " ps_id
" AND (p.root_org_id = 499 OR is_installer_ps_org = -1)n"
" AND s.share_type = 0n"
" AND p.share_type = 0n"
"UNIONn"
"SELECTn"
" s.user_id,n"
" s.moble_tel,n"
" s.email,n"
" '0' AS share_type,n"
" so.org_idn"
"FROMn"
"power_station_owner_user_rel pson"
"JOIN sys_user s ON s.user_id = pso.user_idn"
"LEFT JOIN sys_user_org suo ON suo.user_id = pso.user_idn"
"LEFT JOIN sys_organization so ON so.org_id = suo.org_idn"
"WHERE pso.ps_id = " ps_id
" AND suo.is_master_org != -1 "
" AND suo.share_type = 0";
ResultSet resultSet3 = connection.prepareStatement(sql3).executeQuery();
List<PowerStationInfo.OrgUserInfo> orgUserInfo_array = new ArrayList<>();
while (resultSet3.next()) {
PowerStationInfo.OrgUserInfo orgUserInfo = new PowerStationInfo.OrgUserInfo();
orgUserInfo.setOrg_id(resultSet3.getLong("org_id"));
orgUserInfo.setUser_id(resultSet3.getLong("user_id"));
orgUserInfo.setMoble_tel(resultSet3.getString("moble_tel"));
orgUserInfo.setEmail(resultSet3.getString("email"));
orgUserInfo.setShare_type(resultSet3.getInt("share_type"));
orgUserInfo_array.add(orgUserInfo);
}
psInfo.setOrgUserInfo_array(orgUserInfo_array);
}
return psInfo;
}
}
4.sink
代码语言:javascript复制package com.shi.mysqlEsTest;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.security.user.User;
import org.elasticsearch.common.xcontent.XContentType;
import java.util.ArrayList;
import java.util.Map;
/**
* @author shiye
* @create 2023-02-21 13:55
*/
public class MysqlSourceToEsTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ArrayList<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("10.0.81.167", 9201, "http"));
httpHosts.add(new HttpHost("10.0.81.168", 9201, "http"));
httpHosts.add(new HttpHost("10.0.81.169", 9201, "http"));
DataStreamSource<Long> mapDataStreamSource = env.addSource(new PowerStationSourceFromMysql());
SingleOutputStreamOperator<PowerStationInfo> map = mapDataStreamSource.map(new SqlMapFunction());
ElasticsearchSink.Builder<PowerStationInfo> esBuilder = new ElasticsearchSink.Builder<PowerStationInfo>(httpHosts, getEsSinkFunction());
//刷新前最大缓存的操作数。
esBuilder.setBulkFlushMaxActions(100);
//刷新前最大缓存的数据量(以兆字节为单位)。
esBuilder.setBulkFlushMaxSizeMb(100);
//刷新的时间间隔(不论缓存操作的数量或大小如何)
esBuilder.setBulkFlushInterval(1000);
//设置用户名密码
esBuilder.setRestClientFactory(getRestClientFactory());
map.addSink(esBuilder.build());
env.execute();
}
/**
* 定义ES Sink算子
*
* @return
*/
public static ElasticsearchSinkFunction<PowerStationInfo> getEsSinkFunction() {
ElasticsearchSinkFunction<PowerStationInfo> esSinkFunction = new ElasticsearchSinkFunction<PowerStationInfo>() {
@Override
public void process(PowerStationInfo event, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
System.out.println("------> " event);
String string = JSON.toJSONString(event);
IndexRequest request = Requests.indexRequest()
.index("temp_ps_info2")
.id(event.getPs_id().toString())
.source(string, XContentType.JSON);// 放入数据json格式
requestIndexer.add(request);
}
};
return esSinkFunction;
}
/**
* es 用户名密码
*/
public static RestClientFactory getRestClientFactory() {
RestClientFactory restClientFactory = new RestClientFactory() {
@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "1111"));
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
httpAsyncClientBuilder.disableAuthCaching();
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
}
};
return restClientFactory;
}
}