FirehoseJPAListener.java
/*
* Copyright 2021 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gringlobal.component.firehose;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.time.Instant;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.elasticsearch.common.inject.Singleton;
import org.genesys.blocks.auditlog.model.AuditLog;
import org.genesys.blocks.model.EmptyModel;
import org.genesys.blocks.security.SecurityContextUtil;
import org.gringlobal.component.firehose.FirehoseEvent.EventType;
import org.springframework.aop.framework.Advised;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import com.google.common.collect.Streams;
/**
* This listener is attached to save and remove methods of JPA repositories.
*
* It determines if records are inserted, updated or deleted and prepares and
* fires a Message (for each save/delete call) for the FirehoseEventListener.
*
* The message contains the timestamp, type and a list of object references
* (List<Ref>). Note: save method may generate two events: one for inserts and
* one for updates.
*
* @author Artem Hrybeniuk
* @author Matija Obreza
*/
@Aspect
@Component("firehoseJPAListener")
@Singleton
@Slf4j
public class FirehoseJPAListener implements InitializingBean {
private static final Set<Class<?>> AUDITLOG = Set.of(AuditLog.class);
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
private Set<Class<?>> includedClasses;
public FirehoseJPAListener() {
System.err.println("Made an instance of FirehoseJPAListener: " + this);
}
@Override
public void afterPropertiesSet() throws Exception {
includedClasses = Set.copyOf(includedClasses); // make it immutable
}
public void setIncludedClasses(Set<Class<?>> includedClasses) {
this.includedClasses = includedClasses;
}
@Around(value = "(execution(* org.springframework.data.jpa.repository.JpaRepository.save(..)) || execution(* org.springframework.data.jpa.repository.JpaRepository.saveAndFlush(..))) && args(saveArg)")
public Object aroundPersist(final ProceedingJoinPoint joinPoint, Object saveArg) throws Throwable {
if (log.isTraceEnabled()) {
log.trace("JPA around persist {} {} save:{}", joinPoint.getKind(), joinPoint.getSignature(), saveArg);
}
if (saveArg == null) {
return joinPoint.proceed();
}
try {
var timestamp = Instant.now();
if (isIncluded(saveArg, AUDITLOG)) {
log.trace("AuditLog is being saved!");
// Monitor changes by looking at AuditLogs
var result = joinPoint.proceed();
var model = (AuditLog) result;
applicationEventPublisher.publishEvent(new FirehoseAuditLogEvent(model));
return result;
} else if (isIncluded(saveArg, includedClasses)) {
log.trace("JPA persisting {}", saveArg);
EmptyModel model = (EmptyModel) saveArg;
var eventType = model.isNew() ? EventType.CREATE : EventType.UPDATE;
var result = joinPoint.proceed();
FirehoseEvent firehoseEvent = createFirehoseEvent((EmptyModel) result, timestamp, eventType);
applicationEventPublisher.publishEvent(firehoseEvent);
return result;
} else {
// Don't bother
log.trace("Not considering {}", saveArg);
return joinPoint.proceed();
}
} catch (Throwable e) {
log.error("Error in FirehoseJPAListener: {}", e.getMessage(), e);
throw e;
}
}
@Around(value = "(execution(* org.springframework.data.repository.*.saveAll(java.lang.Iterable)) || execution(* org.springframework.data.jpa.repository.JpaRepository.saveAllAndFlush(java.lang.Iterable)))")
public Object aroundPersistAll(final ProceedingJoinPoint joinPoint) throws Throwable {
// System.err.println("Firehose saveAll");
Iterable<?> saveArg = (Iterable<?>) joinPoint.getArgs()[0];
if (log.isDebugEnabled()) {
log.debug("JPA around persist all {} {} save:{}", joinPoint.getKind(), joinPoint.getSignature(), saveArg);
}
if (saveArg == null) {
return joinPoint.proceed();
}
try {
var timestamp = Instant.now();
// Monitor changes by looking at AuditLogs
if (isIncludedIterable(saveArg, AUDITLOG)) {
log.trace("AuditLogs are being saved!");
var result = joinPoint.proceed();
Streams.stream((Iterable<?>) result).forEach(entity -> {
var model = (AuditLog) entity;
applicationEventPublisher.publishEvent(new FirehoseAuditLogEvent(model));
});
return result;
}
if (! isIncludedIterable(saveArg, includedClasses)) {
return joinPoint.proceed();
}
log.trace("JPA persisting {}", saveArg);
// Create and send all saved objects separately as FirehoseEvent to the FirehoseEventListener
var objectsIds = Streams.stream(saveArg)
.filter(entity -> entity instanceof EmptyModel)
.map(entity -> ((EmptyModel) entity).getId())
.filter(Objects::nonNull)
.collect(Collectors.toSet());
log.trace("Have {} objects that have ID", objectsIds.size());
var result = joinPoint.proceed();
if (result instanceof Iterable) {
// log.debug("Processing JPA save result!");
Streams.stream((Iterable<?>) result)
.filter(Objects::nonNull)
// .peek(x -> log.debug("Processing: {}", x))
.filter(entity -> entity instanceof EmptyModel)
.map(entity -> createFirehoseEvent((EmptyModel)entity, timestamp, objectsIds.contains(((EmptyModel)entity).getId()) ? EventType.UPDATE : EventType.CREATE))
// .peek(x -> log.debug("Publishing: {}", x))
.forEach(applicationEventPublisher::publishEvent);
} else {
log.warn("JPA result is not Iterable but {}", result == null ? null : result.getClass());
}
return result;
} catch (Throwable e) {
log.error("Error in FirehoseJPAListener: {}", e.getMessage(), e);
throw e;
}
}
/**
* Check if the iterable should be processed.
*
* @param iterable
* @param allowedClasses list of types we're interested in
* @return true if the first element of the iterable is included in Firehose processing
*/
private boolean isIncludedIterable(Iterable<?> iterable, Set<Class<?>> allowedClasses) {
var it = iterable.iterator();
return it.hasNext() && isIncluded(it.next(), allowedClasses);
}
/**
* Is the object included in Firehose processing?
*
* @param object the object
* @param allowedClasses list of types we're interested in
* @return true if the object is included in Firehose processing
*/
private boolean isIncluded(Object object, Set<Class<?>> allowedClasses) {
var result = object != null && allowedClasses.contains(object.getClass()) && (object instanceof EmptyModel);
// if (!result) {
// log.trace("{} is not included!!", object == null ? null : object.getClass());
// }
return result;
}
@After(value = "execution(* org.springframework.data.jpa.repository.JpaRepository.delete(..)) || execution(* org.springframework.data.jpa.repository.JpaRepository.deleteAll(java.lang.Iterable)) || execution(* org.springframework.data.jpa.repository.JpaRepository.deleteInBatch(java.lang.Iterable))")
public void afterRemove(final JoinPoint joinPoint) {
final Object[] args = joinPoint.getArgs();
try {
final Object toRemove = args[0];
if (toRemove == null) {
return;
}
var timestamp = Instant.now();
if (toRemove instanceof Iterable<?>) {
if (isIncludedIterable((Iterable<?>) toRemove, includedClasses)) {
if (log.isTraceEnabled()) {
log.trace("JPA afterRemove: {} {}", joinPoint.toLongString(), joinPoint.getTarget());
}
// send events!
Streams.stream((Iterable<?>) toRemove).map(entity -> createFirehoseEvent((EmptyModel) entity, timestamp, EventType.DELETE)).forEach(applicationEventPublisher::publishEvent);
}
} else if (isIncluded(toRemove, includedClasses)) {
if (log.isTraceEnabled()) {
log.trace("JPA afterRemove: {} {}", joinPoint.toLongString(), joinPoint.getTarget());
}
FirehoseEvent firehoseEvent = createFirehoseEvent((EmptyModel) toRemove, timestamp, EventType.DELETE);
// send event!
applicationEventPublisher.publishEvent(firehoseEvent);
}
} catch (Throwable e) {
log.error("Error in FirehoseJPAListener: {}", e.getMessage(), e);
throw e;
}
}
/**
* Delete all
*
* @param joinPoint the join point
*/
@After(value = "execution(* org.springframework.data.jpa.repository.JpaRepository.deleteAll()) || execution(* org.springframework.data.jpa.repository.JpaRepository.deleteAllInBatch())")
public void afterDeleteAll(final JoinPoint joinPoint) {
try {
log.trace("JPA afterDeleteAll: {} {}", joinPoint.toLongString(), joinPoint.getTarget());
Object proxy = joinPoint.getTarget();
if (proxy instanceof Advised) {
Advised x = (Advised) proxy;
for (Class<?> foo : x.getProxiedInterfaces()) {
for (Type generic : foo.getGenericInterfaces()) {
if (generic instanceof ParameterizedType) {
var typeArg = ((ParameterizedType) generic).getActualTypeArguments()[0];
if (typeArg instanceof Class<?> && includedClasses.contains(typeArg)) {
log.trace("Delete all documents for {}", typeArg);
applicationEventPublisher.publishEvent(new FirehoseDeleteAllEvent((Class<?>) typeArg, Instant.now()));
} else {
log.trace("Skipping {}", typeArg);
}
}
}
}
}
} catch (Throwable e) {
log.error("Error in FirehoseJPAListener: {}", e.getMessage(), e);
throw e;
}
}
/**
* Creates a FirehoseEvent instance
*
* @param <T> the type
* @param object object of type EmptyModel
* @param timestamp the timestamp of JPA event
* @param eventType the event type, can be null
*
* @return an event instance
*/
private <T extends EmptyModel> FirehoseEvent createFirehoseEvent(T object, Instant timestamp, EventType eventType) {
assert(object != null);
var aclSid = SecurityContextUtil.getCurrentUser();
var fhe = new FirehoseEvent(object.getClass(), object.getId(), timestamp, eventType, object);
fhe.setSid(aclSid == null ? null : aclSid.getId());
return fhe;
}
}