Commit f2196adb authored by Daniel Eggert's avatar Daniel Eggert
Browse files

moved CloudCoverUpdate from gms-testing to gms-metadatacrawler

parent b1734c90
package de.potsdam.gfz.gms.metadatacrawler;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpression;
import javax.xml.xpath.XPathFactory;
import org.w3c.dom.Document;
import de.potsdam.gfz.gms.database.SceneDatabase;
public class CloudCoverUpdate {
private static class Scene {
long id;
double cloudcover;
public Scene(long id, double cloudcover) {
super();
this.id = id;
this.cloudcover = cloudcover;
}
}
private static final Scene FINAL_SCENE = new Scene(-1, -1);
public static void main(String[] args) throws Exception {
// landsat_tm(108), landsat_mss(107), landsat_8(104), landsat_7(112)
short datasetids[] = new short[] { 104, 112, 189 }; // , 108, 107, 112 };
// Timestamp start = Timestamp.MIN;
// Timestamp end = Timestamp.now();
// Timestamp end = start.plus(20, Timestamp.TimeUnits.DAYS);
// int seasonCode = 0;
// byte minCloudcover = 0;
// byte maxCloudcover = 100;
// double[] polygon = new double[] { -14, 32, 50, 32, 50, 64, -14, 64, -14, 32 };
// double[] polygon = new double[] { -180, -90, 180, -90, 180, 90, -180, 90, -180, -90 };
// final ByteBuffer buffer = ByteBuffer.allocate(200 * 1000 * 1000);
final SceneDatabase db = SceneDatabase.getInstance();
ExecutorService pool = Executors.newFixedThreadPool(20);
final BlockingQueue<Scene> sceneQueue = new LinkedBlockingQueue<>();
pool.execute(new Runnable() {
@Override
public void run() {
int count = 0;
PreparedStatement pst = db.prepareCustomStatement("UPDATE scenes SET cloudcover=? WHERE id=?;");
while (true) {
Scene s;
try {
s = sceneQueue.take();
if (s == FINAL_SCENE) {
return;
}
pst.setDouble(1, s.cloudcover);
pst.setLong(2, s.id);
// db.placeCustomQuery("UPDATE scenes SET cloudcover=" + s.cloudcover + " WHERE id=" + s.id + " returning 0;");
pst.executeUpdate();
db.commit();
if (++count % 100 == 0) {
System.out.println(count + " scenes updated");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
BufferedWriter writer = new BufferedWriter(new FileWriter("failed.txt"));
PreparedStatement pst = db.prepareCustomStatement("SELECT id, metadataurl FROM scenes WHERE datasetid=? AND cloudcover<0;");
for (short datasetid : datasetids) {
// buffer.clear();
// db.getCompactSceneMetaDataListFromIndex(datasetid, start, end, seasonCode, minCloudcover, maxCloudcover, polygon, buffer);
// buffer.flip();
/*
* Decode as follows: [short] datasetid [int] number of scenes list of scenes, with each scene: [int] sceneid [long] timestamp [byte] cloudcover
*/
// short returnedDatasetid = buffer.getShort();
// if (returnedDatasetid != datasetid) {
// System.err.println("dataset ids dont match");
// System.exit(1);
// }
// int numScenes = buffer.getInt();
final Random rand = new Random();
System.out.println("processing dataset " + datasetid);
List<Future<?>> fList = new ArrayList<>();
pst.setShort(1, datasetid);
ResultSet rs = pst.executeQuery();
while (rs.next()) {
try {
final long id = rs.getLong(1);
final String urlStr = rs.getString(2);
Future<?> f = pool.submit(new Runnable() {
@Override
public void run() {
InputStream stream = null;
HttpURLConnection con = null;
try {
// random sleep
Thread.sleep(50 + rand.nextInt(150));
// establish connection
URL url = new URL(urlStr);
con = (HttpURLConnection) url.openConnection();
con.setConnectTimeout(5000);
int code = 0;
// check response code
try {
con.connect();
code = con.getResponseCode();
} catch (Exception e) {
}
if (code != 200) {
// connection error - backoff and retry
// random sleep
Thread.sleep(15000l + rand.nextInt(10000));
// re-establish connection
url = new URL(urlStr);
con = (HttpURLConnection) url.openConnection();
if (con.getResponseCode() != 200) {
// connection error - again - skip scene
System.err.println("Connection error: " + con.getResponseCode() + " - " + con.getResponseMessage());
writer.write(id + "\t" + url + "\t" + con.getResponseMessage() + "\n");
writer.flush();
con.disconnect();
return;
}
}
// connection successfully established - open stream
try {
stream = url.openStream();
} catch (Exception e) {
// stream error - backoff and retry
// random sleep
Thread.sleep(15000l + rand.nextInt(10000));
// re-establish connection
url = new URL(urlStr);
con = (HttpURLConnection) url.openConnection();
if (con.getResponseCode() != 200) {
System.err.println("Connection error after stream error: " + con.getResponseCode() + " - " + con.getResponseMessage());
writer.write(id + "\t" + url + "\t" + con.getResponseMessage() + "\n");
writer.flush();
if (stream != null) {
stream.close();
}
return;
}
try {
stream = url.openStream();
} catch (Exception ex) {
System.err.println("Stream error: " + con.getResponseCode() + " - " + con.getResponseMessage() + "\t" + ex.getMessage());
writer.write(id + "\t" + url + "\t" + con.getResponseMessage() + "\n");
writer.flush();
if (stream != null) {
stream.close();
}
con.disconnect();
return;
}
}
final DocumentBuilder builder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
final XPath xpath = XPathFactory.newInstance().newXPath();
final XPathExpression expr = xpath.compile("//*[contains(@name,'Cloud Cover')]/*/text()");
Document doc = builder.parse(stream);
double cc = (Double) expr.evaluate(doc, XPathConstants.NUMBER);
if (cc < 0) {
cc = Double.NaN;
}
sceneQueue.add(new Scene(id, cc));
} catch (Exception e) {
e.printStackTrace();
}
if (stream != null) {
try {
stream.close();
} catch (IOException e) {
}
}
if (con != null) {
con.disconnect();
}
}
});
fList.add(f);
} catch (SQLException e) {
e.printStackTrace();
System.exit(1);
}
}
// wait for tasks to finish
for (Future<?> f : fList) {
f.get();
}
fList.clear();
}
sceneQueue.add(FINAL_SCENE);
// wait for pool to shut down
pool.shutdown();
pool.awaitTermination(1, TimeUnit.HOURS);
writer.close();
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment