UsdaGeographyUpdater.java

/*
 * Copyright 2020 Global Crop Diversity Trust
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gringlobal.worker;

import java.io.File;
import java.nio.file.Files;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.genesys.taxonomy.gringlobal.component.CabReader;
import org.genesys.taxonomy.gringlobal.model.GeographyRegionMapRow;
import org.genesys.taxonomy.gringlobal.model.GeographyRow;
import org.genesys.taxonomy.gringlobal.model.RegionRow;
import org.gringlobal.model.CodeValue;
import org.gringlobal.model.Geography;
import org.gringlobal.model.GeographyRegionMap;
import org.gringlobal.model.QCodeValue;
import org.gringlobal.model.Region;
import org.gringlobal.model.TaxonomySpecies;
import org.gringlobal.model.community.CommunityCodeValues;
import org.gringlobal.persistence.CodeValueRepository;
import org.gringlobal.persistence.GeographyRegionMapRepository;
import org.gringlobal.persistence.GeographyRepository;
import org.gringlobal.persistence.RegionRepository;
import org.gringlobal.service.CodeValueService;
import org.gringlobal.worker.UsdaTaxonomyUpdater.LookupList;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import com.google.common.collect.Lists;
import com.opencsv.CSVReader;
import com.opencsv.bean.CsvToBean;

/**
 * The component downloads the latest .cab of the USDA installer and updates the
 * records in Geography, Region and the links between them.
 * 
 * @author Matija Obreza
 */
@Component
@Slf4j
public class UsdaGeographyUpdater {

	@Autowired
	private RegionRepository regionRepository;
	@Autowired
	private GeographyRepository geographyRepository;
	@Autowired
	private GeographyRegionMapRepository geoRegionRepository;
	@Autowired
	private CodeValueService codeValueService;
	@Autowired
	private CodeValueRepository codeValueRepository;

	private final File downloadFolder = new File(FileUtils.getTempDirectory(), "grin-taxonomy-source"); // + System.currentTimeMillis());

	/**
	 * Update local taxonomy tables with data from GRIN Taxonomy.
	 * 
	 * @throws Exception
	 */
	@PreAuthorize("hasAuthority('GROUP_ADMINS')")
	@Transactional
	public void update() throws Exception {
		log.info("Updating Region and Geography from USDA GRIN folder {}", downloadFolder.getAbsolutePath());
		UsdaTaxonomyUpdater.downloadDataIfNeeded(downloadFolder);
		updateLocalDatabase();
	}

	/**
	 * The update starts with {@link Region}, {@link Geography} and then
	 * {@link TaxonomySpecies}. The entries from source database are mapped to local
	 * identifiers. No records are removed from the local database.
	 * 
	 * <p>
	 * Note: The update may update capitalization of names.
	 * </p>
	 * 
	 * @throws Exception
	 */
	private void updateLocalDatabase() throws Exception {
		log.warn("Loading region.txt");
		Map<Long, Region> regTheirsToOurs = new HashMap<>();
		Map<Long, Geography> geoTheirsToOurs = new HashMap<>();

		// read region.txt
		List<RegionRow> grinRegion = new ArrayList<>();

		try (CSVReader reader = CabReader.openCsvReader(Files.newInputStream(new File(downloadFolder, "region.txt").toPath()), 0)) {
			CsvToBean<RegionRow> objectReader = CabReader.beanReader(RegionRow.class, reader);
			objectReader.forEach(regionRow -> {
				log.trace("{} {} {}", regionRow.getContinent(), regionRow.getRegionId(), regionRow.getSubcontinent());
				grinRegion.add(regionRow);
			});
		}

		log.warn("Loading geography.txt");
		List<GeographyRow> ggGeo = new ArrayList<>();

		try (CSVReader reader = CabReader.openCsvReader(Files.newInputStream(new File(downloadFolder, "geography.txt").toPath()), 0)) {
				CsvToBean<GeographyRow> objectReader = CabReader.beanReader(GeographyRow.class, reader);
				objectReader.forEach(ggGeo::add);
		}

		// Add missing GEOGRAPHY_COUNTRY_CODE code values
		{
			ggGeo.stream().map(GeographyRow::getCountryCode).filter(StringUtils::isNotBlank).distinct().forEach(countryCode -> {
				if (! codeValueService.validate(CommunityCodeValues.GEOGRAPHY_COUNTRY_CODE, countryCode)) {
					codeValueService.ensureCodeValue(CommunityCodeValues.GEOGRAPHY_COUNTRY_CODE, countryCode, countryCode, null);
				}
			});
		}


			List<Region> allRegions = regionRepository.findAll();

			{
				for (RegionRow regionRow : grinRegion) {
					Region region = new Region();
					region.setContinent(regionRow.getContinent());
					region.setSubcontinent(regionRow.getSubcontinent());
	
					if (allRegions.size() > 0) {
						final Region compareTo = region;
						List<Region> narrow = allRegions.stream()
							// filter
							.filter(m -> StringUtils.equalsIgnoreCase(m.getContinent(), compareTo.getContinent()) && StringUtils.equalsIgnoreCase(m.getSubcontinent(), compareTo
								.getSubcontinent()))
							// print
							.peek(m -> {
								log.debug("{} {}", m.getContinent(), m.getSubcontinent());
							})
							// collect
							.collect(Collectors.toList());
	
						if (narrow.size() == 1) {
							region = narrow.get(0);
						} else {
							log.debug("{} matches found! Will create new entry.", narrow.size());
						}
					}
	
					// Update anyway
					region.setSequenceNumber(regionRow.getSequenceNumber());
					region.setContinentAbbreviation(regionRow.getContinentAbbreviation());
					region.setSubcontinentAbbreviation(regionRow.getSubcontinentAbbreviation());
	
					region.setNote(regionRow.getNote());
					region.setCreatedDate(regionRow.getCreatedDate().toInstant(ZoneOffset.UTC));
					// region.setModifiedDate(regionRow.modifiedDate); // Do not update @Versioned modifiedDate 
	
					region = regionRepository.save(region);
					regTheirsToOurs.put(regionRow.getRegionId(), region);
				}
			}
	
			final List<Geography> allGeographies = geographyRepository.findAll();
	
			// Map by country code
			LookupList<String, Geography> countryGeographies = new LookupList<>();
			allGeographies.forEach(geography -> {
				countryGeographies.add(geography.getCountryCode(), geography);
			});
	
			final List<Geography> toSave = new ArrayList<>();

			var adm1TypeCodeValues = StreamSupport.stream(codeValueRepository.findAll(QCodeValue.codeValue.groupName.eq("GEOGRAPHY_ADMIN1_TYPE")).spliterator(), false)
				.map(CodeValue::getValue)
				.collect(Collectors.toList());
	
			for (GeographyRow geographyRow : ggGeo) {
				Geography geography = new Geography();
				geography.setCountryCode(geographyRow.getCountryCode());
				geography.setAdm1(geographyRow.getAdm1());
				geography.setAdm1TypeCode(geographyRow.getAdm1TypeCode());
				if (StringUtils.isNotBlank(geography.getAdm1TypeCode()) && !adm1TypeCodeValues.contains(geography.getAdm1TypeCode())) {
					log.error("Invalid GEOGRAPHY_ADMIN1_TYPE: {}", geography.getAdm1TypeCode());
					continue;
				}
				geography.setAdm2(geographyRow.getAdm2());
				geography.setAdm3(geographyRow.getAdm3());
				geography.setAdm4(geographyRow.getAdm4());
	
				Geography compareTo = geography;
				List<Geography> narrow = countryGeographies.getOrDefault(geography.getCountryCode(), List.of()).stream()
					// filter
					.filter(m -> StringUtils.equalsIgnoreCase(m.getCountryCode(), compareTo.getCountryCode()) && StringUtils.equalsIgnoreCase(m.getAdm1(), compareTo.getAdm1())
							&& StringUtils.equalsIgnoreCase(m.getAdm1TypeCode(), compareTo.getAdm1TypeCode()) && StringUtils.equalsIgnoreCase(m.getAdm2(), compareTo.getAdm2())
							&& StringUtils.equalsIgnoreCase(m.getAdm3(), compareTo.getAdm3()) && StringUtils.equalsIgnoreCase(m.getAdm4(), compareTo.getAdm4()))
					// print
					.peek(m -> {
						log.debug("{} {} {} {} {} {}", m.getCountryCode(), m.getAdm1(), m.getAdm1TypeCode(), m.getAdm2(), m.getAdm3(), m.getAdm4());
					})
					// collect
					.collect(Collectors.toList());
	
				if (narrow.size() == 1) {
					geography = narrow.get(0);
				} else {
					log.debug("{} matches found for {} {} {} {} {} {}! Will create new entry.", narrow.size(), geography.getCountryCode(), geography.getAdm1(), geography
						.getAdm1TypeCode(), geography.getAdm2(), geography.getAdm3(), geography.getAdm4());
				}
	
				geography.setIsValid(geographyRow.getIsValid());
				geography.setNote(geographyRow.getNote());
				if (geographyRow.getChangedDate() != null) {
					geography.setChangedDate(geographyRow.getChangedDate().toInstant(ZoneOffset.UTC));
				}
				geography.setCreatedDate(geographyRow.getCreatedDate().toInstant(ZoneOffset.UTC));
				// geography.setModifiedDate(geographyRow.modifiedDate); // Do not update @Versioned modifiedDate
	
				toSave.add(geography);
				geoTheirsToOurs.put(geographyRow.getGeographyId(), geography);
			}
	
			// Save without updating current
			Lists.partition(toSave, 100).forEach(batch -> {
				log.warn("Saving geography batch");
				geographyRepository.saveAll(batch);
			});
	
			toSave.clear();
			log.warn("Updating current geography references for {} records", ggGeo.size());
			for (GeographyRow geographyRow : ggGeo) {
				Geography geography = geoTheirsToOurs.get(geographyRow.getGeographyId());
				Geography currentGeography = geoTheirsToOurs.get(geographyRow.getCurrentGeographyId());
				if (currentGeography != null) {
					geography.setCurrentGeography(currentGeography);
					toSave.add(geography);
				} else {
					log.warn("No Geography for id={} current={}", geographyRow.getGeographyId(), geographyRow.getCurrentGeographyId());
				}
			}
	
			Lists.partition(toSave, 100).forEach(batch -> {
				log.warn("Saving geography batch");
				geographyRepository.saveAll(batch);
			});

			log.warn("Loading geography_region_map.txt");
			try (CSVReader reader = CabReader.openCsvReader(Files.newInputStream(new File(downloadFolder, "geography_region_map.txt").toPath()), 0)) {
				var toSaveMap = new ArrayList<GeographyRegionMap>();
				var allGeoRegionMaps = geoRegionRepository.findAll();
				log.warn("Loaded {} geo-region-maps", allGeoRegionMaps.size());
				LookupList<Long, Geography> regionGeographies = new LookupList<>();
				allGeoRegionMaps.forEach(geoRegionMap -> {
					regionGeographies.add(geoRegionMap.getRegion().getId(), geoRegionMap.getGeography());
				});

				CsvToBean<GeographyRegionMapRow> objectReader = CabReader.beanReader(GeographyRegionMapRow.class, reader);
	
				objectReader.forEach(geoRegionMapRow -> {
					Geography geography = geoTheirsToOurs.get(geoRegionMapRow.getGeographyId());
					if (geography != null) {
						Region region = regTheirsToOurs.get(geoRegionMapRow.getRegionId());
						if (region != null) {
							if (!regionGeographies.containsKey(region.getId()) || ! regionGeographies.get(region.getId()).contains(geography)) {
								var grm = new GeographyRegionMap();
								grm.setGeography(geography);
								grm.setRegion(region);
								toSaveMap.add(grm);
							}
						}
					}
				});

				log.warn("Saving {} geo-region-maps", toSaveMap.size());
				Lists.partition(toSaveMap, 100).forEach(batch -> {
					log.warn("Saving georegionmap batch");
					geoRegionRepository.saveAll(batch);
				});
			}

		log.warn("Done. Transaction will now be committed.");
	}
}