AttachmentMigrationController.java
/*
* Copyright 2020 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gringlobal.mvc.admin;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import javax.persistence.EntityManager;
import javax.servlet.http.HttpServletResponse;
import javax.validation.constraints.Size;
import org.apache.commons.lang3.StringUtils;
import org.genesys.filerepository.InvalidRepositoryFileDataException;
import org.genesys.filerepository.InvalidRepositoryPathException;
import org.genesys.filerepository.NoSuchRepositoryFileException;
import org.genesys.filerepository.model.RepositoryFile;
import org.genesys.filerepository.service.BytesStorageService;
import org.genesys.filerepository.service.RepositoryService;
import org.gringlobal.api.exception.InvalidApiUsageException;
import org.gringlobal.custom.elasticsearch.SearchException;
import org.gringlobal.model.CooperatorAttachment;
import org.gringlobal.service.CodeValueService;
import org.gringlobal.spring.TransactionHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.http.MediaType;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import com.querydsl.core.types.ExpressionUtils;
import com.querydsl.core.types.Order;
import com.querydsl.core.types.OrderSpecifier;
import com.querydsl.core.types.Predicate;
import com.querydsl.core.types.dsl.PathBuilder;
import com.querydsl.jpa.impl.JPAQueryFactory;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@Controller
@RequestMapping("/admin/attach")
@PreAuthorize("hasAuthority('GROUP_ADMINS')")
@Slf4j
public class AttachmentMigrationController {
private static final String EOL = "\n";
/** All entities */
@Resource
@Lazy
private Set<Class<?>> entityClassSet;
/** Code values */
@Autowired
@Lazy
private CodeValueService codeValueService;
@Autowired
@Lazy
protected JPAQueryFactory jpaQueryFactory;
@Autowired
@Lazy
private RepositoryService repositoryService;
@Autowired
@Lazy
private BytesStorageService bytesStorageService;
@Autowired
@Lazy
private EntityManager entityManager;
@GetMapping
public String startMigration(Model model) throws SearchException {
// List<TranslatedCodeValue> categoryCodes = codeValueService.listFiltered(new CodeValueFilter().groupName(new StringFilter().eq(Set.of("CATEGORY_CODE"))), Pageable.ofSize(100)).getContent();
// log.warn("Found CATEGORY_CODEs: {}", categoryCodes);
// categoryCodes.forEach(code -> log.warn("CATEGORY_CODE: {} {}", code.title, code.description));
// model.addAttribute("categoryCodes", categoryCodes);
List<String> attachmentEntities = entityClassSet.stream().filter(entity -> CooperatorAttachment.class.isAssignableFrom(entity)).map(Class::getSimpleName).collect(Collectors.toList());
log.warn("Found attachment entities: {}", attachmentEntities);
attachmentEntities.sort(Comparator.naturalOrder());
model.addAttribute("attachmentEntities", attachmentEntities);
return "/admin/attach";
}
/**
* Migrate attachment bytes and log directly to the response stream
* @param entity
* @param category
* @param basePath
* @param maxErrors
* @throws IOException
*/
@PostMapping(value = "/migrate", params = { "entity", "basePath", "maxErrors" }, produces = { MediaType.TEXT_PLAIN_VALUE })
public void migrateAttachments(
HttpServletResponse response, // write logs directly
@RequestParam(required = true) @NonNull @Size(min = 1) String entity, // entity to migrate
@RequestParam(required = true) @NonNull @Size(min = 1) String basePath, // base path to data
@RequestParam(required = true) Optional<Integer> maxProcessed, // maximum number of rows to process (-1 for no limit)
@RequestParam(required = true) Optional<Integer> maxErrors // maximum number of errors to tolerate (-1 for no limit)
) throws IOException {
response.setContentType(MediaType.TEXT_PLAIN_VALUE);
if (StringUtils.isBlank(basePath)) {
throw new InvalidApiUsageException("basePath must be provided");
}
if (StringUtils.isBlank(entity)) {
throw new InvalidApiUsageException("entity must be provided");
}
try (BufferedWriter logger = new BufferedWriter(new OutputStreamWriter(response.getOutputStream(), StandardCharsets.UTF_8), 1024)) {
logger.append("Migrating ").append(entity).append(" attachments from ").append(basePath).append(EOL);
logger.flush();
// Find entity
Class<?> selectedEntity = entityClassSet.stream().filter(c -> CooperatorAttachment.class.isAssignableFrom(c) && Objects.equals(c.getSimpleName(), entity)).findFirst().orElse(null);
if (selectedEntity == null) {
logger.append("Could not find ").append(entity).append(EOL);
logger.flush();
throw new RuntimeException("No entity for " + entity);
}
logger.append("Migrating ").append(selectedEntity.getName()).append(EOL);
AtomicInteger processedCounter = new AtomicInteger();
AtomicInteger errorCounter = new AtomicInteger();
try {
migrateAttachments(logger, processedCounter, maxProcessed.orElse(10), maxErrors.orElse(0), errorCounter, selectedEntity, basePath);
logger.write("We're done here");
} catch (Throwable e) {
logger.append("Error: ").append(e.getMessage());
log.error("Fatal error: {}", e.getMessage(), e);
}
logger.flush();
} finally {
response.flushBuffer();
}
}
private void migrateAttachments(BufferedWriter logger, AtomicInteger processedCounter, final int maxProcessed, final int maxErrors, AtomicInteger errorCounter, Class<?> entity, String basePath) throws IOException {
final AtomicInteger offset = new AtomicInteger(0);
final int limit = 100;
while (TransactionHelper.executeInTransaction(false, () -> {
logger.append("Loading ").append(entity.getName()).append(" rows ").append(Integer.toString(offset.get())).append(" - ").append(Integer.toString(offset.get() + limit -1)).append(EOL);
var attachments = fetchAttachRecords(entity, offset.get(), limit);
logger.append("Attachments found: ").append(Objects.toString(attachments.size())).append(EOL);
if (attachments.size() == 0) {
return false;
}
for (var attachment : attachments) {
processedCounter.incrementAndGet();
if (maxProcessed >= 0 && processedCounter.get() > maxProcessed) {
logger.append("Maximum number of ").append(Integer.toString(maxProcessed)).append(" rows processed! Ending migration").append(EOL);
return false;
}
logger.append("\nProcessing #").append(Integer.toString(processedCounter.get())).append(" ").append(attachment.getClass().getName()).append(EOL);
CooperatorAttachment attach = (CooperatorAttachment) attachment;
if (migrateAttachment(logger, errorCounter, attach, basePath)) {
// That's fine, the query will exclude it because repositoryFile is not null
} else {
// This attachment was skipped, so we increment offset to skip a row
offset.incrementAndGet();
}
logger.flush();
if (maxErrors >= 0 && errorCounter.get() > maxErrors) {
logger.append("Maximum number of ").append(Integer.toString(maxErrors)).append(" errors reached! Ending migration").append(EOL);
return false;
}
}
return true;
})) {
logger.append("\n\nStarting next batch\n--------------------").append(EOL);
logger.flush();
}
log.warn("Done.");
}
private boolean migrateAttachment(BufferedWriter logger, AtomicInteger errorCounter, CooperatorAttachment attach, String basePath) throws IOException, NoSuchRepositoryFileException, InvalidRepositoryPathException {
if (attach.getRepositoryFile() != null && attach.getRepositoryFile().getId() != null) {
logger.append("Skipping id=").append(attach.getId().toString()).append(" already has repository_file").append(EOL);
return false;
}
logger.append("Processing id=").append(attach.getId().toString()).append(" virtual_path=").append(attach.getVirtualPath()).append(EOL);
try {
URL url = new URL(attach.getVirtualPath());
logger.append("Source virtual path is a URL ").append(url.toString()).append(". Skipping.").append(EOL);
return false;
} catch (MalformedURLException e) {
// Fine
}
Path repositoryPath = getPathFor(attach.getVirtualPath());
try {
// Check if we have this file!
RepositoryFile existingFile = repositoryService.getFile(repositoryPath.getParent(), repositoryPath.getFileName().toString());
if (existingFile != null) {
logger.append("Found an existing repository file at target path ").append(repositoryPath.toString()).append(EOL);
if (! bytesStorageService.exists(existingFile.storagePath())) {
logger.append("Bytes for the file do not exist at ").append(existingFile.storagePath().toString()).append(EOL);
File sourceFile = getSourceFileForAttachment(logger, errorCounter, basePath, attach.getVirtualPath(), existingFile);
if (sourceFile == null) {
return false;
}
bytesStorageService.upsert(existingFile.storagePath(), sourceFile);
logger.append("Updated file bytes ").append(attach.getVirtualPath()).append(" to ").append(existingFile.getStoragePath()).append(EOL);
}
attach.setRepositoryFile(existingFile);
entityManager.merge(attach); // Save!
logger.append("Reusing existing repository file for ").append(attach.getVirtualPath()).append(EOL);
return true; // We're done with success!
}
} catch (NoSuchRepositoryFileException | InvalidRepositoryPathException e) {
// OK, no file yet
logger.append("Note: Checking for existing repository file resulted in ").append(e.getMessage()).append(EOL);
}
RepositoryFile metadata = new RepositoryFile();
File sourceFile = getSourceFileForAttachment(logger, errorCounter, basePath, attach.getVirtualPath(), metadata);
if (sourceFile == null) {
return false;
}
try {
RepositoryFile uploadedFile = repositoryService.addFile(repositoryPath.getParent(), repositoryPath.getFileName().toString(), attach.getContentType(), sourceFile, metadata);
attach.setRepositoryFile(uploadedFile);
entityManager.merge(attach); // Save!
logger.append("Migrated ").append(attach.getVirtualPath()).append(" to ").append(uploadedFile.getStoragePath()).append(EOL);
return true; // We're done with success!
} catch (InvalidRepositoryPathException | InvalidRepositoryFileDataException | IOException e) {
logger.append("Could not ingest file ").append(e.getMessage()).append(EOL);
log.warn("Could not ingest file {}: {}", attach.getVirtualPath(), e.getMessage(), e);
errorCounter.incrementAndGet();
logger.append("This is error #").append(Objects.toString(errorCounter.get())).append(EOL);
return false;
} finally {
deleteTemporaryFile(logger, sourceFile);
}
}
private File getSourceFileForAttachment(BufferedWriter logger, AtomicInteger errorCounter, @NonNull String basePath, @NonNull String attachPath, @NonNull RepositoryFile metadata) throws IOException {
File sourceFile = null;
try {
sourceFile = getOriginalBytes(logger, basePath, attachPath, metadata);
if (sourceFile == null || sourceFile.length() == 0) {
logger.append("No bytes not found for ").append(attachPath).append(". Nothing to do here.").append(EOL);
errorCounter.incrementAndGet();
logger.append("This is error #").append(Objects.toString(errorCounter.get())).append(EOL);
deleteTemporaryFile(logger, sourceFile);
return null;
}
} catch (Throwable e) {
log.warn("Could not obtain source bytes: {}", e.getMessage(), e);
logger.append("Could not obtain source bytes. ").append(e.getMessage()).append(EOL);
errorCounter.incrementAndGet();
logger.append("This is error #").append(Objects.toString(errorCounter.get())).append(EOL);
deleteTemporaryFile(logger, sourceFile);
}
return sourceFile;
}
private void deleteTemporaryFile(BufferedWriter logger, File sourceFile) {
if (sourceFile instanceof TemporaryFile) {
// Delete temporary file
if (sourceFile.exists()) {
try {
logger.append("Deleting temporary file ").append(sourceFile.getAbsolutePath()).append(EOL);
} catch (IOException ignored) {
}
log.warn("Deleting temporary file {}", sourceFile.getAbsolutePath());
if (!sourceFile.delete()) {
log.warn("Temporary file {} was not deleted", sourceFile.getAbsolutePath());
}
}
}
}
/**
* Get local file with bytes for virtual path to a temporary file.
* Will download using HTTP if necessary.
*
* @param virtualPath attachment virtual path
* @param metadata A place for repository file metadata
* @return temporary file with bytes
* @throws IOException
*/
private File getOriginalBytes(BufferedWriter logger, @NonNull String basePath, @NonNull String virtualPath, @NonNull RepositoryFile metadata) throws IOException {
try {
URL url = new URL(virtualPath);
logger.append("Source virtual path is a URL ").append(url.toString()).append(". Not migrating!").append(EOL);
return null;
} catch (MalformedURLException e) {
// Fine
}
Path cleanSourcePath = getPathFor(virtualPath);
logger.append("Cleaned '").append(virtualPath).append("' -> '").append(cleanSourcePath.toString()).append("'").append(EOL);
String fullSourcePath = basePath.concat(cleanSourcePath.toString());
logger.append("Full source path is ").append(fullSourcePath).append(EOL);
try {
URL url = new URL(fullSourcePath);
URI uri = new URI(url.getProtocol(), url.getUserInfo(), url.getHost(), url.getPort(), url.getPath(), url.getQuery(), url.getRef());
url = uri.toURL();
logger.append("Full source path is a URL ").append(url.toString()).append(". Will download.").append(EOL);
return download(logger, url, metadata);
} catch (MalformedURLException | URISyntaxException e) {
logger.append("Using local file ").append(fullSourcePath).append(".").append(EOL);
log.warn("Full source path {} is not a URL", fullSourcePath);
File sourceFile = Paths.get(fullSourcePath).toFile();
if (sourceFile.exists()) {
return sourceFile;
} else {
logger.append("Local file ").append(sourceFile.toString()).append(" does not exist.").append(EOL);
return null;
}
}
}
/**
* Download to temporary file, read into memory, delete temporary file.
*/
private TemporaryFile download(BufferedWriter logger, URL url, @NonNull RepositoryFile metadata) throws IOException {
TemporaryFile tempFile = new TemporaryFile(File.createTempFile("attach-", ".bin"));
try (ReadableByteChannel readableByteChannel = Channels.newChannel(url.openStream())) {
try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
FileChannel fileChannel = fileOutputStream.getChannel();
long bytesTransferred = fileChannel.transferFrom(readableByteChannel, 0, Long.MAX_VALUE);
if (bytesTransferred == 0) {
logger.append("Zero bytes were downloaded.").append(EOL);
}
}
if (tempFile.exists()) {
metadata.setOriginalUrl(url.toString());
metadata.setDateRetrieved(Instant.now());
return tempFile; // This must be deleted
} else {
return null;
}
} catch (Throwable e) {
log.warn("Error downloading {}: {}", url, e.getMessage(), e);
logger.append("Error downloading ").append(url.toString()).append(": ").append(e.getMessage()).append(EOL);
deleteTemporaryFile(logger, tempFile);
return null;
}
}
/**
* Convert virtualPath to clean repository path
* @param virtualPath source path string
* @return the folder path where the repository file will be stored
*/
private Path getPathFor(String virtualPath) {
return Paths.get("/", virtualPath.replaceAll("\\\\", "/").replaceAll("\\/\\/", "/")).normalize().toAbsolutePath();
}
private List<?> fetchAttachRecords(Class<?> entityClass, int offset, int limit) {
var root = new PathBuilder<>(entityClass, "t");
PathBuilder<Long> pkField = root.get("id", Long.class);
var q = jpaQueryFactory.selectFrom(root);
Collection<Predicate> filters = new ArrayList<>();
var virtualPath = root.get("virtualPath");
filters.add(virtualPath.isNotNull());
var repositoryFile = root.get("repositoryFile");
filters.add(repositoryFile.isNull());
q.where(ExpressionUtils.allOf(filters));
// always order by id
q.orderBy(new OrderSpecifier<>(Order.ASC, pkField));
// Apply limits
q.limit(limit);
q.offset(offset);
// Get entities
var rows = q.fetch();
return rows;
}
/**
* Utility type so we know what can be removed.
*/
private static class TemporaryFile extends File {
private static final long serialVersionUID = -8725589313022779411L;
public TemporaryFile(File sourceFile) {
super(sourceFile.getPath());
}
}
}