ElasticsearchServiceImpl.java
/*
* Copyright 2020 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gringlobal.service.impl;
import static org.elasticsearch.index.query.QueryBuilders.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Resource;
import javax.persistence.EntityManager;
import javax.persistence.TypedQuery;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Root;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.GetAliasesResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetIndexResponse;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.unit.Fuzziness;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.Operator;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.SimpleQueryStringBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.missing.InternalMissing;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedDoubleTerms;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedLongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.UnmappedTerms;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.genesys.blocks.model.EmptyModel;
import org.genesys.blocks.model.filters.EmptyModelFilter;
import org.gringlobal.application.config.ElasticsearchConfig.GRINGlobalEntityMapper;
import org.gringlobal.component.elastic.ElasticLoader;
import org.gringlobal.component.elastic.ElasticReindex;
import org.gringlobal.component.elastic.ElasticTrigger;
import org.gringlobal.custom.elasticsearch.BoostedFields;
import org.gringlobal.custom.elasticsearch.CustomMapping;
import org.gringlobal.custom.elasticsearch.ElasticQueryBuilder;
import org.gringlobal.custom.elasticsearch.SearchException;
import org.gringlobal.service.ElasticsearchService;
import org.gringlobal.service.TransientMessageService;
import org.gringlobal.service.filter.IFullTextFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.task.TaskExecutor;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.EntityMapper;
import org.springframework.data.elasticsearch.core.query.AliasQuery;
import org.springframework.data.elasticsearch.core.query.DeleteQuery;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.jpa.repository.support.Querydsl;
import org.springframework.data.querydsl.SimpleEntityPathResolver;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.transaction.annotation.Transactional;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
import com.fasterxml.jackson.module.jsonSchema.types.ObjectSchema;
import com.querydsl.core.BooleanBuilder;
import com.querydsl.core.types.EntityPath;
import com.querydsl.core.types.Predicate;
import com.querydsl.core.types.dsl.PathBuilder;
import com.querydsl.core.types.dsl.PathBuilderFactory;
import com.querydsl.jpa.JPQLQuery;
/**
* Manage Elasticsearch indexing, indices.
*
* @author Matija Obreza
*/
// Not @Service
@Transactional(readOnly = true)
@Slf4j
public class ElasticsearchServiceImpl implements ElasticsearchService, InitializingBean {
private static final Logger LOGtest = LoggerFactory.getLogger("org.gringlobal.test.Elasticsearch");
private static final String INDEX_READ = "_read";
private static final String INDEX_WRITE = "_write";
private static final String ALIAS_EVERYTHING = "everything";
private static final String COMMON_TYPE_NAME = "data";
@Value("${elasticsearch.cluster.nodes}")
private String esClusterNodes;
@Resource
private Lock elasticsearchReindexLock; // Is there another database scan running in the cluster?
/// Is this instance running a full database scan?
private AtomicBoolean databaseReindexRunning = new AtomicBoolean(false);
@Resource
private BlockingQueue<ElasticReindex> elasticReindexQueue; // JVM-bound queue
@Autowired
private TaskExecutor taskExecutor;
@Autowired
private EntityManager em;
@Autowired
private ElasticsearchOperations elasticsearchRestTemplate;
@Autowired
private RestHighLevelClient client;
@Autowired
private EntityMapper mapper;
@Autowired
private ElasticsearchService _self;
@Autowired
private TransientMessageService transientMessageService;
private final Set<Class<? extends EmptyModel>> indexedEntities = Collections.synchronizedSet(new HashSet<>());
private final Map<Class<? extends EmptyModel>, Set<String>> jsonSchemas = new HashMap<>();
private final Map<Class<? extends EmptyModel>, Integer> reindexBatchSize = new HashMap<>();
/// Size of database batch scan for IDs
private int batchSize = 1000;
// Boosted fields
private Map<Class<? extends EmptyModel>, String[]> fieldBoosts = new HashMap<>();
@Value("${elasticsearch.collection.prefix:ggce}")
private String collectionPrefix;
@Override
public void afterPropertiesSet() throws Exception {
if (StringUtils.isNotBlank(collectionPrefix)) {
collectionPrefix = collectionPrefix.replaceAll("[^a-zA-Z0-9]", "") + "_";
} else {
collectionPrefix = "ggce_";
}
log.warn("ES index prefix: {}", collectionPrefix);
if (withElasticsearch()) {
for (Class<? extends EmptyModel> clazz : indexedEntities) {
final String writeIndex = getFirstIndexForAlias(toIndexName(clazz) + INDEX_WRITE);
if (writeIndex == null) {
ensureWriteAlias(clazz);
} else {
log.info("Updating write index {} for {}", writeIndex, clazz.getName());
Object mapping = CustomMapping.springDataMapping(elasticsearchRestTemplate.getElasticsearchConverter(), clazz, COMMON_TYPE_NAME);
try {
elasticsearchRestTemplate.putMapping(writeIndex, COMMON_TYPE_NAME, mapping);
} catch (Throwable e) {
log.warn("The ES mapping is no longer compatible for index={} of {}. Please regenerate.", writeIndex, clazz);
transientMessageService.addAdminAlert("ES-MAPPING-" + clazz, "The ES mapping is no longer compatible for index={} of {}. The index must be regenerated.", writeIndex, clazz);
}
}
}
} else {
log.warn("Elasticsearch not accessible, not updating mappings");
}
JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(((GRINGlobalEntityMapper) mapper).getObjectMapper());
for (Class<? extends EmptyModel> clazz : indexedEntities) {
try {
JsonSchema schema = schemaGen.generateSchema(clazz);
jsonSchemas.put(clazz, buildJsonPaths(((ObjectSchema) schema).getProperties(), null));
} catch (Throwable e) {
log.error("The list of all {} fields is not created.", clazz.getSimpleName(), e);
}
}
// Boosted fields
for (Class<? extends EmptyModel> clazz : indexedEntities) {
BoostedFields boostedFields = clazz.getAnnotation(BoostedFields.class);
if (boostedFields != null) {
fieldBoosts.put(clazz, boostedFields.value());
} else {
fieldBoosts.put(clazz, ArrayUtils.EMPTY_STRING_ARRAY);
}
}
}
private boolean withElasticsearch() {
try {
return RestStatus.OK == client.cluster().health(new ClusterHealthRequest(), RequestOptions.DEFAULT).status();
} catch (Throwable e) {
log.warn(e.getMessage());
return false;
}
}
private String getFirstIndexForAlias(final String aliasName) {
try {
GetAliasesResponse response = client.indices().getAlias(new GetAliasesRequest(aliasName), RequestOptions.DEFAULT);
if (LOGtest.isDebugEnabled() && response != null && !response.getAliases().isEmpty()) {
for (var al : response.getAliases().entrySet())
LOGtest.debug("Alias {} meta={}", al.getKey(), al.getValue());
}
return response != null && !response.getAliases().isEmpty() ? (String) response.getAliases().keySet().toArray()[0] : null;
} catch (Throwable e) {
log.warn("Error while getting index for alias={}:", aliasName, e);
return null;
}
}
/**
* Makes a list of all JSON paths for indexed entity and all related types.
*
* @param properties properties
* @param parentPath parentPath
*/
private Set<String> buildJsonPaths(Map<String, JsonSchema> properties, String parentPath) {
if (MapUtils.isEmpty(properties)) {
return Collections.emptySet();
}
Set<String> fieldList = new HashSet<>();
Set<Map.Entry<String, JsonSchema>> entries = properties.entrySet();
entries.removeIf(e -> e.getKey().equals("_class") || e.getKey().equals("_permissions"));
for (Map.Entry<String, JsonSchema> entry : entries) {
JsonSchema schema = entry.getValue();
String fullPath = StringUtils.isBlank(parentPath) ? entry.getKey() : parentPath + "." + entry.getKey();
if (schema instanceof ObjectSchema) {
fieldList.addAll(buildJsonPaths(((ObjectSchema) schema).getProperties(), fullPath));
} else {
fieldList.add(fullPath);
}
}
return fieldList;
}
/**
* Get index name for clazz
*
* @param clazz
* @return
*/
private String toIndexName(Class<?> clazz) {
return collectionPrefix + clazz.getSimpleName().toLowerCase();
}
/**
* Checks that index "write" alias exists. When alias is not found, a new index
* is created and alias points to it.
*
* @param clazz for type mapping
*/
protected synchronized void ensureWriteAlias(Class<?> clazz) {
String indexRoot = toIndexName(clazz);
String aliasWrite = indexRoot + INDEX_WRITE;
if (!aliasExists(aliasWrite)) {
final String indexName = createIndexFor(clazz);
String aliasRead = indexRoot + INDEX_READ;
if (!aliasExists(aliasRead)) {
deleteAlias(collectionPrefix + ALIAS_EVERYTHING, indexRoot + "*");
addAlias(collectionPrefix + ALIAS_EVERYTHING, indexName);
addAlias(aliasRead, indexName);
}
}
}
private void deleteAlias(String aliasName, String indexName) {
final AliasQuery query = new AliasQuery();
query.setAliasName(aliasName);
query.setIndexName(indexName);
try {
this.elasticsearchRestTemplate.removeAlias(query);
log.info("Removed alias {} from index {}", aliasName, indexName);
LOGtest.warn("Removed alias {} from index {}", aliasName, indexName);
} catch (ElasticsearchStatusException | AliasesNotFoundException e) {
log.info("Alias {} does not exist on index {}", aliasName, indexName);
LOGtest.warn("Alias {} does not exist on index {}", aliasName, indexName);
}
}
private boolean aliasExists(final String aliasName) {
try {
return client.indices().existsAlias(new GetAliasesRequest(aliasName), RequestOptions.DEFAULT);
} catch (IOException e) {
return false;
}
}
/**
* Creates the index for the entity. It assigns it the _WRITE alias
*
* @param clazz the clazz
* @return the new index name
*/
private String createIndexFor(Class<?> clazz) {
final String indexRoot = toIndexName(clazz);
final String indexName = indexRoot + System.currentTimeMillis();
log.info("Creating new index {} for {}", indexName, clazz.getName());
LOGtest.warn("Creating new index {} for {}", indexName, clazz.getName());
/*@formatter:off*/
MapBuilder<String, Object> settingsBuilder = new MapBuilder<String, Object>()
// .put("index.blocks.read_only_allow_delete", null)
// .put("index.mapping.total_fields.limit", 1000)
// .put("index.number_of_shards", 1)
;
/*@formatter:on*/
if (! "embedded".equals(esClusterNodes)) {
// /*@formatter:off*/
// settingsBuilder
// // Analyzer
// .put("index.analysis.analyzer.default.tokenizer", "standard")
// .put("index.analysis.analyzer.default.filter.0", "asciifolding2")
// // Filter
// .put("index.analysis.filter.asciifolding2.type", "asciifolding")
// .put("index.analysis.filter.asciifolding2.preserve_original", true)
// ;
// /*@formatter:on*/
} else {
// Embedded ES
settingsBuilder
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0);
}
elasticsearchRestTemplate.createIndex(indexName, settingsBuilder.map());
Object mapping = CustomMapping.springDataMapping(elasticsearchRestTemplate.getElasticsearchConverter(), clazz, COMMON_TYPE_NAME);
elasticsearchRestTemplate.putMapping(indexName, COMMON_TYPE_NAME, mapping);
// Add _WRITE alias
realias(indexRoot + INDEX_WRITE, indexRoot, indexName);
return indexName;
}
@SuppressWarnings("unchecked")
@Override
public <R extends EmptyModel> void indexEntity(Class<R> clazz) {
synchronized (this) {
if (indexedEntities.contains(clazz)) {
log.warn("Entity {} already set for indexing.", clazz.getName());
return;
}
}
indexedEntities.add(clazz);
}
@Override
public <R extends EmptyModel> void indexEntity(Class<R> clazz, int reindexBatchSize) {
indexEntity(clazz);
this.reindexBatchSize.put(clazz, reindexBatchSize);
}
@Override
public <R> void removeAll(Class<R> clazz) throws SearchException {
if (!indexedEntities.contains(clazz)) {
log.debug("Class {} is not indexed.", clazz.getName());
return;
}
if (!withElasticsearch()) {
log.warn("Elasticsearch not accessible, not reindexing {}", clazz.getName());
return;
}
// LOG.info("Deleting all docs for {}", clazz.getName());
// if (LOGtest.isInfoEnabled()) LOGtest.info("Deleting all docs for {}", clazz);
// DeleteQuery dq = new DeleteQuery();
// dq.setIndex(toIndexName(clazz) + INDEX_WRITE);
// dq.setType(COMMON_TYPE_NAME);
// dq.setQuery(new MatchAllQueryBuilder());
// elasticsearchRestTemplate.delete(dq);
if (count(clazz, null) == 0) {
LOGtest.warn("Index for {} is empty, count == 0. No need to recreate.", clazz);
return;
}
log.info("Re-creating empty index for {}", clazz.getName());
LOGtest.warn("Re-creating empty index for {}", clazz.getName());
final String indexRoot = toIndexName(clazz);
// Figure out existing index name
String currentIndexName = getFirstIndexForAlias(indexRoot + INDEX_READ);
if (currentIndexName == null) {
currentIndexName = getFirstIndexForAlias(indexRoot + INDEX_WRITE);
}
// Make new index with timestamp
final String indexName = createIndexFor(clazz);
// The old index name
final String oldIndexName = currentIndexName;
// Move _READ alias
realias(indexRoot + INDEX_READ, indexRoot, indexName);
// Add to EVERYTHING
deleteAlias(collectionPrefix + ALIAS_EVERYTHING, indexRoot + "*");
addAlias(collectionPrefix + ALIAS_EVERYTHING, indexName);
// delete old index
if (oldIndexName != null) {
log.info("Deleting old index {}", oldIndexName);
LOGtest.warn("Deleting old index {}", oldIndexName);
elasticsearchRestTemplate.deleteIndex(oldIndexName);
}
}
private <R> void internalReindex(Class<R> clazz) {
if (!indexedEntities.contains(clazz)) {
log.warn("Class {} is not indexed.", clazz.getName());
return;
}
if (!withElasticsearch()) {
log.warn("Elasticsearch not accessible, not reindexing {}", clazz.getName());
return;
}
databaseReindexRunning.set(true); // We are now doing a database scan.
log.info("Reindexing {}", clazz.getName());
final String indexRoot = toIndexName(clazz);
// Figure out existing index name
String currentIndexName = getFirstIndexForAlias(indexRoot + INDEX_READ);
if (currentIndexName == null) {
currentIndexName = getFirstIndexForAlias(indexRoot + INDEX_WRITE);
}
// Make new index with timestamp
final String indexName = createIndexFor(clazz);
// The old index name
final String oldIndexName = currentIndexName;
// Scan
scanDatabase(clazz, null);
// Schedule rename
taskExecutor.execute(() -> {
try {
do {
var queueSize = elasticReindexQueue.size();
if (queueSize == 0) {
break;
}
log.trace("ES Reindex queue has {} elements, waiting to realias {}...", queueSize, indexName);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Fine...
}
} while (true);
log.info("ES Reindex queue is empty, realiasing {}", indexName);
// Move _READ alias
realias(indexRoot + INDEX_READ, indexRoot, indexName);
// Add to EVERYTHING
deleteAlias(collectionPrefix + ALIAS_EVERYTHING, indexRoot + "*");
addAlias(collectionPrefix + ALIAS_EVERYTHING, indexName);
// delete old index
if (oldIndexName != null) {
log.info("Deleting old index {}", oldIndexName);
elasticsearchRestTemplate.deleteIndex(oldIndexName);
}
} catch (Throwable e) {
log.warn("Realiasing after full scan threw an exception: {}", e.getMessage(), e);
} finally {
databaseReindexRunning.set(false); // Clear flag that we're running a scan.
}
});
}
private <R> void scanDatabase(Class<R> clazz, EmptyModelFilter<?, ?> filter) {
PathBuilder<R> builder = new PathBuilderFactory().create(clazz);
Querydsl querydsl = new Querydsl(em, builder);
EntityPath<R> entityPath = SimpleEntityPathResolver.INSTANCE.createPath(clazz);
PathBuilder<R> pathBuilder = new PathBuilder<R>(clazz, entityPath.getMetadata().getName());
JPQLQuery<Long> query = querydsl.createQuery(entityPath)
// select id only
.select(pathBuilder.getNumber("id", Long.class))
// and order by id
.orderBy(pathBuilder.getNumber("id", Long.class).asc());
if (filter != null) {
// apply filter
query.where(filter.buildPredicate());
}
Integer scanSize = reindexBatchSize.get(clazz);
final int customBatchSize = scanSize == null ? batchSize : scanSize.intValue();
int startPosition = 0;
query.offset(startPosition);
query.limit(customBatchSize);
StopWatch stopWatch = new StopWatch();
stopWatch.start();
List<Long> results;
do {
stopWatch.split();
log.debug("Reading IDs stopwatch={}s {} {}+{}", stopWatch.getSplitTime() / 1000, clazz.getName(), startPosition, customBatchSize);
if (startPosition > 10 * customBatchSize && startPosition / (10 * customBatchSize) == 0) {
log.info("Reading IDs stopwatch={}s {} {}+{}", stopWatch.getSplitTime() / 1000, clazz.getName(), startPosition, customBatchSize);
}
results = query.fetch();
// Schedule indexing
elasticReindexQueue.addAll(results.stream().map((res) -> new ElasticReindex(clazz, res)).collect(Collectors.toList()));
// Next page
query.offset(startPosition += results.size());
} while (results.size() > 0);
stopWatch.stop();
log.info("Scanning {} for reindex took {}ms", clazz.getName(), stopWatch.getTime());
}
/**
* Will not modify the list of IDs
*/
@Override
public <R> void update(final Class<R> clazz, final Collection<Long> ids) {
if (!indexedEntities.contains(clazz)) {
log.warn("Class {} is not indexed.", clazz.getName());
return;
}
var checkTriggers = ! databaseReindexRunning.get(); // Check triggers only if we are not in a full database scan
HashSet<Long> notFoundIds = new HashSet<>(ids);
final String indexName = toIndexName(clazz) + INDEX_WRITE;
CriteriaBuilder cb = em.getCriteriaBuilder();
CriteriaQuery<R> cq = cb.createQuery(clazz);
Root<R> root = cq.from(clazz);
cq.where(root.get("id").in(ids));
TypedQuery<R> query = em.createQuery(cq);
List<R> results = query.getResultList();
List<IndexQuery> queries = new ArrayList<IndexQuery>();
Map<String, String> jsons = new HashMap<>();
for (R x : results) {
if (log.isTraceEnabled()) log.trace("Indexing {} {}", clazz.getName(), x);
if (LOGtest.isInfoEnabled()) LOGtest.info("Indexing {} {}", clazz.getName(), x);
EmptyModel bm = (EmptyModel) x;
if (x instanceof ElasticLoader) {
// Prepare entity for indexing (e.g. lazy-load)
((ElasticLoader) x).prepareForIndexing();
}
try {
jsons.put(bm.getId().toString(), mapper.mapToString(bm));
} catch (Throwable e) {
log.warn("Failed to prepare JSON for {} id={}: {}", bm.getClass(), bm.getId(), e.getMessage(), e);
}
if (LOGtest.isTraceEnabled()) LOGtest.trace("JSON: {}", jsons.get(bm.getId().toString()));
// is found
notFoundIds.remove(bm.getId());
IndexQuery iq = new IndexQuery();
iq.setIndexName(indexName);
iq.setType(COMMON_TYPE_NAME);
iq.setId(String.valueOf(bm.getId()));
iq.setObject(bm);
queries.add(iq);
// check for ElasticTrigger if not in a full database scan
if (checkTriggers) {
checkElasticTriggers(bm);
}
}
if (!log.isTraceEnabled() && log.isDebugEnabled() && jsons.size() > 0) {
log.debug("JSON: {}", jsons.values().toArray()[0]);
}
if (!queries.isEmpty()) {
log.debug("Indexing {} count={} of provided objects count={}", clazz.getName(), queries.size(), ids.size());
if (LOGtest.isInfoEnabled()) LOGtest.info("Indexing {} count={} of provided objects count={}", clazz.getName(), queries.size(), ids.size());
try {
elasticsearchRestTemplate.bulkIndex(queries);
} catch (org.springframework.data.elasticsearch.ElasticsearchException e) {
log.error(e.getMessage(), e);
log.error("JSON sample: {}", jsons.values().toArray()[0]);
Map<String, String> failedDocs = e.getFailedDocuments();
if (failedDocs != null) {
for (Map.Entry<String, String> entry : failedDocs.entrySet()) {
log.error("{} {}\n{}", entry.getKey(), entry.getValue(), jsons.get(entry.getKey()));
}
}
} catch (ElasticsearchException e) {
log.error("Could not index document", e);
}
}
em.clear();
queries.clear();
for (Long id : notFoundIds) {
if (log.isTraceEnabled()) log.trace("Removing {} id={} from index {}/{}", clazz.getName(), id, indexName, COMMON_TYPE_NAME);
if (LOGtest.isInfoEnabled()) LOGtest.info("Removing {} id={} from index {}/{}", clazz.getName(), id, indexName, COMMON_TYPE_NAME);
String res = elasticsearchRestTemplate.delete(indexName, COMMON_TYPE_NAME, String.valueOf(id));
log.trace("Deleted ES document id={}", res);
LOGtest.warn("Deleted ES document id={} from {}", res, indexName);
}
}
private void checkElasticTriggers(EmptyModel entity) {
// Is this an ElasticTrigger?
if (entity instanceof ElasticTrigger) {
if (LOGtest.isInfoEnabled()) LOGtest.info("Check related entities of {}", entity);
try {
Object[] reindexedEntities = ((ElasticTrigger) entity).reindexedEntities();
if (reindexedEntities != null) {
for (var m : reindexedEntities) {
if (m instanceof EmptyModel) {
var toReindex = new ElasticReindex((EmptyModel) m);
var newlyAdded = elasticReindexQueue.add(toReindex);
LOGtest.info("Related entity {} was (newly={}) queued.", toReindex, newlyAdded);
}
}
}
} catch (Throwable e) {
log.error("Error inspecting reindexed entities of {} id={}: {}", entity.getClass(), entity.getId(), e.getMessage()); // , e);
}
}
}
/**
* Schedule a parallel update.
*
* @param clazz Entity to reindex
* @param entityIds Entity identifiers to reindex
*/
@Override
public <R> void asyncUpdate(final Class<R> clazz, final Collection<Long> entityIds) {
if (!indexedEntities.contains(clazz)) {
log.warn("Class {} is not indexed.", clazz.getName());
return;
}
final HashSet<Long> copy = new HashSet<>(entityIds);
if (copy.size() == 0) {
return;
}
taskExecutor.execute(() -> {
log.debug("Running scheduled reindex of {} size={}", clazz.getName(), copy.size());
if (LOGtest.isInfoEnabled()) LOGtest.info("Running scheduled reindex of {} size={}", clazz, copy.size());
try {
_self.update(clazz, copy);
} catch (Throwable e) {
log.error(e.getMessage(), e);
}
});
}
private <T extends EmptyModel> List<T> findEntities(Class<T> clazz, List<Long> collect) {
CriteriaBuilder cb = em.getCriteriaBuilder();
CriteriaQuery<T> cq = cb.createQuery(clazz);
Root<T> root = cq.from(clazz);
cq.where(root.get("id").in(collect));
return em.createQuery(cq).getResultList();
}
/**
* Runs TermFacet, but will automatically increase size if #otherCount is more
* than 10%
*/
@Override
public TermResult termStatisticsAuto(Class<? extends EmptyModel> clazz, EmptyModelFilter<?, ?> filters, int size, String term) throws SearchException {
return termStatisticsAuto(clazz, filters, size, new String[] { term }).get(term);
}
@Override
public Map<String, TermResult> termStatisticsAuto(Class<? extends EmptyModel> clazz, EmptyModelFilter<?, ?> filters, int size, String... terms) throws SearchException {
Map<String, TermResult> termResult = termStatistics(clazz, filters, size, terms);
// TBD
return termResult;
}
@Override
public TermResult termStatistics(Class<? extends EmptyModel> clazz, EmptyModelFilter<?, ?> filters, int size, String term) throws SearchException {
return termStatistics(clazz, filters, size, new String[] { term }).get(term);
}
@Override
public Map<String, TermResult> termStatistics(Class<? extends EmptyModel> clazz, EmptyModelFilter<?, ?> filters, int size, String... terms) throws SearchException {
if (!indexedEntities.contains(clazz)) {
throw new RuntimeException("Class is not indexed " + clazz);
}
Long total = count(clazz, filters);
String indexName = toIndexName(clazz) + INDEX_READ;
SearchRequest searchRequest = new SearchRequest(indexName).types(COMMON_TYPE_NAME);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(toEsQuery(clazz, filters));
for (String term : terms) {
TermsAggregationBuilder aggregation = AggregationBuilders.terms(term).field(term).size(size).order(BucketOrder.count(false));
searchSourceBuilder.aggregation(aggregation);
}
searchRequest.source(searchSourceBuilder);
log.debug("ES query {}", searchRequest);
SearchResponse response = search(searchRequest, RequestOptions.DEFAULT);
Map<String, Aggregation> results = response.getAggregations().asMap();
for (Map.Entry<String, Aggregation> e : results.entrySet()) {
log.debug("FYI, ES results {} for {} of {}", e.getKey(), terms, e.getValue().getClass());
}
Map<String, TermResult> termResults = new HashMap<>();
for (String term : terms) {
Aggregation agg = results.get(term);
if (agg instanceof ParsedLongTerms || agg instanceof ParsedStringTerms || agg instanceof ParsedDoubleTerms) {
ParsedTerms topCounts = (ParsedTerms) agg;
List<? extends Terms.Bucket> buckets = topCounts.getBuckets();
TermResult tr = new TermResult(term, total, buckets.stream().map(bucket -> new Term(bucket.getKeyAsString(), bucket.getDocCount())).collect(Collectors.toList()),
topCounts.getSumOfOtherDocCounts());
termResults.put(term, tr);
} else if (agg instanceof UnmappedTerms) {
UnmappedTerms unmapped = (UnmappedTerms) agg;
throw new RuntimeException("Unmapped term " + term + ": " + unmapped.getBuckets());
} else {
// Doesn't happen
throw new RuntimeException("IDK for term " + term + ": " + agg.getClass());
}
}
return termResults;
}
private QueryBuilder toEsQuery(Class<?> clazz, EmptyModelFilter<?, ?> filters) {
return toEsQuery(clazz, filters, null, null);
}
private QueryBuilder toEsQuery(Class<?> clazz, EmptyModelFilter<?, ?> filters, Predicate predicate, String[] boostFields) {
ElasticQueryBuilder esQb = new ElasticQueryBuilder();
if (filters != null) {
BooleanBuilder builder = new BooleanBuilder();
if (predicate != null) {
builder.and(predicate);
}
builder.and(filters.buildPredicate()).accept(esQb, null);
}
QueryBuilder esQuery = esQb.getQuery();
if (filters instanceof IFullTextFilter) {
String text = ((IFullTextFilter) filters).get_text();
if (StringUtils.isNotBlank(text)) {
BoolQueryBuilder fulltext = boolQuery()
/*@formatter:off*/
.should(multiMatchQuery(text, "_texts")
.fuzziness(Fuzziness.AUTO)
// .minimumShouldMatch("75%")
.boost(2.0f)
)
.should(queryStringQuery(text)
.lenient(true)
.tieBreaker(1.0f) // was .useDisMaX()
.fuzziness(Fuzziness.AUTO)
.boost(0.9f)
);
/*@formatter:on*/
{
SimpleQueryStringBuilder sqsQuery = simpleQueryStringQuery(text)
/*@formatter:off*/
.lenient(true)
.defaultOperator(Operator.AND)
.field("*").boost(1)
// .minimumShouldMatch("75%")
/*@formatter:on*/
;
if (boostFields != null && boostFields.length > 0) {
sqsQuery.field("_texts", 1);
for (String field : boostFields) {
sqsQuery.field(field, 3);
}
}
String[] boostedSearch = fieldBoosts.get(clazz);
if (boostedSearch != null && boostedSearch.length > 0) {
for (String field : boostedSearch) {
sqsQuery.field(field, 10);
}
}
fulltext.should(sqsQuery);
}
fulltext.minimumShouldMatch("70%");
if (esQuery instanceof BoolQueryBuilder) {
esQuery = ((BoolQueryBuilder) esQuery).must(fulltext);
} else {
BoolQueryBuilder builder = new BoolQueryBuilder();
esQuery = builder.filter(esQuery).must(fulltext);
}
}
}
if (log.isDebugEnabled()) {
try {
log.debug("Converted {} to\ncurl -XGET 'localhost:9200/{}{}/_search?pretty' -H 'Content-Type: application/json' -d '{ \"query\": {} }'", new ObjectMapper()
.writeValueAsString(filters), toIndexName(clazz), INDEX_READ, esQuery);
} catch (JsonProcessingException e) {
}
}
return esQuery;
}
/**
* Make the alias point exclusively to the specified index
*
* @param aliasName The alias name
* @param indexPrefix used to remove existing aliases
* @param targetIndexName The index the alias points to
*/
@Override
@PreAuthorize("hasRole('ADMINISTRATOR')")
public void realias(final String aliasName, final String indexPrefix, final String targetIndexName) {
if (!aliasName.startsWith(collectionPrefix)) {
throw new RuntimeException("Unable to move alias with a such prefix.");
}
if (!targetIndexName.startsWith(collectionPrefix)) {
throw new RuntimeException("Unable to move alias to index with a such prefix.");
}
if (indexPrefix != null) {
deleteAlias(aliasName, indexPrefix + "*");
} else {
deleteAlias(aliasName);
}
addAlias(aliasName, targetIndexName);
}
@Override
@PreAuthorize("hasRole('ADMINISTRATOR')")
public void addAlias(String aliasName, String indexName) {
if (!aliasName.startsWith(collectionPrefix) || !indexName.startsWith(collectionPrefix)) {
throw new RuntimeException("Unable to add alias with a such prefix of alias or index.");
}
final AliasQuery query = new AliasQuery();
query.setAliasName(aliasName);
query.setIndexName(indexName);
log.info("Adding alias {} to index {}", aliasName, indexName);
LOGtest.info("Adding alias {} to index {}", aliasName, indexName);
this.elasticsearchRestTemplate.addAlias(query);
List<AliasMetaData> aliases = elasticsearchRestTemplate.queryForAlias(indexName);
for (AliasMetaData alias : aliases) {
log.debug(" Index {} has alias {} write={} {}", indexName, alias.getAlias(), alias.writeIndex(), alias.indexRouting());
LOGtest.info(" Index {} has alias {} write={} {}", indexName, alias.getAlias(), alias.writeIndex(), alias.indexRouting());
}
}
/**
* Delete alias.
*
* @param aliasName the alias name
*/
@Override
@PreAuthorize("hasRole('ADMINISTRATOR')")
public void deleteAlias(final String aliasName) {
if (!aliasName.startsWith(collectionPrefix)) {
throw new RuntimeException("Unable to delete alias with a such prefix.");
}
try {
GetAliasesResponse response = client.indices().getAlias(new GetAliasesRequest(aliasName), RequestOptions.DEFAULT);
if (response != null && !response.getAliases().isEmpty()) {
for (final String indexName : response.getAliases().keySet()) {
deleteAlias(aliasName, indexName);
}
}
} catch (IOException e) {
log.warn("Error while reading alias={}:", aliasName, e);
LOGtest.warn("Error while reading alias={}:", aliasName, e);
}
}
@Override
@PreAuthorize("hasRole('ADMINISTRATOR')")
public void deleteIndex(final String indexName) {
if (!indexName.startsWith(collectionPrefix)) {
throw new RuntimeException("Unable to delete index with a such prefix.");
}
elasticsearchRestTemplate.deleteIndex(indexName);
assert(! elasticsearchRestTemplate.indexExists(indexName));
}
@Override
public void reindexAll() {
for (Class<? extends EmptyModel> clazz : indexedEntities) {
reindex(clazz);
}
}
@Override
public <R> void reindex(Class<R> clazz) {
boolean isLockAcquired = elasticsearchReindexLock.tryLock();
if (isLockAcquired) {
try {
internalReindex(clazz);
} finally {
elasticsearchReindexLock.unlock();
}
} else {
throw new RuntimeException("Could not run reindex for " + clazz.getSimpleName() + ". Operation locked.");
}
}
@Override
public <T extends EmptyModel> void reindex(Class<T> clazz, EmptyModelFilter<?, ?> filter) {
boolean isLockAcquired = elasticsearchReindexLock.tryLock();
if (isLockAcquired) {
try {
scanDatabase(clazz, filter);
} finally {
elasticsearchReindexLock.unlock();
}
} else {
throw new RuntimeException("Could not run reindex for " + clazz.getSimpleName() + ". Operation locked.");
}
}
@Override
public List<Class<?>> getIndexedEntities() {
ArrayList<Class<?>> entities = new ArrayList<>(this.indexedEntities);
entities.sort((a, b) -> a.getName().compareTo(b.getName()));
return ListUtils.unmodifiableList(entities);
}
@Override
public long count(Class<?> clazz, EmptyModelFilter<?, ?> filter) throws SearchException {
String indexName = toIndexName(clazz) + INDEX_READ;
SearchRequest searchRequest = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(new SearchSourceBuilder().query(toEsQuery(clazz, filter)).size(0));
SearchResponse hits = search(searchRequest, RequestOptions.DEFAULT);
for (var shardFail : hits.getShardFailures()) {
log.warn("Count of {} for {} failed shard {}.{} with: {}", clazz, filter, shardFail.shard().getIndex(), shardFail.shardId(), shardFail.reason());
}
return hits.getHits().getTotalHits();
}
/**
* This is mostly used in unit testing to assure index has settled
*/
@Override
public long waitForCount(Class<? extends EmptyModel> clazz, EmptyModelFilter<?, ?> filter, int mustHaveCount) throws SearchException {
long count = 0;
int repeat = 0;
var maxRepeats = 20;
if (LOGtest.isInfoEnabled()) LOGtest.info("Need ES count for {}={}", clazz, mustHaveCount);
do {
try {
var readIndexName = getFirstIndexForAlias(toIndexName(clazz) + INDEX_READ);
var writeIndexName = getFirstIndexForAlias(toIndexName(clazz) + INDEX_WRITE);
if (! Objects.equals(readIndexName, writeIndexName)) {
LOGtest.warn("Read/write index mismatch {}!={}.", readIndexName, writeIndexName);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
throw new ElasticsearchStatusException("Read/write index mismatch,", null); // This is caught later and will reset counts.
}
count = count(clazz, filter);
if (count != mustHaveCount) {
// LOG.warn("ES count #{} of {} is {}!={}", repeat, clazz.getName(), count, mustHaveCount);
if (repeat++ >= maxRepeats) {
throw new RuntimeException("ES count did not settle in " + repeat + " retries. " + clazz.getName() + " " + count + "!=" + mustHaveCount);
}
clearEsCache(); // Clear caches
if (repeat % 3 == 0) {
syncFlushEs(); // Force sync-flush, maybe that helps
}
try {
var sleepTime = ((maxRepeats - repeat + 1) * 50) + RandomUtils.nextLong(10, 100);
if (LOGtest.isInfoEnabled()) LOGtest.info("ES wrong count #{} from {}/{} in {} is {}!={}, sleeping for {}ms...", repeat, clazz, readIndexName, writeIndexName, count, mustHaveCount, sleepTime);
printHealth(); // Log cluster info before retrying
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
log.error("Thread interrupted, returning {}", count);
return count;
}
} else {
// OK!
if (LOGtest.isInfoEnabled()) LOGtest.info("ES good count #{} for {} from {}/{} is {}=={}!", repeat, clazz.getName(), readIndexName, writeIndexName, count, mustHaveCount);
if (repeat > 0) {
printHealth(); // Log cluster info after successful retry!
}
}
} catch (ElasticsearchStatusException e) {
count = -1;
repeat = 0;
log.warn("Error while waiting for count: {}. Retrying...", e.getMessage());
LOGtest.warn("Error while waiting for count: {}. Retrying...", e.getMessage());
}
} while (count != mustHaveCount);
return count;
}
/**
* Execute refresh on all indices and clear ES caches.
*/
private void clearEsCache() {
try {
Response r;
log.trace("Refreshing ES");
r = client.getLowLevelClient().performRequest(new Request("POST", "/_refresh"));
try (var body = r.getEntity().getContent()) {
log.debug("Refresh ES: {}", new String(body.readAllBytes(), StandardCharsets.UTF_8));
}
log.trace("Clearing ES caches");
r = client.getLowLevelClient().performRequest(new Request("POST", "/_cache/clear"));
try (var body = r.getEntity().getContent()) {
log.debug("Clear ES cache: {}", new String(body.readAllBytes(), StandardCharsets.UTF_8));
}
var status = client.cluster().health(new ClusterHealthRequest(), RequestOptions.DEFAULT).getStatus();
if (status != ClusterHealthStatus.GREEN) {
LOGtest.warn("ES status {}", status);
printHealth();
}
} catch (IOException e) {
log.error("Could not execute ES operation: {}", e.getMessage(), e);
}
}
private void syncFlushEs() {
try {
Response r;
log.trace("Flushing ES");
r = client.getLowLevelClient().performRequest(new Request("POST", "/_flush/synced"));
try (var body = r.getEntity().getContent()) {
log.info("Synced ES flush:\n{}", new String(body.readAllBytes(), StandardCharsets.UTF_8));
}
} catch (IOException e) {
log.error("Could not execute synced ES flush: {}", e.getMessage());
}
}
/**
* This is called from {@link #waitForCount(Class, EmptyModelFilter, int) to try and debug
* ES cluster status.
*/
private void printHealth() {
try {
Response r;
// r = client.getLowLevelClient().performRequest(new Request("GET", "/_cat/nodes?v"));
// try (var body = r.getEntity().getContent()) {
// LOG.error("ES nodes:\n{}", new String(body.readAllBytes()));
// }
// r = client.getLowLevelClient().performRequest(new Request("GET", "/_cat/health?v"));
// try (var body = r.getEntity().getContent()) {
// LOG.error("ES health:\n{}", new String(body.readAllBytes()));
// }
// r = client.getLowLevelClient().performRequest(new Request("GET", "/_cat/thread_pool?v"));
// try (var body = r.getEntity().getContent()) {
// LOG.error("ES thread pool:\n{}", new String(body.readAllBytes()));
// }
// r = client.getLowLevelClient().performRequest(new Request("GET", "/_cat/pending_tasks?v"));
// try (var body = r.getEntity().getContent()) {
// LOG.error("ES pending tasks:\n{}", new String(body.readAllBytes()));
// }
// r = client.getLowLevelClient().performRequest(new Request("GET", "/_cat/indices?v"));
// try (var body = r.getEntity().getContent()) {
// LOG.error("ES indices:\n{}", new String(body.readAllBytes()));
// }
// r = client.getLowLevelClient().performRequest(new Request("GET", "/_cat/aliases?v"));
// try (var body = r.getEntity().getContent()) {
// LOG.error("ES aliases:\n{}", new String(body.readAllBytes()));
// }
r = client.getLowLevelClient().performRequest(new Request("GET", "/_cat/segments?v"));
try (var body = r.getEntity().getContent()) {
log.debug("ES segments:\n{}", new String(body.readAllBytes(), StandardCharsets.UTF_8));
LOGtest.info("ES segments:\n{}", new String(body.readAllBytes(), StandardCharsets.UTF_8));
}
} catch (IOException e) {
log.error("Could not execute cluster API call: {}", e.getMessage(), e);
}
}
@Override
public Map<String, Long> countMissingValues(Class<? extends EmptyModel> indexClass, EmptyModelFilter<?, ?> filter) throws SearchException {
final Map<String, Long> results = new HashMap<>();
final Set<String> fields = jsonSchemas.get(indexClass);
if (CollectionUtils.isEmpty(fields)) {
return results;
}
final String indexName = toIndexName(indexClass) + INDEX_READ;
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(toEsQuery(indexClass, filter)).size(0);
for (String fieldName : fields) {
sourceBuilder.aggregation(AggregationBuilders.missing(fieldName).field(fieldName));
}
SearchRequest esQuery = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(sourceBuilder);
final SearchResponse response = search(esQuery, RequestOptions.DEFAULT);
log.debug("Counting missing values took {}s", response.getTook().getSeconds());
for (Aggregation agg : response.getAggregations()) {
long missingCount = ((InternalMissing) agg).getDocCount();
if (missingCount > 0) {
results.put(agg.getName(), missingCount);
}
}
results.put("_totalCount", response.getHits().getTotalHits());
return results;
}
@Override
public <T extends EmptyModel> Page<T> findAll(Class<T> clazz, EmptyModelFilter<?, ?> filter, Pageable page) throws SearchException {
return findAll(clazz, toEsQuery(clazz, filter), page, null);
}
@Override
public <T extends EmptyModel> Page<T> findAll(Class<T> clazz, EmptyModelFilter<?, ?> filter, Predicate predicate, Pageable page) throws SearchException {
return findAll(clazz, filter, predicate, page, null);
}
@Override
public <T extends EmptyModel> Page<T> findAll(Class<T> clazz, EmptyModelFilter<?, ?> filter, Predicate predicate, Pageable page,
ElasticsearchService.IEntityLoader<T> entityLoader, String... boostFields) throws SearchException {
return findAll(clazz, toEsQuery(clazz, filter, predicate, boostFields), page, entityLoader);
}
@Override
public <T extends EmptyModel> void remove(Class<T> clazz, EmptyModelFilter<?, ?> filter) throws SearchException {
String indexName = toIndexName(clazz) + INDEX_READ; // Remove from READ index
long esCount = count(clazz, filter);
log.warn("ES removing {} documents of {}", esCount, clazz);
DeleteQuery dq = new DeleteQuery();
dq.setIndex(indexName);
dq.setType(COMMON_TYPE_NAME);
dq.setQuery(toEsQuery(clazz, filter));
elasticsearchRestTemplate.delete(dq);
}
private <T extends EmptyModel> Page<T> findAll(Class<T> clazz, QueryBuilder query, Pageable page, ElasticsearchService.IEntityLoader<T> entityLoader) throws SearchException {
String indexName = toIndexName(clazz) + INDEX_READ;
if (page.getOffset() > 100000) {
log.warn("Large offset={} for ES query", page.getOffset());
}
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(query).size(1);
sourceBuilder.sort("_score", SortOrder.DESC);
SearchRequest esRequest = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(sourceBuilder);
SearchResponse response = search(esRequest, RequestOptions.DEFAULT);
float maxScore = response.getHits().getMaxScore();
float minScore = maxScore * 0.5f;
sourceBuilder = new SearchSourceBuilder().query(query).from((int) page.getOffset()).size(page.getPageSize());
sourceBuilder.sort("_score", SortOrder.DESC);
sourceBuilder.minScore(minScore);
// Effective search
esRequest = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(sourceBuilder);
response = search(esRequest, RequestOptions.DEFAULT);
log.debug("Converted to\ncurl -XGET 'localhost:9200/{}{}/_search?pretty' -H 'Content-Type: application/json' -d '{ \"query\": {} }'", toIndexName(clazz), INDEX_READ,
sourceBuilder);
List<T> content;
if (entityLoader != null) {
// Use custom entity loader
content = entityLoader.loadEntities(StreamSupport.stream(response.getHits().spliterator(), false).map(searchHit -> {
if (log.isTraceEnabled()) {
log.trace("Hit score={} id={} _class={} _source={}", searchHit.getScore(), searchHit.getId(), clazz, searchHit.getSourceAsString());
}
return Long.parseLong(searchHit.getId());
}).collect(Collectors.toList()));
} else {
// Use simple entityManager loader
content = findEntities(clazz, StreamSupport.stream(response.getHits().spliterator(), false).map(searchHit -> {
if (log.isTraceEnabled()) {
log.trace("Hit score={} id={} _class={} _source={}", searchHit.getScore(), searchHit.getId(), clazz, searchHit.getSourceAsString());
}
return Long.parseLong(searchHit.getId());
}).collect(Collectors.toList()));
}
return new PageImpl<>(content, PageRequest.of(page.getPageNumber(), page.getPageSize()), response.getHits().getTotalHits());
}
private SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws SearchException {
try {
return client.search(searchRequest, options);
} catch (IOException e) {
log.error("Error occurred during search", e);
throw new SearchException(e.getMessage(), e.getCause());
}
}
@Override
public GetIndexResponse listIndices() throws IOException {
String indexNames = collectionPrefix + "*";
var request = new GetIndexRequest(indexNames);
GetIndexResponse response = null;
if (client.indices().exists(request, RequestOptions.DEFAULT)) {
response = client.indices().get(new GetIndexRequest(indexNames), RequestOptions.DEFAULT);
}
return response;
}
}