FirehoseJPAListener.java

/*
 * Copyright 2021 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gringlobal.component.firehose;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.time.Instant;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.elasticsearch.common.inject.Singleton;
import org.genesys.blocks.auditlog.model.AuditLog;
import org.genesys.blocks.model.EmptyModel;
import org.genesys.blocks.security.SecurityContextUtil;
import org.gringlobal.component.firehose.FirehoseEvent.EventType;
import org.springframework.aop.framework.Advised;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

import com.google.common.collect.Streams;

/**
 * This listener is attached to save and remove methods of JPA repositories.
 * 
 * It determines if records are inserted, updated or deleted and prepares and
 * fires a Message (for each save/delete call) for the FirehoseEventListener.
 * 
 * The message contains the timestamp, type and a list of object references
 * (List<Ref>). Note: save method may generate two events: one for inserts and
 * one for updates.
 *
 * @author Artem Hrybeniuk
 * @author Matija Obreza
 */
@Aspect
@Component("firehoseJPAListener")
@Singleton
@Slf4j
public class FirehoseJPAListener implements InitializingBean {

	private static final Set<Class<?>> AUDITLOG = Set.of(AuditLog.class);

	@Autowired
	private ApplicationEventPublisher applicationEventPublisher;

	private Set<Class<?>> includedClasses;

	public FirehoseJPAListener() {
		System.err.println("Made an instance of FirehoseJPAListener: " + this);
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		includedClasses = Set.copyOf(includedClasses); // make it immutable
	}

	public void setIncludedClasses(Set<Class<?>> includedClasses) {
		this.includedClasses = includedClasses;
	}

	@Around(value = "(execution(* org.springframework.data.jpa.repository.JpaRepository.save(..)) || execution(* org.springframework.data.jpa.repository.JpaRepository.saveAndFlush(..))) && args(saveArg)")
	public Object aroundPersist(final ProceedingJoinPoint joinPoint, Object saveArg) throws Throwable {
		if (log.isTraceEnabled()) {
			log.trace("JPA around persist {} {} save:{}", joinPoint.getKind(), joinPoint.getSignature(), saveArg);
		}
		if (saveArg == null) {
			return joinPoint.proceed();
		}

		try {
			var timestamp = Instant.now();
			if (isIncluded(saveArg, AUDITLOG)) {

				log.trace("AuditLog is being saved!");

				// Monitor changes by looking at AuditLogs
				var result = joinPoint.proceed();
				var model = (AuditLog) result;
				applicationEventPublisher.publishEvent(new FirehoseAuditLogEvent(model));
				return result;

			} else if (isIncluded(saveArg, includedClasses)) {

				log.trace("JPA persisting {}", saveArg);

				EmptyModel model = (EmptyModel) saveArg;
				var eventType = model.isNew() ? EventType.CREATE : EventType.UPDATE;
				var result = joinPoint.proceed();
				FirehoseEvent firehoseEvent = createFirehoseEvent((EmptyModel) result, timestamp, eventType);
				applicationEventPublisher.publishEvent(firehoseEvent);
				return result;

			} else {
				// Don't bother
				log.trace("Not considering {}", saveArg);
				return joinPoint.proceed();
			}
		} catch (Throwable e) {
			log.error("Error in FirehoseJPAListener: {}", e.getMessage(), e);
			throw e;
		}
	}

	@Around(value = "(execution(* org.springframework.data.repository.*.saveAll(java.lang.Iterable)) || execution(* org.springframework.data.jpa.repository.JpaRepository.saveAllAndFlush(java.lang.Iterable)))")
	public Object aroundPersistAll(final ProceedingJoinPoint joinPoint) throws Throwable {
		// System.err.println("Firehose saveAll");
		Iterable<?> saveArg = (Iterable<?>) joinPoint.getArgs()[0];
		if (log.isDebugEnabled()) {
			log.debug("JPA around persist all {} {} save:{}", joinPoint.getKind(), joinPoint.getSignature(), saveArg);
		}
		if (saveArg == null) {
			return joinPoint.proceed();
		}

		try {
			var timestamp = Instant.now();

			// Monitor changes by looking at AuditLogs
			if (isIncludedIterable(saveArg, AUDITLOG)) {
				log.trace("AuditLogs are being saved!");
				var result = joinPoint.proceed();
				Streams.stream((Iterable<?>) result).forEach(entity -> {
					var model = (AuditLog) entity;
					applicationEventPublisher.publishEvent(new FirehoseAuditLogEvent(model));
				});
				return result;
			}

			if (! isIncludedIterable(saveArg, includedClasses)) {
				return joinPoint.proceed();
			}
			log.trace("JPA persisting {}", saveArg);

			// Create and send all saved objects separately as FirehoseEvent to the FirehoseEventListener
			var objectsIds = Streams.stream(saveArg)
				.filter(entity -> entity instanceof EmptyModel)
				.map(entity -> ((EmptyModel) entity).getId())
				.filter(Objects::nonNull)
				.collect(Collectors.toSet());
			log.trace("Have {} objects that have ID", objectsIds.size());

			var result = joinPoint.proceed();
			if (result instanceof Iterable) {
				// log.debug("Processing JPA save result!");
				Streams.stream((Iterable<?>) result)
					.filter(Objects::nonNull)
					// .peek(x -> log.debug("Processing: {}", x))
					.filter(entity -> entity instanceof EmptyModel)
					.map(entity -> createFirehoseEvent((EmptyModel)entity, timestamp, objectsIds.contains(((EmptyModel)entity).getId()) ? EventType.UPDATE : EventType.CREATE))
					// .peek(x -> log.debug("Publishing: {}", x))
					.forEach(applicationEventPublisher::publishEvent);
			} else {
				log.warn("JPA result is not Iterable but {}", result == null ? null : result.getClass());
			}
			return result;

		} catch (Throwable e) {
			log.error("Error in FirehoseJPAListener: {}", e.getMessage(), e);
			throw e;
		}
	}

	/**
	 * Check if the iterable should be processed.
	 * 
	 * @param iterable
	 * @param allowedClasses list of types we're interested in
	 * @return true if the first element of the iterable is included in Firehose processing
	 */
	private boolean isIncludedIterable(Iterable<?> iterable, Set<Class<?>> allowedClasses) {
		var it = iterable.iterator();
		return it.hasNext() && isIncluded(it.next(), allowedClasses);
	}
	
	/**
	 * Is the object included in Firehose processing?
	 * 
	 * @param object the object
	 * @param allowedClasses list of types we're interested in
	 * @return true if the object is included in Firehose processing
	 */
	private boolean isIncluded(Object object, Set<Class<?>> allowedClasses) {
		var result = object != null && allowedClasses.contains(object.getClass()) && (object instanceof EmptyModel);
		// if (!result) {
		// 	log.trace("{} is not included!!", object == null ? null : object.getClass());
		// }
		return result;
	}

	@After(value = "execution(* org.springframework.data.jpa.repository.JpaRepository.delete(..)) || execution(* org.springframework.data.jpa.repository.JpaRepository.deleteAll(java.lang.Iterable)) || execution(* org.springframework.data.jpa.repository.JpaRepository.deleteInBatch(java.lang.Iterable))")
	public void afterRemove(final JoinPoint joinPoint) {
		final Object[] args = joinPoint.getArgs();
		try {
			final Object toRemove = args[0];
			if (toRemove == null) {
				return;
			}
			var timestamp = Instant.now();

			if (toRemove instanceof Iterable<?>) {
				if (isIncludedIterable((Iterable<?>) toRemove, includedClasses)) {
					if (log.isTraceEnabled()) {
						log.trace("JPA afterRemove: {} {}", joinPoint.toLongString(), joinPoint.getTarget());
					}
					// send events!
					Streams.stream((Iterable<?>) toRemove).map(entity -> createFirehoseEvent((EmptyModel) entity, timestamp, EventType.DELETE)).forEach(applicationEventPublisher::publishEvent);
				}

			} else if (isIncluded(toRemove, includedClasses)) {

				if (log.isTraceEnabled()) {
					log.trace("JPA afterRemove: {} {}", joinPoint.toLongString(), joinPoint.getTarget());
				}
				FirehoseEvent firehoseEvent = createFirehoseEvent((EmptyModel) toRemove, timestamp, EventType.DELETE);
				// send event!
				applicationEventPublisher.publishEvent(firehoseEvent);
			}

		} catch (Throwable e) {
			log.error("Error in FirehoseJPAListener: {}", e.getMessage(), e);
			throw e;
		}
	}


	/**
	 * Delete all
	 *
	 * @param joinPoint the join point
	 */
	@After(value = "execution(* org.springframework.data.jpa.repository.JpaRepository.deleteAll()) || execution(* org.springframework.data.jpa.repository.JpaRepository.deleteAllInBatch())")
	public void afterDeleteAll(final JoinPoint joinPoint) {
		try {
			log.trace("JPA afterDeleteAll: {} {}", joinPoint.toLongString(), joinPoint.getTarget());
			Object proxy = joinPoint.getTarget();
			if (proxy instanceof Advised) {
				Advised x = (Advised) proxy;
				for (Class<?> foo : x.getProxiedInterfaces()) {
					for (Type generic : foo.getGenericInterfaces()) {
						if (generic instanceof ParameterizedType) {
							var typeArg = ((ParameterizedType) generic).getActualTypeArguments()[0];
							if (typeArg instanceof Class<?> && includedClasses.contains(typeArg)) {
								log.trace("Delete all documents for {}", typeArg);
								applicationEventPublisher.publishEvent(new FirehoseDeleteAllEvent((Class<?>) typeArg, Instant.now()));
							} else {
								log.trace("Skipping {}", typeArg);
							}
						}
					}
				}
			}
		} catch (Throwable e) {
			log.error("Error in FirehoseJPAListener: {}", e.getMessage(), e);
			throw e;
		}
	}

	/**
	 * Creates a FirehoseEvent instance
	 * 
	 * @param <T> the type
	 * @param object object of type EmptyModel
	 * @param timestamp the timestamp of JPA event
	 * @param eventType the event type, can be null
	 * 
	 * @return an event instance
	 */
	private <T extends EmptyModel> FirehoseEvent createFirehoseEvent(T object, Instant timestamp, EventType eventType) {
		assert(object != null);
		var aclSid = SecurityContextUtil.getCurrentUser();
		var fhe = new FirehoseEvent(object.getClass(), object.getId(), timestamp, eventType, object);
		fhe.setSid(aclSid == null ? null : aclSid.getId());
		return fhe;
	}

}