GenesysDownloader.java

/*
 * Copyright 2020 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gringlobal.worker;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.gringlobal.api.exception.NotFoundElement;
import org.gringlobal.model.Accession;
import org.gringlobal.model.AccessionSource;
import org.gringlobal.model.Crop;
import org.gringlobal.model.Geography;
import org.gringlobal.model.Site;
import org.gringlobal.model.TaxonomyCropMap;
import org.gringlobal.model.TaxonomyFamily;
import org.gringlobal.model.TaxonomyGenus;
import org.gringlobal.model.TaxonomySpecies;
import org.gringlobal.model.community.CommunityCodeValues;
import org.gringlobal.persistence.AccessionRepository;
import org.gringlobal.persistence.GeographyRepository;
import org.gringlobal.persistence.SiteRepository;
import org.gringlobal.service.AccessionService;
import org.gringlobal.service.CodeValueService;
import org.gringlobal.service.CropService;
import org.gringlobal.service.SiteService;
import org.gringlobal.service.TaxonomySpeciesCRUDService;
import org.gringlobal.spring.TransactionHelper;
import org.gringlobal.util.MCPDDate;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.data.util.Pair;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;
import org.springframework.web.client.ResourceAccessException;
import org.springframework.web.client.RestTemplate;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

/**
 * The component downloads current data from Genesys.
 * 
 * @author Matija Obreza
 */
@Component
@Slf4j
public class GenesysDownloader implements InitializingBean {
	private static final int DOWNLOAD_PAGE_SIZE = 200;

	@Autowired
	private ThreadPoolTaskExecutor taskExecutor;

	@Autowired
	private AccessionRepository accessionRepository;

	@Autowired
	private TaxonomySpeciesCRUDService taxonomySpeciesCRUDService;
	
	@Autowired
	private CropService cropService;

	private RestTemplate restTemplate;

	protected static class FilteredPage<T> {
		public List<T> content;
		public List<?> sort;

		public boolean first;
		public boolean last;
		public int number; // page num

		public int totalElements;
		public int totalPages;
		public int numberOfElements;
		public Map<?, ?> filter;
		public String filterCode;
	}

	private final Cache<String, TaxonomyCropMap> cropTaxonomyLookup = CacheBuilder.newBuilder().maximumSize(200).expireAfterWrite(5, TimeUnit.MINUTES).build();
	private final Cache<String, Crop> cropLookup = CacheBuilder.newBuilder().maximumSize(50).expireAfterWrite(5, TimeUnit.MINUTES).build();
	private final Cache<String, TaxonomySpecies> taxonomyLookup = CacheBuilder.newBuilder().maximumSize(200).expireAfterWrite(5, TimeUnit.MINUTES).build();
	private final Cache<String, Site> siteWiewsLookup = CacheBuilder.newBuilder().maximumSize(20).expireAfterWrite(5, TimeUnit.MINUTES).build();
	private final Cache<String, Geography> geographyLookup = CacheBuilder.newBuilder().maximumSize(50).expireAfterWrite(5, TimeUnit.MINUTES).build();
	private final Cache<Pair<String, Integer>, Optional<String>> codeValueOfMcpdCache = CacheBuilder.newBuilder().maximumSize(200).expireAfterWrite(5, TimeUnit.MINUTES).build();

	@Autowired
	private SiteRepository siteRepository;

	@Autowired
	private SiteService siteService;

	@Autowired
	private AccessionService accessionService;

	@Autowired
	private GeographyRepository geographyRepository;

	@Autowired
	private CodeValueService codeValueService;

	@Override
	public void afterPropertiesSet() throws Exception {
		HttpComponentsClientHttpRequestFactory httpRequestFactory = new HttpComponentsClientHttpRequestFactory();
		httpRequestFactory.setConnectTimeout(5 * 1000);
		httpRequestFactory.setBufferRequestBody(true);
		httpRequestFactory.setReadTimeout(180 * 1000);
		this.restTemplate = new RestTemplate(httpRequestFactory);
	}

	private Optional<String> findCodeValueOfMcpd(String groupName, Integer mcpd) {
		if (mcpd == null) {
			return Optional.empty();
		}
		try {
			return codeValueOfMcpdCache.get(Pair.of(groupName, mcpd), () -> {
				return Optional.ofNullable(codeValueService.findCodeValueOfMCPD(groupName, mcpd));
			});
		} catch (ExecutionException e) {
			throw new RuntimeException("Error reading MCPD code value", e);
		}
	}

	/**
	 * Update local taxonomy tables with data from GRIN Taxonomy.
	 *
	 * @param instituteCode the institute code
	 * @param authorizationToken the authorization token
	 * @throws Exception the exception
	 */
	@PreAuthorize("hasAuthority('GROUP_ADMINS')")
	public void download(String instituteCode, String authorizationToken) throws Exception {
		log.info("Updating from Genesys {}", instituteCode);
		updateLocalDatabase(instituteCode, authorizationToken);
	}

	/**
	 * The update starts with {@link TaxonomyFamily}, {@link TaxonomyGenus} and then
	 * {@link TaxonomySpecies}. The entries from source database are mapped to local
	 * identifiers. No records are removed from the local database.
	 * 
	 * <p>
	 * Note: The update may update capitalization of names.
	 * </p>
	 * 
	 * @param authorizationToken
	 * @param instituteCode
	 * 
	 * @throws Exception
	 */
	private void updateLocalDatabase(String instituteCode, String authorizationToken) throws Exception {

		log.warn("Downloading {} using {}", instituteCode, authorizationToken);

		// async
		List<Future<?>> futures = new ArrayList<>();
		final Authentication prevAuth = SecurityContextHolder.getContext().getAuthentication();

		HttpHeaders headers = new HttpHeaders();
		headers.setAccept(List.of(MediaType.APPLICATION_JSON));
		headers.setContentType(MediaType.APPLICATION_JSON);
		headers.set("Origin", "https://www.genesys-pgr.org");
		headers.set("Referer", "https://www.genesys-pgr.org");
		headers.set("Authorization", authorizationToken);

		String body = "{\"institute\": { \"code\": [ \"" + instituteCode + "\" ] } }";
		log.warn("Body:\n{}", body);

		HttpEntity<String> entity = new HttpEntity<String>(body, headers);

		futures.add(taskExecutor.submit(() -> {
			int page = 0;
			do {
				HashMap<String, Object> uriVars = new HashMap<>();
				uriVars.put("length", DOWNLOAD_PAGE_SIZE);
				uriVars.put("page", page);

				try {
					log.warn("Fetching page {}", page);
					FilteredPage<ObjectNode> result = restTemplate.exchange("https://api.genesys-pgr.org/api/v1/acn/list?l={length}&p={page}", HttpMethod.POST, entity,
						new ParameterizedTypeReference<FilteredPage<ObjectNode>>() {
						}, uriVars).getBody();
					page++;
					Map<Long, ObjectNode> taxaMap = new HashMap<>();
					log.debug("Filter code: {}", result.filterCode);
					log.debug("Filters: {}", result.filter);
					log.info("Rows returned: {} of page {}/{}", result.numberOfElements, result.number, result.totalPages);

					if (result.content.size() == 0 || page * DOWNLOAD_PAGE_SIZE > 250000) {
						break;
					}

					// dereference references :-)
					result.content.forEach((acce) -> {
						JsonNode taxa = acce.get("taxonomy");
						if (taxa instanceof ObjectNode) {
							// System.err.println(taxa.get("genus") + " " + taxa.get("species") + " " +
							// taxa);
							taxaMap.put(taxa.get("id").asLong(), (ObjectNode) taxa);
						} else {
							// System.err.println(taxa.asLong());
							acce.set("taxonomy", taxaMap.get(taxa.asLong()));
						}

						// System.err.println(acce);
					});

					Future<List<Accession>> exec = taskExecutor.submit(() -> TransactionHelper.asUser(prevAuth, () -> TransactionHelper.executeInTransaction(true,
						() -> updateAccessions(result.content))));
					futures.add(exec);
				} catch (ResourceAccessException e) {
					log.error("Genesys {}", e.getMessage());
				}
			} while (true);

			return "Done!";
		}));

		while (futures.size() > 0) {
			log.debug("Have {} pending tasks", futures.size());
			for (int i = futures.size() - 1; i >= 0; i--) {
				Future<?> f = futures.get(i);
				try {
					Object r = f.get(5000, TimeUnit.MILLISECONDS);
					if (r != null) {
						if (r instanceof Collection) {
							log.info("Updated: {} accessions", ((Collection<?>) r).size());
						} else {
							log.info("Result: {}", r);
						}
						futures.remove(i);
					} else {
						log.warn("Got null from {}", f);
					}
				} catch (TimeoutException e) {
					log.debug("Still busy {}", f);
				} catch (Throwable e) {
					futures.remove(i);
					log.warn("Dead {} with {}", f, e.getMessage(), e);
				}
			}
		}

		log.info("Done.");
	}

	private List<Accession> updateAccessions(List<ObjectNode> content) {
		List<Accession> toSave = new ArrayList<>();
		
		if (content == null || content.size() == 0) {
			return toSave;
		}
		
		log.debug("Processing {} nodes", content.size());
		log.debug("Sample JSON: {}", content.get(0));

		for (ObjectNode acce : content) {
			String doi = acce.get("doi").textValue();
			Long seqNo = acce.get("seqNo").asLong();
			String acceNumb = acce.get("accessionNumber").asText();

			String[] parts = acceNumb.strip().split(seqNo.toString(), 2);
			String name1, name3;
			Long name2 = null;
			if (acceNumb.startsWith(parts[0])) {
				// prefix
				name1 = parts[0].replaceAll("[\\-_\\. 0]*$", "");
				name2 = seqNo;
				name3 = parts.length < 2 || parts[1] == null ? null : StringUtils.defaultIfBlank(parts[1].replaceAll("^[\\-_\\. ]*", ""), null);
			} else {
				// number first
				name1 = seqNo.toString();
				name3 = StringUtils.defaultIfBlank(parts[1], null);
			}
			log.trace("Parts: doi={} {} with {} -> {} = {} {} {}", doi, acceNumb, seqNo, parts, name1, name2, name3);
			Accession a = null;
			if (doi != null) {
				a = accessionRepository.findByDoi(doi);
			}
			if (a == null) {
				a = accessionRepository.findOne(name1, name2, name3);
			}
			if (a == null) {
				a = new Accession();
				a.setAccessionNumberPart1(name1);
				a.setAccessionNumberPart2(name2);
				a.setAccessionNumberPart3(name3);
			}
			if (a.getDoi() == null) {
				// Set doi
				a.setDoi(doi);
			} else {
				// Update name
				a.setAccessionNumberPart1(name1);
				a.setAccessionNumberPart2(name2);
				a.setAccessionNumberPart3(name3);
			}

			String instituteCode = acce.get("instituteCode").textValue();
			Site site = siteRepository.findAllByFaoInstituteNumber(instituteCode).stream().findFirst().orElseThrow(() -> {
				throw new NotFoundElement("Site by instCode='" + instituteCode + "' not found");
			});
			a.setSite(site);

			a.setStatusCode(acce.get("historic").booleanValue() ? "INACTIVE" : "ACTIVE");
			a.setTaxonomySpecies(getTaxonomySpecies((ObjectNode) acce.get("taxonomy")));
			
			JsonNode cropName = acce.get("cropName");
			if (cropName != null && cropName.isTextual()) {
				ensureTaxonomyCropMap(a.getTaxonomySpecies(), cropName.asText());
			}
			a.setImprovementStatusCode(findCodeValueOfMcpd("IMPROVEMENT_LEVEL", getInteger(acce, "sampStat")).orElse(null));

			String mcpdAcqDate = getText(acce, "acquisitionDate");
			MCPDDate.convert(mcpdAcqDate, a::setInitialReceivedDate, a::setInitialReceivedDateCode);

			handleBackup(a, (ArrayNode) acce.get("duplSite"));
			handleSource(a, acce);

			if (a.getTaxonomySpecies() != null) {
				toSave.add(a);
			} else {
				log.warn("Accession {} does not have TaxonomySpecies", a.getAccessionNumber());
			}

			// LOG.debug("Accession id={}: {}", a.getId(), a);
		}

		try {
			var multiOp = TransactionHelper.executeInTransaction(false, () -> accessionService.update(toSave));
			return multiOp.success;
		} catch (Throwable e) {
			if (!(e.getCause() instanceof DataIntegrityViolationException)) {
				throw e;
			}
			// 1-by-1
			List<Accession> oneByOne = new ArrayList<>();
			for (Accession one : toSave) {
				try {
					if (one.getId() == null) {
						oneByOne.add(TransactionHelper.executeInTransaction(false, () -> accessionService.create(one)));
					} else {
						oneByOne.add(TransactionHelper.executeInTransaction(false, () -> accessionService.update(one)));
					}
				} catch (Throwable e2) {
					log.error("Could not store accession: {}", e2.getCause().getMessage(), e2.getCause());
				}
			}
			return oneByOne;
		}
	}

	private TaxonomyCropMap ensureTaxonomyCropMap(TaxonomySpecies taxonomySpecies, String cropName) {
		if (taxonomySpecies == null || taxonomySpecies.getId() == null) {
			return null;
		}

		String key=cropName + "-" + taxonomySpecies.getId();
		try {
			return cropTaxonomyLookup.get(key, () -> {
				Crop crop = getCrop(cropName);
				TaxonomyCropMap ctm = TransactionHelper.executeInTransaction(false, () -> cropService.ensureCropTaxonomyLink(crop, taxonomySpecies));
				if (ctm == null) {
					throw new Exception("Crop taxonomy entry not inserted"); // checked exception
				}
				return ctm;
			});
		} catch (ExecutionException e) {
			log.error("Crop taxonomy map lookup for key {} error: {}", key, e.getMessage());
			return null;
		}
	}

	private void handleSource(Accession a, ObjectNode acce) {
		List<AccessionSource> sources = null;

		String donorNumb = getText(acce, "donorNumb");
		String donorName = getText(acce, "donorName");
		String donorWiews = getText(acce, "donorCode");
		
		if (donorNumb != null) {
			// Inventory sysInventory = a.getSysInventory();
			// if (sysInventory != null) {
			// sysInventory.getNames();
			// }
		}

		if (donorWiews != null || donorName != null) {
			sources = getOrCreateAccessionSources(a);
			AccessionSource donorSource = sources.stream().filter((source) -> CommunityCodeValues.ACCESSION_SOURCE_TYPE_DONATED.value.equals(source.getSourceTypeCode())).findFirst().orElse(null);
			if (donorSource == null) {
				donorSource = new AccessionSource();
				donorSource.setSourceTypeCode(CommunityCodeValues.ACCESSION_SOURCE_TYPE_DONATED.value);
				donorSource.setAccession(a);
				sources.add(donorSource);
			}
			// List<Cooperator> donors = donorSource.getCooperators();
			// if (donors == null) {
			// donors = new ArrayList<>();
			// Cooperator cooperator = getCooperator(donorName);
			// donors.add(cooperator);
			// }
		}
		
		
		String origCty = getText(acce, "origCty");
		ObjectNode nodeCollecting = acce.hasNonNull("coll") ? (ObjectNode) acce.get("coll") : null;
		ObjectNode nodeGeo = acce.hasNonNull("geo") ? (ObjectNode) acce.get("geo") : null;
		if (origCty != null || nodeGeo != null || nodeCollecting != null) {
			sources = getOrCreateAccessionSources(a);
			AccessionSource collectingSource = sources.stream().filter((source) -> CommunityCodeValues.ACCESSION_SOURCE_TYPE_COLLECTED.value.equals(source.getSourceTypeCode())).findFirst().orElse(null);
			if (collectingSource == null) {
				collectingSource = new AccessionSource();
				collectingSource.setSourceTypeCode(CommunityCodeValues.ACCESSION_SOURCE_TYPE_COLLECTED.value);
				collectingSource.setAccession(a);
				sources.add(collectingSource);
			}
			
			if (origCty != null) {
				try {
					Geography geography = geographyLookup.get(origCty, () -> {
						Geography g = geographyRepository.getGeographyForCountry(origCty);
						if (g == null) {
							throw new Exception("Geography not found"); // checked exception
						}
						return g;
					});
					collectingSource.setGeography(geography);
				} catch (Throwable e) {
					log.info("Geography lookup error {}: {}", origCty, e.getMessage());
				}
			}
			collectingSource.setCollectorVerbatimLocality(getText(nodeCollecting, "collSite"));
			String collDate = getText(nodeCollecting, "collDate");
			MCPDDate.convert(collDate, collectingSource::setSourceDate, collectingSource::setSourceDateCode);
			collectingSource.setLatitude(getDouble(nodeGeo, "latitude"));
			collectingSource.setLongitude(getDouble(nodeGeo, "longitude"));
			collectingSource.setElevationMeters(getInteger(nodeGeo, "elevation"));
			collectingSource.setUncertainty(getInteger(nodeGeo, "uncertainty"));
			collectingSource.setGeoreferenceDatum(getText(nodeGeo, "datum"));
			collectingSource.setGeoreferenceAnnotation(getText(nodeGeo, "method"));
			collectingSource.setAcquisitionSource(findCodeValueOfMcpd(CommunityCodeValues.ACCESSION_SOURCE_HABITAT_TYPE, getInteger(nodeCollecting, "collSrc")).orElse(null));
			collectingSource.setIsOrigin("Y");
		}
	}

//	private Cooperator getCooperator(String donorName) {
//		Cooperator cooperator = null;
//		try {
//			cooperator = cooperatorLookup.get(donorName, () -> {
//				Cooperator s = cooperatorRepository.getByFaoInstituteNumber(wiewsCode).orElse(null);
//				if (s == null) {
//					throw new Exception("Site not found"); // checked exception
//				}
//				return s;
//			});
//		} catch (Throwable e) {
//			LOG.info("Site lookup error {}: {}", wiewsCode, e.getMessage());
//		}
//		if (cooperator == null) {
//			Cooperator cooperator1 = new Cooperator();
//			cooperator1.setFaoInstituteNumber(wiewsCode);
//			cooperator1.setSiteShortName(wiewsCode);
//			cooperator1.setSiteLongName(wiewsCode);
//			cooperator1.setIsInternal("Y");
//			cooperator1.setIsDistributionSite("N");
//
//			cooperator = TransactionHelper.executeInTransaction(false, () -> cooperatorService.create(cooperator1));
//		}
//	}

	private List<AccessionSource> getOrCreateAccessionSources(Accession a) {
		List<AccessionSource> accessionSources = a.getAccessionSources(); // load
		if (accessionSources == null) {
			accessionSources = new ArrayList<>();
			a.setAccessionSources(accessionSources);
		}
		return accessionSources;
	}

	private void handleBackup(Accession accession, ArrayNode arrayNode) {
		if (arrayNode == null || arrayNode.size() == 0) {
			return;
		}
		log.trace("DUPLSITE: {}", arrayNode);
		List<Site> backupSites = new ArrayList<>();

		arrayNode.forEach((wiewsNode) -> {
			String wiewsCode = wiewsNode.textValue();
			Site backupSite = null;
			try {
				backupSite = siteWiewsLookup.get(wiewsCode, () -> {
					Site s = siteRepository.findAllByFaoInstituteNumber(wiewsCode).stream().findFirst().orElse(null);
					if (s == null) {
						throw new Exception("Site by instCode='" + wiewsCode + "' not found"); // checked exception
					}
					return s;
				});
			} catch (Throwable e) {
				log.info("Site lookup error {}: {}", wiewsCode, e.getMessage());
			}
			if (backupSite == null) {
				Site backupSite1 = new Site();
				backupSite1.setFaoInstituteNumber(wiewsCode);
				backupSite1.setSiteShortName(wiewsCode);
				backupSite1.setSiteLongName(wiewsCode);
				backupSite1.setIsInternal("Y");
				backupSite1.setIsDistributionSite("N");

				backupSite = TransactionHelper.executeInTransaction(false, () -> siteService.create(backupSite1));
			}
			backupSites.add(backupSite);
		});

		if (backupSites.size() > 0) {
			log.trace("Accession has backup site: {}", backupSites.get(0).getSiteShortName());
			accession.setBackupLocation1Site(backupSites.get(0));
		}
		if (backupSites.size() > 1) {
			accession.setBackupLocation2Site(backupSites.get(1));
		}
		if (backupSites.size() > 2) {
			log.warn("Accession has many {} backup sites: {}", backupSites.size(), arrayNode);
		}
	}

	private TaxonomySpecies getTaxonomySpecies(ObjectNode taxonomyJson) {
		String genus = getText(taxonomyJson, "genus");
		String specificEpithet = getText(taxonomyJson, "species");
		String spAuthor = getText(taxonomyJson, "spAuthor");
		String subTaxa = getText(taxonomyJson, "subtaxa");
		String subtAuthor = getText(taxonomyJson, "subtAuthor");
		String key = genus + "-" + specificEpithet + "-" + spAuthor + "-" + subTaxa + "-" + subtAuthor;
		try {
			return taxonomyLookup.get(key, () -> {
				TaxonomySpecies ts = TransactionHelper.executeInTransaction(false, () -> taxonomySpeciesCRUDService.fromMCPD(genus, specificEpithet, spAuthor, subTaxa, subtAuthor));
				if (ts == null) {
					throw new Exception("TaxonomySpecies not found"); // checked exception
				}
				return ts;
			});
		} catch (ExecutionException e) {
			log.error("Taxa lookup for key {} error: {}", key, e.getMessage());
			return null;
		}
	}
	

	private Crop getCrop(String cropName) {
		String key = cropName;
		try {
			return cropLookup.get(key, () -> {
				Crop crop = TransactionHelper.executeInTransaction(false, () -> {
					Crop c = cropService.getCrop(cropName);
					if (c == null) {
						c = new Crop();
						c.setName(cropName);
						c.setNote("Imported from Genesys");
						c = cropService.create(c);
					}
					return c;
				});
				if (crop == null) {
					throw new Exception("Crop not found"); // checked exception
				}
				return crop;
			});
		} catch (ExecutionException e) {
			log.error("Crop lookup for key {} error: {}", key, e.getMessage());
			return null;
		}
	}


	private String getText(ObjectNode objectNode, String prop) {
		if (objectNode == null) {
			return null;
		}
		JsonNode n = objectNode.get(prop);
		return n == null || n instanceof NullNode ? null : n.textValue();
	}
	

	private Double getDouble(ObjectNode objectNode, String prop) {
		if (objectNode == null) {
			return null;
		}
		JsonNode n = objectNode.get(prop);
		return n == null || n instanceof NullNode ? null : n.asDouble();
	}

	private Integer getInteger(ObjectNode objectNode, String prop) {
		if (objectNode == null) {
			return null;
		}
		JsonNode n = objectNode.get(prop);
		return n == null || n instanceof NullNode ? null : n.asInt();
	}

}