AttachmentMigrationController.java

/*
 * Copyright 2020 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gringlobal.mvc.admin;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import javax.annotation.Resource;
import javax.persistence.EntityManager;
import javax.servlet.http.HttpServletResponse;
import javax.validation.constraints.Size;

import org.apache.commons.lang3.StringUtils;
import org.genesys.filerepository.InvalidRepositoryFileDataException;
import org.genesys.filerepository.InvalidRepositoryPathException;
import org.genesys.filerepository.NoSuchRepositoryFileException;
import org.genesys.filerepository.model.RepositoryFile;
import org.genesys.filerepository.service.BytesStorageService;
import org.genesys.filerepository.service.RepositoryService;
import org.gringlobal.api.exception.InvalidApiUsageException;
import org.gringlobal.custom.elasticsearch.SearchException;
import org.gringlobal.model.CooperatorAttachment;
import org.gringlobal.service.CodeValueService;
import org.gringlobal.spring.TransactionHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.http.MediaType;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;

import com.querydsl.core.types.ExpressionUtils;
import com.querydsl.core.types.Order;
import com.querydsl.core.types.OrderSpecifier;
import com.querydsl.core.types.Predicate;
import com.querydsl.core.types.dsl.PathBuilder;
import com.querydsl.jpa.impl.JPAQueryFactory;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

@Controller
@RequestMapping("/admin/attach")
@PreAuthorize("hasAuthority('GROUP_ADMINS')")
@Slf4j
public class AttachmentMigrationController {

	private static final String EOL = "\n";

	/** All entities */
	@Resource
	@Lazy
	private Set<Class<?>> entityClassSet;

	/** Code values */
	@Autowired
	@Lazy
	private CodeValueService codeValueService;

	@Autowired
	@Lazy
	protected JPAQueryFactory jpaQueryFactory;

	@Autowired
	@Lazy
	private RepositoryService repositoryService;

	@Autowired
	@Lazy
	private BytesStorageService bytesStorageService;

	@Autowired
	@Lazy
	private EntityManager entityManager;

	@GetMapping
	public String startMigration(Model model) throws SearchException {
//		List<TranslatedCodeValue> categoryCodes = codeValueService.listFiltered(new CodeValueFilter().groupName(new StringFilter().eq(Set.of("CATEGORY_CODE"))), Pageable.ofSize(100)).getContent();
//		log.warn("Found CATEGORY_CODEs: {}", categoryCodes);
//		categoryCodes.forEach(code -> log.warn("CATEGORY_CODE: {} {}", code.title, code.description));
//		model.addAttribute("categoryCodes", categoryCodes);

		List<String> attachmentEntities = entityClassSet.stream().filter(entity -> CooperatorAttachment.class.isAssignableFrom(entity)).map(Class::getSimpleName).collect(Collectors.toList());
		log.warn("Found attachment entities: {}", attachmentEntities);
		attachmentEntities.sort(Comparator.naturalOrder());
		model.addAttribute("attachmentEntities", attachmentEntities);

		return "/admin/attach";
	}

	/**
	 * Migrate attachment bytes and log directly to the response stream
	 * @param entity
	 * @param category
	 * @param basePath
	 * @param maxErrors
	 * @throws IOException 
	 */
	@PostMapping(value = "/migrate", params = { "entity", "basePath", "maxErrors" }, produces = { MediaType.TEXT_PLAIN_VALUE })
	public void migrateAttachments(
		HttpServletResponse response, // write logs directly
		@RequestParam(required = true) @NonNull @Size(min = 1) String entity, // entity to migrate
		@RequestParam(required = true) @NonNull @Size(min = 1) String basePath, // base path to data
		@RequestParam(required = true) Optional<Integer> maxProcessed, // maximum number of rows to process (-1 for no limit)
		@RequestParam(required = true) Optional<Integer> maxErrors // maximum number of errors to tolerate (-1 for no limit)
	) throws IOException {
		response.setContentType(MediaType.TEXT_PLAIN_VALUE);

		if (StringUtils.isBlank(basePath)) {
			throw new InvalidApiUsageException("basePath must be provided");
		}
		if (StringUtils.isBlank(entity)) {
			throw new InvalidApiUsageException("entity must be provided");
		}
		try (BufferedWriter logger = new BufferedWriter(new OutputStreamWriter(response.getOutputStream(), StandardCharsets.UTF_8), 1024)) {

			logger.append("Migrating ").append(entity).append(" attachments from ").append(basePath).append(EOL);
			logger.flush();

			// Find entity
			Class<?> selectedEntity = entityClassSet.stream().filter(c -> CooperatorAttachment.class.isAssignableFrom(c) && Objects.equals(c.getSimpleName(), entity)).findFirst().orElse(null);
			if (selectedEntity == null) {
				logger.append("Could not find ").append(entity).append(EOL);
				logger.flush();
				throw new RuntimeException("No entity for " + entity);
			}

			logger.append("Migrating ").append(selectedEntity.getName()).append(EOL);

			AtomicInteger processedCounter = new AtomicInteger();
			AtomicInteger errorCounter = new AtomicInteger();
			try {
				migrateAttachments(logger, processedCounter, maxProcessed.orElse(10), maxErrors.orElse(0), errorCounter, selectedEntity, basePath);
				logger.write("We're done here");
			} catch (Throwable e) {
				logger.append("Error: ").append(e.getMessage());
				log.error("Fatal error: {}", e.getMessage(), e);
			}

			logger.flush();
		} finally {
			response.flushBuffer();
		}
	}

	private void migrateAttachments(BufferedWriter logger, AtomicInteger processedCounter, final int maxProcessed, final int maxErrors, AtomicInteger errorCounter, Class<?> entity, String basePath) throws IOException {
		final AtomicInteger offset = new AtomicInteger(0);
		final int limit = 100;

		while (TransactionHelper.executeInTransaction(false, () -> {
			logger.append("Loading ").append(entity.getName()).append(" rows ").append(Integer.toString(offset.get())).append(" - ").append(Integer.toString(offset.get() + limit -1)).append(EOL);
			var attachments = fetchAttachRecords(entity, offset.get(), limit);
			logger.append("Attachments found: ").append(Objects.toString(attachments.size())).append(EOL);
			if (attachments.size() == 0) {
				return false;
			}
			for (var attachment : attachments) {
				processedCounter.incrementAndGet();
				if (maxProcessed >= 0 && processedCounter.get() > maxProcessed) {
					logger.append("Maximum number of ").append(Integer.toString(maxProcessed)).append(" rows processed! Ending migration").append(EOL);
					return false;
				}
				logger.append("\nProcessing #").append(Integer.toString(processedCounter.get())).append(" ").append(attachment.getClass().getName()).append(EOL);
				CooperatorAttachment attach = (CooperatorAttachment) attachment;
				if (migrateAttachment(logger, errorCounter, attach, basePath)) {
					// That's fine, the query will exclude it because repositoryFile is not null
				} else {
					// This attachment was skipped, so we increment offset to skip a row
					offset.incrementAndGet();
				}
				logger.flush();
				if (maxErrors >= 0 && errorCounter.get() > maxErrors) {
					logger.append("Maximum number of ").append(Integer.toString(maxErrors)).append(" errors reached! Ending migration").append(EOL);
					return false;
				}
			}
			return true;
		})) {
			logger.append("\n\nStarting next batch\n--------------------").append(EOL);
			logger.flush();
		}
		log.warn("Done.");
	}

	private boolean migrateAttachment(BufferedWriter logger, AtomicInteger errorCounter, CooperatorAttachment attach, String basePath) throws IOException, NoSuchRepositoryFileException, InvalidRepositoryPathException {
		if (attach.getRepositoryFile() != null && attach.getRepositoryFile().getId() != null) {
			logger.append("Skipping id=").append(attach.getId().toString()).append(" already has repository_file").append(EOL);
			return false;
		}
		logger.append("Processing id=").append(attach.getId().toString()).append(" virtual_path=").append(attach.getVirtualPath()).append(EOL);

		try {
			URL url = new URL(attach.getVirtualPath());
			logger.append("Source virtual path is a URL ").append(url.toString()).append(". Skipping.").append(EOL);
			return false;
		} catch (MalformedURLException e) {
			// Fine
		}

		Path repositoryPath = getPathFor(attach.getVirtualPath());

		try {
			// Check if we have this file!
			RepositoryFile existingFile = repositoryService.getFile(repositoryPath.getParent(), repositoryPath.getFileName().toString());
			if (existingFile != null) {
				logger.append("Found an existing repository file at target path ").append(repositoryPath.toString()).append(EOL);
				if (! bytesStorageService.exists(existingFile.storagePath())) {
					logger.append("Bytes for the file do not exist at ").append(existingFile.storagePath().toString()).append(EOL);
					File sourceFile = getSourceFileForAttachment(logger, errorCounter, basePath, attach.getVirtualPath(), existingFile);
					if (sourceFile == null) {
						return false;
					}
					bytesStorageService.upsert(existingFile.storagePath(), sourceFile);
					logger.append("Updated file bytes ").append(attach.getVirtualPath()).append(" to ").append(existingFile.getStoragePath()).append(EOL);
				}
				attach.setRepositoryFile(existingFile);
				entityManager.merge(attach); // Save!
				logger.append("Reusing existing repository file for ").append(attach.getVirtualPath()).append(EOL);
				return true; // We're done with success!
			}
		} catch (NoSuchRepositoryFileException | InvalidRepositoryPathException e) {
			// OK, no file yet
			logger.append("Note: Checking for existing repository file resulted in ").append(e.getMessage()).append(EOL);
		}

		RepositoryFile metadata = new RepositoryFile();
		File sourceFile = getSourceFileForAttachment(logger, errorCounter, basePath, attach.getVirtualPath(), metadata);
		if (sourceFile == null) {
			return false;
		}

		try {
			RepositoryFile uploadedFile = repositoryService.addFile(repositoryPath.getParent(), repositoryPath.getFileName().toString(), attach.getContentType(), sourceFile, metadata);
			attach.setRepositoryFile(uploadedFile);
			entityManager.merge(attach); // Save!
			logger.append("Migrated ").append(attach.getVirtualPath()).append(" to ").append(uploadedFile.getStoragePath()).append(EOL);
			return true; // We're done with success!

		} catch (InvalidRepositoryPathException | InvalidRepositoryFileDataException | IOException e) {
			logger.append("Could not ingest file ").append(e.getMessage()).append(EOL);
			log.warn("Could not ingest file {}: {}", attach.getVirtualPath(), e.getMessage(), e);
			errorCounter.incrementAndGet();
			logger.append("This is error #").append(Objects.toString(errorCounter.get())).append(EOL);
			return false;
		} finally {
			deleteTemporaryFile(logger, sourceFile);
		}
	}

	private File getSourceFileForAttachment(BufferedWriter logger, AtomicInteger errorCounter, @NonNull String basePath, @NonNull String attachPath, @NonNull RepositoryFile metadata) throws IOException {
		File sourceFile = null;
		try {
			sourceFile = getOriginalBytes(logger, basePath, attachPath, metadata);
			if (sourceFile == null || sourceFile.length() == 0) {
				logger.append("No bytes not found for ").append(attachPath).append(". Nothing to do here.").append(EOL);
				errorCounter.incrementAndGet();
				logger.append("This is error #").append(Objects.toString(errorCounter.get())).append(EOL);

				deleteTemporaryFile(logger, sourceFile);
				return null;
			}
		} catch (Throwable e) {
			log.warn("Could not obtain source bytes: {}", e.getMessage(), e);
			logger.append("Could not obtain source bytes. ").append(e.getMessage()).append(EOL);
			errorCounter.incrementAndGet();
			logger.append("This is error #").append(Objects.toString(errorCounter.get())).append(EOL);

			deleteTemporaryFile(logger, sourceFile);
		}
		return sourceFile;
	}

	private void deleteTemporaryFile(BufferedWriter logger, File sourceFile) {
		if (sourceFile instanceof TemporaryFile) {
			// Delete temporary file
			if (sourceFile.exists()) {
				try {
					logger.append("Deleting temporary file ").append(sourceFile.getAbsolutePath()).append(EOL);
				} catch (IOException ignored) {
				}
				log.warn("Deleting temporary file {}", sourceFile.getAbsolutePath());
				if (!sourceFile.delete()) {
					log.warn("Temporary file {} was not deleted", sourceFile.getAbsolutePath());
				}
			}
		}
	}

	/**
	 * Get local file with bytes for virtual path to a temporary file.
	 * Will download using HTTP if necessary.
	 * 
	 * @param virtualPath attachment virtual path
	 * @param metadata A place for repository file metadata
	 * @return temporary file with bytes
	 * @throws IOException 
	 */
	private File getOriginalBytes(BufferedWriter logger, @NonNull String basePath, @NonNull String virtualPath, @NonNull RepositoryFile metadata) throws IOException {
		try {
			URL url = new URL(virtualPath);
			logger.append("Source virtual path is a URL ").append(url.toString()).append(". Not migrating!").append(EOL);
			return null;
		} catch (MalformedURLException e) {
			// Fine
		}
		Path cleanSourcePath = getPathFor(virtualPath);
		logger.append("Cleaned '").append(virtualPath).append("' -> '").append(cleanSourcePath.toString()).append("'").append(EOL);
		String fullSourcePath = basePath.concat(cleanSourcePath.toString());
		logger.append("Full source path is ").append(fullSourcePath).append(EOL);

		try {
			URL url = new URL(fullSourcePath);
			URI uri = new URI(url.getProtocol(), url.getUserInfo(), url.getHost(), url.getPort(), url.getPath(), url.getQuery(), url.getRef());
			url = uri.toURL();
			logger.append("Full source path is a URL ").append(url.toString()).append(". Will download.").append(EOL);
			return download(logger, url, metadata);
		} catch (MalformedURLException | URISyntaxException e) {
			logger.append("Using local file ").append(fullSourcePath).append(".").append(EOL);
			log.warn("Full source path {} is not a URL", fullSourcePath);
			File sourceFile = Paths.get(fullSourcePath).toFile();
			if (sourceFile.exists()) {
				return sourceFile;
			} else {
				logger.append("Local file ").append(sourceFile.toString()).append(" does not exist.").append(EOL);
				return null;
			}
		}
	}

	/**
	 * Download to temporary file, read into memory, delete temporary file.
	 */
	private TemporaryFile download(BufferedWriter logger, URL url, @NonNull RepositoryFile metadata) throws IOException {

		TemporaryFile tempFile = new TemporaryFile(File.createTempFile("attach-", ".bin"));
		try (ReadableByteChannel readableByteChannel = Channels.newChannel(url.openStream())) {
			try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
				FileChannel fileChannel = fileOutputStream.getChannel();
				long bytesTransferred = fileChannel.transferFrom(readableByteChannel, 0, Long.MAX_VALUE);
				if (bytesTransferred == 0) {
					logger.append("Zero bytes were downloaded.").append(EOL);
				}
			}

			if (tempFile.exists()) {
				metadata.setOriginalUrl(url.toString());
				metadata.setDateRetrieved(Instant.now());

				return tempFile; // This must be deleted
			} else {
				return null;
			}

		} catch (Throwable e) {
			log.warn("Error downloading {}: {}", url, e.getMessage(), e);
			logger.append("Error downloading ").append(url.toString()).append(": ").append(e.getMessage()).append(EOL);

			deleteTemporaryFile(logger, tempFile);
			return null;
		}
	}

	/**
	 * Convert virtualPath to clean repository path
	 * @param virtualPath source path string
	 * @return the folder path where the repository file will be stored
	 */
	private Path getPathFor(String virtualPath) {
		return Paths.get("/", virtualPath.replaceAll("\\\\", "/").replaceAll("\\/\\/", "/")).normalize().toAbsolutePath();
	}

	private List<?> fetchAttachRecords(Class<?> entityClass, int offset, int limit) {
		var root = new PathBuilder<>(entityClass, "t");
		PathBuilder<Long> pkField = root.get("id", Long.class);

		var q = jpaQueryFactory.selectFrom(root);

		Collection<Predicate> filters = new ArrayList<>();
		var virtualPath = root.get("virtualPath");
		filters.add(virtualPath.isNotNull());
		var repositoryFile = root.get("repositoryFile");
		filters.add(repositoryFile.isNull());

		q.where(ExpressionUtils.allOf(filters));

		// always order by id
		q.orderBy(new OrderSpecifier<>(Order.ASC, pkField));

		// Apply limits
		q.limit(limit);
		q.offset(offset);

		// Get entities
		var rows = q.fetch();
		return rows;
	}

	/**
	 * Utility type so we know what can be removed.
	 */
	private static class TemporaryFile extends File {
		private static final long serialVersionUID = -8725589313022779411L;
		public TemporaryFile(File sourceFile) {
			super(sourceFile.getPath());
		}
	}
}