FirehoseActionListener.java
/*
* Copyright 2024 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gringlobal.notification.action;
import java.time.Instant;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import com.google.common.collect.Lists;
import lombok.NoArgsConstructor;
import org.gringlobal.component.firehose.FirehoseDelayedEvent;
import org.gringlobal.component.firehose.FirehoseEvent.EventType;
import org.genesys.blocks.auditlog.service.ClassPKService;
import org.genesys.blocks.model.EmptyModel;
import org.genesys.blocks.security.model.AclSid;
import org.gringlobal.component.firehose.FirehoseAuditLogEvent;
import org.gringlobal.model.AbstractAction;
import org.gringlobal.model.QAbstractAction;
import org.gringlobal.model.notification.NotificationMessage;
import org.gringlobal.service.NotificationMessageService;
import org.gringlobal.service.UserService;
import org.gringlobal.service.WorkflowService;
import org.gringlobal.spring.TransactionHelper;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.context.event.EventListener;
import org.springframework.core.type.filter.AssignableTypeFilter;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
/**
* Listen to Firehose events and generate notifications about actions (implementations of {@link AbstractAction}).
*/
@Component
@Slf4j
public class FirehoseActionListener implements InitializingBean {
private static final String FIELD_IS_DONE = QAbstractAction.abstractAction.isDone.getMetadata().getName(); // "isDone";
private static final String FIELD_ASSIGNEE = QAbstractAction.abstractAction.assignee().getMetadata().getName(); // "assignee";
private static final String FIELD_COMPLETED_DATE = QAbstractAction.abstractAction.completedDate.getMetadata().getName(); // "completedDate";
private static final String FIELD_NOT_BEFORE_DATE = QAbstractAction.abstractAction.notBeforeDate.getMetadata().getName(); // "notBeforeDate";
private static final String ACTION_ASSIGNED = "notification.action.assigned";
private static final String ACTION_REASSIGNED = "notification.action.reassigned";
private static final String ACTION_UPDATED = "notification.action.updated";
/** Names of classes we're monitoring for audit log changes */
private Set<String> ACTION_CLASSNAMES;
@Autowired
private NotificationMessageService messageService;
@Autowired
private ClassPKService classPkService;
@Autowired
private WorkflowService workflowService;
@Autowired
protected UserService userService;
/**
* Keep track assignees for actions. Key is action#class+id, value: assignee#id
*/
private Map<ActionId, Long> assigneeForAction = new LinkedHashMap<>(100);
/** Tracking original action states with AuditLogs */
private Map<ActionState, Optional<Object>> stateForAction = new LinkedHashMap<>();
/*
* Find implementations of AbstractAction and put their names info ACTION_CLASSNAMES.
*/
@Override
public void afterPropertiesSet() throws Exception {
ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false);
provider.addIncludeFilter(new AssignableTypeFilter(AbstractAction.class));
var actionClasses = new HashSet<String>();
Set<BeanDefinition> components = provider.findCandidateComponents(AbstractAction.class.getPackageName().replaceAll("\\.", "/")); // Scan
for (BeanDefinition component : components) {
log.info("Handling actions of type {}", component.getBeanClassName());
actionClasses.add(component.getBeanClassName());
}
ACTION_CLASSNAMES = Set.copyOf(actionClasses);
}
/**
* Get number of elements still queued for processing. A large number might
* indicate that elements are not cleared and we're holding on to data we don't
* need.
*
* @return number of actions that need processing.
*/
public int getQueueSize() {
return assigneeForAction.size();
}
/**
* Listen to inserts of AuditLog data and extract assignee ID ({@link AbstractAction#getAssignee()}).
*/
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void rememberOriginalAssignee(FirehoseAuditLogEvent firehoseEvent) {
var auditLog = firehoseEvent.getAuditLog();
var auditedClassName = auditLog.getClassPk().getClassname();
// if (auditLog.getAction() == AuditAction.DELETE) {}
if (auditedClassName != null && ACTION_CLASSNAMES.contains(auditedClassName)) {
if (Objects.equals(auditLog.getPropertyName(), FIELD_ASSIGNEE)) {
var actionId = auditLog.getEntityId();
// var assigneeIdStr = auditLog.getPreviousState();
var originalAssignee = (EmptyModel) auditLog.getPreviousEntity();
var assigneeId = originalAssignee == null ? null : originalAssignee.getId();
log.debug("PRE-UPDATE {} id:{} had assignee:{}", auditedClassName, actionId, assigneeId);
// If action belongs to someone already then ignore the change!
assigneeForAction.putIfAbsent(new ActionId(actionId, auditedClassName), assigneeId);
} else if (Objects.equals(auditLog.getPropertyName(), FIELD_COMPLETED_DATE)) {
var actionId = auditLog.getEntityId();
var completedDate = (Instant) auditLog.getPreviousEntity();
log.debug("PRE-UPDATE {} id:{} had completedDate:{}", auditedClassName, actionId, completedDate);
stateForAction.putIfAbsent(new ActionState(actionId, auditedClassName, FIELD_COMPLETED_DATE), Optional.ofNullable(completedDate));
} else if (Objects.equals(auditLog.getPropertyName(), FIELD_IS_DONE)) {
var actionId = auditLog.getEntityId();
var isDoneYN = auditLog.getPreviousState();
log.debug("PRE-UPDATE {} id:{} had isDone:{}", auditedClassName, actionId, isDoneYN);
stateForAction.putIfAbsent(new ActionState(actionId, auditedClassName, FIELD_IS_DONE), Optional.ofNullable(isDoneYN));
} else if (Objects.equals(auditLog.getPropertyName(), FIELD_NOT_BEFORE_DATE)) {
var actionId = auditLog.getEntityId();
var notBeforeDate = auditLog.getPreviousState();
log.debug("PRE-UPDATE {} id:{} had notBeforeDate:{}", auditedClassName, actionId, notBeforeDate);
stateForAction.putIfAbsent(new ActionState(actionId, auditedClassName, FIELD_NOT_BEFORE_DATE), Optional.ofNullable(notBeforeDate));
}
}
}
/**
* Check (delayed) messages sent by Firehose for {@link AbstractAction} types and
* generate notification messages to current and past assignees.
*/
@EventListener
public void handleEvent(FirehoseDelayedEvent firehoseEvent) throws Exception {
Class<?> clazz = firehoseEvent.getClazz();
Object entity = firehoseEvent.getEntity();
if (entity != null && AbstractAction.class.isAssignableFrom(clazz)) {
// log.trace("Handling event:{}", firehoseEvent);
AbstractAction<?> action = (AbstractAction<?>) entity;
var actionClassName = action.getClass().getName();
var actionId = new ActionId(action.getId(), actionClassName);
var wasReassigned = assigneeForAction.containsKey(actionId);
var originalCompletedDate = stateForAction.remove(new ActionState(action.getId(), actionClassName, FIELD_COMPLETED_DATE));
var originalIsDone = stateForAction.remove(new ActionState(action.getId(), actionClassName, FIELD_IS_DONE));
var originalNotBeforeDate = stateForAction.remove(new ActionState(action.getId(), actionClassName, FIELD_NOT_BEFORE_DATE));
var previousAssigneeId = assigneeForAction.remove(actionId);
var currentAssigneeId = action.getAssignee() == null ? null : action.getAssignee().getId();
var causedBySid = firehoseEvent.getSid();
if (wasReassigned && previousAssigneeId != null && !Objects.equals(previousAssigneeId, currentAssigneeId)) {
log.debug("{} {} id:{} reassigned from:{} to assignee:{}", firehoseEvent.getEventType(), action.getClass().getSimpleName(), action.getId(), previousAssigneeId, currentAssigneeId);
if (firehoseEvent.getEventType() == EventType.UPDATE) {
// Send a message to previous and current assignee that action was reassigned
log.debug(" Send a message to previous:{} and current assignee:{} that action was reassigned", previousAssigneeId, currentAssigneeId);
if (previousAssigneeId != null) {
messageService.removePendingMessages(new AclSid(previousAssigneeId), classPkService.getClassPk(action.getClass()), action.getId());
if (!Objects.equals(causedBySid, previousAssigneeId)) {
messageService.createFast(new NotificationMessage(new AclSid(previousAssigneeId), firehoseEvent.getTimestamp(), ACTION_REASSIGNED, classPkService.getClassPk(action.getClass()), action.getId()));
} else {
log.debug(" Not notifying previous:{} causedBy:{}", previousAssigneeId, causedBySid);
}
}
if (currentAssigneeId != null && !Objects.equals(causedBySid, currentAssigneeId)) {
messageService.createFast(new NotificationMessage(new AclSid(currentAssigneeId), firehoseEvent.getTimestamp(), ACTION_REASSIGNED, classPkService.getClassPk(action.getClass()), action.getId()));
} else {
log.debug(" Not notifying assignee:{} causedBy:{}", currentAssigneeId, causedBySid);
}
}
} else if (wasReassigned) {
log.debug("{} {} id:{} assigned to assignee:{}", firehoseEvent.getEventType(), action.getClass().getSimpleName(), action.getId(), currentAssigneeId);
if (firehoseEvent.getEventType() == EventType.CREATE || firehoseEvent.getEventType() == EventType.UPDATE) {
log.debug(" Send a message that action was assigned to current assignee:{}", currentAssigneeId);
// Send a message that an action was assigned to current assignee
if (currentAssigneeId != null && !Objects.equals(causedBySid, currentAssigneeId)) {
messageService.createFast(new NotificationMessage(new AclSid(currentAssigneeId), firehoseEvent.getTimestamp(), ACTION_ASSIGNED, classPkService.getClassPk(action.getClass()), action.getId()));
} else {
log.debug(" Not notifying assignee:{} causedBy:{}", currentAssigneeId, causedBySid);
}
} else if (firehoseEvent.getEventType() == EventType.DELETE) {
// Remove pending notification
log.debug(" Remove pending messages for deleted action {}#{}!", actionClassName, action.getId());
messageService.removePendingMessages(classPkService.getClassPk(action.getClass()), action.getId());
}
} else {
if (firehoseEvent.getEventType() == EventType.CREATE) {
log.debug(" Send a message that action was assigned to current assignee:{}", currentAssigneeId);
// Send a message that action was updated
if (currentAssigneeId != null && !Objects.equals(causedBySid, currentAssigneeId)) {
messageService.createFast(new NotificationMessage(new AclSid(currentAssigneeId), firehoseEvent.getTimestamp(), ACTION_ASSIGNED, classPkService.getClassPk(action.getClass()), action.getId()));
} else {
log.debug(" Not notifying assignee:{} causedBy:{}", currentAssigneeId, causedBySid);
}
} else if (firehoseEvent.getEventType() == EventType.CREATE || firehoseEvent.getEventType() == EventType.UPDATE) {
log.debug(" Send a message that action was updated to current assignee:{}", currentAssigneeId);
// Send a message that action was updated
if (currentAssigneeId != null && !Objects.equals(causedBySid, currentAssigneeId)) {
messageService.createFast(new NotificationMessage(new AclSid(currentAssigneeId), firehoseEvent.getTimestamp(), ACTION_UPDATED, classPkService.getClassPk(action.getClass()), action.getId()));
} else {
log.debug(" Not notifying assignee:{} causedBy:{}", currentAssigneeId, causedBySid);
}
} else if (firehoseEvent.getEventType() == EventType.DELETE) {
// Remove pending notification
log.debug(" Remove pending messages for deleted action {}#{}!", actionClassName, action.getId());
messageService.removePendingMessages(classPkService.getClassPk(action.getClass()), action.getId());
}
}
if (action.getCompletedDate() != null) { // Action is closed
if (originalCompletedDate == null && originalIsDone == null) {
// date did not change
log.debug(" completedDate and isDone did not change");
} else {
// Action was completed
if (Objects.equals(action.getIsDone(), "Y")) {
log.info("{} id:{} is now completed as isDone:{} on:{}", actionClassName, action.getId(), action.getIsDone(), action.getCompletedDate());
if (action.getWorkflowStep() != null) {
asAdmin(() -> TransactionHelper.executeInTransaction(false, () -> {
workflowService.createNextStepAction(action);
return true;
}));
} else {
log.debug("Action is not part of workflow");
}
} else {
log.info("{} id:{} is now canceled as isDone:{} on:{}", actionClassName, action.getId(), action.getIsDone(), action.getCompletedDate());
}
}
}
// else { // Action is open
// if (originalCompletedDate == null) {
// // date did not change
// log.debug(" completedDate and isDone did not change");
// if (originalNotBeforeDate != null) {
// // notBeforeChanged
// log.info("{} id:{} is now rescheduled from:{} with assignee:{} notBeforeDate:{}", actionClassName, action.getId(), originalNotBeforeDate.get(), currentAssigneeId, action.getNotBeforeDate());
// }
// } else {
// log.info("{} id:{} is now reopened with assignee:{} notBeforeDate:{}", actionClassName, action.getId(), currentAssigneeId, action.getNotBeforeDate());
// }
// }
if (log.isTraceEnabled()) {
log.trace(" assignee changed:{}\twas:{} current:{}", wasReassigned, previousAssigneeId, currentAssigneeId);
log.trace(" completedDate changed:{}\twas:{} current:{}", originalCompletedDate != null, originalCompletedDate != null && originalCompletedDate.isPresent() ? originalCompletedDate.get() : "NULL", action.getCompletedDate());
log.trace(" isDone changed:{}\twas:{} current:{}", originalIsDone != null, originalIsDone != null && originalIsDone.isPresent() ? originalIsDone.get() : "NULL", action.getIsDone());
log.trace(" notBeforeDate changed:{}\twas:{} current:{}", originalNotBeforeDate != null, originalNotBeforeDate != null && originalNotBeforeDate.isPresent() ? originalNotBeforeDate.get() : "NULL", action.getNotBeforeDate());
}
} else {
// log.trace("NOT handling event:{}", firehoseEvent);
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
private static class ActionId {
private long id;
private String className;
}
@Data
@EqualsAndHashCode(callSuper = true)
private static class ActionState extends ActionId {
private String property;
public ActionState(long id, String className, String property) {
super(id, className);
this.property = property;
}
}
private <T> T asAdmin(Callable<T> callable) throws Exception {
UserDetails administrator = userService.loadUserByUsername("administrator");
List<GrantedAuthority> authorities = Lists.newArrayList(new SimpleGrantedAuthority("ROLE_ADMINISTRATOR"));
authorities.addAll(administrator.getAuthorities());
Authentication authentication = new UsernamePasswordAuthenticationToken(administrator, null, authorities);
return TransactionHelper.asUser(authentication, callable);
}
}