NotificationMessageServiceImpl.java
/*
* Copyright 2024 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gringlobal.service.impl;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.time.Period;
import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.genesys.blocks.model.ClassPK;
import org.genesys.blocks.security.model.AclSid;
import org.genesys.blocks.security.persistence.AclSidPersistence;
import org.gringlobal.model.AbstractAction;
import org.gringlobal.model.AppResource;
import org.gringlobal.model.SysGroup;
import org.gringlobal.model.SysUser;
import org.gringlobal.model.notification.NotificationMessage;
import org.gringlobal.model.notification.QNotificationMessage;
import org.gringlobal.persistence.notification.NotificationMessageRepository;
import org.gringlobal.service.AppResourceService;
import org.gringlobal.service.EMailService;
import org.gringlobal.service.NotificationMessageService;
import org.gringlobal.service.SysGroupService;
import org.gringlobal.service.TemplatingService;
import org.gringlobal.service.filter.NotificationMessageFilter;
import org.gringlobal.util.LocaleContextHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.i18n.LocaleContextHolder;
import org.springframework.data.domain.Pageable;
import org.springframework.data.util.Pair;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.querydsl.core.types.dsl.PathBuilder;
import com.querydsl.jpa.impl.JPAQueryFactory;
/**
* The NotificationScheduleServiceImpl.
*/
@Service
@Transactional(readOnly = true)
@EnableScheduling
@Validated
@Slf4j
public class NotificationMessageServiceImpl extends FilteredCRUDService2Impl<NotificationMessage, NotificationMessageFilter, NotificationMessageRepository> implements NotificationMessageService {
@Autowired
private JPAQueryFactory jpqQueryFactory;
@Autowired
protected AppResourceService appResourceService;
@Autowired
protected EMailService emailService;
@Autowired
protected TemplatingService templatingService;
@Autowired
private AclSidPersistence sidPersistence;
@Autowired
private SysGroupService sysGroupService;
@Value("${frontend.url}")
protected String frontendUrl;
private final Cache<Pair<String, Locale>, AppResource> templateCache = CacheBuilder.newBuilder().maximumSize(20).expireAfterWrite(10, TimeUnit.MINUTES).build();
@Override
@Scheduled(initialDelay = 77, fixedDelay = 28, timeUnit = TimeUnit.MINUTES)
@Transactional
@SchedulerLock(name = "org.gringlobal.service.impl.NotificationMessageServiceImpl.cleanup")
public void removeExpiredNotifications() {
var oldestNotification = ZonedDateTime.now().minus(Period.parse("P2M")).toInstant();
log.warn("Removing NotificationMessage with timestamp before:{}", oldestNotification);
repository.deleteAll(repository.findAll(
QNotificationMessage.notificationMessage.timestamp.before(oldestNotification)
));
}
@Override
public NotificationMessage create(NotificationMessage source) {
return createFast(source);
}
@Override
public NotificationMessage createFast(NotificationMessage source) {
log.debug("Storing message: {}", source);
var sid = sidPersistence.findById(source.getRecipient().getId());
if (sid.isPresent() && sid.get() instanceof SysGroup) {
var group = (SysGroup) sid.get();
var members = sysGroupService.listMembers(group, Pageable.unpaged());
if (members.hasContent()) {
members.stream().forEach(member -> {
var memberCopy = new NotificationMessage(member, source.getTimestamp(), source.getMessageType(), source.getRelatedClassPK(), source.getRelatedId());
super.createFast(memberCopy);
});
}
}
return super.createFast(source);
}
@Override
public NotificationMessage update(NotificationMessage updated, NotificationMessage target) {
return updateFast(updated);
}
@Override
public NotificationMessage updateFast(NotificationMessage updated, NotificationMessage target) {
target.apply(updated);
log.debug("Updating message: {}", target);
return repository.save(target);
}
@Override
@Transactional
public void removePendingMessages(ClassPK classPk, long id) {
log.info("Removing pending notifications for {}#{}", classPk.getClassname(), id);
repository.deleteAll(repository.findAll(
QNotificationMessage.notificationMessage.relatedClassPK().eq(classPk)
.and(QNotificationMessage.notificationMessage.relatedId.eq(id))
.and(QNotificationMessage.notificationMessage.sentDate.isNull())
));
}
@Override
public void removePendingMessages(AclSid aclSid, ClassPK classPk, Long id) {
log.info("Removing pending notifications for {} for {}#{}", aclSid.getSid(),classPk.getClassname(), id);
repository.deleteAll(repository.findAll(
QNotificationMessage.notificationMessage.relatedClassPK().eq(classPk)
.and(QNotificationMessage.notificationMessage.relatedId.eq(id))
.and(QNotificationMessage.notificationMessage.recipient().eq(aclSid))
.and(QNotificationMessage.notificationMessage.sentDate.isNull())
));
}
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW) // Need a new transaction
public void sendPendingMessages(SysUser sysUser) {
var timestamp = Instant.now();
var maxAge = Instant.now().minus(Duration.ofDays(7)); // Don't send old messages
var unsentMessages = jpqQueryFactory.selectFrom(QNotificationMessage.notificationMessage)
.where(
QNotificationMessage.notificationMessage.recipient().eq(sysUser)
.and(QNotificationMessage.notificationMessage.sentDate.isNull())
.and(QNotificationMessage.notificationMessage.timestamp.gt(maxAge))
)
.orderBy(QNotificationMessage.notificationMessage.messageType.asc(), QNotificationMessage.notificationMessage.timestamp.asc())
.fetch();
log.debug("Found {} pending messages for {}", unsentMessages.size(), sysUser.getSid());
if (unsentMessages.size() == 0) return;
var messagesByClassPk = unsentMessages.stream().filter(nm -> !Objects.isNull(nm.getRelatedClassPK())).collect(Collectors.groupingBy(NotificationMessage::getRelatedClassPK, Collectors.groupingBy(NotificationMessage::getMessageType)));
var otherMessages = unsentMessages.stream().filter(nm -> Objects.isNull(nm.getRelatedClassPK())).collect(Collectors.toList());
otherMessages.forEach(other -> {
log.warn("Not processing {} {}", other.getId(), other.getMessageType());
});
StringBuilder messageBody = new StringBuilder(1024);
var processedMessages = new AtomicInteger(0);
messagesByClassPk.entrySet().stream().forEach(classPkMessages -> {
var classPk = classPkMessages.getKey();
Class<? extends AbstractAction<?>> actionTypeClass = null;
var messagesByType = classPkMessages.getValue();
var actionCodeGroup = "*_ACTION";
try {
Class<?> cl = Class.forName(classPk.getClassname());
if (AbstractAction.class.isAssignableFrom(cl)) {
actionTypeClass = (Class<? extends AbstractAction<?>>) cl;
actionCodeGroup = actionTypeClass.getDeclaredConstructor().newInstance().getActionGroupName();
}
} catch (Throwable e) {
log.warn("Cannot handle AbstractAction: {}", e.getMessage());
// throw new RuntimeException("Cannot handle as AbstractAction", e);
return;
}
final var finalActionTypeClass = actionTypeClass;
final var finalActionCodeGroup = actionCodeGroup;
log.info("Processing {} with {} types of messages for user:{}", finalActionTypeClass.getName(), messagesByType.size(), sysUser.getSid());
messagesByType.entrySet().stream().forEach(typeMessages -> {
var messageType = typeMessages.getKey();
var messages = typeMessages.getValue();
log.info(" Processing {} type {} with {} messages for user:{}", classPk.getClassname(), messageType, messages.size(), sysUser.getSid());
processedMessages.addAndGet(messages.size());
// Fetch related objects
var relatedIds = messages.stream().map(NotificationMessage::getRelatedId).filter(Objects::nonNull).collect(Collectors.toSet());
log.info(" The message is related to ids: {}", relatedIds);
var relatedEntities = fetchRelatedActions(finalActionTypeClass, relatedIds);
if (relatedEntities.size() == 0) {
log.info(" Zero related action records were loaded. Nothing to process.");
return;
}
try {
Map<String, Object> templateParams = new HashMap<>();
templateParams.put("frontendUrl", frontendUrl);
templateParams.put("messageType", messageType);
templateParams.put("actionClassName", classPk.getClassname());
templateParams.put("records", relatedEntities.stream().collect(Collectors.groupingBy(AbstractAction::getActionNameCode)).entrySet());
templateParams.put("actionCodeGroup", finalActionCodeGroup);
LocaleContextHelper.withLocaleAndTimezone(LocaleContextHelper.getLocale(sysUser), LocaleContextHolder.getTimeZone(), () -> {
var templateName = "notification/" + finalActionTypeClass.getSimpleName().toLowerCase() + "-message.template";
var messageTemplate = templateCache.get(Pair.of(templateName, LocaleContextHolder.getLocale()), () -> {
var resource = appResourceService.getResource(AppResourceService.APP_NAME_GGCE, templateName, LocaleContextHolder.getLocale());
if (resource == null) {
log.info("AppResource '" + templateName + "' not found, trying 'notification/action-message.mustache'");
resource = appResourceService.getResource(AppResourceService.APP_NAME_GGCE, "notification/action-message.mustache", LocaleContextHolder.getLocale());
}
if (resource == null) {
log.info("AppResource 'notification/action-message.mustache' not found, using bundled template");
var fromDisk = new AppResource();
fromDisk.setDisplayMember(Files.readString(Path.of(getClass().getResource("/notification/action-message.mustache").getPath())));
return fromDisk;
} else {
return resource;
}
});
String title = StringUtils.defaultIfBlank(messageTemplate.getValueMember(), messageType);
templateParams.put("title", title);
String template = messageTemplate.getDisplayMember();
messageBody.append(templatingService.fillTemplate(template, templateParams));
return Void.TYPE;
});
// Flag messages as sent
messages.forEach(nm -> nm.setSentDate(timestamp));
} catch (Throwable e) {
log.error("Could not generate email notification", e);
}
});
});
if (messageBody.length() == 0 || processedMessages.get() == 0) {
log.info("No messages were processed for {}", sysUser.getSid());
return;
}
log.debug("The email is:\n{}", messageBody.toString());
String mailTo = null;
if (sysUser.getCooperator() != null) mailTo = StringUtils.trimToNull(sysUser.getCooperator().getEmail());
if (mailTo == null) {
log.warn("User {} ({}) does not have an email address!", sysUser.getSid(), sysUser.getFullName());
} else {
log.info("Sending email to {}", mailTo);
emailService.sendMail("You have unread messages", messageBody.toString(), mailTo);
}
}
private Collection<? extends AbstractAction<?>> fetchRelatedActions(Class<? extends AbstractAction<?>> actionType, Collection<Long> relatedIds) {
var root = new PathBuilder<>(actionType, "t");
PathBuilder<Long> pkField = root.get("id", Long.class);
return jpaQueryFactory.selectFrom(root).where(pkField.in(relatedIds)).fetch();
}
}