Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion runtime/datax/apireader/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
<artifactId>httpclient5</artifactId>
<version>5.3.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -78,4 +83,4 @@
</plugins>
</build>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.entity.StringEntity;
import java.net.URI;
import java.util.Collections;
import java.util.List;

Expand All @@ -50,9 +51,7 @@ public void prepare() {
String api = this.jobConfig.getString("api");
Object schema = this.jobConfig.get("schema");

if (StringUtils.isBlank(api)) {
throw new RuntimeException("api is required for APIReader");
}
HttpEndpointSecurity.validateExternalHttpUri(api, "api");
if (schema == null) {
throw new RuntimeException("schema configuration is required for APIReader");
}
Expand Down Expand Up @@ -211,13 +210,17 @@ private JSONArray parseResponse(String responseBody) {
* 简单的HTTP请求实现
*/
private String doHttpRequest(String urlStr, String method, String body, Configuration headers) throws Exception {
RequestConfig requestConfig = RequestConfig.custom().build();
URI safeUri = HttpEndpointSecurity.validateExternalHttpUri(urlStr, "api");
RequestConfig requestConfig = RequestConfig.custom()
.setRedirectsEnabled(false)
.build();

try (CloseableHttpClient client = HttpClients.custom()
.disableRedirectHandling()
.setDefaultRequestConfig(requestConfig)
.build()) {

HttpUriRequestBase request = buildRequest(urlStr, method, body);
HttpUriRequestBase request = buildRequest(safeUri, method, body);

if (headers != null && !headers.getKeys().isEmpty()) {
for (String key : headers.getKeys()) {
Expand All @@ -238,21 +241,21 @@ private String doHttpRequest(String urlStr, String method, String body, Configur
}
}

private HttpUriRequestBase buildRequest(String urlStr, String method, String body) {
private HttpUriRequestBase buildRequest(URI uri, String method, String body) {
String verb = StringUtils.defaultIfBlank(method, "GET").toUpperCase();
HttpUriRequestBase request;
switch (verb) {
case "POST":
request = new HttpPost(urlStr);
request = new HttpPost(uri);
break;
case "PUT":
request = new HttpPut(urlStr);
request = new HttpPut(uri);
break;
case "DELETE":
request = new HttpDelete(urlStr);
request = new HttpDelete(uri);
break;
default:
request = new HttpGet(urlStr);
request = new HttpGet(uri);
break;
}

Expand All @@ -269,4 +272,4 @@ public void destroy() {

}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package com.datamate.plugin.reader.apireader;

import java.net.IDN;
import java.net.InetAddress;
import java.net.URI;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;

final class HttpEndpointSecurity {
private static final String ALLOWED_HOSTS_PROPERTY = "datamate.ssrf.allowedHosts";
private static final String ALLOWED_HOSTS_ENV = "DATAMATE_SSRF_ALLOWED_HOSTS";

private HttpEndpointSecurity() {
}

static URI validateExternalHttpUri(String rawUri, String fieldName) {
if (isBlank(rawUri)) {
throw new IllegalArgumentException(fieldName + " is required");
}
if (rawUri.indexOf('\r') >= 0 || rawUri.indexOf('\n') >= 0) {
throw new IllegalArgumentException(fieldName + " contains invalid control characters");
}

URI uri = URI.create(rawUri.trim()).normalize();
String scheme = uri.getScheme() == null ? null : uri.getScheme().toLowerCase(Locale.ROOT);
if (!"http".equals(scheme) && !"https".equals(scheme)) {
throw new IllegalArgumentException(fieldName + " only supports http or https");
}
if (!isBlank(uri.getUserInfo())) {
throw new IllegalArgumentException(fieldName + " must not contain user info");
}
if (uri.getFragment() != null) {
throw new IllegalArgumentException(fieldName + " must not contain fragment");
}
int port = uri.getPort();
if (port == 0 || port < -1 || port > 65535) {
throw new IllegalArgumentException(fieldName + " contains invalid port");
}

String asciiHost = toAsciiHost(uri.getHost(), fieldName);
if (isAllowedHost(asciiHost)) {
return uri;
}

InetAddress[] addresses;
try {
addresses = InetAddress.getAllByName(asciiHost);
} catch (Exception e) {
throw new IllegalArgumentException(fieldName + " host cannot be resolved", e);
}
if (addresses.length == 0) {
throw new IllegalArgumentException(fieldName + " host cannot be resolved");
}
for (InetAddress address : addresses) {
if (!isPublicRoutableAddress(address)) {
throw new IllegalArgumentException(fieldName + " points to a restricted network address");
}
}
return uri;
}

private static String toAsciiHost(String host, String fieldName) {
if (isBlank(host)) {
throw new IllegalArgumentException(fieldName + " must contain a valid host");
}
try {
String normalized = host.endsWith(".") ? host.substring(0, host.length() - 1) : host;
return IDN.toASCII(normalized, IDN.USE_STD3_ASCII_RULES).toLowerCase(Locale.ROOT);
} catch (Exception e) {
throw new IllegalArgumentException(fieldName + " contains invalid host", e);
}
}

private static boolean isAllowedHost(String host) {
return configuredAllowedHosts().contains(host);
}

private static Set<String> configuredAllowedHosts() {
Set<String> hosts = new HashSet<String>();
addConfiguredHosts(hosts, System.getProperty(ALLOWED_HOSTS_PROPERTY));
addConfiguredHosts(hosts, System.getenv(ALLOWED_HOSTS_ENV));
return hosts;
}

private static void addConfiguredHosts(Set<String> hosts, String rawHosts) {
if (isBlank(rawHosts)) {
return;
}
String[] parts = rawHosts.split(",");
for (String part : parts) {
if (!isBlank(part)) {
hosts.add(toAsciiHost(part.trim(), "allowed host"));
}
}
}

private static boolean isPublicRoutableAddress(InetAddress address) {
if (address.isAnyLocalAddress() || address.isLoopbackAddress() || address.isLinkLocalAddress()
|| address.isSiteLocalAddress() || address.isMulticastAddress()) {
return false;
}
byte[] bytes = address.getAddress();
if (bytes.length == 4) {
return isPublicRoutableIpv4(bytes);
}
if (bytes.length == 16) {
return isPublicRoutableIpv6(bytes);
}
return false;
}

private static boolean isPublicRoutableIpv4(byte[] bytes) {
int first = bytes[0] & 0xff;
int second = bytes[1] & 0xff;
int third = bytes[2] & 0xff;
if (first == 0 || first == 10 || first == 127 || first >= 224) {
return false;
}
if (first == 100 && second >= 64 && second <= 127) {
return false;
}
if (first == 169 && second == 254) {
return false;
}
if (first == 172 && second >= 16 && second <= 31) {
return false;
}
if (first == 192 && second == 168) {
return false;
}
if (first == 192 && second == 0 && (third == 0 || third == 2)) {
return false;
}
if (first == 198 && (second == 18 || second == 19 || (second == 51 && third == 100))) {
return false;
}
if (first == 203 && second == 0 && third == 113) {
return false;
}
return true;
}

private static boolean isPublicRoutableIpv6(byte[] bytes) {
int first = bytes[0] & 0xff;
int second = bytes[1] & 0xff;
if ((first & 0xfe) == 0xfc) {
return false;
}
if (first == 0xfe && (second & 0xc0) == 0x80) {
return false;
}
if (first == 0x20 && second == 0x01 && (bytes[2] & 0xff) == 0x0d && (bytes[3] & 0xff) == 0xb8) {
return false;
}
return !isIpv4MappedPrivateAddress(bytes);
}

private static boolean isIpv4MappedPrivateAddress(byte[] bytes) {
for (int i = 0; i < 10; i++) {
if (bytes[i] != 0) {
return false;
}
}
if ((bytes[10] & 0xff) != 0xff || (bytes[11] & 0xff) != 0xff) {
return false;
}
byte[] ipv4 = new byte[] {bytes[12], bytes[13], bytes[14], bytes[15]};
return !isPublicRoutableIpv4(ipv4);
}

private static boolean isBlank(String value) {
return value == null || value.trim().isEmpty();
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "apireader",
"parameter": {
"api": "http://127.0.0.1:9000",
"api": "https://api.example.com/data",
"method": "GET",
"body": "{\"name\": \"value\"}",
"headers": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.datamate.plugin.reader.apireader;

import org.junit.Test;

import static org.junit.Assert.assertEquals;

public class HttpEndpointSecurityTest {
@Test(expected = IllegalArgumentException.class)
public void shouldRejectLoopbackHost() {
HttpEndpointSecurity.validateExternalHttpUri("http://127.0.0.1:8080/internal", "api");
}

@Test(expected = IllegalArgumentException.class)
public void shouldRejectLocalhost() {
HttpEndpointSecurity.validateExternalHttpUri("http://localhost:8080/internal", "api");
}

@Test(expected = IllegalArgumentException.class)
public void shouldRejectIpv6LoopbackHost() {
HttpEndpointSecurity.validateExternalHttpUri("http://[::1]:8080/internal", "api");
}

@Test(expected = IllegalArgumentException.class)
public void shouldRejectIpv4MappedIpv6LoopbackHost() {
HttpEndpointSecurity.validateExternalHttpUri("http://[::ffff:127.0.0.1]:8080/internal", "api");
}

@Test(expected = IllegalArgumentException.class)
public void shouldRejectIntegerEncodedLoopbackHost() {
HttpEndpointSecurity.validateExternalHttpUri("http://2130706433/internal", "api");
}

@Test(expected = IllegalArgumentException.class)
public void shouldRejectHexEncodedLoopbackHost() {
HttpEndpointSecurity.validateExternalHttpUri("http://0x7f000001/internal", "api");
}

@Test(expected = IllegalArgumentException.class)
public void shouldRejectOctalEncodedLoopbackHost() {
HttpEndpointSecurity.validateExternalHttpUri("http://017700000001/internal", "api");
}

@Test(expected = IllegalArgumentException.class)
public void shouldRejectMetadataServiceAddress() {
HttpEndpointSecurity.validateExternalHttpUri("http://169.254.169.254/latest/meta-data", "api");
}

@Test(expected = IllegalArgumentException.class)
public void shouldRejectPrivateNetworkAddress() {
HttpEndpointSecurity.validateExternalHttpUri("http://10.0.0.1/admin", "api");
}

@Test(expected = IllegalArgumentException.class)
public void shouldRejectUnsupportedScheme() {
HttpEndpointSecurity.validateExternalHttpUri("file:///etc/passwd", "api");
}

@Test(expected = IllegalArgumentException.class)
public void shouldRejectUserInfo() {
HttpEndpointSecurity.validateExternalHttpUri("http://user@8.8.8.8/path", "api");
}

@Test
public void shouldAllowPublicHttpAddress() {
assertEquals("8.8.8.8", HttpEndpointSecurity.validateExternalHttpUri("https://8.8.8.8/data", "api").getHost());
}

@Test
public void shouldAllowExplicitlyConfiguredHost() {
String previous = System.getProperty("datamate.ssrf.allowedHosts");
try {
System.setProperty("datamate.ssrf.allowedHosts", "localhost");
assertEquals("localhost",
HttpEndpointSecurity.validateExternalHttpUri("http://localhost:9000/data", "api").getHost());
} finally {
if (previous == null) {
System.clearProperty("datamate.ssrf.allowedHosts");
} else {
System.setProperty("datamate.ssrf.allowedHosts", previous);
}
}
}
}
5 changes: 5 additions & 0 deletions runtime/datax/s3reader/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Loading
Loading