ScheduledKPIExecutor.java

/*
 * Copyright 2021 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gringlobal.worker;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import org.apache.commons.lang3.time.StopWatch;
import org.gringlobal.model.kpi.Execution;
import org.gringlobal.persistence.kpi.ExecutionRepository;
import org.gringlobal.service.KPIService;
import org.gringlobal.service.UserService;
import org.gringlobal.spring.TransactionHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;


/**
 * Component to periodically run all KPI Executions
 */
@Component
@Slf4j
public class ScheduledKPIExecutor {

	/** The kpi service. */
	@Autowired
	private KPIService kpiService;

	@Autowired
	private UserService userService;

	@Autowired
	private ExecutionRepository executionRepository;

	/**
	 * Run executions.
	 *
	 * @throws Exception the exception
	 */
	@Scheduled(cron = "0 0 3 * * *") // nightly at 3:00:am
//	@Scheduled(cron = "0 0/1 * * * *") // Test every 1min
	@SchedulerLock(name = "org.gringlobal.service.worker.ScheduledKPIExecutor")
	public void runExecutions() throws Exception {
		log.info("Started scheduled executions run");
		StopWatch stopWatch = StopWatch.createStarted();

		asAdmin(() -> {
			Pageable pageable = PageRequest.of(0, 50);
			Page<Execution> executions;
			do {
				executions = kpiService.listExecutions(pageable);

				for (Execution execution : executions.getContent()) {
					if (!Objects.equals("Y", execution.getIsActive())) {
						log.info("Skipping deactivated execution {}.", execution.getName());
						continue;
					}
					try {
						log.info("Started execution {} after {}ms", execution.getName(), stopWatch.getTime());
						kpiService.executeAndSave(execution);
						log.info("Execution {} successful after {}ms", execution.getName(), stopWatch.getTime());
					} catch (Throwable e) {
						log.error("Error running KPI Execution {}: {}", execution.getName(), e.getMessage(), e);
					}
				}

				// Load next page
				pageable = pageable.next();
			} while (executions.hasNext());

			if (executions.getTotalElements() > 0) {
				log.info("Run of {} executions ended successfully after {}ms.", executions.getTotalElements(), stopWatch.getTime());
			}
			return true;
		});
	}

	@Transactional
	@Scheduled(fixedDelayString = "PT2H", initialDelayString = "PT5M")
	@SchedulerLock(name = "org.gringlobal.service.worker.ScheduledKPIExecutor")
	public void removeOldRuns() throws Exception {
		asAdmin(() -> {
			log.warn("Scheduled removal of old runs across all executions");
			executionRepository.findAll().forEach(execution -> {
				log.warn("Scheduled removal of old runs of execution {} id={}", execution.getTitle(), execution.getId());
				kpiService.purgeExecutionRuns(execution);
			});
			return true;
		});
	}

	private <T> void asAdmin(Callable<T> callable) throws Exception {
		UserDetails administrator = userService.loadUserByUsername("administrator");
		List<GrantedAuthority> authorities = Lists.newArrayList(new SimpleGrantedAuthority("ROLE_ADMINISTRATOR"));
		authorities.addAll(administrator.getAuthorities());
		Authentication authentication = new UsernamePasswordAuthenticationToken(administrator, null, authorities);
		TransactionHelper.asUser(authentication, callable);
	}

}