NotificationScheduleServiceImpl.java

/*
 * Copyright 2024 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.gringlobal.service.impl;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.core.ClockProvider;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.core.LockingTaskExecutor;

import org.genesys.blocks.security.model.AclSid;
import org.genesys.blocks.security.persistence.AclSidPersistence;
import org.genesys.blocks.util.CurrentApplicationContext;
import org.gringlobal.model.SysGroup;
import org.gringlobal.model.SysUser;
import org.gringlobal.model.notification.NotificationSchedule;
import org.gringlobal.model.notification.NotificationScheduleSubscriber;
import org.gringlobal.model.notification.QNotificationSchedule;
import org.gringlobal.model.notification.QNotificationScheduleSubscriber;
import org.gringlobal.notification.AccessionActionNotifications;
import org.gringlobal.notification.InventoryActionNotifications;
import org.gringlobal.notification.KPINotifications;
import org.gringlobal.notification.OrderRequestNotifications;
import org.gringlobal.notification.action.NotificationMessageEmailSender;
import org.gringlobal.notification.schedule.ScheduledNotification;
import org.gringlobal.persistence.NotificationScheduleSubscriberRepository;
import org.gringlobal.persistence.notification.NotificationScheduleRepository;
import org.gringlobal.service.NotificationScheduleService;
import org.gringlobal.service.SysGroupService;
import org.gringlobal.service.filter.NotificationScheduleFilter;
import org.gringlobal.spring.TransactionHelper;
import org.hibernate.Hibernate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.MessageSource;
import org.springframework.context.i18n.LocaleContextHolder;
import org.springframework.data.domain.Pageable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ReflectionUtils;
import org.springframework.validation.annotation.Validated;

import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import java.lang.reflect.Method;
import java.time.Duration;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.gringlobal.application.config.HazelcastConfig.DEF_LOCK_AT_MOST_FOR;

/**
 * The NotificationScheduleServiceImpl.
 */
@Service
@EnableScheduling
@Validated
@Slf4j
public class NotificationScheduleServiceImpl extends FilteredCRUDService2Impl<NotificationSchedule, NotificationScheduleFilter, NotificationScheduleRepository>
	implements NotificationScheduleService {

	private static final Duration LOCK_AT_MOST_FOR_DURATION = Duration.parse(DEF_LOCK_AT_MOST_FOR);
	private static final Duration LOCK_AT_LEAST_FOR_DURATION = Duration.parse("PT30S");

	@Autowired
	private TaskScheduler taskScheduler;

	@Autowired
	private LockingTaskExecutor lockingTaskExecutor;

	@Autowired
	private MessageSource messageSource;

	@Autowired
	private SysGroupService sysGroupService;

	@Autowired
	private AclSidPersistence aclSidPersistence;

	@Autowired
	private NotificationScheduleSubscriberRepository subscriberRepository;

	@PersistenceContext
	protected EntityManager entityManager;

	private final Map<NotificationSchedule, ScheduledFuture<?>> scheduledNotificationMap = new HashMap<>();

	private final List<NotificationSchedule> notificationGenerators = new ArrayList<>();

	@Override
	public void afterPropertiesSet() throws Exception {
		// Create generators list
		// TODO scan context instead!
		var generators = List.of(
			KPINotifications.class, AccessionActionNotifications.class,
			InventoryActionNotifications.class, OrderRequestNotifications.class,
			NotificationMessageEmailSender.class
		);
		for (Class<?> generatorClass : generators) {
			var beanMethods = generatorClass.getMethods();
			for (Method beanMethod : beanMethods) {
				var notificationAnnotation = beanMethod.getAnnotation(ScheduledNotification.class);
				if (notificationAnnotation != null) {
					var name = generatorClass.getSimpleName() + "." + beanMethod.getName();
					var title = messageSource.getMessage("notification." + name + ".title", null, null, LocaleContextHolder.getLocale());
					var description = messageSource.getMessage("notification." + name + ".description", null, null, LocaleContextHolder.getLocale());
					var notification = new NotificationSchedule();
					notification.setMethod(beanMethod.getName());
					notification.setGenerator(generatorClass.getName());
					notification.setTitle(title);
					notification.setDescription(description);
					notification.setCron(null);
					notification.setIsActive("Y");
					notificationGenerators.add(notification);
				}
			}
		}
	}

	@Scheduled(fixedDelay = 2, timeUnit = TimeUnit.MINUTES)
	@Transactional
	public void checkUpdates() {
		var requiredNotifications = Lists.newArrayList(repository.findAll(
			QNotificationSchedule.notificationSchedule.isActive.eq("Y").and(QNotificationSchedule.notificationSchedule.cron.isNotNull())
		));

		var runningNotifications = scheduledNotificationMap.keySet();

		var toRemove = runningNotifications.stream()
			.filter(existed -> !requiredNotifications.contains(existed))
			.collect(Collectors.toList());

		// It removes also from ScheduledNotificationMap
		for (NotificationSchedule notificationSchedule : toRemove) {
			removeTask(notificationSchedule);
		}

		Set<NotificationSchedule> toUpdate = new HashSet<>();

		// Searching for updated in other clusters
		for (NotificationSchedule required : requiredNotifications) {
			for (NotificationSchedule running : runningNotifications) {
				if (required.equals(running)) {
					if (required.getModifiedDate().truncatedTo(ChronoUnit.MILLIS).isAfter(running.getModifiedDate().truncatedTo(ChronoUnit.MILLIS))) {
						toUpdate.add(required);
					}
					break;
				}
			}
		}

		// Add all new missing tasks
		requiredNotifications.stream()
			.filter(required -> !runningNotifications.contains(required))
			.forEach(toUpdate::add);

		// Restart the updated tasks and add new ones.
		toUpdate.forEach(notificationSchedule -> {
			if (!registerTask(notificationSchedule)) {
				notificationSchedule.setIsActive("N");
				repository.saveAndFlush(notificationSchedule);
			}
		});
	}

	@Override
	public List<NotificationSchedule> listNotificationGenerators() {
		return notificationGenerators;
	}

	@Override
	@Transactional
	@PreAuthorize("hasAuthority('GROUP_ADMINS')")
	public NotificationSchedule create(NotificationSchedule source) {
		assert (source.getId() == null);
		assert (source.getGenerator() != null);
		assert (source.getMethod() != null);

		return _lazyLoad(createFast(source));
	}

	@Override
	@Transactional
	@PreAuthorize("hasAuthority('GROUP_ADMINS')")
	public NotificationSchedule createFast(NotificationSchedule source) {
		assert (source.getId() == null);
		assert (source.getGenerator() != null);
		assert (source.getMethod() != null);

		var target = new NotificationSchedule();
		target.apply(source);

		var saved = repository.save(target);

		if (saved.getIsActive().equals("Y")) {
			if (!registerTask(saved)) {
				var toDeactivate = repository.getReferenceById(saved.getId());
				toDeactivate.setIsActive("N");
				saved = repository.saveAndFlush(toDeactivate);
			}
		}

		return saved;
	}

	@Override
	@Transactional
	@PreAuthorize("hasAuthority('GROUP_ADMINS')")
	public NotificationSchedule update(NotificationSchedule updated, NotificationSchedule target) {
		return _lazyLoad(updateFast(updated, target));
	}

	@Override
	@Transactional
	@PreAuthorize("hasAuthority('GROUP_ADMINS')")
	public NotificationSchedule updateFast(NotificationSchedule updated, NotificationSchedule target) {

		removeTask(target);
		target.apply(updated);

		var saved = repository.saveAndFlush(target);
		if (saved.getIsActive().equals("Y")) {
			if (!registerTask(saved)) {
				var toDeactivate = repository.getReferenceById(saved.getId());
				toDeactivate.setIsActive("N");
				saved = repository.saveAndFlush(toDeactivate);
			}
		}
		return saved;
	}

	@Override
	@Transactional
	@PreAuthorize("hasAuthority('GROUP_ADMINS')")
	public NotificationSchedule remove(NotificationSchedule entity) {
		var removed = super.remove(entity);
		removeTask(removed);
		return removed;
	}

	@Override
	@Transactional
	@PreAuthorize("hasAuthority('GROUP_ADMINS')")
	public NotificationSchedule subscribe(NotificationSchedule entity, List<String> sids) {
		entity = get(entity.getId());

		var existingSubscribers = entity.getSubscribers().stream().map(subscriber -> subscriber.getSid().getSid()).collect(Collectors.toSet());
		// Remove existing
		var sidIds = sids.stream().filter(sid -> !existingSubscribers.contains(sid)).map(aclSidPersistence::getSidId).collect(Collectors.toSet());

		Set<AclSid> aclSids = aclSidPersistence.listById(sidIds).stream()
			.map(Hibernate::unproxy)
			.map(sid -> (AclSid) sid)
			.filter(sid -> (sid instanceof SysUser || sid instanceof SysGroup) && sid.isActive()) // Keep only active
			.collect(Collectors.toSet());
		if (aclSids.isEmpty()) {
			return entity;
		}

		List<NotificationScheduleSubscriber> subscribers = new ArrayList<>();
		for (AclSid aclSid : aclSids) {
			var subscriber = new NotificationScheduleSubscriber();
			subscriber.setNotificationSchedule(entity);
			subscriber.setSid(aclSid);
			subscribers.add(subscriber);
		}
		subscriberRepository.saveAll(subscribers);
		entityManager.detach(entity);
		return get(entity.getId());
	}

	@Override
	@Transactional
	@PreAuthorize("hasAuthority('GROUP_ADMINS')")
	public NotificationSchedule unsubscribe(NotificationSchedule entity, List<String> sids) {
		entity = get(entity.getId());

		var sidIds = sids.stream().map(aclSidPersistence::getSidId).collect(Collectors.toSet());
		
		var qSubscriber = QNotificationScheduleSubscriber.notificationScheduleSubscriber;
		var toRemove = subscriberRepository.findAll(qSubscriber.notificationSchedule().id.eq(entity.getId()).and(qSubscriber.sid().id.in(sidIds)), Pageable.unpaged()).getContent();

		subscriberRepository.deleteAll(toRemove);
		subscriberRepository.flush();
		
		return reload(entity);
	}

	private boolean registerTask(NotificationSchedule notificationSchedule) {
		assert notificationSchedule != null && notificationSchedule.getId() != null;
		try {
			var toRegister = repository.getReferenceById(notificationSchedule.getId());
			entityManager.detach(toRegister);
			removeTask(toRegister);
			var generator = toRegister.getGenerator();
			var method = toRegister.getMethod();
			var cron = toRegister.getCron();
			Class<?> beanClass;
			try {
				beanClass = Class.forName(generator);
			} catch (ClassNotFoundException e) {
				log.warn("Notification schedule generator {} not found", generator, e);
				return false;
			}
			var generatorBean = CurrentApplicationContext.getContext().getBean(beanClass);
			var scheduledMethod = ReflectionUtils.findMethod(beanClass, method, NotificationSchedule.class, List.class); // Signature!

			if (scheduledMethod == null) {
				log.warn("Notification schedule generator method {}#{} not found", generator, method);
				return false;
			}

			Method methodToRun = scheduledMethod;

			var notificationLockKey = generator + "." + method + "." + toRegister.getId();
			var timezone = toRegister.getTimezone() != null ? ZoneId.of(toRegister.getTimezone()) : ZoneId.of("UTC");

			var scheduledTask = taskScheduler.schedule(() -> {
				log.info("Attempting to run scheduled task id={} {}#{} cron {} in {}", toRegister.getId(), generator, method, cron, timezone);
				try {
					// execute with LockingTaskExecutor
					// lockAtLeastFor should be greater than @Scheduled checkUpdates()
					var lockConfig = new LockConfiguration(ClockProvider.now(), notificationLockKey, LOCK_AT_MOST_FOR_DURATION, LOCK_AT_LEAST_FOR_DURATION);
					var lockResult = lockingTaskExecutor.executeWithLock(() -> {
						log.debug("Lock acquired. Running scheduled task id={} {}#{} cron {} in {}", toRegister.getId(), generator, method, cron, timezone);
						var result = executeScheduledTask(toRegister.getId(), (schedule, users) -> {
							try {
								return methodToRun.invoke(generatorBean, schedule, users);
							} catch (Exception e) {
								log.error("Scheduled task id={} {}#{} execution fail: {}", toRegister.getId(), generator, method, e.getMessage(), e);
								return null;
							}
						});
						log.debug("Scheduled task id={} {}#{} completed successfully with result:{}", toRegister.getId(), generator, method, result);
						return result;
					}, lockConfig);
					log.info("Scheduled task id={} {}#{} was executed:{}", toRegister.getId(), generator, method, lockResult.wasExecuted());
				} catch (Throwable e) {
					log.warn("Exception in scheduled task execution", e);
				}
			}, new CronTrigger(cron, timezone));

			scheduledNotificationMap.put(toRegister, scheduledTask);
			log.warn("Task id={} {}#{} added to scheduler with {} in {}", toRegister.getId(), generator, method, cron, timezone);

			return true;
		} catch (Throwable t) {
			log.warn("Exception when registering NotificationSchedule", t);
			return false;
		}
	}

	private Object executeScheduledTask(Long notificationId, BiFunction<NotificationSchedule, List<SysUser>, Object> task) {
		return TransactionHelper.executeInTransaction(true, () -> {
			var notificationSchedule = repository.findById(notificationId).orElseThrow(() -> new RuntimeException("NotificationSchedule not found"));
			var subscribers = notificationSchedule.getSubscribers();

			if (subscribers.isEmpty()) {
				log.info("Task has no subscribers, nothing to do.");
				return null;
			}
			log.debug("Task has {} subscribers, will execute.", subscribers.size());

			var users = subscribers.stream()
				.map(NotificationScheduleSubscriber::getSid)
				.map(Hibernate::unproxy) // Causes log message HH000179: Narrowing proxy to class org.gringlobal.model.SysUser - this operation breaks ==
				.peek(sid -> log.debug("Notification {} subscriber: {}", notificationId, sid))
				.filter(sid -> (sid instanceof SysUser || sid instanceof SysGroup) && ((AclSid) sid).isActive()) // Keep only active
				.flatMap(sid -> { // Unpack groups
					if (sid instanceof SysUser) {
						return Stream.of((SysUser) sid);
					} else {
						return sysGroupService.listMembers((SysGroup) sid, Pageable.unpaged()).getContent().stream();
					}
				})
				.filter(sid -> (sid instanceof SysUser) && ((AclSid) sid).isActive()) // Keep only active
				.peek(sid -> log.info("Notification {} will be sent to: {}", notificationId, sid))
				.collect(Collectors.toList());

			return task.apply(notificationSchedule, users);
		});
	}

	private void removeTask(NotificationSchedule notificationSchedule) {
		var scheduledTask = scheduledNotificationMap.remove(notificationSchedule);
		if (scheduledTask != null) {
			log.debug("Removing scheduled task for id={} {}#{}", notificationSchedule.getId(), notificationSchedule.getGenerator(), notificationSchedule.getMethod());
			scheduledTask.cancel(true);
		} else {
			log.debug("Scheduled task for id={} {}#{} not found and cannot be removed", notificationSchedule.getId(), notificationSchedule.getGenerator(), notificationSchedule.getMethod());
		}
	}
}