NotificationScheduleServiceImpl.java
/*
* Copyright 2024 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gringlobal.service.impl;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.core.ClockProvider;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.core.LockingTaskExecutor;
import org.genesys.blocks.security.model.AclSid;
import org.genesys.blocks.security.persistence.AclSidPersistence;
import org.genesys.blocks.util.CurrentApplicationContext;
import org.gringlobal.model.SysGroup;
import org.gringlobal.model.SysUser;
import org.gringlobal.model.notification.NotificationSchedule;
import org.gringlobal.model.notification.NotificationScheduleSubscriber;
import org.gringlobal.model.notification.QNotificationSchedule;
import org.gringlobal.model.notification.QNotificationScheduleSubscriber;
import org.gringlobal.notification.AccessionActionNotifications;
import org.gringlobal.notification.InventoryActionNotifications;
import org.gringlobal.notification.KPINotifications;
import org.gringlobal.notification.OrderRequestNotifications;
import org.gringlobal.notification.action.NotificationMessageEmailSender;
import org.gringlobal.notification.schedule.ScheduledNotification;
import org.gringlobal.persistence.NotificationScheduleSubscriberRepository;
import org.gringlobal.persistence.notification.NotificationScheduleRepository;
import org.gringlobal.service.NotificationScheduleService;
import org.gringlobal.service.SysGroupService;
import org.gringlobal.service.filter.NotificationScheduleFilter;
import org.gringlobal.spring.TransactionHelper;
import org.hibernate.Hibernate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.MessageSource;
import org.springframework.context.i18n.LocaleContextHolder;
import org.springframework.data.domain.Pageable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ReflectionUtils;
import org.springframework.validation.annotation.Validated;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import java.lang.reflect.Method;
import java.time.Duration;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.gringlobal.application.config.HazelcastConfig.DEF_LOCK_AT_MOST_FOR;
/**
* The NotificationScheduleServiceImpl.
*/
@Service
@EnableScheduling
@Validated
@Slf4j
public class NotificationScheduleServiceImpl extends FilteredCRUDService2Impl<NotificationSchedule, NotificationScheduleFilter, NotificationScheduleRepository>
implements NotificationScheduleService {
private static final Duration LOCK_AT_MOST_FOR_DURATION = Duration.parse(DEF_LOCK_AT_MOST_FOR);
private static final Duration LOCK_AT_LEAST_FOR_DURATION = Duration.parse("PT30S");
@Autowired
private TaskScheduler taskScheduler;
@Autowired
private LockingTaskExecutor lockingTaskExecutor;
@Autowired
private MessageSource messageSource;
@Autowired
private SysGroupService sysGroupService;
@Autowired
private AclSidPersistence aclSidPersistence;
@Autowired
private NotificationScheduleSubscriberRepository subscriberRepository;
@PersistenceContext
protected EntityManager entityManager;
private final Map<NotificationSchedule, ScheduledFuture<?>> scheduledNotificationMap = new HashMap<>();
private final List<NotificationSchedule> notificationGenerators = new ArrayList<>();
@Override
public void afterPropertiesSet() throws Exception {
// Create generators list
// TODO scan context instead!
var generators = List.of(
KPINotifications.class, AccessionActionNotifications.class,
InventoryActionNotifications.class, OrderRequestNotifications.class,
NotificationMessageEmailSender.class
);
for (Class<?> generatorClass : generators) {
var beanMethods = generatorClass.getMethods();
for (Method beanMethod : beanMethods) {
var notificationAnnotation = beanMethod.getAnnotation(ScheduledNotification.class);
if (notificationAnnotation != null) {
var name = generatorClass.getSimpleName() + "." + beanMethod.getName();
var title = messageSource.getMessage("notification." + name + ".title", null, null, LocaleContextHolder.getLocale());
var description = messageSource.getMessage("notification." + name + ".description", null, null, LocaleContextHolder.getLocale());
var notification = new NotificationSchedule();
notification.setMethod(beanMethod.getName());
notification.setGenerator(generatorClass.getName());
notification.setTitle(title);
notification.setDescription(description);
notification.setCron(null);
notification.setIsActive("Y");
notificationGenerators.add(notification);
}
}
}
}
@Scheduled(fixedDelay = 2, timeUnit = TimeUnit.MINUTES)
@Transactional
public void checkUpdates() {
var requiredNotifications = Lists.newArrayList(repository.findAll(
QNotificationSchedule.notificationSchedule.isActive.eq("Y").and(QNotificationSchedule.notificationSchedule.cron.isNotNull())
));
var runningNotifications = scheduledNotificationMap.keySet();
var toRemove = runningNotifications.stream()
.filter(existed -> !requiredNotifications.contains(existed))
.collect(Collectors.toList());
// It removes also from ScheduledNotificationMap
for (NotificationSchedule notificationSchedule : toRemove) {
removeTask(notificationSchedule);
}
Set<NotificationSchedule> toUpdate = new HashSet<>();
// Searching for updated in other clusters
for (NotificationSchedule required : requiredNotifications) {
for (NotificationSchedule running : runningNotifications) {
if (required.equals(running)) {
if (required.getModifiedDate().truncatedTo(ChronoUnit.MILLIS).isAfter(running.getModifiedDate().truncatedTo(ChronoUnit.MILLIS))) {
toUpdate.add(required);
}
break;
}
}
}
// Add all new missing tasks
requiredNotifications.stream()
.filter(required -> !runningNotifications.contains(required))
.forEach(toUpdate::add);
// Restart the updated tasks and add new ones.
toUpdate.forEach(notificationSchedule -> {
if (!registerTask(notificationSchedule)) {
notificationSchedule.setIsActive("N");
repository.saveAndFlush(notificationSchedule);
}
});
}
@Override
public List<NotificationSchedule> listNotificationGenerators() {
return notificationGenerators;
}
@Override
@Transactional
@PreAuthorize("hasAuthority('GROUP_ADMINS')")
public NotificationSchedule create(NotificationSchedule source) {
assert (source.getId() == null);
assert (source.getGenerator() != null);
assert (source.getMethod() != null);
return _lazyLoad(createFast(source));
}
@Override
@Transactional
@PreAuthorize("hasAuthority('GROUP_ADMINS')")
public NotificationSchedule createFast(NotificationSchedule source) {
assert (source.getId() == null);
assert (source.getGenerator() != null);
assert (source.getMethod() != null);
var target = new NotificationSchedule();
target.apply(source);
var saved = repository.save(target);
if (saved.getIsActive().equals("Y")) {
if (!registerTask(saved)) {
var toDeactivate = repository.getReferenceById(saved.getId());
toDeactivate.setIsActive("N");
saved = repository.saveAndFlush(toDeactivate);
}
}
return saved;
}
@Override
@Transactional
@PreAuthorize("hasAuthority('GROUP_ADMINS')")
public NotificationSchedule update(NotificationSchedule updated, NotificationSchedule target) {
return _lazyLoad(updateFast(updated, target));
}
@Override
@Transactional
@PreAuthorize("hasAuthority('GROUP_ADMINS')")
public NotificationSchedule updateFast(NotificationSchedule updated, NotificationSchedule target) {
removeTask(target);
target.apply(updated);
var saved = repository.saveAndFlush(target);
if (saved.getIsActive().equals("Y")) {
if (!registerTask(saved)) {
var toDeactivate = repository.getReferenceById(saved.getId());
toDeactivate.setIsActive("N");
saved = repository.saveAndFlush(toDeactivate);
}
}
return saved;
}
@Override
@Transactional
@PreAuthorize("hasAuthority('GROUP_ADMINS')")
public NotificationSchedule remove(NotificationSchedule entity) {
var removed = super.remove(entity);
removeTask(removed);
return removed;
}
@Override
@Transactional
@PreAuthorize("hasAuthority('GROUP_ADMINS')")
public NotificationSchedule subscribe(NotificationSchedule entity, List<String> sids) {
entity = get(entity.getId());
var existingSubscribers = entity.getSubscribers().stream().map(subscriber -> subscriber.getSid().getSid()).collect(Collectors.toSet());
// Remove existing
var sidIds = sids.stream().filter(sid -> !existingSubscribers.contains(sid)).map(aclSidPersistence::getSidId).collect(Collectors.toSet());
Set<AclSid> aclSids = aclSidPersistence.listById(sidIds).stream()
.map(Hibernate::unproxy)
.map(sid -> (AclSid) sid)
.filter(sid -> (sid instanceof SysUser || sid instanceof SysGroup) && sid.isActive()) // Keep only active
.collect(Collectors.toSet());
if (aclSids.isEmpty()) {
return entity;
}
List<NotificationScheduleSubscriber> subscribers = new ArrayList<>();
for (AclSid aclSid : aclSids) {
var subscriber = new NotificationScheduleSubscriber();
subscriber.setNotificationSchedule(entity);
subscriber.setSid(aclSid);
subscribers.add(subscriber);
}
subscriberRepository.saveAll(subscribers);
entityManager.detach(entity);
return get(entity.getId());
}
@Override
@Transactional
@PreAuthorize("hasAuthority('GROUP_ADMINS')")
public NotificationSchedule unsubscribe(NotificationSchedule entity, List<String> sids) {
entity = get(entity.getId());
var sidIds = sids.stream().map(aclSidPersistence::getSidId).collect(Collectors.toSet());
var qSubscriber = QNotificationScheduleSubscriber.notificationScheduleSubscriber;
var toRemove = subscriberRepository.findAll(qSubscriber.notificationSchedule().id.eq(entity.getId()).and(qSubscriber.sid().id.in(sidIds)), Pageable.unpaged()).getContent();
subscriberRepository.deleteAll(toRemove);
subscriberRepository.flush();
return reload(entity);
}
private boolean registerTask(NotificationSchedule notificationSchedule) {
assert notificationSchedule != null && notificationSchedule.getId() != null;
try {
var toRegister = repository.getReferenceById(notificationSchedule.getId());
entityManager.detach(toRegister);
removeTask(toRegister);
var generator = toRegister.getGenerator();
var method = toRegister.getMethod();
var cron = toRegister.getCron();
Class<?> beanClass;
try {
beanClass = Class.forName(generator);
} catch (ClassNotFoundException e) {
log.warn("Notification schedule generator {} not found", generator, e);
return false;
}
var generatorBean = CurrentApplicationContext.getContext().getBean(beanClass);
var scheduledMethod = ReflectionUtils.findMethod(beanClass, method, NotificationSchedule.class, List.class); // Signature!
if (scheduledMethod == null) {
log.warn("Notification schedule generator method {}#{} not found", generator, method);
return false;
}
Method methodToRun = scheduledMethod;
var notificationLockKey = generator + "." + method + "." + toRegister.getId();
var timezone = toRegister.getTimezone() != null ? ZoneId.of(toRegister.getTimezone()) : ZoneId.of("UTC");
var scheduledTask = taskScheduler.schedule(() -> {
log.info("Attempting to run scheduled task id={} {}#{} cron {} in {}", toRegister.getId(), generator, method, cron, timezone);
try {
// execute with LockingTaskExecutor
// lockAtLeastFor should be greater than @Scheduled checkUpdates()
var lockConfig = new LockConfiguration(ClockProvider.now(), notificationLockKey, LOCK_AT_MOST_FOR_DURATION, LOCK_AT_LEAST_FOR_DURATION);
var lockResult = lockingTaskExecutor.executeWithLock(() -> {
log.debug("Lock acquired. Running scheduled task id={} {}#{} cron {} in {}", toRegister.getId(), generator, method, cron, timezone);
var result = executeScheduledTask(toRegister.getId(), (schedule, users) -> {
try {
return methodToRun.invoke(generatorBean, schedule, users);
} catch (Exception e) {
log.error("Scheduled task id={} {}#{} execution fail: {}", toRegister.getId(), generator, method, e.getMessage(), e);
return null;
}
});
log.debug("Scheduled task id={} {}#{} completed successfully with result:{}", toRegister.getId(), generator, method, result);
return result;
}, lockConfig);
log.info("Scheduled task id={} {}#{} was executed:{}", toRegister.getId(), generator, method, lockResult.wasExecuted());
} catch (Throwable e) {
log.warn("Exception in scheduled task execution", e);
}
}, new CronTrigger(cron, timezone));
scheduledNotificationMap.put(toRegister, scheduledTask);
log.warn("Task id={} {}#{} added to scheduler with {} in {}", toRegister.getId(), generator, method, cron, timezone);
return true;
} catch (Throwable t) {
log.warn("Exception when registering NotificationSchedule", t);
return false;
}
}
private Object executeScheduledTask(Long notificationId, BiFunction<NotificationSchedule, List<SysUser>, Object> task) {
return TransactionHelper.executeInTransaction(true, () -> {
var notificationSchedule = repository.findById(notificationId).orElseThrow(() -> new RuntimeException("NotificationSchedule not found"));
var subscribers = notificationSchedule.getSubscribers();
if (subscribers.isEmpty()) {
log.info("Task has no subscribers, nothing to do.");
return null;
}
log.debug("Task has {} subscribers, will execute.", subscribers.size());
var users = subscribers.stream()
.map(NotificationScheduleSubscriber::getSid)
.map(Hibernate::unproxy) // Causes log message HH000179: Narrowing proxy to class org.gringlobal.model.SysUser - this operation breaks ==
.peek(sid -> log.debug("Notification {} subscriber: {}", notificationId, sid))
.filter(sid -> (sid instanceof SysUser || sid instanceof SysGroup) && ((AclSid) sid).isActive()) // Keep only active
.flatMap(sid -> { // Unpack groups
if (sid instanceof SysUser) {
return Stream.of((SysUser) sid);
} else {
return sysGroupService.listMembers((SysGroup) sid, Pageable.unpaged()).getContent().stream();
}
})
.filter(sid -> (sid instanceof SysUser) && ((AclSid) sid).isActive()) // Keep only active
.peek(sid -> log.info("Notification {} will be sent to: {}", notificationId, sid))
.collect(Collectors.toList());
return task.apply(notificationSchedule, users);
});
}
private void removeTask(NotificationSchedule notificationSchedule) {
var scheduledTask = scheduledNotificationMap.remove(notificationSchedule);
if (scheduledTask != null) {
log.debug("Removing scheduled task for id={} {}#{}", notificationSchedule.getId(), notificationSchedule.getGenerator(), notificationSchedule.getMethod());
scheduledTask.cancel(true);
} else {
log.debug("Scheduled task for id={} {}#{} not found and cannot be removed", notificationSchedule.getId(), notificationSchedule.getGenerator(), notificationSchedule.getMethod());
}
}
}