ES更新嵌套数组(使用Java API)

2022-01-06 15:18:24 浏览数 (1)

最近在做一个需求,一开始的时候以为用es脚本能搞定,耽搁了一天半时间。

后来用了Java client 的api来做,效率快多了。

代码语言:javascript复制
package com.XXX.XXXX.XXX;

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.assertj.core.util.Arrays;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class HestiaApplicationTests {

    @Resource
    protected RestHighLevelClient client;

    @Test
    public void contextLoads() {
    }

    /**
     * *先检索、再更新文档
     **/
    @Test
    public void search() throws IOException {

		try {
			SearchRequest searchRequest = new SearchRequest("zm_prod");
			SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
			// 第几页
			searchSourceBuilder.from(0);
			Map<String, Object> params = new HashMap<>();
			//课程标签:1.大班课;2.小班课;3.AI课;4.磨课
//        Script script = new Script(ScriptType.INLINE, "painless", "ctx._source.values.base_info.course_mark == 2", params);
//        searchSourceBuilder.query(QueryBuilders.scriptQuery(script));
//        searchSourceBuilder.query(QueryBuilders.matchQuery("_source.values.base_info.course_mark",2));


			searchSourceBuilder.query(QueryBuilders.termQuery("_id", 55074));
//			searchSourceBuilder.query(QueryBuilders.matchAllQuery());
			// 每页多少条数据
			searchSourceBuilder.size(1000);
			// 设置超时时间为2s
			searchSourceBuilder.timeout(new TimeValue(2000));

			// 设置request要搜索的索引和类型
//		searchRequest.indices("zm_prod").types("news");
			// 设置SearchSourceBuilder查询属性
			searchRequest.source(searchSourceBuilder);

			SearchResponse searchResponse = client.search(searchRequest);
			SearchHit[] searchHits = searchResponse.getHits().getHits();
			if (ArrayUtils.isEmpty(searchHits)) {
				log.info("searchHits = 666666666666666666666");
			}

			for (SearchHit hit : searchHits) {
				//文档的主键
				String id = hit.getId();
				String index = hit.getIndex();
				String type = hit.getType();
				String sourceAsString = hit.getSourceAsString();
				if(!sourceAsString.contains("course_mark")){
					continue;
				}
				//源文档内容
				Map<String, Object> sourceAsMap = hit.getSource();
//				String s = JSON.toJSONString(sourceAsMap);
				Map<String, Object> values = (Map)sourceAsMap.get("values");
				Map<String, Object>  base_info = (Map)values.get("base_info");

				Integer course_mark = (Integer) base_info.get("course_mark");
				String base_infostr = JSON.toJSONString(base_info);
				if(null!=course_mark){
					if(course_mark==1||course_mark==2||course_mark==3){
						log.info(" base_infostr = "   base_infostr);

                         //TODO 这个地方反向的在map里面添加字段,然后重新组装,然后更新当前的数据
						base_info.put("BBB",7777);
						values.put("base_info",base_info);

						UpdateRequest uRequest = new UpdateRequest()
								.index(index)
								.type(type)
								.id(id)
								.doc(XContentFactory.jsonBuilder()
										.startObject().field("values", values).endObject());
						BulkRequest blkRequest = new BulkRequest();
						blkRequest.add(uRequest);
                       // 执行
						BulkResponse bulkResponse = client.bulk(blkRequest);
//						for (UpdateRequest uprequest : list) {
//							bulkResponse.add(uprequest);
//						}

//						BulkResponse bulkResponse = bulkResponse.execute().actionGet();

						if (bulkResponse.hasFailures()) {
							System.out.println("批量错误!");
						}

					}
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
			log.error(ExceptionUtils.getMessage(e));
		}
    }

}

以下是参考别的博主的内容;

最近在学习ElasticSearch,前些天在工作中遇到一个难以解决的问题,问题正如标题所示在使用Java TransportClient更新ES复杂数据结构数组,最后请教大佬问题得以解决。此文章将详细描述问题并提供解决办法。

博主要更新的数据格式大致如下:

原数据:一个嵌套类型的数组 更新后的数据:将商场01对应的数据从数组删除

代码语言:javascript复制
"list":[
    {
    "code": "9111364",
    "name": "企业01"
    },
    {
     "code": "900662",
    "name": "智能01"
   },
   {
   "code": "9000300",
   "name": "商场01"
    }
]

博主是ES小白,对于此类型的数据不知道如何正确使用 UpdateRequest进行更新。

于是乎使用如下方法,value表示更新的数据(也就是没有"商场01"的list数据),对于value的类型博主尝试过Object和List<>,甚至将list转成Json格式结果都不可以。

// XXXXXXXXX表示要更新的数据 // List value = XXXXXXXXX; // XXXXXXXXX表示要更新的数据 // Object value = XXXXXXXXX; List> value = XXXXXXXXX; // 这个好用 updateRequest.doc(XContentFactory.jsonBuilder() .startObject() .field("name", value) .field(flag, 1) .endObject()); 最后大佬告诉我要将List value转成List> value,也就是当使用updateRequest的时候,对于字段类型是对象数组的,ES是无法正常更新的,要将List中的泛型专程Map类型,Es才会识别。

// 将嵌套数组对象转Set格式(List也可以),否则无法进行更新(会报错) List> set = Lists.newArrayList(); Map map = Maps.newHashMap(); Class clazz; // 使用反射动态将Set中的属性值放入Map中 for (Object obj : setArry) { clazz = obj.getClass(); // 遍历当前对象的属性值 for (Field field : clazz.getDeclaredFields()) { field.setAccessible(true); String name = field.getName(); Object value = field.get(obj); map.put(name, value); } set.add(map); map = Maps.newHashMap(); 由于此种数组类型较多,博主使用反射,可以兼容每种数组类型。最后成功更新数据。

同时参考了

如果更新一条文档,而且知道文档id的前提下可以使用UpdateRequest即可实现,代码如下:

代码语言:javascript复制
/**
     * 根据文档id更新
     * @throws IOException
     */
    @Test
    public void test() throws IOException {
        UpdateRequest request = new UpdateRequest("sub_bank1031","sub_bank","SvjgP24BndtcmnpzbiuL");
        request.doc("{"aliasName":"中国农业发展银行林州市支行444","bankType":"ADB","bankTypeName":"中国农业发展银行","cityId":"410500","cityName":"安阳市","createTime":1515719190000,"createUser":"system","id":"000238a326b044e9ae10cfe4298f4c44","isEnabled":"1","name":"中国农业发展银行林州市支行","provinceId":"410000","provinceName":"河南省","unionNumber":"203496100010"}", XContentType.JSON);
        UpdateResponse resp = highLevelClient.update(request, RequestOptions.DEFAULT);
        println(resp.getResult());
    }

但是如果不知道文档id的情况如果还想使用UpdateRequest更新文档就需要先使用SearchRequest根据某个条件查询符合条件的文档,然后再循环更新文档即可。

代码语言:javascript复制
    /**
    **先检索、再更新文档
    **/
   	@Test
	public void search() throws IOException{
		SearchRequest searchRequest = new SearchRequest("sub_bank1031");
		SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
		searchSourceBuilder.query(QueryBuilders.matchQuery("_id", "SvjgP24BndtcmnpzbiuL"));
		searchSourceBuilder.size(2);
		searchRequest.source(searchSourceBuilder);
		SearchResponse searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT); 
		SearchHit[] searchHits = searchResponse.getHits().getHits();
	    for(SearchHit s:searchHits){
           String docId = s.getId();
    		UpdateRequest request = new UpdateRequest("sub_bank1031","sub_bank",docId);
		request.doc("{"aliasName":"中国农业发展银行林州市支行444","bankType":"ADB","bankTypeName":"中国农业发展银行","cityId":"410500","cityName":"安阳市","createTime":1515719190000,"createUser":"system","id":"000238a326b044e9ae10cfe4298f4c44","isEnabled":"1","name":"中国农业发展银行林州市支行","provinceId":"410000","provinceName":"河南省","unionNumber":"203496100010"}", XContentType.JSON);
		UpdateResponse resp = highLevelClient.update(request, RequestOptions.DEFAULT);
		println(resp.getResult());              
	    }
	}

上面操作略显麻烦,需要多条http请求才能完成,要更新的文档数量很多时将大大降低系统响应速度,这时候我们可以使用es的UpdateByQueryRequest来实现该功能。

代码语言:javascript复制
/**
     * 通过脚本更新文档
     * @throws IOException
     */
    @Test
    public void updateByQueryRequest() throws IOException {
        UpdateByQueryRequest request = new UpdateByQueryRequest("sub_bank1031");
        request.setDocTypes("sub_bank");
        request.setQuery(new TermQueryBuilder("cityId", "511000"));
        request.setSize(2);
        request.setScript(
                new Script(
                    ScriptType.INLINE, "painless",
                    "if (ctx._source.bankType == 'BOC') {ctx._source.aliasName='hello'}",
                    Collections.emptyMap()));    
        BulkByScrollResponse resp = highLevelClient.updateByQuery(request, RequestOptions.DEFAULT);
    }

PS:

pom文件es相关依赖如下:

代码语言:javascript复制
  <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>6.8.0</version>
        </dependency>
        <!-- elasticsearch high level -->    
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.elasticsearch</groupId>
                    <artifactId>elasticsearch</artifactId>                
                </exclusion>
            </exclusions>
            <version>6.8.0</version>
        </dependency>

0 人点赞