ElasticSearch5.6.2常用transport client Java API操作代码实例

2024-07-25 15:30:04 浏览数 (1)

版本及环境

windows 11 ElasticSearch 5.6.2 Idea 2020

请注意,5.6.2是已经停止维护的版本,官方文档好多已经404,transport client客户端在6.x的版本中被官方弃用,但是现在仍然有老服务在用。最新的elasticsearch 7.x和8.x不再支持jdk1.8,但是我的服务器在2023年仍不能升级高版本java,所以只能学习老版本。

1 Maven依赖

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>ES_Client</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>5.6.2</version>
        </dependency>

<!--        <dependency>-->
<!--            <groupId>org.elasticsearch.client</groupId>-->
<!--            <artifactId>elasticsearch-rest-high-level-client</artifactId>-->
<!--            <version>5.6.2</version>-->
<!--        </dependency>-->

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-csv</artifactId>
            <version>1.9.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>
</project>

2 创建索引并插入单条数据

代码语言:javascript复制
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.net.InetAddress;
import java.net.UnknownHostException;

public class IndexCreated {

    public static void main(String[] args) {
        try {
            // 设置集群名称
            Settings settings = Settings.builder()
                    .put("cluster.name", "elasticsearch")
                    .build();

            // 创建 TransportClient
            try (TransportClient client = new PreBuiltTransportClient(settings)
                    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300))) {

                // 准备索引数据
                String jsonDocument = "{"  
                        ""user":"Marry Doe","  
                        ""postDate":"2023-12-01","  
                        ""message":"Hello, Index!""  
                        "}";

                // 创建索引请求
                IndexResponse response = client.prepareIndex("twitter", "tweet", "2")
                        .setSource(jsonDocument)
                        .get();

                // 输出索引结果
                System.out.println("Index created: "   response.getId());
            }

        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
    }
}

2 删除索引

代码语言:javascript复制
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.net.InetAddress;
import java.net.UnknownHostException;

public class DeleteIndexExample {

    public static void main(String[] args) {
        try (TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300))) {

            // 删除索引请求
            DeleteIndexRequest request = new DeleteIndexRequest("geo_index");

            // 执行删除索引请求
            DeleteIndexResponse response = client.admin().indices().delete(request).actionGet();

            // 打印删除索引响应
            System.out.println("Index deleted: "   response.isAcknowledged());

        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
    }
}

3 打印所有创建的索引的名称

代码语言:javascript复制
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;

public class ListAllIndicesExample {

    public static void main(String[] args) {
        try (TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300))) {

            // 获取所有索引请求
            GetIndexRequest request = new GetIndexRequest();

            // 执行获取所有索引请求
            GetIndexResponse response = client.admin().indices().getIndex(request).actionGet();

            // 获取所有索引的名称
            String[] indices = response.getIndices();

            // 打印所有索引的名称
            System.out.println("All Indices:");
            for (String index : indices) {
                System.out.println(index);

                // 获取索引的设置
                try {
                    Settings indexSettings = client.admin().indices().prepareGetSettings(index).get().getIndexToSettings().get(index);
                    System.out.println("Settings: "   indexSettings);
                } catch (Exception e) {
                    if (e.getMessage().contains("Index not exist")) {
                        System.out.println("Index not found: "   index);
                    } else {
                        e.printStackTrace();
                    }
                }

                // 获取索引的映射
                try {
                    GetMappingsResponse mappingsResponse = client.admin().indices().getMappings(new GetMappingsRequest().indices(index)).get();
                    ImmutableOpenMap<String, MappingMetaData> indexMappings = mappingsResponse.mappings().get(index);
                    System.out.println("Mappings: "   indexMappings);
                } catch (Exception e) {
                    if (e.getMessage().contains("Index not exist")) {
                        System.out.println("Index not found: "   index);
                    } else {
                        e.printStackTrace();
                    }
                }

                System.out.println();
            }

        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

4 查询索引中数据

代码语言:javascript复制
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.net.InetAddress;
import java.net.UnknownHostException;

public class QueryIndexExample {

    public static void main(String[] args) {
        try (TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300))) {

            // 准备查询请求
            SearchRequestBuilder searchRequest = client.prepareSearch("twitter")
                    .setQuery(QueryBuilders.matchAllQuery());

            // 执行查询请求
            SearchResponse searchResponse = searchRequest.get();

            // 处理查询结果
            System.out.println("Search results:");
            System.out.println(searchResponse.toString());

        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
    }
}

5 删除索引

代码语言:javascript复制
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.net.InetAddress;
import java.net.UnknownHostException;

public class DeleteIndexExample {

    public static void main(String[] args) {
        try (TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300))) {

            // 删除索引请求
            DeleteIndexRequest request = new DeleteIndexRequest("geo_index");

            // 执行删除索引请求
            DeleteIndexResponse response = client.admin().indices().delete(request).actionGet();

            // 打印删除索引响应
            System.out.println("Index deleted: "   response.isAcknowledged());

        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
    }
}

6 创建索引,并批量插入本地csv数据

将以下数据存入前文创建的索引geo_index,csv文件一共20万行。

以下为数据示例,第二列为地理数据。每行第二列的 loc 字段包含包围符 ",并且在字段中包含逗号 ,,我们需要更复杂的逻辑来解析 CSV 文件。以下使用 Apache Commons CSV 库来处理包含引号和逗号的 CSV 数据:

代码语言:javascript复制
12345678901234567890,"23.123456,113.123456"
23456789012345678901,"24.234567,114.234567"
34567890123456789012,"25.345678,115.345678"
45678901234567890123,"26.456789,116.456789"

代码假设csv文件位置在"D:geo_clean.csv"

代码语言:javascript复制
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.io.FileReader;
import java.net.InetAddress;

public class BulkInsertCsvToElasticsearch {

    public static void main(String[] args) {
        try (TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300))) {

            // 读取 CSV 文件
            String csvFile = "D:\geo_clean.csv";
            CSVParser csvParser = CSVParser.parse(new FileReader(csvFile), CSVFormat.DEFAULT);

            // 创建批量请求构建器
            BulkRequestBuilder bulkRequest = client.prepareBulk();

            // 读取 CSV 文件的每一行
            for (CSVRecord csvRecord : csvParser) {
                // 获取 CSV 数据中的两列
                String did = csvRecord.get(0);
                String loc = csvRecord.get(1);

                // 创建索引请求
                IndexRequest indexRequest = client.prepareIndex("geo_index", "doc")
                        .setSource(XContentFactory.jsonBuilder()
                                .startObject()
                                .field("did", did)
                                .field("loc", loc)
                                .endObject())
                        .request();

                // 添加索引请求到批量请求
                bulkRequest.add(indexRequest);
            }

            // 执行批量请求
            BulkResponse bulkResponse = bulkRequest.get();

            // 处理批量响应
            if (bulkResponse.hasFailures()) {
                System.out.println("Bulk request failed with failures: "   bulkResponse.buildFailureMessage());
            } else {
                System.out.println("Bulk request successfully processed.");
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

7 查看索引中的前10条数据

代码语言:javascript复制
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.net.InetAddress;
import java.net.UnknownHostException;

public class QueryGeoIndexExample {

    public static void main(String[] args) {
        try (TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300))) {

            // 构建搜索请求
            SearchRequest searchRequest = new SearchRequest("geo_index");

            // 构建搜索源
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(QueryBuilders.matchAllQuery());
            searchSourceBuilder.size(10); // 限制返回结果的数量

            // 设置搜索源
            searchRequest.source(searchSourceBuilder);

            // 执行搜索请求
            SearchResponse searchResponse = client.search(searchRequest).actionGet();

            // 打印搜索结果
            System.out.println("Search Results:");
            System.out.println(searchResponse.toString());

        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

0 人点赞