EmbeddedNodeFactoryBean.java

/*
 * Copyright 2020 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gringlobal.component.elastic;

import static java.util.Arrays.*;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.common.logging.LogConfigurator;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.node.InternalSettingsPreparer;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.Netty4Plugin;
import org.elasticsearch.transport.TransportService;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.elasticsearch.client.NodeClientFactoryBean;

/**
 * Based on {@link NodeClientFactoryBean }
 */
@Slf4j
public class EmbeddedNodeFactoryBean implements FactoryBean<EmbeddedNodeFactoryBean.EmbeddedNode>, InitializingBean, DisposableBean {

	private String clusterName;
	private EmbeddedNode node;
	private String pathData;
	private String pathHome;
	private String pathConfiguration;
	private String portRange = "9200-9300"; // default

	public static class EmbeddedNode extends Node {

		public EmbeddedNode(Settings preparedSettings, Collection<Class<? extends Plugin>> classpathPlugins) {

			super(InternalSettingsPreparer.prepareEnvironment(preparedSettings, null), classpathPlugins, false);
		}

		protected void registerDerivedNodeNameWithLogger(String nodeName) {
			try {
				LogConfigurator.setNodeName(nodeName);
			} catch (Exception e) {
				// nagh - just forget about it
			}
		}
		
		@Override
		public EmbeddedNode start() throws NodeValidationException {
			super.start();
			return this;
		}

		public int getPort() {
			var transportService = injector().getInstance(TransportService.class);
			for (var localAddr : transportService.getLocalAddresses()) {
				log.debug("Running ES Embedded node on: {}", localAddr);
			}
			var esHttpTransport = injector().getInstance(HttpServerTransport.class);
			for (var boundAddress : esHttpTransport.boundAddress().boundAddresses()) {
				log.warn("ES Embedded node listening on: {}", boundAddress.address());
				return boundAddress.getPort();
			}
			return 9200; // stick to defaults
		}
	}

	public EmbeddedNodeFactoryBean() {
	}

	@Override
	public EmbeddedNode getObject() {
		return node;
	}

	@Override
	public Class<? extends Node> getObjectType() {
		return EmbeddedNode.class;
	}

	@Override
	public boolean isSingleton() {
		return true;
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		log.info("Starting ES Embedded node on port: {}", this.portRange);

		/* @formatter:off */
		Settings settings = Settings.builder()
				.put(loadConfig())
				.put("discovery.type", "single-node")
				.put("transport.type", "netty4")
				.put("http.port", this.portRange) // change port
				.put("http.type", "netty4")
				.put("path.home", this.pathHome)
				.put("path.data", this.pathData)
				.put("cluster.name", this.clusterName)
				.put("cluster.routing.allocation.disk.threshold_enabled", false)
				.put("cluster.remote.connect", false)
				.put("cluster.persistent_tasks.allocation.enable", "none")
				.put("node.max_local_storage_nodes", 1)
			.build();
		/* @formatter:on */

		node = new EmbeddedNode(settings, asList(Netty4Plugin.class, ReindexPlugin.class)).start();
	}

	private Settings loadConfig() throws IOException {
		if (! StringUtils.isBlank(pathConfiguration)) {
			InputStream stream = getClass().getClassLoader().getResourceAsStream(pathConfiguration);
			if (stream != null) {
				return Settings.builder().loadFromStream(pathConfiguration, getClass().getClassLoader().getResourceAsStream(pathConfiguration), false).build();
			}
			log.error("Unable to read node configuration from file [{}]", pathConfiguration);
		}
		return Settings.builder().build();
	}

	public void setClusterName(String clusterName) {
		this.clusterName = clusterName;
	}

	public void setPathData(String pathData) {
		this.pathData = pathData;
	}

	public void setPathHome(String pathHome) {
		this.pathHome = pathHome;
	}

	public void setPathConfiguration(String configuration) {
		this.pathConfiguration = configuration;
	}

	public void setPortRange(String portRange) {
		this.portRange = portRange;
	}

	@Override
	public void destroy() {
		try {
			if (node != null) {
				log.info("Closing elasticSearch node");
				node.close();
				node = null;
			} else {
				log.info("Node is null");
			}
		} catch (final Exception e) {
			log.error("Error closing ElasticSearch node: ", e);
		}
	}

}