ElasticReindexProcessor.java

/*
 * Copyright 2020 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.gringlobal.component.elastic;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;

import javax.annotation.Resource;

import lombok.extern.slf4j.Slf4j;
import org.gringlobal.service.ElasticsearchService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * ES Processor component uses Spring's @Scheduled annotation to scan queues
 * with 2000ms delay measured from the completion time of each preceding
 * invocation.
 */
@Component("elasticReindexProcessor")
@Slf4j
public class ElasticReindexProcessor {
	/** The Constant LOG. */
	private static final Logger LOGtest = LoggerFactory.getLogger("org.gringlobal.test.Elasticsearch");

	private static final int BATCH_SIZE = 100;

	@Resource
	private BlockingQueue<ElasticReindex> elasticReindexQueue;

	@Autowired
	private ElasticsearchService elasticsearch;

	public ElasticReindexProcessor() {
		System.err.println("Made ElasticReindexProcessor");
	}

	/**
	 * Process queues.
	 */
	@Scheduled(fixedDelay = 1100)
	public void processQueues() {
		if (log.isTraceEnabled()) {
			log.trace("Scanning ES update queue");
		}
		// LOGtest.debug("Scanning ES update queue size:{}", elasticReindexQueue.size());

		List<ElasticReindex> forReindexing = new ArrayList<>(200);
		Map<Class<?>, Set<Long>> buckets = new HashMap<>();

		while (elasticReindexQueue.drainTo(forReindexing, 200) > 0) {
			if (log.isDebugEnabled()) log.debug("Remaining for reindex={} handling={}", elasticReindexQueue.size(), forReindexing.size());
			if (LOGtest.isInfoEnabled()) LOGtest.info("Remaining for reindex={} handling={}", elasticReindexQueue.size(), forReindexing.size());
			forReindexing.forEach(er -> bucketize(buckets, er));
			forReindexing.clear();
		}

		if (!buckets.isEmpty()) {
			for (Map.Entry<Class<?>, Set<Long>> entry : buckets.entrySet()) {
				Set<Long> bucket = entry.getValue();
				if (LOGtest.isInfoEnabled()) LOGtest.info("ElasticReindexProcessor remainder {}: {}", entry.getKey(), bucket);
				elasticsearch.asyncUpdate(entry.getKey(), bucket);
				bucket.clear();
			}
			buckets.clear();
		}
	}

	private void bucketize(final Map<Class<?>, Set<Long>> buckets, final ElasticReindex toReindex) {
		if (toReindex == null)
			return;

		Class<?> clazz = toReindex.getClazz();
		Set<Long> bucket = buckets.computeIfAbsent(clazz, k -> Collections.synchronizedSet(new HashSet<Long>()));
		bucket.add(toReindex.getId());

		if (bucket.size() >= BATCH_SIZE) {
			if (LOGtest.isInfoEnabled()) LOGtest.info("ElasticReindexProcessor bucket {}: {}", clazz, bucket);
			elasticsearch.asyncUpdate(clazz, bucket);
			bucket.clear();
		}
	}

}