GenesysDownloader.java
/*
* Copyright 2020 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gringlobal.worker;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.gringlobal.api.exception.NotFoundElement;
import org.gringlobal.model.Accession;
import org.gringlobal.model.AccessionSource;
import org.gringlobal.model.Crop;
import org.gringlobal.model.Geography;
import org.gringlobal.model.Site;
import org.gringlobal.model.TaxonomyCropMap;
import org.gringlobal.model.TaxonomyFamily;
import org.gringlobal.model.TaxonomyGenus;
import org.gringlobal.model.TaxonomySpecies;
import org.gringlobal.model.community.CommunityCodeValues;
import org.gringlobal.persistence.AccessionRepository;
import org.gringlobal.persistence.GeographyRepository;
import org.gringlobal.persistence.SiteRepository;
import org.gringlobal.service.AccessionService;
import org.gringlobal.service.CodeValueService;
import org.gringlobal.service.CropService;
import org.gringlobal.service.SiteService;
import org.gringlobal.service.TaxonomySpeciesCRUDService;
import org.gringlobal.spring.TransactionHelper;
import org.gringlobal.util.MCPDDate;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.data.util.Pair;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;
import org.springframework.web.client.ResourceAccessException;
import org.springframework.web.client.RestTemplate;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
/**
* The component downloads current data from Genesys.
*
* @author Matija Obreza
*/
@Component
@Slf4j
public class GenesysDownloader implements InitializingBean {
private static final int DOWNLOAD_PAGE_SIZE = 200;
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@Autowired
private AccessionRepository accessionRepository;
@Autowired
private TaxonomySpeciesCRUDService taxonomySpeciesCRUDService;
@Autowired
private CropService cropService;
private RestTemplate restTemplate;
protected static class FilteredPage<T> {
public List<T> content;
public List<?> sort;
public boolean first;
public boolean last;
public int number; // page num
public int totalElements;
public int totalPages;
public int numberOfElements;
public Map<?, ?> filter;
public String filterCode;
}
private final Cache<String, TaxonomyCropMap> cropTaxonomyLookup = CacheBuilder.newBuilder().maximumSize(200).expireAfterWrite(5, TimeUnit.MINUTES).build();
private final Cache<String, Crop> cropLookup = CacheBuilder.newBuilder().maximumSize(50).expireAfterWrite(5, TimeUnit.MINUTES).build();
private final Cache<String, TaxonomySpecies> taxonomyLookup = CacheBuilder.newBuilder().maximumSize(200).expireAfterWrite(5, TimeUnit.MINUTES).build();
private final Cache<String, Site> siteWiewsLookup = CacheBuilder.newBuilder().maximumSize(20).expireAfterWrite(5, TimeUnit.MINUTES).build();
private final Cache<String, Geography> geographyLookup = CacheBuilder.newBuilder().maximumSize(50).expireAfterWrite(5, TimeUnit.MINUTES).build();
private final Cache<Pair<String, Integer>, Optional<String>> codeValueOfMcpdCache = CacheBuilder.newBuilder().maximumSize(200).expireAfterWrite(5, TimeUnit.MINUTES).build();
@Autowired
private SiteRepository siteRepository;
@Autowired
private SiteService siteService;
@Autowired
private AccessionService accessionService;
@Autowired
private GeographyRepository geographyRepository;
@Autowired
private CodeValueService codeValueService;
@Override
public void afterPropertiesSet() throws Exception {
HttpComponentsClientHttpRequestFactory httpRequestFactory = new HttpComponentsClientHttpRequestFactory();
httpRequestFactory.setConnectTimeout(5 * 1000);
httpRequestFactory.setBufferRequestBody(true);
httpRequestFactory.setReadTimeout(180 * 1000);
this.restTemplate = new RestTemplate(httpRequestFactory);
}
private Optional<String> findCodeValueOfMcpd(String groupName, Integer mcpd) {
if (mcpd == null) {
return Optional.empty();
}
try {
return codeValueOfMcpdCache.get(Pair.of(groupName, mcpd), () -> {
return Optional.ofNullable(codeValueService.findCodeValueOfMCPD(groupName, mcpd));
});
} catch (ExecutionException e) {
throw new RuntimeException("Error reading MCPD code value", e);
}
}
/**
* Update local taxonomy tables with data from GRIN Taxonomy.
*
* @param instituteCode the institute code
* @param authorizationToken the authorization token
* @throws Exception the exception
*/
@PreAuthorize("hasAuthority('GROUP_ADMINS')")
public void download(String instituteCode, String authorizationToken) throws Exception {
log.info("Updating from Genesys {}", instituteCode);
updateLocalDatabase(instituteCode, authorizationToken);
}
/**
* The update starts with {@link TaxonomyFamily}, {@link TaxonomyGenus} and then
* {@link TaxonomySpecies}. The entries from source database are mapped to local
* identifiers. No records are removed from the local database.
*
* <p>
* Note: The update may update capitalization of names.
* </p>
*
* @param authorizationToken
* @param instituteCode
*
* @throws Exception
*/
private void updateLocalDatabase(String instituteCode, String authorizationToken) throws Exception {
log.warn("Downloading {} using {}", instituteCode, authorizationToken);
// async
List<Future<?>> futures = new ArrayList<>();
final Authentication prevAuth = SecurityContextHolder.getContext().getAuthentication();
HttpHeaders headers = new HttpHeaders();
headers.setAccept(List.of(MediaType.APPLICATION_JSON));
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set("Origin", "https://www.genesys-pgr.org");
headers.set("Referer", "https://www.genesys-pgr.org");
headers.set("Authorization", authorizationToken);
String body = "{\"institute\": { \"code\": [ \"" + instituteCode + "\" ] } }";
log.warn("Body:\n{}", body);
HttpEntity<String> entity = new HttpEntity<String>(body, headers);
futures.add(taskExecutor.submit(() -> {
int page = 0;
do {
HashMap<String, Object> uriVars = new HashMap<>();
uriVars.put("length", DOWNLOAD_PAGE_SIZE);
uriVars.put("page", page);
try {
log.warn("Fetching page {}", page);
FilteredPage<ObjectNode> result = restTemplate.exchange("https://api.genesys-pgr.org/api/v1/acn/list?l={length}&p={page}", HttpMethod.POST, entity,
new ParameterizedTypeReference<FilteredPage<ObjectNode>>() {
}, uriVars).getBody();
page++;
Map<Long, ObjectNode> taxaMap = new HashMap<>();
log.debug("Filter code: {}", result.filterCode);
log.debug("Filters: {}", result.filter);
log.info("Rows returned: {} of page {}/{}", result.numberOfElements, result.number, result.totalPages);
if (result.content.size() == 0 || page * DOWNLOAD_PAGE_SIZE > 250000) {
break;
}
// dereference references :-)
result.content.forEach((acce) -> {
JsonNode taxa = acce.get("taxonomy");
if (taxa instanceof ObjectNode) {
// System.err.println(taxa.get("genus") + " " + taxa.get("species") + " " +
// taxa);
taxaMap.put(taxa.get("id").asLong(), (ObjectNode) taxa);
} else {
// System.err.println(taxa.asLong());
acce.set("taxonomy", taxaMap.get(taxa.asLong()));
}
// System.err.println(acce);
});
Future<List<Accession>> exec = taskExecutor.submit(() -> TransactionHelper.asUser(prevAuth, () -> TransactionHelper.executeInTransaction(true,
() -> updateAccessions(result.content))));
futures.add(exec);
} catch (ResourceAccessException e) {
log.error("Genesys {}", e.getMessage());
}
} while (true);
return "Done!";
}));
while (futures.size() > 0) {
log.debug("Have {} pending tasks", futures.size());
for (int i = futures.size() - 1; i >= 0; i--) {
Future<?> f = futures.get(i);
try {
Object r = f.get(5000, TimeUnit.MILLISECONDS);
if (r != null) {
if (r instanceof Collection) {
log.info("Updated: {} accessions", ((Collection<?>) r).size());
} else {
log.info("Result: {}", r);
}
futures.remove(i);
} else {
log.warn("Got null from {}", f);
}
} catch (TimeoutException e) {
log.debug("Still busy {}", f);
} catch (Throwable e) {
futures.remove(i);
log.warn("Dead {} with {}", f, e.getMessage(), e);
}
}
}
log.info("Done.");
}
private List<Accession> updateAccessions(List<ObjectNode> content) {
List<Accession> toSave = new ArrayList<>();
if (content == null || content.size() == 0) {
return toSave;
}
log.debug("Processing {} nodes", content.size());
log.debug("Sample JSON: {}", content.get(0));
for (ObjectNode acce : content) {
String doi = acce.get("doi").textValue();
Long seqNo = acce.get("seqNo").asLong();
String acceNumb = acce.get("accessionNumber").asText();
String[] parts = acceNumb.strip().split(seqNo.toString(), 2);
String name1, name3;
Long name2 = null;
if (acceNumb.startsWith(parts[0])) {
// prefix
name1 = parts[0].replaceAll("[\\-_\\. 0]*$", "");
name2 = seqNo;
name3 = parts.length < 2 || parts[1] == null ? null : StringUtils.defaultIfBlank(parts[1].replaceAll("^[\\-_\\. ]*", ""), null);
} else {
// number first
name1 = seqNo.toString();
name3 = StringUtils.defaultIfBlank(parts[1], null);
}
log.trace("Parts: doi={} {} with {} -> {} = {} {} {}", doi, acceNumb, seqNo, parts, name1, name2, name3);
Accession a = null;
if (doi != null) {
a = accessionRepository.findByDoi(doi);
}
if (a == null) {
a = accessionRepository.findOne(name1, name2, name3);
}
if (a == null) {
a = new Accession();
a.setAccessionNumberPart1(name1);
a.setAccessionNumberPart2(name2);
a.setAccessionNumberPart3(name3);
}
if (a.getDoi() == null) {
// Set doi
a.setDoi(doi);
} else {
// Update name
a.setAccessionNumberPart1(name1);
a.setAccessionNumberPart2(name2);
a.setAccessionNumberPart3(name3);
}
String instituteCode = acce.get("instituteCode").textValue();
Site site = siteRepository.findAllByFaoInstituteNumber(instituteCode).stream().findFirst().orElseThrow(() -> {
throw new NotFoundElement("Site by instCode='" + instituteCode + "' not found");
});
a.setSite(site);
a.setStatusCode(acce.get("historic").booleanValue() ? "INACTIVE" : "ACTIVE");
a.setTaxonomySpecies(getTaxonomySpecies((ObjectNode) acce.get("taxonomy")));
JsonNode cropName = acce.get("cropName");
if (cropName != null && cropName.isTextual()) {
ensureTaxonomyCropMap(a.getTaxonomySpecies(), cropName.asText());
}
a.setImprovementStatusCode(findCodeValueOfMcpd("IMPROVEMENT_LEVEL", getInteger(acce, "sampStat")).orElse(null));
String mcpdAcqDate = getText(acce, "acquisitionDate");
MCPDDate.convert(mcpdAcqDate, a::setInitialReceivedDate, a::setInitialReceivedDateCode);
handleBackup(a, (ArrayNode) acce.get("duplSite"));
handleSource(a, acce);
if (a.getTaxonomySpecies() != null) {
toSave.add(a);
} else {
log.warn("Accession {} does not have TaxonomySpecies", a.getAccessionNumber());
}
// LOG.debug("Accession id={}: {}", a.getId(), a);
}
try {
var multiOp = TransactionHelper.executeInTransaction(false, () -> accessionService.update(toSave));
return multiOp.success;
} catch (Throwable e) {
if (!(e.getCause() instanceof DataIntegrityViolationException)) {
throw e;
}
// 1-by-1
List<Accession> oneByOne = new ArrayList<>();
for (Accession one : toSave) {
try {
if (one.getId() == null) {
oneByOne.add(TransactionHelper.executeInTransaction(false, () -> accessionService.create(one)));
} else {
oneByOne.add(TransactionHelper.executeInTransaction(false, () -> accessionService.update(one)));
}
} catch (Throwable e2) {
log.error("Could not store accession: {}", e2.getCause().getMessage(), e2.getCause());
}
}
return oneByOne;
}
}
private TaxonomyCropMap ensureTaxonomyCropMap(TaxonomySpecies taxonomySpecies, String cropName) {
if (taxonomySpecies == null || taxonomySpecies.getId() == null) {
return null;
}
String key=cropName + "-" + taxonomySpecies.getId();
try {
return cropTaxonomyLookup.get(key, () -> {
Crop crop = getCrop(cropName);
TaxonomyCropMap ctm = TransactionHelper.executeInTransaction(false, () -> cropService.ensureCropTaxonomyLink(crop, taxonomySpecies));
if (ctm == null) {
throw new Exception("Crop taxonomy entry not inserted"); // checked exception
}
return ctm;
});
} catch (ExecutionException e) {
log.error("Crop taxonomy map lookup for key {} error: {}", key, e.getMessage());
return null;
}
}
private void handleSource(Accession a, ObjectNode acce) {
List<AccessionSource> sources = null;
String donorNumb = getText(acce, "donorNumb");
String donorName = getText(acce, "donorName");
String donorWiews = getText(acce, "donorCode");
if (donorNumb != null) {
// Inventory sysInventory = a.getSysInventory();
// if (sysInventory != null) {
// sysInventory.getNames();
// }
}
if (donorWiews != null || donorName != null) {
sources = getOrCreateAccessionSources(a);
AccessionSource donorSource = sources.stream().filter((source) -> CommunityCodeValues.ACCESSION_SOURCE_TYPE_DONATED.value.equals(source.getSourceTypeCode())).findFirst().orElse(null);
if (donorSource == null) {
donorSource = new AccessionSource();
donorSource.setSourceTypeCode(CommunityCodeValues.ACCESSION_SOURCE_TYPE_DONATED.value);
donorSource.setAccession(a);
sources.add(donorSource);
}
// List<Cooperator> donors = donorSource.getCooperators();
// if (donors == null) {
// donors = new ArrayList<>();
// Cooperator cooperator = getCooperator(donorName);
// donors.add(cooperator);
// }
}
String origCty = getText(acce, "origCty");
ObjectNode nodeCollecting = acce.hasNonNull("coll") ? (ObjectNode) acce.get("coll") : null;
ObjectNode nodeGeo = acce.hasNonNull("geo") ? (ObjectNode) acce.get("geo") : null;
if (origCty != null || nodeGeo != null || nodeCollecting != null) {
sources = getOrCreateAccessionSources(a);
AccessionSource collectingSource = sources.stream().filter((source) -> CommunityCodeValues.ACCESSION_SOURCE_TYPE_COLLECTED.value.equals(source.getSourceTypeCode())).findFirst().orElse(null);
if (collectingSource == null) {
collectingSource = new AccessionSource();
collectingSource.setSourceTypeCode(CommunityCodeValues.ACCESSION_SOURCE_TYPE_COLLECTED.value);
collectingSource.setAccession(a);
sources.add(collectingSource);
}
if (origCty != null) {
try {
Geography geography = geographyLookup.get(origCty, () -> {
Geography g = geographyRepository.getGeographyForCountry(origCty);
if (g == null) {
throw new Exception("Geography not found"); // checked exception
}
return g;
});
collectingSource.setGeography(geography);
} catch (Throwable e) {
log.info("Geography lookup error {}: {}", origCty, e.getMessage());
}
}
collectingSource.setCollectorVerbatimLocality(getText(nodeCollecting, "collSite"));
String collDate = getText(nodeCollecting, "collDate");
MCPDDate.convert(collDate, collectingSource::setSourceDate, collectingSource::setSourceDateCode);
collectingSource.setLatitude(getDouble(nodeGeo, "latitude"));
collectingSource.setLongitude(getDouble(nodeGeo, "longitude"));
collectingSource.setElevationMeters(getInteger(nodeGeo, "elevation"));
collectingSource.setUncertainty(getInteger(nodeGeo, "uncertainty"));
collectingSource.setGeoreferenceDatum(getText(nodeGeo, "datum"));
collectingSource.setGeoreferenceAnnotation(getText(nodeGeo, "method"));
collectingSource.setAcquisitionSource(findCodeValueOfMcpd(CommunityCodeValues.ACCESSION_SOURCE_HABITAT_TYPE, getInteger(nodeCollecting, "collSrc")).orElse(null));
collectingSource.setIsOrigin("Y");
}
}
// private Cooperator getCooperator(String donorName) {
// Cooperator cooperator = null;
// try {
// cooperator = cooperatorLookup.get(donorName, () -> {
// Cooperator s = cooperatorRepository.getByFaoInstituteNumber(wiewsCode).orElse(null);
// if (s == null) {
// throw new Exception("Site not found"); // checked exception
// }
// return s;
// });
// } catch (Throwable e) {
// LOG.info("Site lookup error {}: {}", wiewsCode, e.getMessage());
// }
// if (cooperator == null) {
// Cooperator cooperator1 = new Cooperator();
// cooperator1.setFaoInstituteNumber(wiewsCode);
// cooperator1.setSiteShortName(wiewsCode);
// cooperator1.setSiteLongName(wiewsCode);
// cooperator1.setIsInternal("Y");
// cooperator1.setIsDistributionSite("N");
//
// cooperator = TransactionHelper.executeInTransaction(false, () -> cooperatorService.create(cooperator1));
// }
// }
private List<AccessionSource> getOrCreateAccessionSources(Accession a) {
List<AccessionSource> accessionSources = a.getAccessionSources(); // load
if (accessionSources == null) {
accessionSources = new ArrayList<>();
a.setAccessionSources(accessionSources);
}
return accessionSources;
}
private void handleBackup(Accession accession, ArrayNode arrayNode) {
if (arrayNode == null || arrayNode.size() == 0) {
return;
}
log.trace("DUPLSITE: {}", arrayNode);
List<Site> backupSites = new ArrayList<>();
arrayNode.forEach((wiewsNode) -> {
String wiewsCode = wiewsNode.textValue();
Site backupSite = null;
try {
backupSite = siteWiewsLookup.get(wiewsCode, () -> {
Site s = siteRepository.findAllByFaoInstituteNumber(wiewsCode).stream().findFirst().orElse(null);
if (s == null) {
throw new Exception("Site by instCode='" + wiewsCode + "' not found"); // checked exception
}
return s;
});
} catch (Throwable e) {
log.info("Site lookup error {}: {}", wiewsCode, e.getMessage());
}
if (backupSite == null) {
Site backupSite1 = new Site();
backupSite1.setFaoInstituteNumber(wiewsCode);
backupSite1.setSiteShortName(wiewsCode);
backupSite1.setSiteLongName(wiewsCode);
backupSite1.setIsInternal("Y");
backupSite1.setIsDistributionSite("N");
backupSite = TransactionHelper.executeInTransaction(false, () -> siteService.create(backupSite1));
}
backupSites.add(backupSite);
});
if (backupSites.size() > 0) {
log.trace("Accession has backup site: {}", backupSites.get(0).getSiteShortName());
accession.setBackupLocation1Site(backupSites.get(0));
}
if (backupSites.size() > 1) {
accession.setBackupLocation2Site(backupSites.get(1));
}
if (backupSites.size() > 2) {
log.warn("Accession has many {} backup sites: {}", backupSites.size(), arrayNode);
}
}
private TaxonomySpecies getTaxonomySpecies(ObjectNode taxonomyJson) {
String genus = getText(taxonomyJson, "genus");
String specificEpithet = getText(taxonomyJson, "species");
String spAuthor = getText(taxonomyJson, "spAuthor");
String subTaxa = getText(taxonomyJson, "subtaxa");
String subtAuthor = getText(taxonomyJson, "subtAuthor");
String key = genus + "-" + specificEpithet + "-" + spAuthor + "-" + subTaxa + "-" + subtAuthor;
try {
return taxonomyLookup.get(key, () -> {
TaxonomySpecies ts = TransactionHelper.executeInTransaction(false, () -> taxonomySpeciesCRUDService.fromMCPD(genus, specificEpithet, spAuthor, subTaxa, subtAuthor));
if (ts == null) {
throw new Exception("TaxonomySpecies not found"); // checked exception
}
return ts;
});
} catch (ExecutionException e) {
log.error("Taxa lookup for key {} error: {}", key, e.getMessage());
return null;
}
}
private Crop getCrop(String cropName) {
String key = cropName;
try {
return cropLookup.get(key, () -> {
Crop crop = TransactionHelper.executeInTransaction(false, () -> {
Crop c = cropService.getCrop(cropName);
if (c == null) {
c = new Crop();
c.setName(cropName);
c.setNote("Imported from Genesys");
c = cropService.create(c);
}
return c;
});
if (crop == null) {
throw new Exception("Crop not found"); // checked exception
}
return crop;
});
} catch (ExecutionException e) {
log.error("Crop lookup for key {} error: {}", key, e.getMessage());
return null;
}
}
private String getText(ObjectNode objectNode, String prop) {
if (objectNode == null) {
return null;
}
JsonNode n = objectNode.get(prop);
return n == null || n instanceof NullNode ? null : n.textValue();
}
private Double getDouble(ObjectNode objectNode, String prop) {
if (objectNode == null) {
return null;
}
JsonNode n = objectNode.get(prop);
return n == null || n instanceof NullNode ? null : n.asDouble();
}
private Integer getInteger(ObjectNode objectNode, String prop) {
if (objectNode == null) {
return null;
}
JsonNode n = objectNode.get(prop);
return n == null || n instanceof NullNode ? null : n.asInt();
}
}