GenesysValidator.java

/*
 * Copyright 2024 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.gringlobal.service.genesys.validator.impl;

import com.opencsv.CSVParserBuilder;
import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;
import com.opencsv.CSVWriter;
import com.opencsv.exceptions.CsvMalformedLineException;

import com.querydsl.jpa.impl.JPAQueryFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.gringlobal.api.exception.InvalidApiUsageException;
import org.gringlobal.model.QAccession;
import org.gringlobal.model.QAccessionSource;
import org.gringlobal.model.QGeography;
import org.gringlobal.model.community.CommunityCodeValues;
import org.gringlobal.persistence.AccessionRepository;
import org.gringlobal.service.filter.AccessionFilter;
import org.gringlobal.worker.AccessionMCPDConverter;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;

@Component
@Slf4j
public class GenesysValidator implements InitializingBean {

	private static final int MAX_ACCESSIONS_SIZE = 100000;
	private static final String[] REQUEST_HEADERS = new String[] { "ACCESSIONID", "ACCENUMB", "DECLATITUDE", "DECLONGITUDE", "ORIGCTY" };
	private static final Character CSV_SEPARATOR = '\t';
	private static final Character CSV_QUOTECHAR = '"';
	private static final Character CSV_ESCAPECHAR = '\\';
	private static final String CSV_LINE_END = "\n";
	private static final String ORIGCTY_CHECK_HEADER = "ORIGCTY_check";
	private static final Charset ENCODING = StandardCharsets.UTF_8;
	private static final int MCPD_PAGE_SIZE = 5000;

	@Autowired
	private AccessionRepository accessionRepository;

	@Autowired
	protected JPAQueryFactory jpaQueryFactory;

	@Autowired(required = false)
	private AsyncListenableTaskExecutor taskExecutor;

	@Value("${genesys.validator.url:}")
	private String validatorUrl;

	private RestTemplate restTemplate;

	@Override
	public void afterPropertiesSet() throws Exception {
		SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
		requestFactory.setBufferRequestBody(false);
		requestFactory.setConnectTimeout(2000);
		if (taskExecutor != null) requestFactory.setTaskExecutor(taskExecutor);
		restTemplate = new RestTemplate(requestFactory);
	}

	@Transactional(readOnly = true)
	public void processAccessionGeo(AccessionFilter filter, HttpServletResponse httpResponse) throws Exception {

		if (StringUtils.isBlank(validatorUrl)) {
			throw new InvalidApiUsageException("Validator URL is not configured");
		}

		var acceCount = accessionRepository.count(filter.buildPredicate());
		if (acceCount > MAX_ACCESSIONS_SIZE) {
			throw new InvalidApiUsageException("The maximum number of accessions to be checked is " + MAX_ACCESSIONS_SIZE);
		}

		URI uri = UriComponentsBuilder.fromHttpUrl(validatorUrl.concat("/process"))
			.queryParam("stream", "stream")
			.queryParam("validateType", "country")
			.queryParam("separator", CSV_SEPARATOR)
			.queryParam("quoteChar", CSV_QUOTECHAR)
			.queryParam( "escapeChar", CSV_ESCAPECHAR)
			.queryParam("encoding", ENCODING)
			.build().encode().toUri();

		log.debug("URL {}", uri);

		restTemplate.execute(uri, HttpMethod.POST,
			request -> {
				HttpHeaders headers = request.getHeaders();
				headers.setContentType(MediaType.valueOf("text/csv"));
				headers.setAccept(List.of(MediaType.valueOf("text/csv"), MediaType.TEXT_PLAIN));
				streamInputCsv(filter, request.getBody());
				log.warn("Done streaming CSV");
			},
			response -> {
				if (response.getStatusCode() == HttpStatus.OK) {
					httpResponse.setCharacterEncoding(ENCODING.toString());
					log.warn("OK Response");
					filterResponseLinesToOutputStream(response.getBody(), httpResponse.getOutputStream());
					log.warn("Finished filtering the response");
				} else {
					log.warn("API {} error {}", uri, response.getStatusCode());
					throw new InvalidApiUsageException("Genesys Validator API responded with " + response.getStatusCode().name());
				}
				return null;
			});
	}

	private void streamInputCsv(AccessionFilter filter, OutputStream outputStream) {
		try (CSVWriter csvWriter = new CSVWriter(new OutputStreamWriter(outputStream, ENCODING), CSV_SEPARATOR, CSV_QUOTECHAR, CSV_ESCAPECHAR, CSV_LINE_END)) {
			csvWriter.writeNext(REQUEST_HEADERS);

			var page = 0;
			do {
				log.debug("Loading page {}", page);
				
				var collectedSource = new QAccessionSource("collectedSource");
				var originSource = new QAccessionSource("originSource");
				var originSourceGeography = QGeography.geography;
				
				var accessions = jpaQueryFactory.select(QAccession.accession.id, QAccession.accession.accessionNumber, collectedSource.latitude, collectedSource.longitude, originSourceGeography.countryCode)
					.from(QAccession.accession)
					.innerJoin(collectedSource).on(
						collectedSource.accession().id.eq(QAccession.accession.id)
							// .and(collectedSource.isWebVisible.eq("Y")) // Validate all coordinates
							.and(collectedSource.sourceTypeCode.eq(CommunityCodeValues.ACCESSION_SOURCE_TYPE_COLLECTED.value))
					)
					.leftJoin(originSource).on(
						originSource.accession().id.eq(QAccession.accession.id)
							// .and(originSource.isWebVisible.eq("Y"))
							.and(originSource.isOrigin.eq("Y"))
					)
					.leftJoin(originSourceGeography).on(originSource.geography().id.eq(originSourceGeography.id))
					.where(filter.buildPredicate().and(collectedSource.longitude.isNotNull()).and(collectedSource.latitude.isNotNull()))
					.offset((long) page * MCPD_PAGE_SIZE)
					.limit(MCPD_PAGE_SIZE)
					.orderBy(QAccession.accession.id.asc())
					.fetch();

				for (var accession : accessions) {
					var origCtyCode = accession.get(originSourceGeography.countryCode);
					csvWriter.writeNext(new String[] {
						String.valueOf(accession.get(QAccession.accession.id)),
						String.valueOf(accession.get(QAccession.accession.accessionNumber)),
						String.valueOf(accession.get(collectedSource.latitude)),
						String.valueOf(accession.get(collectedSource.longitude)),
						origCtyCode
					});
				}

				if (accessions.size() == 0 || accessions.size() < MCPD_PAGE_SIZE) {
					log.debug("Reached the end of accession list.");
					break; // Done
				} else {
					page++; // Increment page
				}

			} while (page < 100); // Sanity check

		} catch (IOException e) {
			log.warn("Error streaming CSV request lines to validator", e);
			throw new InvalidApiUsageException("Error streaming CSV request lines to validator", e);
		}
	}

	private void filterResponseLinesToOutputStream(InputStream inputStreamToFilter, OutputStream outputStream) {

		try (CSVReader csvReader = new CSVReaderBuilder(new InputStreamReader(inputStreamToFilter, ENCODING))
			.withCSVParser(new CSVParserBuilder().withSeparator(CSV_SEPARATOR).withQuoteChar(CSV_QUOTECHAR).withEscapeChar(CSV_ESCAPECHAR).build()).build();
			 CSVWriter csvWriter = new CSVWriter(new OutputStreamWriter(outputStream, ENCODING), CSV_SEPARATOR, CSV_QUOTECHAR, CSV_ESCAPECHAR, CSV_LINE_END)
		) {

			final String[] headers = csvReader.readNext();
			csvWriter.writeNext(headers);
			log.debug(">> {}", Arrays.toString(headers));

			// Looking for the ORIGCTY_check header index
			Integer origctyCheckIndex = null;
			for (int i = 0; i < headers.length; i++) {
				if (headers[i].equals(ORIGCTY_CHECK_HEADER)) {
					origctyCheckIndex = i;
					break;
				}
			}
			if (origctyCheckIndex == null) {
				throw new InvalidApiUsageException("Could not find " + ORIGCTY_CHECK_HEADER + " in response header");
			}

			// Remove all ORIGCTY_check == OK lines
			String[] line;
			while ((line = csvReader.readNext()) != null) {
				log.debug(">> {}", Arrays.toString(line));
				if (line.length > origctyCheckIndex && !line[origctyCheckIndex].equals("OK")) {
					csvWriter.writeNext(line);
					log.debug("!! {}", Arrays.toString(line));
				}
			}
		} catch (CsvMalformedLineException e) {
			log.warn("Malformed CSV in line {}: {}", e.getLineNumber(), e.getContext(), e);
			throw new InvalidApiUsageException("Error streaming CSV response lines from validator", e);
		} catch (Throwable e) {
			log.warn("Error streaming CSV response lines from validator", e);
			throw new InvalidApiUsageException("Error streaming CSV response lines from validator", e);
		}
	}
}