Flink-mysql源-esSink

2023-03-16 19:45:48 浏览数 (1)

1实体对象

代码语言:javascript复制
package com.shi.mysqlEsTest;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Date;
import java.util.List;

/**
 * 电站信息ES对象
 *
 * @author shiye
 * @create 2023-02-21 11:26
 */
public class PowerStationInfo implements Serializable {
    /**
     * 电站id
     */
    private Long ps_id;

    /**
     * 电站名称
     */
    private String ps_name;

    /**
     * 电站图片链接
     */
    private String images;

    /**
     * 电站类型:
     */
    private Integer ps_type;
    /**
     * 电站在线离线状态:
     */
    private Integer ps_status;
    /**
     * 装机功率
     */
    private Double design_capacity;
    /**
     * 实时功率
     */
    private Double current_power;
    /**
     * 累计发电量
     */
    private Double today_energy;
    /**
     * 等效小时
     */
    private Double equivalent_hour;
    /**
     * 电站地址
     */
    private String ps_location;

    /**
     * 电站PR
     */
    private String pr_scale;
    /**
     * 瞬时辐照
     */
    private Double radiation;
    /**
     * 日辐射量
     */
    private Double daily_irradiation;

    /**
     * 电站有效标识
     */
    private Integer valid_flag;
    /**
     * 离线时间
     */
    private Date offline_time;
    /**
     * 行政区划code (最底级区划一般为区级区划)
     */
    private Integer division_code;
    /**
     * 经度-WGS84格式
     * 117.18748542291920
     * 纬度-WGS84格式
     * 31.81492717972597
     */
    private String[] lon_lat;
    /**
     * 当前电站的 业主信息
     */
    private List<OrgUserInfo> orgUserInfo_array;

    /**
     * 设备sn集合
     */
    private String[] sn_array;
    /**
     * 关注用户集合
     */
    private long[] userid_array;

    static class OrgUserInfo{
        /**
         * 电站的分享类型:
         */
        private Integer share_type;

        /**
         * 组织id
         */
        private Long org_id;

        /**
         * 用户id
         */
        private Long user_id;

        /**
         * 业主联系方式 - 手机号码
         */
        private String moble_tel;
        /**
         * 业主联系方式 - 邮箱
         */
        private String email;

        public Integer getShare_type() {
            return share_type;
        }

        public void setShare_type(Integer share_type) {
            this.share_type = share_type;
        }

        public Long getOrg_id() {
            return org_id;
        }

        public void setOrg_id(Long org_id) {
            this.org_id = org_id;
        }

        public Long getUser_id() {
            return user_id;
        }

        public void setUser_id(Long user_id) {
            this.user_id = user_id;
        }

        public String getMoble_tel() {
            return moble_tel;
        }

        public void setMoble_tel(String moble_tel) {
            this.moble_tel = moble_tel;
        }

        public String getEmail() {
            return email;
        }

        public void setEmail(String email) {
            this.email = email;
        }

        @Override
        public String toString() {
            return "OrgUserInfo{"  
                    "share_type="   share_type  
                    ", org_id="   org_id  
                    ", user_id="   user_id  
                    ", moble_tel='"   moble_tel   '''  
                    ", email='"   email   '''  
                    '}';
        }
    }

    public List<OrgUserInfo> getOrgUserInfo_array() {
        return orgUserInfo_array;
    }

    public void setOrgUserInfo_array(List<OrgUserInfo> orgUserInfo_array) {
        this.orgUserInfo_array = orgUserInfo_array;
    }

    public Long getPs_id() {
        return ps_id;
    }

    public void setPs_id(Long ps_id) {
        this.ps_id = ps_id;
    }

    public String getPs_name() {
        return ps_name;
    }

    public void setPs_name(String ps_name) {
        this.ps_name = ps_name;
    }

    public String getImages() {
        return images;
    }

    public void setImages(String images) {
        this.images = images;
    }

    public Integer getPs_type() {
        return ps_type;
    }

    public void setPs_type(Integer ps_type) {
        this.ps_type = ps_type;
    }

    public Integer getPs_status() {
        return ps_status;
    }

    public void setPs_status(Integer ps_status) {
        this.ps_status = ps_status;
    }

    public Double getDesign_capacity() {
        return design_capacity;
    }

    public void setDesign_capacity(Double design_capacity) {
        this.design_capacity = design_capacity;
    }

    public Double getCurrent_power() {
        return current_power;
    }

    public void setCurrent_power(Double current_power) {
        this.current_power = current_power;
    }

    public Double getToday_energy() {
        return today_energy;
    }

    public void setToday_energy(Double today_energy) {
        this.today_energy = today_energy;
    }

    public Double getEquivalent_hour() {
        return equivalent_hour;
    }

    public void setEquivalent_hour(Double equivalent_hour) {
        this.equivalent_hour = equivalent_hour;
    }

    public String getPs_location() {
        return ps_location;
    }

    public void setPs_location(String ps_location) {
        this.ps_location = ps_location;
    }

    public String getPr_scale() {
        return pr_scale;
    }

    public void setPr_scale(String pr_scale) {
        this.pr_scale = pr_scale;
    }

    public Double getRadiation() {
        return radiation;
    }

    public void setRadiation(Double radiation) {
        this.radiation = radiation;
    }

    public Double getDaily_irradiation() {
        return daily_irradiation;
    }

    public void setDaily_irradiation(Double daily_irradiation) {
        this.daily_irradiation = daily_irradiation;
    }

    public Integer getValid_flag() {
        return valid_flag;
    }

    public void setValid_flag(Integer valid_flag) {
        this.valid_flag = valid_flag;
    }

    public Date getOffline_time() {
        return offline_time;
    }

    public void setOffline_time(Date offline_time) {
        this.offline_time = offline_time;
    }

    public Integer getDivision_code() {
        return division_code;
    }

    public void setDivision_code(Integer division_code) {
        this.division_code = division_code;
    }

    public String[] getLon_lat() {
        return lon_lat;
    }

    public void setLon_lat(String[] lon_lat) {
        this.lon_lat = lon_lat;
    }

    public String[] getSn_array() {
        return sn_array;
    }

    public void setSn_array(String[] sn_array) {
        this.sn_array = sn_array;
    }

    public long[] getUserid_array() {
        return userid_array;
    }

    public void setUserid_array(long[] userid_array) {
        this.userid_array = userid_array;
    }

    @Override
    public String toString() {
        return "PowerStationInfo{"  
                "ps_id="   ps_id  
                ", ps_name='"   ps_name   '''  
                ", images='"   images   '''  
                ", ps_type="   ps_type  
                ", ps_status="   ps_status  
                ", design_capacity="   design_capacity  
                ", current_power="   current_power  
                ", today_energy="   today_energy  
                ", equivalent_hour="   equivalent_hour  
                ", ps_location='"   ps_location   '''  
                ", pr_scale='"   pr_scale   '''  
                ", radiation="   radiation  
                ", daily_irradiation="   daily_irradiation  
                ", valid_flag="   valid_flag  
                ", offline_time="   offline_time  
                ", division_code="   division_code  
                ", lon_lat="   Arrays.toString(lon_lat)  
                ", orgUserInfo_array="   orgUserInfo_array  
                ", sn_array="   Arrays.toString(sn_array)  
                ", userid_array="   Arrays.toString(userid_array)  
                '}';
    }
}

2.源获取数据

代码语言:javascript复制
package com.shi.mysqlEsTest;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

/**
 * @author shiye
 * @create 2023-02-21 11:54
 */
public class PowerStationSourceFromMysql extends RichSourceFunction<Long> {

    private PreparedStatement ps = null;
    private Connection connection = null;
    String driver = "com.mysql.jdbc.Driver";
    String url = "jdbc:mysql://10.0.81.151:3306/sungrow?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&serverTimezone=Asia/Shanghai&useSSL=false";
    String username = "root";
    String password = "1234";

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //加载驱动
        Class.forName(driver);
        connection = DriverManager.getConnection(url, username, password);

    }

    @Override
    public void run(SourceContext<Long> sourceContext) throws Exception {
        String sql = "SELECT t1.ps_id FROM power_station t1 ORDER BY t1.ps_id";
        ps = connection.prepareStatement(sql);
        ResultSet resultSet = ps.executeQuery();

        while (resultSet.next()) {
            Long ps_id = resultSet.getLong("ps_id");
            sourceContext.collect(ps_id);
        }

    }

    @Override
    public void cancel() {
        try {
            super.close();
            if (connection != null) {
                connection.close();
            }
            if (ps != null) {
                ps.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2.map处理数据

代码语言:javascript复制
package com.shi.mysqlEsTest;

import com.mysql.jdbc.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;

/**
 * @author shiye
 * @create 2023-02-22 14:56
 */
public class SqlMapFunction extends RichMapFunction<Long, PowerStationInfo> {
    private PreparedStatement ps = null;
    private Connection connection = null;
    String driver = "com.mysql.jdbc.Driver";
    String url = "jdbc:mysql://10.0.81.151:3306/sungrow?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&serverTimezone=Asia/Shanghai&useSSL=false";
    String username = "root";
    String password = "1234";

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //加载驱动
        Class.forName(driver);
        connection = DriverManager.getConnection(url, username, password);
    }

    @Override
    public void close() throws Exception {
        try {
            super.close();
            if (connection != null) {
                connection.close();
            }
            if (ps != null) {
                ps.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public PowerStationInfo map(Long ps_id) throws Exception {
        PowerStationInfo psInfo = new PowerStationInfo();

        String sql2 = "SELECTn"  
                "  t1.ps_id,n"  
                "  IFNULL(t1.ps_name,'') AS ps_name,n"  
                "  t1.ps_type,n"  
                "  t1.ps_location,n"  
                "  t1.valid_flag,n"  
                "  t1.division_code,n"  
                "  t1.longitude,n"  
                "  t1.latitude,n"  
                "  IFNULL(t2.`PICTURE_URL`,'') AS images,n"  
                "  t3.`dev_status` AS ps_status,n"  
                "  t3.`update_time` AS offline_time,n"  
                "  t4.`DESIGN_CAPACITY` AS design_capacity,n"  
                "  t4.`MAX_PR` AS max_pr,n"  
                "  t4.`min_pr` AS min_pr,n"  
                "  t5.`p83023` AS p83023,n"  
                "  t5.`p83033` AS current_power,n"  
                "  t5.`p83022` AS today_energy,n"  
                "  t5.`p83024` AS total_energy,n"  
                "  t5.`p83025` AS equivalent_hour,n"  
                "  t5.`p83012` AS radiation,n"  
                "  t5.`p83013` AS daily_irradiation,n"  
                "  GROUP_CONCAT(DISTINCT t6.`DEVICE_PRO_SN`) AS sn_array,n"  
                "  GROUP_CONCAT(DISTINCT t8.`user_id`) AS userid_arrayn"  
                "FROMn"  
                "  power_station t1n"  
                "LEFT JOIN power_station_picture t2 ON t1.ps_id = t2.`PS_ID`n"  
                "LEFT JOIN power_device_attr t6 ON t1.`PS_ID` = t6.`PS_ID`n"  
                "LEFT JOIN power_device_status t3 ON t6.uuid = t3.`uuid`n"  
                "LEFT JOIN power_station_config_ratio t4 ON t1.ps_id = t4.`PS_ID`n"  
                "LEFT JOIN power_statistical_data t5 ON t1.ps_id = t5.`ps_id`n"  
                "LEFT JOIN user_station_follow_config t8 ON t1.`PS_ID` = t8.`ps_id`n"  
                "WHERE t1.`PS_ID` = "   ps_id  
                " GROUP BY t1.`PS_ID`";
        ResultSet resultSet2 = connection.prepareStatement(sql2).executeQuery();

        if (resultSet2.next()) {
            psInfo.setPs_id(ps_id);
            psInfo.setPs_name(resultSet2.getString("ps_name"));
            psInfo.setPs_type(resultSet2.getInt("ps_type"));
            psInfo.setPs_location(resultSet2.getString("ps_location"));
            psInfo.setValid_flag(resultSet2.getInt("valid_flag"));
            psInfo.setValid_flag(resultSet2.getInt("division_code"));
            String longitude = resultSet2.getString("longitude");
            String latitude = resultSet2.getString("latitude");
            String[] lon_lat = new String[]{longitude, latitude};
            psInfo.setLon_lat(lon_lat);
            psInfo.setImages(resultSet2.getString("images"));
            psInfo.setPs_status(resultSet2.getInt("ps_status"));
            Date offlineTime = resultSet2.getDate("offline_time");
            psInfo.setOffline_time(offlineTime);
            psInfo.setDesign_capacity(resultSet2.getDouble("design_capacity"));
            double max_pr = resultSet2.getDouble("max_pr");
            double min_pr = resultSet2.getDouble("min_pr");
            double p83023 = resultSet2.getDouble("p83023");
            if (p83023 == 0) {
                psInfo.setPr_scale("--");
            } else if (p83023 > max_pr) {
                psInfo.setPr_scale(String.format("%.2f", (max_pr * 100)));
            } else if (p83023 < min_pr) {
                psInfo.setPr_scale(String.format("%.2f", (min_pr * 100)));
            } else {
                psInfo.setPr_scale(String.format("%.2f", (p83023 * 100)));
            }

            psInfo.setCurrent_power(resultSet2.getDouble("current_power"));
            psInfo.setToday_energy(resultSet2.getDouble("today_energy"));
            psInfo.setToday_energy(resultSet2.getDouble("total_energy"));
            psInfo.setEquivalent_hour(resultSet2.getDouble("equivalent_hour"));
            psInfo.setRadiation(resultSet2.getDouble("radiation"));
            String snStr = resultSet2.getString("sn_array");
            if (!StringUtils.isNullOrEmpty(snStr)) {
                psInfo.setSn_array(snStr.split(","));
            }
            String useridStr = resultSet2.getString("userid_array");
            if (!StringUtils.isNullOrEmpty(useridStr)) {
                String[] split = resultSet2.getString("userid_array").split(",");
                psInfo.setUserid_array(Arrays.asList(split).stream().mapToLong(Long::parseLong).toArray());
            }

            String sql3 = "SELECTn"  
                    "tu.user_id,n"  
                    "tu.moble_tel,n"  
                    "tu.email,n"  
                    "tp.share_type,n"  
                    "tu.org_idn"  
                    "FROMn"  
                    "power_station_org pn"  
                    "LEFT JOIN sys_organization s ON p.org_id = s.org_idn"  
                    "LEFT JOIN sys_user_org o ON o.org_id = s.org_id AND o.share_type = 0n"  
                    "LEFT JOIN sys_user u ON o.user_id = u.user_idn"  
                    "WHEREn"  
                    " p.ps_id = "   ps_id  
                    " AND (p.root_org_id = 499 OR is_installer_ps_org = -1)n"  
                    " AND s.share_type = 0n"  
                    " AND p.share_type = 0n"  
                    "UNIONn"  
                    "SELECTn"  
                    "    s.user_id,n"  
                    "    s.moble_tel,n"  
                    "    s.email,n"  
                    "    '0' AS share_type,n"  
                    "    so.org_idn"  
                    "FROMn"  
                    "power_station_owner_user_rel pson"  
                    "JOIN sys_user s ON s.user_id = pso.user_idn"  
                    "LEFT JOIN sys_user_org suo ON suo.user_id = pso.user_idn"  
                    "LEFT JOIN sys_organization so ON so.org_id = suo.org_idn"  
                    "WHERE pso.ps_id = "   ps_id  
                    " AND suo.is_master_org != -1 "  
                    " AND suo.share_type = 0";
            ResultSet resultSet3 = connection.prepareStatement(sql3).executeQuery();
            List<PowerStationInfo.OrgUserInfo> orgUserInfo_array = new ArrayList<>();
            while (resultSet3.next()) {
                PowerStationInfo.OrgUserInfo orgUserInfo = new PowerStationInfo.OrgUserInfo();
                orgUserInfo.setOrg_id(resultSet3.getLong("org_id"));
                orgUserInfo.setUser_id(resultSet3.getLong("user_id"));
                orgUserInfo.setMoble_tel(resultSet3.getString("moble_tel"));
                orgUserInfo.setEmail(resultSet3.getString("email"));
                orgUserInfo.setShare_type(resultSet3.getInt("share_type"));
                orgUserInfo_array.add(orgUserInfo);
            }
            psInfo.setOrgUserInfo_array(orgUserInfo_array);
        }
        return psInfo;
    }
}

4.sink

代码语言:javascript复制
package com.shi.mysqlEsTest;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.security.user.User;
import org.elasticsearch.common.xcontent.XContentType;

import java.util.ArrayList;
import java.util.Map;

/**
 * @author shiye
 * @create 2023-02-21 13:55
 */
public class MysqlSourceToEsTest {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        ArrayList<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("10.0.81.167", 9201, "http"));
        httpHosts.add(new HttpHost("10.0.81.168", 9201, "http"));
        httpHosts.add(new HttpHost("10.0.81.169", 9201, "http"));

        DataStreamSource<Long> mapDataStreamSource = env.addSource(new PowerStationSourceFromMysql());

        SingleOutputStreamOperator<PowerStationInfo> map = mapDataStreamSource.map(new SqlMapFunction());

        ElasticsearchSink.Builder<PowerStationInfo> esBuilder = new ElasticsearchSink.Builder<PowerStationInfo>(httpHosts, getEsSinkFunction());
        //刷新前最大缓存的操作数。
        esBuilder.setBulkFlushMaxActions(100);
        //刷新前最大缓存的数据量(以兆字节为单位)。
        esBuilder.setBulkFlushMaxSizeMb(100);
        //刷新的时间间隔(不论缓存操作的数量或大小如何)
        esBuilder.setBulkFlushInterval(1000);
        //设置用户名密码
        esBuilder.setRestClientFactory(getRestClientFactory());
        map.addSink(esBuilder.build());

        env.execute();
    }

    /**
     * 定义ES Sink算子
     *
     * @return
     */
    public static ElasticsearchSinkFunction<PowerStationInfo> getEsSinkFunction() {
        ElasticsearchSinkFunction<PowerStationInfo> esSinkFunction = new ElasticsearchSinkFunction<PowerStationInfo>() {
            @Override
            public void process(PowerStationInfo event, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                System.out.println("------> "   event);
                String string = JSON.toJSONString(event);

                IndexRequest request = Requests.indexRequest()
                        .index("temp_ps_info2")
                        .id(event.getPs_id().toString())
                        .source(string, XContentType.JSON);// 放入数据json格式

                requestIndexer.add(request);
            }
        };
        return esSinkFunction;
    }

    /**
     * es 用户名密码
     */
    public static RestClientFactory getRestClientFactory() {

        RestClientFactory restClientFactory = new RestClientFactory() {
            @Override
            public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
                CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "1111"));
                restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
                        httpAsyncClientBuilder.disableAuthCaching();
                        return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    }
                });
            }
        };
        return restClientFactory;
    }
}

0 人点赞