ElasticReindexProcessor.java
/*
* Copyright 2020 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gringlobal.component.elastic;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.gringlobal.service.ElasticsearchService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* ES Processor component uses Spring's @Scheduled annotation to scan queues
* with 2000ms delay measured from the completion time of each preceding
* invocation.
*/
@Component("elasticReindexProcessor")
@Slf4j
public class ElasticReindexProcessor {
/** The Constant LOG. */
private static final Logger LOGtest = LoggerFactory.getLogger("org.gringlobal.test.Elasticsearch");
private static final int BATCH_SIZE = 100;
@Resource
private BlockingQueue<ElasticReindex> elasticReindexQueue;
@Autowired
private ElasticsearchService elasticsearch;
public ElasticReindexProcessor() {
System.err.println("Made ElasticReindexProcessor");
}
/**
* Process queues.
*/
@Scheduled(fixedDelay = 1100)
public void processQueues() {
if (log.isTraceEnabled()) {
log.trace("Scanning ES update queue");
}
// LOGtest.debug("Scanning ES update queue size:{}", elasticReindexQueue.size());
List<ElasticReindex> forReindexing = new ArrayList<>(200);
Map<Class<?>, Set<Long>> buckets = new HashMap<>();
while (elasticReindexQueue.drainTo(forReindexing, 200) > 0) {
if (log.isDebugEnabled()) log.debug("Remaining for reindex={} handling={}", elasticReindexQueue.size(), forReindexing.size());
if (LOGtest.isInfoEnabled()) LOGtest.info("Remaining for reindex={} handling={}", elasticReindexQueue.size(), forReindexing.size());
forReindexing.forEach(er -> bucketize(buckets, er));
forReindexing.clear();
}
if (!buckets.isEmpty()) {
for (Map.Entry<Class<?>, Set<Long>> entry : buckets.entrySet()) {
Set<Long> bucket = entry.getValue();
if (LOGtest.isInfoEnabled()) LOGtest.info("ElasticReindexProcessor remainder {}: {}", entry.getKey(), bucket);
elasticsearch.asyncUpdate(entry.getKey(), bucket);
bucket.clear();
}
buckets.clear();
}
}
private void bucketize(final Map<Class<?>, Set<Long>> buckets, final ElasticReindex toReindex) {
if (toReindex == null)
return;
Class<?> clazz = toReindex.getClazz();
Set<Long> bucket = buckets.computeIfAbsent(clazz, k -> Collections.synchronizedSet(new HashSet<Long>()));
bucket.add(toReindex.getId());
if (bucket.size() >= BATCH_SIZE) {
if (LOGtest.isInfoEnabled()) LOGtest.info("ElasticReindexProcessor bucket {}: {}", clazz, bucket);
elasticsearch.asyncUpdate(clazz, bucket);
bucket.clear();
}
}
}