ElasticsearchServiceImpl.java

/*
 * Copyright 2020 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.gringlobal.service.impl;

import static org.elasticsearch.index.query.QueryBuilders.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import javax.annotation.Resource;
import javax.persistence.EntityManager;
import javax.persistence.TypedQuery;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Root;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.GetAliasesResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetIndexResponse;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.unit.Fuzziness;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.Operator;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.SimpleQueryStringBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.missing.InternalMissing;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedDoubleTerms;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedLongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.UnmappedTerms;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.genesys.blocks.model.EmptyModel;
import org.genesys.blocks.model.filters.EmptyModelFilter;
import org.gringlobal.application.config.ElasticsearchConfig.GRINGlobalEntityMapper;
import org.gringlobal.component.elastic.ElasticLoader;
import org.gringlobal.component.elastic.ElasticReindex;
import org.gringlobal.component.elastic.ElasticTrigger;
import org.gringlobal.custom.elasticsearch.BoostedFields;
import org.gringlobal.custom.elasticsearch.CustomMapping;
import org.gringlobal.custom.elasticsearch.ElasticQueryBuilder;
import org.gringlobal.custom.elasticsearch.SearchException;
import org.gringlobal.service.ElasticsearchService;
import org.gringlobal.service.TransientMessageService;
import org.gringlobal.service.filter.IFullTextFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.task.TaskExecutor;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.EntityMapper;
import org.springframework.data.elasticsearch.core.query.AliasQuery;
import org.springframework.data.elasticsearch.core.query.DeleteQuery;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.jpa.repository.support.Querydsl;
import org.springframework.data.querydsl.SimpleEntityPathResolver;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.transaction.annotation.Transactional;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
import com.fasterxml.jackson.module.jsonSchema.types.ObjectSchema;
import com.querydsl.core.BooleanBuilder;
import com.querydsl.core.types.EntityPath;
import com.querydsl.core.types.Predicate;
import com.querydsl.core.types.dsl.PathBuilder;
import com.querydsl.core.types.dsl.PathBuilderFactory;
import com.querydsl.jpa.JPQLQuery;

/**
 * Manage Elasticsearch indexing, indices.
 *
 * @author Matija Obreza
 */
// Not @Service
@Transactional(readOnly = true)
@Slf4j
public class ElasticsearchServiceImpl implements ElasticsearchService, InitializingBean {

	private static final Logger LOGtest = LoggerFactory.getLogger("org.gringlobal.test.Elasticsearch");

	private static final String INDEX_READ = "_read";
	private static final String INDEX_WRITE = "_write";
	private static final String ALIAS_EVERYTHING = "everything";
	private static final String COMMON_TYPE_NAME = "data";

	@Value("${elasticsearch.cluster.nodes}")
	private String esClusterNodes;
	
	@Resource
	private Lock elasticsearchReindexLock; // Is there another database scan running in the cluster?

	/// Is this instance running a full database scan?
	private AtomicBoolean databaseReindexRunning = new AtomicBoolean(false);

	@Resource
	private BlockingQueue<ElasticReindex> elasticReindexQueue; // JVM-bound queue

	@Autowired
	private TaskExecutor taskExecutor;

	@Autowired
	private EntityManager em;

	@Autowired
	private ElasticsearchOperations elasticsearchRestTemplate;

	@Autowired
	private RestHighLevelClient client;

	@Autowired
	private EntityMapper mapper;

	@Autowired
	private ElasticsearchService _self;

	@Autowired
	private TransientMessageService transientMessageService;

	private final Set<Class<? extends EmptyModel>> indexedEntities = Collections.synchronizedSet(new HashSet<>());
	private final Map<Class<? extends EmptyModel>, Set<String>> jsonSchemas = new HashMap<>();
	private final Map<Class<? extends EmptyModel>, Integer> reindexBatchSize = new HashMap<>();

	/// Size of database batch scan for IDs
	private int batchSize = 1000;

	// Boosted fields
	private Map<Class<? extends EmptyModel>, String[]> fieldBoosts = new HashMap<>();

	@Value("${elasticsearch.collection.prefix:ggce}")
	private String collectionPrefix;

	@Override
	public void afterPropertiesSet() throws Exception {
		if (StringUtils.isNotBlank(collectionPrefix)) {
			collectionPrefix = collectionPrefix.replaceAll("[^a-zA-Z0-9]", "") + "_";
		} else {
			collectionPrefix = "ggce_";
		}
		log.warn("ES index prefix: {}", collectionPrefix);

		if (withElasticsearch()) {
			for (Class<? extends EmptyModel> clazz : indexedEntities) {
				final String writeIndex = getFirstIndexForAlias(toIndexName(clazz) + INDEX_WRITE);
				if (writeIndex == null) {
					ensureWriteAlias(clazz);
				} else {
					log.info("Updating write index {} for {}", writeIndex, clazz.getName());
					Object mapping = CustomMapping.springDataMapping(elasticsearchRestTemplate.getElasticsearchConverter(), clazz, COMMON_TYPE_NAME);
					try {
						elasticsearchRestTemplate.putMapping(writeIndex, COMMON_TYPE_NAME, mapping);
					} catch (Throwable e) {
						log.warn("The ES mapping is no longer compatible for index={} of {}. Please regenerate.", writeIndex, clazz);
						transientMessageService.addAdminAlert("ES-MAPPING-" + clazz, "The ES mapping is no longer compatible for index={} of {}. The index must be regenerated.", writeIndex, clazz);
					}
				}
			}
		} else {
			log.warn("Elasticsearch not accessible, not updating mappings");
		}

		JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(((GRINGlobalEntityMapper) mapper).getObjectMapper());
		for (Class<? extends EmptyModel> clazz : indexedEntities) {
			try {
				JsonSchema schema = schemaGen.generateSchema(clazz);
				jsonSchemas.put(clazz, buildJsonPaths(((ObjectSchema) schema).getProperties(), null));
			} catch (Throwable e) {
				log.error("The list of all {} fields is not created.", clazz.getSimpleName(), e);
			}
		}

		// Boosted fields
		for (Class<? extends EmptyModel> clazz : indexedEntities) {
			BoostedFields boostedFields = clazz.getAnnotation(BoostedFields.class);
			if (boostedFields != null) {
				fieldBoosts.put(clazz, boostedFields.value());
			} else {
				fieldBoosts.put(clazz, ArrayUtils.EMPTY_STRING_ARRAY);
			}
		}
	}

	private boolean withElasticsearch() {
		try {
			return RestStatus.OK == client.cluster().health(new ClusterHealthRequest(), RequestOptions.DEFAULT).status();
		} catch (Throwable e) {
			log.warn(e.getMessage());
			return false;
		}
	}

	private String getFirstIndexForAlias(final String aliasName) {
		try {
			GetAliasesResponse response = client.indices().getAlias(new GetAliasesRequest(aliasName), RequestOptions.DEFAULT);
			if (LOGtest.isDebugEnabled() && response != null && !response.getAliases().isEmpty()) {
				for (var al : response.getAliases().entrySet())
					LOGtest.debug("Alias {} meta={}", al.getKey(), al.getValue());
			}
			return response != null && !response.getAliases().isEmpty() ? (String) response.getAliases().keySet().toArray()[0] : null;
		} catch (Throwable e) {
			log.warn("Error while getting index for alias={}:", aliasName, e);
			return null;
		}
	}

	/**
	 * Makes a list of all JSON paths for indexed entity and all related types.
	 *
	 * @param properties properties
	 * @param parentPath parentPath
	 */
	private Set<String> buildJsonPaths(Map<String, JsonSchema> properties, String parentPath) {
		if (MapUtils.isEmpty(properties)) {
			return Collections.emptySet();
		}

		Set<String> fieldList = new HashSet<>();
		Set<Map.Entry<String, JsonSchema>> entries = properties.entrySet();
		entries.removeIf(e -> e.getKey().equals("_class") || e.getKey().equals("_permissions"));

		for (Map.Entry<String, JsonSchema> entry : entries) {
			JsonSchema schema = entry.getValue();
			String fullPath = StringUtils.isBlank(parentPath) ? entry.getKey() : parentPath + "." + entry.getKey();

			if (schema instanceof ObjectSchema) {
				fieldList.addAll(buildJsonPaths(((ObjectSchema) schema).getProperties(), fullPath));
			} else {
				fieldList.add(fullPath);
			}
		}
		return fieldList;
	}

	/**
	 * Get index name for clazz
	 *
	 * @param clazz
	 * @return
	 */
	private String toIndexName(Class<?> clazz) {
		return collectionPrefix + clazz.getSimpleName().toLowerCase();
	}

	/**
	 * Checks that index "write" alias exists. When alias is not found, a new index
	 * is created and alias points to it.
	 *
	 * @param clazz for type mapping
	 */
	protected synchronized void ensureWriteAlias(Class<?> clazz) {
		String indexRoot = toIndexName(clazz);
		String aliasWrite = indexRoot + INDEX_WRITE;
		if (!aliasExists(aliasWrite)) {
			final String indexName = createIndexFor(clazz);
			String aliasRead = indexRoot + INDEX_READ;

			if (!aliasExists(aliasRead)) {
				deleteAlias(collectionPrefix + ALIAS_EVERYTHING, indexRoot + "*");
				addAlias(collectionPrefix + ALIAS_EVERYTHING, indexName);
				addAlias(aliasRead, indexName);
			}
		}
	}

	private void deleteAlias(String aliasName, String indexName) {
		final AliasQuery query = new AliasQuery();
		query.setAliasName(aliasName);
		query.setIndexName(indexName);

		try {
			this.elasticsearchRestTemplate.removeAlias(query);
			log.info("Removed alias {} from index {}", aliasName, indexName);
			LOGtest.warn("Removed alias {} from index {}", aliasName, indexName);
		} catch (ElasticsearchStatusException | AliasesNotFoundException e) {
			log.info("Alias {} does not exist on index {}", aliasName, indexName);
			LOGtest.warn("Alias {} does not exist on index {}", aliasName, indexName);
		}
	}

	private boolean aliasExists(final String aliasName) {
		try {
			return client.indices().existsAlias(new GetAliasesRequest(aliasName), RequestOptions.DEFAULT);
		} catch (IOException e) {
			return false;
		}
	}

	/**
	 * Creates the index for the entity. It assigns it the _WRITE alias
	 *
	 * @param clazz the clazz
	 * @return the new index name
	 */
	private String createIndexFor(Class<?> clazz) {
		final String indexRoot = toIndexName(clazz);
		final String indexName = indexRoot + System.currentTimeMillis();

		log.info("Creating new index {} for {}", indexName, clazz.getName());
		LOGtest.warn("Creating new index {} for {}", indexName, clazz.getName());

		/*@formatter:off*/
		MapBuilder<String, Object> settingsBuilder = new MapBuilder<String, Object>()
			// .put("index.blocks.read_only_allow_delete", null)
//			.put("index.mapping.total_fields.limit", 1000)
//			.put("index.number_of_shards", 1)
		;
		/*@formatter:on*/

		if (! "embedded".equals(esClusterNodes)) {
//			/*@formatter:off*/
//			settingsBuilder
//				// Analyzer
//				.put("index.analysis.analyzer.default.tokenizer", "standard")
//				.put("index.analysis.analyzer.default.filter.0", "asciifolding2")
//				// Filter
//				.put("index.analysis.filter.asciifolding2.type", "asciifolding")
//				.put("index.analysis.filter.asciifolding2.preserve_original", true)
//			;
//			/*@formatter:on*/
		} else {
			// Embedded ES
			settingsBuilder
				.put("index.number_of_shards", 1)
				.put("index.number_of_replicas", 0);
		}

		elasticsearchRestTemplate.createIndex(indexName, settingsBuilder.map());
		Object mapping = CustomMapping.springDataMapping(elasticsearchRestTemplate.getElasticsearchConverter(), clazz, COMMON_TYPE_NAME);
		elasticsearchRestTemplate.putMapping(indexName, COMMON_TYPE_NAME, mapping);

		// Add _WRITE alias
		realias(indexRoot + INDEX_WRITE, indexRoot, indexName);
		return indexName;
	}

	@SuppressWarnings("unchecked")
	@Override
	public <R extends EmptyModel> void indexEntity(Class<R> clazz) {

		synchronized (this) {
			if (indexedEntities.contains(clazz)) {
				log.warn("Entity {} already set for indexing.", clazz.getName());
				return;
			}
		}

		indexedEntities.add(clazz);
	}

	@Override
	public <R extends EmptyModel> void indexEntity(Class<R> clazz, int reindexBatchSize) {
		indexEntity(clazz);
		this.reindexBatchSize.put(clazz, reindexBatchSize);
	}

	@Override
	public <R> void removeAll(Class<R> clazz) throws SearchException {
		if (!indexedEntities.contains(clazz)) {
			log.debug("Class {} is not indexed.", clazz.getName());
			return;
		}

		if (!withElasticsearch()) {
			log.warn("Elasticsearch not accessible, not reindexing {}", clazz.getName());
			return;
		}

//		LOG.info("Deleting all docs for {}", clazz.getName());
//		if (LOGtest.isInfoEnabled()) LOGtest.info("Deleting all docs for {}", clazz);
//		DeleteQuery dq = new DeleteQuery();
//		dq.setIndex(toIndexName(clazz) + INDEX_WRITE);
//		dq.setType(COMMON_TYPE_NAME);
//		dq.setQuery(new MatchAllQueryBuilder());
//		elasticsearchRestTemplate.delete(dq);

		if (count(clazz, null) == 0) {
			LOGtest.warn("Index for {} is empty, count == 0. No need to recreate.", clazz);
			return;
		}
		log.info("Re-creating empty index for {}", clazz.getName());
		LOGtest.warn("Re-creating empty index for {}", clazz.getName());
		final String indexRoot = toIndexName(clazz);

		// Figure out existing index name
		String currentIndexName = getFirstIndexForAlias(indexRoot + INDEX_READ);
		if (currentIndexName == null) {
			currentIndexName = getFirstIndexForAlias(indexRoot + INDEX_WRITE);
		}

		// Make new index with timestamp
		final String indexName = createIndexFor(clazz);
		// The old index name
		final String oldIndexName = currentIndexName;

		// Move _READ alias
		realias(indexRoot + INDEX_READ, indexRoot, indexName);
		// Add to EVERYTHING
		deleteAlias(collectionPrefix + ALIAS_EVERYTHING, indexRoot + "*");
		addAlias(collectionPrefix + ALIAS_EVERYTHING, indexName);

		// delete old index
		if (oldIndexName != null) {
			log.info("Deleting old index {}", oldIndexName);
			LOGtest.warn("Deleting old index {}", oldIndexName);
			elasticsearchRestTemplate.deleteIndex(oldIndexName);
		}

	}

	private <R> void internalReindex(Class<R> clazz) {
		if (!indexedEntities.contains(clazz)) {
			log.warn("Class {} is not indexed.", clazz.getName());
			return;
		}

		if (!withElasticsearch()) {
			log.warn("Elasticsearch not accessible, not reindexing {}", clazz.getName());
			return;
		}

		databaseReindexRunning.set(true); // We are now doing a database scan.

		log.info("Reindexing {}", clazz.getName());
		final String indexRoot = toIndexName(clazz);

		// Figure out existing index name
		String currentIndexName = getFirstIndexForAlias(indexRoot + INDEX_READ);
		if (currentIndexName == null) {
			currentIndexName = getFirstIndexForAlias(indexRoot + INDEX_WRITE);
		}

		// Make new index with timestamp
		final String indexName = createIndexFor(clazz);
		// The old index name
		final String oldIndexName = currentIndexName;

		// Scan
		scanDatabase(clazz, null);

		// Schedule rename
		taskExecutor.execute(() -> {
			try {
				do {
					var queueSize = elasticReindexQueue.size();
					if (queueSize == 0) {
						break;
					}
					log.trace("ES Reindex queue has {} elements, waiting to realias {}...", queueSize, indexName);
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						// Fine...
					}
				} while (true);
	
				log.info("ES Reindex queue is empty, realiasing {}", indexName);
				// Move _READ alias
				realias(indexRoot + INDEX_READ, indexRoot, indexName);
				// Add to EVERYTHING
				deleteAlias(collectionPrefix + ALIAS_EVERYTHING, indexRoot + "*");
				addAlias(collectionPrefix + ALIAS_EVERYTHING, indexName);
	
				// delete old index
				if (oldIndexName != null) {
					log.info("Deleting old index {}", oldIndexName);
					elasticsearchRestTemplate.deleteIndex(oldIndexName);
				}

			} catch (Throwable e) {
				log.warn("Realiasing after full scan threw an exception: {}", e.getMessage(), e);

			} finally {
				databaseReindexRunning.set(false); // Clear flag that we're running a scan.
			}
		});
	}

	private <R> void scanDatabase(Class<R> clazz, EmptyModelFilter<?, ?> filter) {

		PathBuilder<R> builder = new PathBuilderFactory().create(clazz);
		Querydsl querydsl = new Querydsl(em, builder);
		EntityPath<R> entityPath = SimpleEntityPathResolver.INSTANCE.createPath(clazz);
		PathBuilder<R> pathBuilder = new PathBuilder<R>(clazz, entityPath.getMetadata().getName());
		JPQLQuery<Long> query = querydsl.createQuery(entityPath)
			// select id only
			.select(pathBuilder.getNumber("id", Long.class))
			// and order by id
			.orderBy(pathBuilder.getNumber("id", Long.class).asc());

		if (filter != null) {
			// apply filter
			query.where(filter.buildPredicate());
		}

		Integer scanSize = reindexBatchSize.get(clazz);
		final int customBatchSize = scanSize == null ? batchSize : scanSize.intValue();

		int startPosition = 0;
		query.offset(startPosition);
		query.limit(customBatchSize);

		StopWatch stopWatch = new StopWatch();
		stopWatch.start();
		List<Long> results;
		do {
			stopWatch.split();
			log.debug("Reading IDs stopwatch={}s {} {}+{}", stopWatch.getSplitTime() / 1000, clazz.getName(), startPosition, customBatchSize);
			if (startPosition > 10 * customBatchSize && startPosition / (10 * customBatchSize) == 0) {
				log.info("Reading IDs stopwatch={}s {} {}+{}", stopWatch.getSplitTime() / 1000, clazz.getName(), startPosition, customBatchSize);
			}
			results = query.fetch();

			// Schedule indexing
			elasticReindexQueue.addAll(results.stream().map((res) -> new ElasticReindex(clazz, res)).collect(Collectors.toList()));

			// Next page
			query.offset(startPosition += results.size());
		} while (results.size() > 0);
		stopWatch.stop();
		log.info("Scanning {} for reindex took {}ms", clazz.getName(), stopWatch.getTime());
	}

	/**
	 * Will not modify the list of IDs
	 */
	@Override
	public <R> void update(final Class<R> clazz, final Collection<Long> ids) {
		if (!indexedEntities.contains(clazz)) {
			log.warn("Class {} is not indexed.", clazz.getName());
			return;
		}

		var checkTriggers = ! databaseReindexRunning.get(); // Check triggers only if we are not in a full database scan

		HashSet<Long> notFoundIds = new HashSet<>(ids);

		final String indexName = toIndexName(clazz) + INDEX_WRITE;

		CriteriaBuilder cb = em.getCriteriaBuilder();
		CriteriaQuery<R> cq = cb.createQuery(clazz);
		Root<R> root = cq.from(clazz);
		cq.where(root.get("id").in(ids));

		TypedQuery<R> query = em.createQuery(cq);
		List<R> results = query.getResultList();

		List<IndexQuery> queries = new ArrayList<IndexQuery>();

		Map<String, String> jsons = new HashMap<>();

		for (R x : results) {
			if (log.isTraceEnabled()) log.trace("Indexing {} {}", clazz.getName(), x);
			if (LOGtest.isInfoEnabled()) LOGtest.info("Indexing {} {}", clazz.getName(), x);

			EmptyModel bm = (EmptyModel) x;
			if (x instanceof ElasticLoader) {
				// Prepare entity for indexing (e.g. lazy-load)
				((ElasticLoader) x).prepareForIndexing();
			}
			try {
				jsons.put(bm.getId().toString(), mapper.mapToString(bm));
			} catch (Throwable e) {
				log.warn("Failed to prepare JSON for {} id={}: {}", bm.getClass(), bm.getId(), e.getMessage(), e);
			}
			if (LOGtest.isTraceEnabled()) LOGtest.trace("JSON: {}", jsons.get(bm.getId().toString()));
			// is found
			notFoundIds.remove(bm.getId());

			IndexQuery iq = new IndexQuery();
			iq.setIndexName(indexName);
			iq.setType(COMMON_TYPE_NAME);
			iq.setId(String.valueOf(bm.getId()));
			iq.setObject(bm);

			queries.add(iq);

			// check for ElasticTrigger if not in a full database scan
			if (checkTriggers) {
				checkElasticTriggers(bm);
			}
		}

		if (!log.isTraceEnabled() && log.isDebugEnabled() && jsons.size() > 0) {
			log.debug("JSON: {}", jsons.values().toArray()[0]);
		}

		if (!queries.isEmpty()) {
			log.debug("Indexing {} count={} of provided objects count={}", clazz.getName(), queries.size(), ids.size());
			if (LOGtest.isInfoEnabled()) LOGtest.info("Indexing {} count={} of provided objects count={}", clazz.getName(), queries.size(), ids.size());

			try {
				elasticsearchRestTemplate.bulkIndex(queries);
			} catch (org.springframework.data.elasticsearch.ElasticsearchException e) {
				log.error(e.getMessage(), e);
				log.error("JSON sample: {}", jsons.values().toArray()[0]);
				Map<String, String> failedDocs = e.getFailedDocuments();
				if (failedDocs != null) {
					for (Map.Entry<String, String> entry : failedDocs.entrySet()) {
						log.error("{} {}\n{}", entry.getKey(), entry.getValue(), jsons.get(entry.getKey()));
					}
				}
			} catch (ElasticsearchException e) {
				log.error("Could not index document", e);
			}
		}

		em.clear();
		queries.clear();

		for (Long id : notFoundIds) {
			if (log.isTraceEnabled()) log.trace("Removing {} id={} from index {}/{}", clazz.getName(), id, indexName, COMMON_TYPE_NAME);
			if (LOGtest.isInfoEnabled()) LOGtest.info("Removing {} id={} from index {}/{}", clazz.getName(), id, indexName, COMMON_TYPE_NAME);

			String res = elasticsearchRestTemplate.delete(indexName, COMMON_TYPE_NAME, String.valueOf(id));
			log.trace("Deleted ES document id={}", res);
			LOGtest.warn("Deleted ES document id={} from {}", res, indexName);
		}
	}


	private void checkElasticTriggers(EmptyModel entity) {
		// Is this an ElasticTrigger?
		if (entity instanceof ElasticTrigger) {
			if (LOGtest.isInfoEnabled()) LOGtest.info("Check related entities of {}", entity);
			try {
				Object[] reindexedEntities = ((ElasticTrigger) entity).reindexedEntities();
				if (reindexedEntities != null) {
					for (var m : reindexedEntities) {
						if (m instanceof EmptyModel) {
							var toReindex = new ElasticReindex((EmptyModel) m);
							var newlyAdded = elasticReindexQueue.add(toReindex);
							LOGtest.info("Related entity {} was (newly={}) queued.", toReindex, newlyAdded);
						}
					}
				}
			} catch (Throwable e) {
				log.error("Error inspecting reindexed entities of {} id={}: {}", entity.getClass(), entity.getId(), e.getMessage()); // , e);
			}
		}
	}

	/**
	 * Schedule a parallel update.
	 *
	 * @param clazz Entity to reindex
	 * @param entityIds Entity identifiers to reindex
	 */
	@Override
	public <R> void asyncUpdate(final Class<R> clazz, final Collection<Long> entityIds) {
		if (!indexedEntities.contains(clazz)) {
			log.warn("Class {} is not indexed.", clazz.getName());
			return;
		}

		final HashSet<Long> copy = new HashSet<>(entityIds);

		if (copy.size() == 0) {
			return;
		}

		taskExecutor.execute(() -> {
			log.debug("Running scheduled reindex of {} size={}", clazz.getName(), copy.size());
			if (LOGtest.isInfoEnabled()) LOGtest.info("Running scheduled reindex of {} size={}", clazz, copy.size());
			try {
				_self.update(clazz, copy);
			} catch (Throwable e) {
				log.error(e.getMessage(), e);
			}
		});
	}

	private <T extends EmptyModel> List<T> findEntities(Class<T> clazz, List<Long> collect) {
		CriteriaBuilder cb = em.getCriteriaBuilder();
		CriteriaQuery<T> cq = cb.createQuery(clazz);
		Root<T> root = cq.from(clazz);
		cq.where(root.get("id").in(collect));
		return em.createQuery(cq).getResultList();
	}

	/**
	 * Runs TermFacet, but will automatically increase size if #otherCount is more
	 * than 10%
	 */
	@Override
	public TermResult termStatisticsAuto(Class<? extends EmptyModel> clazz, EmptyModelFilter<?, ?> filters, int size, String term) throws SearchException {
		return termStatisticsAuto(clazz, filters, size, new String[] { term }).get(term);
	}

	@Override
	public Map<String, TermResult> termStatisticsAuto(Class<? extends EmptyModel> clazz, EmptyModelFilter<?, ?> filters, int size, String... terms) throws SearchException {
		Map<String, TermResult> termResult = termStatistics(clazz, filters, size, terms);

		// TBD

		return termResult;
	}

	@Override
	public TermResult termStatistics(Class<? extends EmptyModel> clazz, EmptyModelFilter<?, ?> filters, int size, String term) throws SearchException {
		return termStatistics(clazz, filters, size, new String[] { term }).get(term);
	}

	@Override
	public Map<String, TermResult> termStatistics(Class<? extends EmptyModel> clazz, EmptyModelFilter<?, ?> filters, int size, String... terms) throws SearchException {
		if (!indexedEntities.contains(clazz)) {
			throw new RuntimeException("Class is not indexed " + clazz);
		}
		Long total = count(clazz, filters);

		String indexName = toIndexName(clazz) + INDEX_READ;
		SearchRequest searchRequest = new SearchRequest(indexName).types(COMMON_TYPE_NAME);
		SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(toEsQuery(clazz, filters));

		for (String term : terms) {
			TermsAggregationBuilder aggregation = AggregationBuilders.terms(term).field(term).size(size).order(BucketOrder.count(false));
			searchSourceBuilder.aggregation(aggregation);
		}
		searchRequest.source(searchSourceBuilder);

		log.debug("ES query {}", searchRequest);
		SearchResponse response = search(searchRequest, RequestOptions.DEFAULT);

		Map<String, Aggregation> results = response.getAggregations().asMap();
		for (Map.Entry<String, Aggregation> e : results.entrySet()) {
			log.debug("FYI, ES results {} for {} of {}", e.getKey(), terms, e.getValue().getClass());
		}

		Map<String, TermResult> termResults = new HashMap<>();

		for (String term : terms) {
			Aggregation agg = results.get(term);

			if (agg instanceof ParsedLongTerms || agg instanceof ParsedStringTerms || agg instanceof ParsedDoubleTerms) {
				ParsedTerms topCounts = (ParsedTerms) agg;

				List<? extends Terms.Bucket> buckets = topCounts.getBuckets();

				TermResult tr = new TermResult(term, total, buckets.stream().map(bucket -> new Term(bucket.getKeyAsString(), bucket.getDocCount())).collect(Collectors.toList()),
					topCounts.getSumOfOtherDocCounts());

				termResults.put(term, tr);
			} else if (agg instanceof UnmappedTerms) {
				UnmappedTerms unmapped = (UnmappedTerms) agg;
				throw new RuntimeException("Unmapped term " + term + ": " + unmapped.getBuckets());
			} else {
				// Doesn't happen
				throw new RuntimeException("IDK for term " + term + ": " + agg.getClass());
			}
		}

		return termResults;
	}

	private QueryBuilder toEsQuery(Class<?> clazz, EmptyModelFilter<?, ?> filters) {
		return toEsQuery(clazz, filters, null, null);
	}

	private QueryBuilder toEsQuery(Class<?> clazz, EmptyModelFilter<?, ?> filters, Predicate predicate, String[] boostFields) {

		ElasticQueryBuilder esQb = new ElasticQueryBuilder();
		if (filters != null) {
			BooleanBuilder builder = new BooleanBuilder();
			if (predicate != null) {
				builder.and(predicate);
			}
			builder.and(filters.buildPredicate()).accept(esQb, null);
		}
		QueryBuilder esQuery = esQb.getQuery();

		if (filters instanceof IFullTextFilter) {
			String text = ((IFullTextFilter) filters).get_text();

			if (StringUtils.isNotBlank(text)) {
				BoolQueryBuilder fulltext = boolQuery()
				/*@formatter:off*/
					.should(multiMatchQuery(text, "_texts")
						.fuzziness(Fuzziness.AUTO)
						// .minimumShouldMatch("75%")
						.boost(2.0f)
					)
					.should(queryStringQuery(text)
						.lenient(true)
						.tieBreaker(1.0f) // was .useDisMaX()
						.fuzziness(Fuzziness.AUTO)
						.boost(0.9f)
					);
				/*@formatter:on*/

				{
					SimpleQueryStringBuilder sqsQuery = simpleQueryStringQuery(text)
					/*@formatter:off*/
						.lenient(true)
						.defaultOperator(Operator.AND)
						.field("*").boost(1)
						// .minimumShouldMatch("75%")
					/*@formatter:on*/
					;

					if (boostFields != null && boostFields.length > 0) {
						sqsQuery.field("_texts", 1);
						for (String field : boostFields) {
							sqsQuery.field(field, 3);
						}
					}

					String[] boostedSearch = fieldBoosts.get(clazz);
					if (boostedSearch != null && boostedSearch.length > 0) {
						for (String field : boostedSearch) {
							sqsQuery.field(field, 10);
						}
					}

					fulltext.should(sqsQuery);
				}

				fulltext.minimumShouldMatch("70%");

				if (esQuery instanceof BoolQueryBuilder) {
					esQuery = ((BoolQueryBuilder) esQuery).must(fulltext);
				} else {
					BoolQueryBuilder builder = new BoolQueryBuilder();
					esQuery = builder.filter(esQuery).must(fulltext);
				}
			}
		}

		if (log.isDebugEnabled()) {
			try {
				log.debug("Converted {} to\ncurl -XGET 'localhost:9200/{}{}/_search?pretty' -H 'Content-Type: application/json' -d '{ \"query\": {} }'", new ObjectMapper()
					.writeValueAsString(filters), toIndexName(clazz), INDEX_READ, esQuery);
			} catch (JsonProcessingException e) {
			}
		}
		return esQuery;
	}

	/**
	 * Make the alias point exclusively to the specified index
	 *
	 * @param aliasName The alias name
	 * @param indexPrefix used to remove existing aliases
	 * @param targetIndexName The index the alias points to
	 */
	@Override
	@PreAuthorize("hasRole('ADMINISTRATOR')")
	public void realias(final String aliasName, final String indexPrefix, final String targetIndexName) {
		if (!aliasName.startsWith(collectionPrefix)) {
			throw new RuntimeException("Unable to move alias with a such prefix.");
		}
		if (!targetIndexName.startsWith(collectionPrefix)) {
			throw new RuntimeException("Unable to move alias to index with a such prefix.");
		}
		if (indexPrefix != null) {
			deleteAlias(aliasName, indexPrefix + "*");
		} else {
			deleteAlias(aliasName);
		}
		addAlias(aliasName, targetIndexName);
	}

	@Override
	@PreAuthorize("hasRole('ADMINISTRATOR')")
	public void addAlias(String aliasName, String indexName) {
		if (!aliasName.startsWith(collectionPrefix) || !indexName.startsWith(collectionPrefix)) {
			throw new RuntimeException("Unable to add alias with a such prefix of alias or index.");
		}
		final AliasQuery query = new AliasQuery();
		query.setAliasName(aliasName);
		query.setIndexName(indexName);
		log.info("Adding alias {} to index {}", aliasName, indexName);
		LOGtest.info("Adding alias {} to index {}", aliasName, indexName);
		this.elasticsearchRestTemplate.addAlias(query);
		List<AliasMetaData> aliases = elasticsearchRestTemplate.queryForAlias(indexName);
		for (AliasMetaData alias : aliases) {
			log.debug(" Index {} has alias {} write={} {}", indexName, alias.getAlias(), alias.writeIndex(), alias.indexRouting());
			LOGtest.info(" Index {} has alias {} write={} {}", indexName, alias.getAlias(), alias.writeIndex(), alias.indexRouting());
		}
	}

	/**
	 * Delete alias.
	 *
	 * @param aliasName the alias name
	 */
	@Override
	@PreAuthorize("hasRole('ADMINISTRATOR')")
	public void deleteAlias(final String aliasName) {
		if (!aliasName.startsWith(collectionPrefix)) {
			throw new RuntimeException("Unable to delete alias with a such prefix.");
		}
		try {
			GetAliasesResponse response = client.indices().getAlias(new GetAliasesRequest(aliasName), RequestOptions.DEFAULT);

			if (response != null && !response.getAliases().isEmpty()) {
				for (final String indexName : response.getAliases().keySet()) {
					deleteAlias(aliasName, indexName);
				}
			}
		} catch (IOException e) {
			log.warn("Error while reading alias={}:", aliasName, e);
			LOGtest.warn("Error while reading alias={}:", aliasName, e);
		}
	}

	@Override
	@PreAuthorize("hasRole('ADMINISTRATOR')")
	public void deleteIndex(final String indexName) {
		if (!indexName.startsWith(collectionPrefix)) {
			throw new RuntimeException("Unable to delete index with a such prefix.");
		}
		elasticsearchRestTemplate.deleteIndex(indexName);
		assert(! elasticsearchRestTemplate.indexExists(indexName));
	}

	@Override
	public void reindexAll() {
		for (Class<? extends EmptyModel> clazz : indexedEntities) {
			reindex(clazz);
		}
	}

	@Override
	public <R> void reindex(Class<R> clazz) {
		boolean isLockAcquired = elasticsearchReindexLock.tryLock();
		if (isLockAcquired) {
			try {
				internalReindex(clazz);
			} finally {
				elasticsearchReindexLock.unlock();
			}
		} else {
			throw new RuntimeException("Could not run reindex for " + clazz.getSimpleName() + ". Operation locked.");
		}
	}

	@Override
	public <T extends EmptyModel> void reindex(Class<T> clazz, EmptyModelFilter<?, ?> filter) {
		boolean isLockAcquired = elasticsearchReindexLock.tryLock();
		if (isLockAcquired) {
			try {
				scanDatabase(clazz, filter);
			} finally {
				elasticsearchReindexLock.unlock();
			}
		} else {
			throw new RuntimeException("Could not run reindex for " + clazz.getSimpleName() + ". Operation locked.");
		}
	}

	@Override
	public List<Class<?>> getIndexedEntities() {
		ArrayList<Class<?>> entities = new ArrayList<>(this.indexedEntities);
		entities.sort((a, b) -> a.getName().compareTo(b.getName()));
		return ListUtils.unmodifiableList(entities);
	}

	@Override
	public long count(Class<?> clazz, EmptyModelFilter<?, ?> filter) throws SearchException {
		String indexName = toIndexName(clazz) + INDEX_READ;
		SearchRequest searchRequest = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(new SearchSourceBuilder().query(toEsQuery(clazz, filter)).size(0));
		SearchResponse hits = search(searchRequest, RequestOptions.DEFAULT);
		for (var shardFail : hits.getShardFailures()) {
			log.warn("Count of {} for {} failed shard {}.{} with: {}", clazz, filter, shardFail.shard().getIndex(), shardFail.shardId(), shardFail.reason());
		}
		return hits.getHits().getTotalHits();
	}

	/**
	 * This is mostly used in unit testing to assure index has settled
	 */
	@Override
	public long waitForCount(Class<? extends EmptyModel> clazz, EmptyModelFilter<?, ?> filter, int mustHaveCount) throws SearchException {
		long count = 0;
		int repeat = 0;
		var maxRepeats = 20;

		if (LOGtest.isInfoEnabled()) LOGtest.info("Need ES count for {}={}", clazz, mustHaveCount);

		do {
			try {
				var readIndexName = getFirstIndexForAlias(toIndexName(clazz) + INDEX_READ);
				var writeIndexName = getFirstIndexForAlias(toIndexName(clazz) + INDEX_WRITE);

				if (! Objects.equals(readIndexName, writeIndexName)) {
					LOGtest.warn("Read/write index mismatch {}!={}.", readIndexName, writeIndexName);
					try {
						Thread.sleep(200);
					} catch (InterruptedException e) {
					}
					throw new ElasticsearchStatusException("Read/write index mismatch,", null); // This is caught later and will reset counts.
				}

				count = count(clazz, filter);
				if (count != mustHaveCount) {
//					LOG.warn("ES count #{} of {} is {}!={}", repeat, clazz.getName(), count, mustHaveCount);
					if (repeat++ >= maxRepeats) {
						throw new RuntimeException("ES count did not settle in " + repeat + " retries. " + clazz.getName() + " " + count + "!=" + mustHaveCount);
					}
					clearEsCache(); // Clear caches
					if (repeat % 3 == 0) {
						syncFlushEs(); // Force sync-flush, maybe that helps
					}
					try {
						var sleepTime = ((maxRepeats - repeat + 1) * 50) + RandomUtils.nextLong(10, 100);
						if (LOGtest.isInfoEnabled()) LOGtest.info("ES wrong count #{} from {}/{} in {} is {}!={}, sleeping for {}ms...", repeat, clazz, readIndexName, writeIndexName, count, mustHaveCount, sleepTime);

						printHealth(); // Log cluster info before retrying
						Thread.sleep(sleepTime);
					} catch (InterruptedException e) {
						log.error("Thread interrupted, returning {}", count);
						return count;
					}
				} else {
					// OK!
					if (LOGtest.isInfoEnabled()) LOGtest.info("ES good count #{} for {} from {}/{} is {}=={}!", repeat, clazz.getName(), readIndexName, writeIndexName, count, mustHaveCount);
					if (repeat > 0) {
						printHealth(); // Log cluster info after successful retry!
					}
				}
			} catch (ElasticsearchStatusException e) {
				count = -1;
				repeat = 0;
				log.warn("Error while waiting for count: {}. Retrying...", e.getMessage());
				LOGtest.warn("Error while waiting for count: {}. Retrying...", e.getMessage());
			}
		} while (count != mustHaveCount);
		return count;
	}

	/**
	 * Execute refresh on all indices and clear ES caches.
	 */
	private void clearEsCache() {
		try {
			Response r;

			log.trace("Refreshing ES");
			r = client.getLowLevelClient().performRequest(new Request("POST", "/_refresh"));
			try (var body = r.getEntity().getContent()) {
				log.debug("Refresh ES: {}", new String(body.readAllBytes(), StandardCharsets.UTF_8));
			}

			log.trace("Clearing ES caches");
			r = client.getLowLevelClient().performRequest(new Request("POST", "/_cache/clear"));
			try (var body = r.getEntity().getContent()) {
				log.debug("Clear ES cache: {}", new String(body.readAllBytes(), StandardCharsets.UTF_8));
			}

			var status = client.cluster().health(new ClusterHealthRequest(), RequestOptions.DEFAULT).getStatus();
			if (status != ClusterHealthStatus.GREEN) {
				LOGtest.warn("ES status {}", status);
				printHealth();
			}
		} catch (IOException e) {
			log.error("Could not execute ES operation: {}", e.getMessage(), e);
		}
	}

	private void syncFlushEs() {
		try {
			Response r;
			log.trace("Flushing ES");
			r = client.getLowLevelClient().performRequest(new Request("POST", "/_flush/synced"));
			try (var body = r.getEntity().getContent()) {
				log.info("Synced ES flush:\n{}", new String(body.readAllBytes(), StandardCharsets.UTF_8));
			}
		} catch (IOException e) {
			log.error("Could not execute synced ES flush: {}", e.getMessage());
		}
	}

	/**
	 * This is called from {@link #waitForCount(Class, EmptyModelFilter, int) to try and debug
	 * ES cluster status.
	 */
	private void printHealth() {
		try {
			Response r;

//			r = client.getLowLevelClient().performRequest(new Request("GET", "/_cat/nodes?v"));
//			try (var body = r.getEntity().getContent()) {
//				LOG.error("ES nodes:\n{}", new String(body.readAllBytes()));
//			}

//			r = client.getLowLevelClient().performRequest(new Request("GET", "/_cat/health?v"));
//			try (var body = r.getEntity().getContent()) {
//				LOG.error("ES health:\n{}", new String(body.readAllBytes()));
//			}

//			r = client.getLowLevelClient().performRequest(new Request("GET", "/_cat/thread_pool?v"));
//			try (var body = r.getEntity().getContent()) {
//				LOG.error("ES thread pool:\n{}", new String(body.readAllBytes()));
//			}

//			r = client.getLowLevelClient().performRequest(new Request("GET", "/_cat/pending_tasks?v"));
//			try (var body = r.getEntity().getContent()) {
//				LOG.error("ES pending tasks:\n{}", new String(body.readAllBytes()));
//			}

//			r = client.getLowLevelClient().performRequest(new Request("GET", "/_cat/indices?v"));
//			try (var body = r.getEntity().getContent()) {
//				LOG.error("ES indices:\n{}", new String(body.readAllBytes()));
//			}

//			r = client.getLowLevelClient().performRequest(new Request("GET", "/_cat/aliases?v"));
//			try (var body = r.getEntity().getContent()) {
//				LOG.error("ES aliases:\n{}", new String(body.readAllBytes()));
//			}

			r = client.getLowLevelClient().performRequest(new Request("GET", "/_cat/segments?v"));
			try (var body = r.getEntity().getContent()) {
				log.debug("ES segments:\n{}", new String(body.readAllBytes(), StandardCharsets.UTF_8));
				LOGtest.info("ES segments:\n{}", new String(body.readAllBytes(), StandardCharsets.UTF_8));
			}

		} catch (IOException e) {
			log.error("Could not execute cluster API call: {}", e.getMessage(), e);
		}
	}

	@Override
	public Map<String, Long> countMissingValues(Class<? extends EmptyModel> indexClass, EmptyModelFilter<?, ?> filter) throws SearchException {
		final Map<String, Long> results = new HashMap<>();
		final Set<String> fields = jsonSchemas.get(indexClass);
		if (CollectionUtils.isEmpty(fields)) {
			return results;
		}

		final String indexName = toIndexName(indexClass) + INDEX_READ;
		SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(toEsQuery(indexClass, filter)).size(0);
		for (String fieldName : fields) {
			sourceBuilder.aggregation(AggregationBuilders.missing(fieldName).field(fieldName));
		}
		SearchRequest esQuery = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(sourceBuilder);

		final SearchResponse response = search(esQuery, RequestOptions.DEFAULT);
		log.debug("Counting missing values took {}s", response.getTook().getSeconds());
		for (Aggregation agg : response.getAggregations()) {
			long missingCount = ((InternalMissing) agg).getDocCount();
			if (missingCount > 0) {
				results.put(agg.getName(), missingCount);
			}
		}
		results.put("_totalCount", response.getHits().getTotalHits());
		return results;
	}

	@Override
	public <T extends EmptyModel> Page<T> findAll(Class<T> clazz, EmptyModelFilter<?, ?> filter, Pageable page) throws SearchException {
		return findAll(clazz, toEsQuery(clazz, filter), page, null);
	}

	@Override
	public <T extends EmptyModel> Page<T> findAll(Class<T> clazz, EmptyModelFilter<?, ?> filter, Predicate predicate, Pageable page) throws SearchException {
		return findAll(clazz, filter, predicate, page, null);
	}

	@Override
	public <T extends EmptyModel> Page<T> findAll(Class<T> clazz, EmptyModelFilter<?, ?> filter, Predicate predicate, Pageable page,
			ElasticsearchService.IEntityLoader<T> entityLoader, String... boostFields) throws SearchException {
		return findAll(clazz, toEsQuery(clazz, filter, predicate, boostFields), page, entityLoader);
	}

	@Override
	public <T extends EmptyModel> void remove(Class<T> clazz, EmptyModelFilter<?, ?> filter) throws SearchException {
		String indexName = toIndexName(clazz) + INDEX_READ; // Remove from READ index
		long esCount = count(clazz, filter);

		log.warn("ES removing {} documents of {}", esCount, clazz);
		DeleteQuery dq = new DeleteQuery();
		dq.setIndex(indexName);
		dq.setType(COMMON_TYPE_NAME);
		dq.setQuery(toEsQuery(clazz, filter));
		elasticsearchRestTemplate.delete(dq);
	}

	private <T extends EmptyModel> Page<T> findAll(Class<T> clazz, QueryBuilder query, Pageable page, ElasticsearchService.IEntityLoader<T> entityLoader) throws SearchException {
		String indexName = toIndexName(clazz) + INDEX_READ;

		if (page.getOffset() > 100000) {
			log.warn("Large offset={} for ES query", page.getOffset());
		}

		SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(query).size(1);
		sourceBuilder.sort("_score", SortOrder.DESC);

		SearchRequest esRequest = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(sourceBuilder);
		SearchResponse response = search(esRequest, RequestOptions.DEFAULT);

		float maxScore = response.getHits().getMaxScore();
		float minScore = maxScore * 0.5f;

		sourceBuilder = new SearchSourceBuilder().query(query).from((int) page.getOffset()).size(page.getPageSize());
		sourceBuilder.sort("_score", SortOrder.DESC);
		sourceBuilder.minScore(minScore);

		// Effective search
		esRequest = new SearchRequest(indexName).types(COMMON_TYPE_NAME).source(sourceBuilder);
		response = search(esRequest, RequestOptions.DEFAULT);

		log.debug("Converted to\ncurl -XGET 'localhost:9200/{}{}/_search?pretty' -H 'Content-Type: application/json' -d '{ \"query\": {} }'", toIndexName(clazz), INDEX_READ,
			sourceBuilder);
		List<T> content;

		if (entityLoader != null) {
			// Use custom entity loader
			content = entityLoader.loadEntities(StreamSupport.stream(response.getHits().spliterator(), false).map(searchHit -> {
				if (log.isTraceEnabled()) {
					log.trace("Hit score={} id={} _class={} _source={}", searchHit.getScore(), searchHit.getId(), clazz, searchHit.getSourceAsString());
				}
				return Long.parseLong(searchHit.getId());
			}).collect(Collectors.toList()));

		} else {
			// Use simple entityManager loader
			content = findEntities(clazz, StreamSupport.stream(response.getHits().spliterator(), false).map(searchHit -> {
				if (log.isTraceEnabled()) {
					log.trace("Hit score={} id={} _class={} _source={}", searchHit.getScore(), searchHit.getId(), clazz, searchHit.getSourceAsString());
				}
				return Long.parseLong(searchHit.getId());
			}).collect(Collectors.toList()));
		}

		return new PageImpl<>(content, PageRequest.of(page.getPageNumber(), page.getPageSize()), response.getHits().getTotalHits());
	}

	private SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws SearchException {
		try {
			return client.search(searchRequest, options);
		} catch (IOException e) {
			log.error("Error occurred during search", e);
			throw new SearchException(e.getMessage(), e.getCause());
		}
	}

	@Override
	public GetIndexResponse listIndices() throws IOException {
		String indexNames = collectionPrefix + "*";
		var request = new GetIndexRequest(indexNames);
		GetIndexResponse response = null;
		if (client.indices().exists(request, RequestOptions.DEFAULT)) {
			response = client.indices().get(new GetIndexRequest(indexNames), RequestOptions.DEFAULT);
		}
		return response;
	}
}