FirehoseActionListener.java

/*
 * Copyright 2024 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gringlobal.notification.action;

import java.time.Instant;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;

import com.google.common.collect.Lists;
import lombok.NoArgsConstructor;
import org.gringlobal.component.firehose.FirehoseDelayedEvent;
import org.gringlobal.component.firehose.FirehoseEvent.EventType;
import org.genesys.blocks.auditlog.service.ClassPKService;
import org.genesys.blocks.model.EmptyModel;
import org.genesys.blocks.security.model.AclSid;
import org.gringlobal.component.firehose.FirehoseAuditLogEvent;
import org.gringlobal.model.AbstractAction;
import org.gringlobal.model.QAbstractAction;
import org.gringlobal.model.notification.NotificationMessage;
import org.gringlobal.service.NotificationMessageService;
import org.gringlobal.service.UserService;
import org.gringlobal.service.WorkflowService;
import org.gringlobal.spring.TransactionHelper;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.context.event.EventListener;
import org.springframework.core.type.filter.AssignableTypeFilter;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;

/**
 * Listen to Firehose events and generate notifications about actions (implementations of {@link AbstractAction}).
 */
@Component
@Slf4j
public class FirehoseActionListener implements InitializingBean {

	private static final String FIELD_IS_DONE = QAbstractAction.abstractAction.isDone.getMetadata().getName(); // "isDone";
	private static final String FIELD_ASSIGNEE = QAbstractAction.abstractAction.assignee().getMetadata().getName(); // "assignee";
	private static final String FIELD_COMPLETED_DATE = QAbstractAction.abstractAction.completedDate.getMetadata().getName(); // "completedDate";
	private static final String FIELD_NOT_BEFORE_DATE = QAbstractAction.abstractAction.notBeforeDate.getMetadata().getName(); // "notBeforeDate";

	private static final String ACTION_ASSIGNED = "notification.action.assigned";
	private static final String ACTION_REASSIGNED = "notification.action.reassigned";
	private static final String ACTION_UPDATED = "notification.action.updated";

	/** Names of classes we're monitoring for audit log changes */
	private Set<String> ACTION_CLASSNAMES;

	@Autowired
	private NotificationMessageService messageService;

	@Autowired
	private ClassPKService classPkService;
	
	@Autowired
	private WorkflowService workflowService;

	@Autowired
	protected UserService userService;


	/**
	 * Keep track assignees for actions. Key is action#class+id, value: assignee#id
	 */
	private Map<ActionId, Long> assigneeForAction = new LinkedHashMap<>(100);

	/** Tracking original action states with AuditLogs */
	private Map<ActionState, Optional<Object>> stateForAction = new LinkedHashMap<>();


	/*
	 * Find implementations of AbstractAction and put their names info ACTION_CLASSNAMES.
	 */
	@Override
	public void afterPropertiesSet() throws Exception {
		ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false);
		provider.addIncludeFilter(new AssignableTypeFilter(AbstractAction.class));

		var actionClasses = new HashSet<String>();
		Set<BeanDefinition> components = provider.findCandidateComponents(AbstractAction.class.getPackageName().replaceAll("\\.", "/")); // Scan 
		for (BeanDefinition component : components) {
			log.info("Handling actions of type {}", component.getBeanClassName());
			actionClasses.add(component.getBeanClassName());
		}
		ACTION_CLASSNAMES = Set.copyOf(actionClasses);
	}

	/**
	 * Get number of elements still queued for processing. A large number might
	 * indicate that elements are not cleared and we're holding on to data we don't
	 * need.
	 *
	 * @return number of actions that need processing.
	 */
	public int getQueueSize() {
		return assigneeForAction.size();
	}

	/**
	 * Listen to inserts of AuditLog data and extract assignee ID ({@link AbstractAction#getAssignee()}).
	 */
	@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
	public void rememberOriginalAssignee(FirehoseAuditLogEvent firehoseEvent) {
		var auditLog = firehoseEvent.getAuditLog();
		var auditedClassName = auditLog.getClassPk().getClassname();

		// if (auditLog.getAction() == AuditAction.DELETE) {}

		if (auditedClassName != null && ACTION_CLASSNAMES.contains(auditedClassName)) {
			if (Objects.equals(auditLog.getPropertyName(), FIELD_ASSIGNEE)) {
				var actionId = auditLog.getEntityId();
				// var assigneeIdStr = auditLog.getPreviousState();
				var originalAssignee = (EmptyModel) auditLog.getPreviousEntity();
				var assigneeId = originalAssignee == null ? null : originalAssignee.getId();
				log.debug("PRE-UPDATE {} id:{} had assignee:{}", auditedClassName, actionId, assigneeId);
				// If action belongs to someone already then ignore the change!
				assigneeForAction.putIfAbsent(new ActionId(actionId, auditedClassName), assigneeId);
			} else if (Objects.equals(auditLog.getPropertyName(), FIELD_COMPLETED_DATE)) {
				var actionId = auditLog.getEntityId();
				var completedDate = (Instant) auditLog.getPreviousEntity();
				log.debug("PRE-UPDATE {} id:{} had completedDate:{}", auditedClassName, actionId, completedDate);
				stateForAction.putIfAbsent(new ActionState(actionId, auditedClassName, FIELD_COMPLETED_DATE), Optional.ofNullable(completedDate));
			} else if (Objects.equals(auditLog.getPropertyName(), FIELD_IS_DONE)) {
				var actionId = auditLog.getEntityId();
				var isDoneYN = auditLog.getPreviousState();
				log.debug("PRE-UPDATE {} id:{} had isDone:{}", auditedClassName, actionId, isDoneYN);
				stateForAction.putIfAbsent(new ActionState(actionId, auditedClassName, FIELD_IS_DONE), Optional.ofNullable(isDoneYN));
			} else if (Objects.equals(auditLog.getPropertyName(), FIELD_NOT_BEFORE_DATE)) {
				var actionId = auditLog.getEntityId();
				var notBeforeDate = auditLog.getPreviousState();
				log.debug("PRE-UPDATE {} id:{} had notBeforeDate:{}", auditedClassName, actionId, notBeforeDate);
				stateForAction.putIfAbsent(new ActionState(actionId, auditedClassName, FIELD_NOT_BEFORE_DATE), Optional.ofNullable(notBeforeDate));
			}
		}
	}

	/**
	 * Check (delayed) messages sent by Firehose for {@link AbstractAction} types and
	 * generate notification messages to current and past assignees.
	 */
	@EventListener
	public void handleEvent(FirehoseDelayedEvent firehoseEvent) throws Exception {
		Class<?> clazz = firehoseEvent.getClazz();
		Object entity = firehoseEvent.getEntity();

		if (entity != null && AbstractAction.class.isAssignableFrom(clazz)) {
			// log.trace("Handling event:{}", firehoseEvent);
			AbstractAction<?> action = (AbstractAction<?>) entity;
			var actionClassName = action.getClass().getName();
			var actionId = new ActionId(action.getId(), actionClassName);
			var wasReassigned = assigneeForAction.containsKey(actionId);
			var originalCompletedDate = stateForAction.remove(new ActionState(action.getId(), actionClassName, FIELD_COMPLETED_DATE));
			var originalIsDone = stateForAction.remove(new ActionState(action.getId(), actionClassName, FIELD_IS_DONE));
			var originalNotBeforeDate = stateForAction.remove(new ActionState(action.getId(), actionClassName, FIELD_NOT_BEFORE_DATE));
			var previousAssigneeId = assigneeForAction.remove(actionId);
			var currentAssigneeId = action.getAssignee() == null ? null : action.getAssignee().getId();
			var causedBySid = firehoseEvent.getSid();

			if (wasReassigned && previousAssigneeId != null && !Objects.equals(previousAssigneeId, currentAssigneeId)) {
				log.debug("{} {} id:{} reassigned from:{} to assignee:{}", firehoseEvent.getEventType(), action.getClass().getSimpleName(), action.getId(), previousAssigneeId, currentAssigneeId);

				if (firehoseEvent.getEventType() == EventType.UPDATE) {
					// Send a message to previous and current assignee that action was reassigned
					log.debug(" Send a message to previous:{} and current assignee:{} that action was reassigned", previousAssigneeId, currentAssigneeId);
					if (previousAssigneeId != null) {
						messageService.removePendingMessages(new AclSid(previousAssigneeId), classPkService.getClassPk(action.getClass()), action.getId());
						if (!Objects.equals(causedBySid, previousAssigneeId)) {
							messageService.createFast(new NotificationMessage(new AclSid(previousAssigneeId), firehoseEvent.getTimestamp(), ACTION_REASSIGNED, classPkService.getClassPk(action.getClass()), action.getId()));
						} else {
							log.debug("  Not notifying previous:{} causedBy:{}", previousAssigneeId, causedBySid);
						}
					}
					if (currentAssigneeId != null && !Objects.equals(causedBySid, currentAssigneeId)) {
						messageService.createFast(new NotificationMessage(new AclSid(currentAssigneeId), firehoseEvent.getTimestamp(), ACTION_REASSIGNED, classPkService.getClassPk(action.getClass()), action.getId()));
					} else {
						log.debug("  Not notifying assignee:{} causedBy:{}", currentAssigneeId, causedBySid);
					}
				}

			} else if (wasReassigned) {
				log.debug("{} {} id:{} assigned to assignee:{}", firehoseEvent.getEventType(), action.getClass().getSimpleName(), action.getId(), currentAssigneeId);

				if (firehoseEvent.getEventType() == EventType.CREATE || firehoseEvent.getEventType() == EventType.UPDATE) {
					log.debug(" Send a message that action was assigned to current assignee:{}", currentAssigneeId);
					// Send a message that an action was assigned to current assignee
					if (currentAssigneeId != null && !Objects.equals(causedBySid, currentAssigneeId)) {
						messageService.createFast(new NotificationMessage(new AclSid(currentAssigneeId), firehoseEvent.getTimestamp(), ACTION_ASSIGNED, classPkService.getClassPk(action.getClass()), action.getId()));
					} else {
						log.debug("  Not notifying assignee:{} causedBy:{}", currentAssigneeId, causedBySid);
					}
				} else if (firehoseEvent.getEventType() == EventType.DELETE) {
					// Remove pending notification
					log.debug(" Remove pending messages for deleted action {}#{}!", actionClassName, action.getId());
					messageService.removePendingMessages(classPkService.getClassPk(action.getClass()), action.getId());
				}

			} else {
				if (firehoseEvent.getEventType() == EventType.CREATE) {
					log.debug(" Send a message that action was assigned to current assignee:{}", currentAssigneeId);
					// Send a message that action was updated 
					if (currentAssigneeId != null && !Objects.equals(causedBySid, currentAssigneeId)) {
						messageService.createFast(new NotificationMessage(new AclSid(currentAssigneeId), firehoseEvent.getTimestamp(), ACTION_ASSIGNED, classPkService.getClassPk(action.getClass()), action.getId()));
					} else {
						log.debug("  Not notifying assignee:{} causedBy:{}", currentAssigneeId, causedBySid);
					}

				} else if (firehoseEvent.getEventType() == EventType.CREATE || firehoseEvent.getEventType() == EventType.UPDATE) {
					log.debug(" Send a message that action was updated to current assignee:{}", currentAssigneeId);
					// Send a message that action was updated 
					if (currentAssigneeId != null && !Objects.equals(causedBySid, currentAssigneeId)) {
						messageService.createFast(new NotificationMessage(new AclSid(currentAssigneeId), firehoseEvent.getTimestamp(), ACTION_UPDATED, classPkService.getClassPk(action.getClass()), action.getId()));
					} else {
						log.debug("  Not notifying assignee:{} causedBy:{}", currentAssigneeId, causedBySid);
					}

				} else if (firehoseEvent.getEventType() == EventType.DELETE) {
					// Remove pending notification
					log.debug(" Remove pending messages for deleted action {}#{}!", actionClassName, action.getId());
					messageService.removePendingMessages(classPkService.getClassPk(action.getClass()), action.getId());
				}
			}

			if (action.getCompletedDate() != null) { // Action is closed
				if (originalCompletedDate == null && originalIsDone == null) {
					// date did not change
					log.debug(" completedDate and isDone did not change");
				} else {
					// Action was completed
					if (Objects.equals(action.getIsDone(), "Y")) {
						log.info("{} id:{} is now completed as isDone:{} on:{}", actionClassName, action.getId(), action.getIsDone(), action.getCompletedDate());
						if (action.getWorkflowStep() != null) {
							asAdmin(() -> TransactionHelper.executeInTransaction(false, () -> {
								workflowService.createNextStepAction(action);
								return true;
							}));
						} else {
							log.debug("Action is not part of workflow");
						}
					} else {
						log.info("{} id:{} is now canceled as isDone:{}	on:{}", actionClassName, action.getId(), action.getIsDone(), action.getCompletedDate());
					}
				}
			}
			// else { // Action is open
			//	 if (originalCompletedDate == null) {
			//		 // date did not change
			//		 log.debug(" completedDate and isDone did not change");
			//		 if (originalNotBeforeDate != null) {
			//			 // notBeforeChanged
			//			 log.info("{} id:{} is now rescheduled from:{} with assignee:{} notBeforeDate:{}", actionClassName, action.getId(), originalNotBeforeDate.get(), currentAssigneeId, action.getNotBeforeDate());
			//		 }
			//	 } else {
			//		 log.info("{} id:{} is now reopened with assignee:{} notBeforeDate:{}", actionClassName, action.getId(), currentAssigneeId, action.getNotBeforeDate());
			//	 }
			// }

			if (log.isTraceEnabled()) {
				log.trace("      assignee changed:{}\twas:{} current:{}", wasReassigned, previousAssigneeId, currentAssigneeId);
				log.trace(" completedDate changed:{}\twas:{} current:{}", originalCompletedDate != null, originalCompletedDate != null && originalCompletedDate.isPresent() ? originalCompletedDate.get() : "NULL", action.getCompletedDate());
				log.trace("        isDone changed:{}\twas:{} current:{}", originalIsDone != null, originalIsDone != null && originalIsDone.isPresent() ? originalIsDone.get() : "NULL", action.getIsDone());
				log.trace(" notBeforeDate changed:{}\twas:{} current:{}", originalNotBeforeDate != null, originalNotBeforeDate != null && originalNotBeforeDate.isPresent() ? originalNotBeforeDate.get() : "NULL", action.getNotBeforeDate());
			}

		} else {
			// log.trace("NOT handling event:{}", firehoseEvent);
		}
	}

	@Data
	@AllArgsConstructor
	@NoArgsConstructor
	private static class ActionId {
		private long id;
		private String className;
	}

	@Data
	@EqualsAndHashCode(callSuper = true)
	private static class ActionState extends ActionId {
		private String property;

		public ActionState(long id, String className, String property) {
			super(id, className);
			this.property = property;
		}
	}

	private <T> T asAdmin(Callable<T> callable) throws Exception {
		UserDetails administrator = userService.loadUserByUsername("administrator");
		List<GrantedAuthority> authorities = Lists.newArrayList(new SimpleGrantedAuthority("ROLE_ADMINISTRATOR"));
		authorities.addAll(administrator.getAuthorities());
		Authentication authentication = new UsernamePasswordAuthenticationToken(administrator, null, authorities);
		return TransactionHelper.asUser(authentication, callable);
	}
}