GenesysValidator.java
/*
* Copyright 2024 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gringlobal.service.genesys.validator.impl;
import com.opencsv.CSVParserBuilder;
import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;
import com.opencsv.CSVWriter;
import com.opencsv.exceptions.CsvMalformedLineException;
import com.querydsl.jpa.impl.JPAQueryFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.gringlobal.api.exception.InvalidApiUsageException;
import org.gringlobal.model.QAccession;
import org.gringlobal.model.QAccessionSource;
import org.gringlobal.model.QGeography;
import org.gringlobal.model.community.CommunityCodeValues;
import org.gringlobal.persistence.AccessionRepository;
import org.gringlobal.service.filter.AccessionFilter;
import org.gringlobal.worker.AccessionMCPDConverter;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
@Component
@Slf4j
public class GenesysValidator implements InitializingBean {
private static final int MAX_ACCESSIONS_SIZE = 100000;
private static final String[] REQUEST_HEADERS = new String[] { "ACCESSIONID", "ACCENUMB", "DECLATITUDE", "DECLONGITUDE", "ORIGCTY" };
private static final Character CSV_SEPARATOR = '\t';
private static final Character CSV_QUOTECHAR = '"';
private static final Character CSV_ESCAPECHAR = '\\';
private static final String CSV_LINE_END = "\n";
private static final String ORIGCTY_CHECK_HEADER = "ORIGCTY_check";
private static final Charset ENCODING = StandardCharsets.UTF_8;
private static final int MCPD_PAGE_SIZE = 5000;
@Autowired
private AccessionRepository accessionRepository;
@Autowired
protected JPAQueryFactory jpaQueryFactory;
@Autowired(required = false)
private AsyncListenableTaskExecutor taskExecutor;
@Value("${genesys.validator.url:}")
private String validatorUrl;
private RestTemplate restTemplate;
@Override
public void afterPropertiesSet() throws Exception {
SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
requestFactory.setBufferRequestBody(false);
requestFactory.setConnectTimeout(2000);
if (taskExecutor != null) requestFactory.setTaskExecutor(taskExecutor);
restTemplate = new RestTemplate(requestFactory);
}
@Transactional(readOnly = true)
public void processAccessionGeo(AccessionFilter filter, HttpServletResponse httpResponse) throws Exception {
if (StringUtils.isBlank(validatorUrl)) {
throw new InvalidApiUsageException("Validator URL is not configured");
}
var acceCount = accessionRepository.count(filter.buildPredicate());
if (acceCount > MAX_ACCESSIONS_SIZE) {
throw new InvalidApiUsageException("The maximum number of accessions to be checked is " + MAX_ACCESSIONS_SIZE);
}
URI uri = UriComponentsBuilder.fromHttpUrl(validatorUrl.concat("/process"))
.queryParam("stream", "stream")
.queryParam("validateType", "country")
.queryParam("separator", CSV_SEPARATOR)
.queryParam("quoteChar", CSV_QUOTECHAR)
.queryParam( "escapeChar", CSV_ESCAPECHAR)
.queryParam("encoding", ENCODING)
.build().encode().toUri();
log.debug("URL {}", uri);
restTemplate.execute(uri, HttpMethod.POST,
request -> {
HttpHeaders headers = request.getHeaders();
headers.setContentType(MediaType.valueOf("text/csv"));
headers.setAccept(List.of(MediaType.valueOf("text/csv"), MediaType.TEXT_PLAIN));
streamInputCsv(filter, request.getBody());
log.warn("Done streaming CSV");
},
response -> {
if (response.getStatusCode() == HttpStatus.OK) {
httpResponse.setCharacterEncoding(ENCODING.toString());
log.warn("OK Response");
filterResponseLinesToOutputStream(response.getBody(), httpResponse.getOutputStream());
log.warn("Finished filtering the response");
} else {
log.warn("API {} error {}", uri, response.getStatusCode());
throw new InvalidApiUsageException("Genesys Validator API responded with " + response.getStatusCode().name());
}
return null;
});
}
private void streamInputCsv(AccessionFilter filter, OutputStream outputStream) {
try (CSVWriter csvWriter = new CSVWriter(new OutputStreamWriter(outputStream, ENCODING), CSV_SEPARATOR, CSV_QUOTECHAR, CSV_ESCAPECHAR, CSV_LINE_END)) {
csvWriter.writeNext(REQUEST_HEADERS);
var page = 0;
do {
log.debug("Loading page {}", page);
var collectedSource = new QAccessionSource("collectedSource");
var originSource = new QAccessionSource("originSource");
var originSourceGeography = QGeography.geography;
var accessions = jpaQueryFactory.select(QAccession.accession.id, QAccession.accession.accessionNumber, collectedSource.latitude, collectedSource.longitude, originSourceGeography.countryCode)
.from(QAccession.accession)
.innerJoin(collectedSource).on(
collectedSource.accession().id.eq(QAccession.accession.id)
// .and(collectedSource.isWebVisible.eq("Y")) // Validate all coordinates
.and(collectedSource.sourceTypeCode.eq(CommunityCodeValues.ACCESSION_SOURCE_TYPE_COLLECTED.value))
)
.leftJoin(originSource).on(
originSource.accession().id.eq(QAccession.accession.id)
// .and(originSource.isWebVisible.eq("Y"))
.and(originSource.isOrigin.eq("Y"))
)
.leftJoin(originSourceGeography).on(originSource.geography().id.eq(originSourceGeography.id))
.where(filter.buildPredicate().and(collectedSource.longitude.isNotNull()).and(collectedSource.latitude.isNotNull()))
.offset((long) page * MCPD_PAGE_SIZE)
.limit(MCPD_PAGE_SIZE)
.orderBy(QAccession.accession.id.asc())
.fetch();
for (var accession : accessions) {
var origCtyCode = accession.get(originSourceGeography.countryCode);
csvWriter.writeNext(new String[] {
String.valueOf(accession.get(QAccession.accession.id)),
String.valueOf(accession.get(QAccession.accession.accessionNumber)),
String.valueOf(accession.get(collectedSource.latitude)),
String.valueOf(accession.get(collectedSource.longitude)),
origCtyCode
});
}
if (accessions.size() == 0 || accessions.size() < MCPD_PAGE_SIZE) {
log.debug("Reached the end of accession list.");
break; // Done
} else {
page++; // Increment page
}
} while (page < 100); // Sanity check
} catch (IOException e) {
log.warn("Error streaming CSV request lines to validator", e);
throw new InvalidApiUsageException("Error streaming CSV request lines to validator", e);
}
}
private void filterResponseLinesToOutputStream(InputStream inputStreamToFilter, OutputStream outputStream) {
try (CSVReader csvReader = new CSVReaderBuilder(new InputStreamReader(inputStreamToFilter, ENCODING))
.withCSVParser(new CSVParserBuilder().withSeparator(CSV_SEPARATOR).withQuoteChar(CSV_QUOTECHAR).withEscapeChar(CSV_ESCAPECHAR).build()).build();
CSVWriter csvWriter = new CSVWriter(new OutputStreamWriter(outputStream, ENCODING), CSV_SEPARATOR, CSV_QUOTECHAR, CSV_ESCAPECHAR, CSV_LINE_END)
) {
final String[] headers = csvReader.readNext();
csvWriter.writeNext(headers);
log.debug(">> {}", Arrays.toString(headers));
// Looking for the ORIGCTY_check header index
Integer origctyCheckIndex = null;
for (int i = 0; i < headers.length; i++) {
if (headers[i].equals(ORIGCTY_CHECK_HEADER)) {
origctyCheckIndex = i;
break;
}
}
if (origctyCheckIndex == null) {
throw new InvalidApiUsageException("Could not find " + ORIGCTY_CHECK_HEADER + " in response header");
}
// Remove all ORIGCTY_check == OK lines
String[] line;
while ((line = csvReader.readNext()) != null) {
log.debug(">> {}", Arrays.toString(line));
if (line.length > origctyCheckIndex && !line[origctyCheckIndex].equals("OK")) {
csvWriter.writeNext(line);
log.debug("!! {}", Arrays.toString(line));
}
}
} catch (CsvMalformedLineException e) {
log.warn("Malformed CSV in line {}: {}", e.getLineNumber(), e.getContext(), e);
throw new InvalidApiUsageException("Error streaming CSV response lines from validator", e);
} catch (Throwable e) {
log.warn("Error streaming CSV response lines from validator", e);
throw new InvalidApiUsageException("Error streaming CSV response lines from validator", e);
}
}
}