NotificationMessageServiceImpl.java

/*
 * Copyright 2024 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gringlobal.service.impl;

import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;

import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.time.Period;
import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.genesys.blocks.model.ClassPK;
import org.genesys.blocks.security.model.AclSid;
import org.genesys.blocks.security.persistence.AclSidPersistence;
import org.gringlobal.model.AbstractAction;
import org.gringlobal.model.AppResource;
import org.gringlobal.model.SysGroup;
import org.gringlobal.model.SysUser;
import org.gringlobal.model.notification.NotificationMessage;
import org.gringlobal.model.notification.QNotificationMessage;
import org.gringlobal.persistence.notification.NotificationMessageRepository;
import org.gringlobal.service.AppResourceService;
import org.gringlobal.service.EMailService;
import org.gringlobal.service.NotificationMessageService;
import org.gringlobal.service.SysGroupService;
import org.gringlobal.service.TemplatingService;
import org.gringlobal.service.filter.NotificationMessageFilter;
import org.gringlobal.util.LocaleContextHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.i18n.LocaleContextHolder;
import org.springframework.data.domain.Pageable;
import org.springframework.data.util.Pair;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.querydsl.core.types.dsl.PathBuilder;
import com.querydsl.jpa.impl.JPAQueryFactory;

/**
 * The NotificationScheduleServiceImpl.
 */
@Service
@Transactional(readOnly = true)
@EnableScheduling
@Validated
@Slf4j
public class NotificationMessageServiceImpl extends FilteredCRUDService2Impl<NotificationMessage, NotificationMessageFilter, NotificationMessageRepository> implements NotificationMessageService {

	@Autowired
	private JPAQueryFactory jpqQueryFactory;

	@Autowired
	protected AppResourceService appResourceService;

	@Autowired
	protected EMailService emailService;

	@Autowired
	protected TemplatingService templatingService;

	@Autowired
	private AclSidPersistence sidPersistence;

	@Autowired
	private SysGroupService sysGroupService;

	@Value("${frontend.url}")
	protected String frontendUrl;

	private final Cache<Pair<String, Locale>, AppResource> templateCache = CacheBuilder.newBuilder().maximumSize(20).expireAfterWrite(10, TimeUnit.MINUTES).build();

	@Override
	@Scheduled(initialDelay = 77, fixedDelay = 28, timeUnit = TimeUnit.MINUTES)
	@Transactional
	@SchedulerLock(name = "org.gringlobal.service.impl.NotificationMessageServiceImpl.cleanup")
	public void removeExpiredNotifications() {
		var oldestNotification = ZonedDateTime.now().minus(Period.parse("P2M")).toInstant();
		log.warn("Removing NotificationMessage with timestamp before:{}", oldestNotification);
		repository.deleteAll(repository.findAll(
			QNotificationMessage.notificationMessage.timestamp.before(oldestNotification)
		));
	}


	@Override
	public NotificationMessage create(NotificationMessage source) {
		return createFast(source);
	}

	@Override
	public NotificationMessage createFast(NotificationMessage source) {
		log.debug("Storing message: {}", source);
		var sid = sidPersistence.findById(source.getRecipient().getId());
		if (sid.isPresent() && sid.get() instanceof SysGroup) {
			var group = (SysGroup) sid.get();
			var members = sysGroupService.listMembers(group, Pageable.unpaged());
			if (members.hasContent()) {
				members.stream().forEach(member -> {
					var memberCopy = new NotificationMessage(member, source.getTimestamp(), source.getMessageType(), source.getRelatedClassPK(), source.getRelatedId());
					super.createFast(memberCopy);
				});
			}
		}
		return super.createFast(source);
	}

	@Override
	public NotificationMessage update(NotificationMessage updated, NotificationMessage target) {
		return updateFast(updated);
	}

	@Override
	public NotificationMessage updateFast(NotificationMessage updated, NotificationMessage target) {
		target.apply(updated);
		log.debug("Updating message: {}", target);
		return repository.save(target);
	}

	@Override
	@Transactional
	public void removePendingMessages(ClassPK classPk, long id) {
		log.info("Removing pending notifications for {}#{}", classPk.getClassname(), id);
		repository.deleteAll(repository.findAll(
			QNotificationMessage.notificationMessage.relatedClassPK().eq(classPk)
				.and(QNotificationMessage.notificationMessage.relatedId.eq(id))
				.and(QNotificationMessage.notificationMessage.sentDate.isNull())
		));
	}

	@Override
	public void removePendingMessages(AclSid aclSid, ClassPK classPk, Long id) {
		log.info("Removing pending notifications for {} for {}#{}", aclSid.getSid(),classPk.getClassname(), id);
		repository.deleteAll(repository.findAll(
			QNotificationMessage.notificationMessage.relatedClassPK().eq(classPk)
			.and(QNotificationMessage.notificationMessage.relatedId.eq(id))
				.and(QNotificationMessage.notificationMessage.recipient().eq(aclSid))
				.and(QNotificationMessage.notificationMessage.sentDate.isNull())
		));
	}

	@Override
	@Transactional(propagation = Propagation.REQUIRES_NEW) // Need a new transaction
	public void sendPendingMessages(SysUser sysUser) {

		var timestamp = Instant.now();
		var maxAge = Instant.now().minus(Duration.ofDays(7)); // Don't send old messages

		var unsentMessages = jpqQueryFactory.selectFrom(QNotificationMessage.notificationMessage)
			.where(
				QNotificationMessage.notificationMessage.recipient().eq(sysUser)
				.and(QNotificationMessage.notificationMessage.sentDate.isNull())
				.and(QNotificationMessage.notificationMessage.timestamp.gt(maxAge))
			)
			.orderBy(QNotificationMessage.notificationMessage.messageType.asc(), QNotificationMessage.notificationMessage.timestamp.asc())
			.fetch();

		log.debug("Found {} pending messages for {}", unsentMessages.size(), sysUser.getSid());

		if (unsentMessages.size() == 0) return;

		var messagesByClassPk = unsentMessages.stream().filter(nm -> !Objects.isNull(nm.getRelatedClassPK())).collect(Collectors.groupingBy(NotificationMessage::getRelatedClassPK, Collectors.groupingBy(NotificationMessage::getMessageType)));
		var otherMessages = unsentMessages.stream().filter(nm -> Objects.isNull(nm.getRelatedClassPK())).collect(Collectors.toList());

		otherMessages.forEach(other -> {
			log.warn("Not processing {} {}", other.getId(), other.getMessageType());
		});

		StringBuilder messageBody = new StringBuilder(1024);
		var processedMessages = new AtomicInteger(0);

		messagesByClassPk.entrySet().stream().forEach(classPkMessages -> {
			var classPk = classPkMessages.getKey();
			Class<? extends AbstractAction<?>> actionTypeClass = null;
			var messagesByType = classPkMessages.getValue();
			var actionCodeGroup = "*_ACTION";

			try {
				Class<?> cl = Class.forName(classPk.getClassname());
				if (AbstractAction.class.isAssignableFrom(cl)) {
					actionTypeClass = (Class<? extends AbstractAction<?>>) cl;
					actionCodeGroup = actionTypeClass.getDeclaredConstructor().newInstance().getActionGroupName();
				}
			} catch (Throwable e) {
				log.warn("Cannot handle AbstractAction: {}", e.getMessage());
				// throw new RuntimeException("Cannot handle as AbstractAction", e);
				return;
			}

			final var finalActionTypeClass = actionTypeClass;
			final var finalActionCodeGroup = actionCodeGroup;
			
			log.info("Processing {} with {} types of messages for user:{}", finalActionTypeClass.getName(), messagesByType.size(), sysUser.getSid());

			messagesByType.entrySet().stream().forEach(typeMessages -> {
				var messageType = typeMessages.getKey();
				var messages = typeMessages.getValue();
				
				log.info(" Processing {} type {} with {} messages for user:{}", classPk.getClassname(), messageType, messages.size(), sysUser.getSid());
				processedMessages.addAndGet(messages.size());

				// Fetch related objects
				var relatedIds = messages.stream().map(NotificationMessage::getRelatedId).filter(Objects::nonNull).collect(Collectors.toSet());
				log.info("  The message is related to ids: {}", relatedIds);
				var relatedEntities = fetchRelatedActions(finalActionTypeClass, relatedIds);
				if (relatedEntities.size() == 0) {
					log.info(" Zero related action records were loaded. Nothing to process.");
					return;
				}

				try {
					Map<String, Object> templateParams = new HashMap<>();
					templateParams.put("frontendUrl", frontendUrl);
					templateParams.put("messageType", messageType);
					templateParams.put("actionClassName", classPk.getClassname());
					templateParams.put("records", relatedEntities.stream().collect(Collectors.groupingBy(AbstractAction::getActionNameCode)).entrySet());
					templateParams.put("actionCodeGroup", finalActionCodeGroup);

					LocaleContextHelper.withLocaleAndTimezone(LocaleContextHelper.getLocale(sysUser), LocaleContextHolder.getTimeZone(), () -> {

						var templateName = "notification/" + finalActionTypeClass.getSimpleName().toLowerCase() + "-message.template";
						var messageTemplate = templateCache.get(Pair.of(templateName, LocaleContextHolder.getLocale()), () -> {
							var resource = appResourceService.getResource(AppResourceService.APP_NAME_GGCE, templateName, LocaleContextHolder.getLocale());
							if (resource == null) {
								log.info("AppResource '" + templateName + "' not found, trying 'notification/action-message.mustache'");
								resource = appResourceService.getResource(AppResourceService.APP_NAME_GGCE, "notification/action-message.mustache", LocaleContextHolder.getLocale());
							}
							if (resource == null) {
								log.info("AppResource 'notification/action-message.mustache' not found, using bundled template");
								var fromDisk = new AppResource();
								fromDisk.setDisplayMember(Files.readString(Path.of(getClass().getResource("/notification/action-message.mustache").getPath())));
								return fromDisk;
							} else {
								return resource;
							}
						});

						String title = StringUtils.defaultIfBlank(messageTemplate.getValueMember(), messageType);
						templateParams.put("title", title);

						String template = messageTemplate.getDisplayMember();
						messageBody.append(templatingService.fillTemplate(template, templateParams));

						return Void.TYPE;
					});

					// Flag messages as sent
					messages.forEach(nm -> nm.setSentDate(timestamp));

				} catch (Throwable e) {
					log.error("Could not generate email notification", e);
				}

			});

		});

		if (messageBody.length() == 0 || processedMessages.get() == 0) {
			log.info("No messages were processed for {}", sysUser.getSid());
			return;
		}

		log.debug("The email is:\n{}", messageBody.toString());

		String mailTo = null;
		if (sysUser.getCooperator() != null) mailTo = StringUtils.trimToNull(sysUser.getCooperator().getEmail());
		if (mailTo == null) {
			log.warn("User {} ({}) does not have an email address!", sysUser.getSid(), sysUser.getFullName());
		} else {
			log.info("Sending email to {}", mailTo);
			emailService.sendMail("You have unread messages", messageBody.toString(), mailTo);
		}
	}

	private Collection<? extends AbstractAction<?>> fetchRelatedActions(Class<? extends AbstractAction<?>> actionType, Collection<Long> relatedIds) {
			var root = new PathBuilder<>(actionType, "t");
			PathBuilder<Long> pkField = root.get("id", Long.class);
			return jpaQueryFactory.selectFrom(root).where(pkField.in(relatedIds)).fetch();
	}

}