Skip to content

Commit e988a2f

Browse files
committed
Made it possible to redo a clone operation
1 parent 6318f3c commit e988a2f

12 files changed

Lines changed: 300 additions & 20 deletions

File tree

core/src/main/java/dev/mhh/cloner/api/CloneError.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ public enum CloneError {
2222
CREATE_TEMP_TABLE_FAILED("Failed to create a temporary table for data transfer"),
2323
COPY_IN_FAILED("Failed to copy data into the temporary table"),
2424
MOVE_DATA_FAILED("Failed to move data from temporary table to the target table"),
25+
DELETE_STALE_DATA_FAILED("Failed to delete stale data from the target table"),
26+
GET_CONFLICT_COLUMNS_FAILED("Failed to get columns for the on conflict clause"),
27+
GET_CONFLICT_COLUMNS_RETURNED_NOTHING("The on conflict clause returned nothing"),
28+
CONFIG_FINGERPRINT_WAS_NOT_FIRST_ENTRY("The configuration fingerprint was not the first entry in the ZIP"),
29+
CONFIG_FINGERPRINT_READ_FAILED("Failed to read the configuration fingerprint from the ZIP"),
30+
CONFIG_FINGERPRINT_MISMATCH("The configuration fingerprint in the ZIP does not match the provided configuration"),
2531
;
2632

2733
private final String message;
@@ -34,6 +40,11 @@ public String message(final Object... args) {
3440
return String.format(message, args);
3541
}
3642

43+
44+
public CloneException toException() {
45+
return new CloneException(this);
46+
}
47+
3748
public Supplier<CloneException> toException(final Object... args) {
3849
return () -> new CloneException(this, args);
3950
}

core/src/main/java/dev/mhh/cloner/api/CloneException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public CloneError error() {
2222

2323
@FunctionalInterface
2424
public interface ThrowableSupplier<T> {
25-
T get() throws SQLException, IOException;
25+
T get() throws SQLException, IOException, CloneException;
2626
}
2727

2828
public static <T> T wrapKnownExceptions(

core/src/main/java/dev/mhh/cloner/api/Cloner.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ void exportClone(
1515
) throws CloneException;
1616
void importClone(
1717
InputStream inputStream,
18-
Connection connection
18+
Connection connection,
19+
CloneConfiguration<T> configuration
1920
) throws CloneException;
2021
}

core/src/main/java/dev/mhh/cloner/impl/ClonerImpl.java

Lines changed: 86 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@
66
import dev.mhh.cloner.api.Cloner;
77
import dev.mhh.cloner.api.verification.ConfigurationVerifier;
88
import dev.mhh.cloner.impl.select.CloneConfigurationTransformer;
9+
import dev.mhh.cloner.impl.select.DeletePreviousTransformer;
910
import org.postgresql.PGConnection;
1011
import org.postgresql.copy.CopyManager;
1112

1213
import java.io.InputStream;
1314
import java.io.OutputStream;
1415
import java.io.Serializable;
16+
import java.nio.charset.StandardCharsets;
1517
import java.sql.Connection;
1618
import java.sql.SQLException;
1719
import java.util.List;
@@ -20,6 +22,8 @@
2022
import java.util.zip.ZipOutputStream;
2123

2224
public class ClonerImpl<T extends Serializable> implements Cloner<T> {
25+
private static final String CONFIG_ENTRY_NAME = "__config__.txt";
26+
2327
@Override
2428
public void exportClone(
2529
final OutputStream outputStream,
@@ -35,8 +39,21 @@ public void exportClone(
3539

3640
final var zos = new ZipOutputStream(outputStream);
3741

38-
for (final var select : selects) {
42+
final var configEntry = new ZipEntry(CONFIG_ENTRY_NAME);
43+
CloneException.wrapKnownExceptions(
44+
() -> zos.putNextEntry(configEntry),
45+
CloneError.ZIP_ENTRY_CREATION_FAILED
46+
);
47+
CloneException.wrapKnownExceptions(
48+
() -> zos.write(configuration.toString().getBytes(StandardCharsets.UTF_8)),
49+
CloneError.ZIP_ENTRY_CREATION_FAILED
50+
);
51+
CloneException.wrapKnownExceptions(
52+
zos::closeEntry,
53+
CloneError.ZIP_ENTRY_CLOSE_FAILED
54+
);
3955

56+
for (final var select : selects) {
4057
final var zipEntry = new ZipEntry(select.tableName() + ".csv");
4158
CloneException.wrapKnownExceptions(
4259
() -> zos.putNextEntry(zipEntry),
@@ -73,12 +90,28 @@ private static CopyManager getCopyManager(final Connection connection) throws Cl
7390
}
7491

7592
@Override
76-
public void importClone(final InputStream inputStream, final Connection connection) throws CloneException {
93+
public void importClone(
94+
final InputStream inputStream,
95+
final Connection connection,
96+
final CloneConfiguration<T> configuration
97+
) throws CloneException {
7798
final var copyManager = getCopyManager(connection);
7899

100+
final var deleteScopeByTable = DeletePreviousTransformer.transform(configuration);
101+
79102
final var zis = new ZipInputStream(inputStream);
80103

81-
ZipEntry entry;
104+
var entry = CloneException.wrapKnownExceptions(
105+
zis::getNextEntry,
106+
CloneError.ZIP_ENTRY_NEXT_FAILED
107+
);
108+
109+
if (entry.getName().equals(CONFIG_ENTRY_NAME)) {
110+
verifyConfigFingerprint(zis, configuration);
111+
CloneException.wrapKnownExceptions(zis::closeEntry, CloneError.ZIP_ENTRY_CLOSE_FAILED);
112+
} else {
113+
throw CloneError.CONFIG_FINGERPRINT_WAS_NOT_FIRST_ENTRY.toException();
114+
}
82115

83116
while ((entry = CloneException.wrapKnownExceptions(
84117
zis::getNextEntry,
@@ -88,7 +121,7 @@ public void importClone(final InputStream inputStream, final Connection connecti
88121

89122
final var tableName = fileName.substring(0, fileName.length() - 4);
90123

91-
final var tempTableSql = "create temp table temp_" + tableName + " (like " + tableName + " including defaults)";
124+
final var tempTableSql = "create temp table temp_" + tableName + " (like " + tableName + " including defaults) on commit drop";
92125
CloneException.wrapKnownExceptions(
93126
() -> execute(connection, tempTableSql),
94127
CloneError.CREATE_TEMP_TABLE_FAILED
@@ -100,7 +133,20 @@ public void importClone(final InputStream inputStream, final Connection connecti
100133
CloneError.COPY_IN_FAILED
101134
);
102135

103-
final var moveSql = "insert into " + tableName + " select * from temp_" + tableName;
136+
final var deleteScope = deleteScopeByTable.get(tableName);
137+
if (deleteScope != null) {
138+
final var idColumn = configuration.idColumnName();
139+
final var deleteSql = "delete from " + tableName
140+
+ " where " + idColumn + " not in (select " + idColumn + " from temp_" + tableName + ")"
141+
+ " and " + deleteScope;
142+
CloneException.wrapKnownExceptions(
143+
() -> execute(connection, deleteSql),
144+
CloneError.DELETE_STALE_DATA_FAILED
145+
);
146+
}
147+
148+
final var conflictClause = buildConflictClause(connection, tableName, configuration);
149+
final var moveSql = "insert into " + tableName + " select * from temp_" + tableName + " " + conflictClause;
104150
CloneException.wrapKnownExceptions(
105151
() -> execute(connection, moveSql),
106152
CloneError.MOVE_DATA_FAILED
@@ -113,6 +159,41 @@ public void importClone(final InputStream inputStream, final Connection connecti
113159
}
114160
}
115161

162+
private String buildConflictClause(final Connection connection, final String tableName, final CloneConfiguration<T> configuration) throws CloneException {
163+
final var idColumn = configuration.idColumnName();
164+
final var schema = configuration.schema();
165+
final var sql = "select 'on conflict (" + idColumn + ") do update set ' || "
166+
+ "string_agg(column_name || ' = excluded.' || column_name, ', ') "
167+
+ "from information_schema.columns "
168+
+ "where table_schema = '" + schema + "' "
169+
+ "and table_name = '" + tableName + "' "
170+
+ "and column_name != '" + idColumn + "'";
171+
172+
return CloneException.wrapKnownExceptions(() -> {
173+
try (final var stmt = connection.createStatement();
174+
final var rs = stmt.executeQuery(sql)) {
175+
if (rs.next()) {
176+
final var result = rs.getString(1);
177+
if (result != null) {
178+
return result;
179+
}
180+
}
181+
throw CloneError.GET_CONFLICT_COLUMNS_RETURNED_NOTHING.toException();
182+
}
183+
}, CloneError.GET_CONFLICT_COLUMNS_FAILED);
184+
}
185+
186+
private void verifyConfigFingerprint(final ZipInputStream zis, final CloneConfiguration<T> configuration) throws CloneException {
187+
final var storedFingerprint = CloneException.wrapKnownExceptions(
188+
() -> new String(zis.readAllBytes(), StandardCharsets.UTF_8),
189+
CloneError.CONFIG_FINGERPRINT_READ_FAILED
190+
);
191+
192+
if (!configuration.toString().equals(storedFingerprint)) {
193+
throw new CloneException(CloneError.CONFIG_FINGERPRINT_MISMATCH);
194+
}
195+
}
196+
116197
private void execute(final Connection connection, final String sql) throws SQLException {
117198
try (final var stmt = connection.createStatement()) {
118199
stmt.execute(sql);
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package dev.mhh.cloner.impl.select;
2+
3+
import dev.mhh.cloner.api.CloneConfiguration;
4+
import dev.mhh.cloner.api.table.JoinByJoinTableFk;
5+
import dev.mhh.cloner.api.table.JoinByMatchingColumns;
6+
import dev.mhh.cloner.api.table.JoinConfig;
7+
import dev.mhh.cloner.api.table.TableConfig;
8+
9+
import java.io.Serializable;
10+
import java.util.HashMap;
11+
import java.util.Map;
12+
13+
/**
14+
* Produces a map of table name to delete scope condition fragment.
15+
* The condition is used to scope the DELETE statement during recloning:
16+
* DELETE FROM table WHERE id NOT IN (SELECT id FROM temp_table) AND <condition>
17+
*
18+
* Tables joined via joinByMainTableFk are excluded (null) because the FK lives in
19+
* the parent table, which hasn't been processed yet when the child is imported.
20+
*/
21+
public final class DeletePreviousTransformer<T extends Serializable> {
22+
private final CloneConfiguration<T> configuration;
23+
24+
private DeletePreviousTransformer(final CloneConfiguration<T> configuration) {
25+
this.configuration = configuration;
26+
}
27+
28+
public static <T extends Serializable> Map<String, String> transform(final CloneConfiguration<T> configuration) {
29+
return new DeletePreviousTransformer<>(configuration).transform();
30+
}
31+
32+
private Map<String, String> transform() {
33+
final var result = new HashMap<String, String>();
34+
transformRoot(configuration.rootTable(), result);
35+
return result;
36+
}
37+
38+
private void transformRoot(final TableConfig<?> table, final Map<String, String> result) {
39+
table.joins().forEach(join -> transformJoin(join, result));
40+
}
41+
42+
private void transformJoin(final JoinConfig join, final Map<String, String> result) {
43+
final var newIn = switch (join) {
44+
case final JoinByJoinTableFk j -> j.foreignKey() + " in (select " + j.foreignKey() + " from temp_" + j.tableName() + ")";
45+
case final JoinByMatchingColumns j -> j.joinedTableColumn() + " in (select " + j.joinedTableColumn() + " from temp_" + j.tableName() + ")";
46+
default -> null;
47+
};
48+
49+
if (newIn != null) {
50+
final var previous = result.get(join.tableName());
51+
final var condition = previous == null
52+
? newIn
53+
: ("(" + previous + " or " + newIn + ")");
54+
result.put(join.tableName(), condition);
55+
}
56+
57+
join.joins().forEach(j -> transformJoin(j, result));
58+
}
59+
}

core/src/test/java/dev/mhh/cloner/OverwriteDataTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ void runOverwriteTestCase(final Connection exportConn, final Connection importCo
5353
}
5454

5555
try (final var in = new ByteArrayInputStream(zipBytes)) {
56-
cloner.importClone(in, importConn);
56+
cloner.importClone(in, importConn, CLONE_CONFIGURATION);
5757
}
5858

5959
// Assert — stale body values should be gone, replaced by 'relevant'
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package dev.mhh.cloner;
2+
3+
import dev.mhh.cloner.api.CloneException;
4+
import dev.mhh.cloner.api.Cloner;
5+
import dev.mhh.cloner.impl.ClonerImpl;
6+
import org.junit.jupiter.api.Test;
7+
8+
import java.io.ByteArrayInputStream;
9+
import java.io.ByteArrayOutputStream;
10+
import java.io.IOException;
11+
import java.sql.Connection;
12+
import java.sql.SQLException;
13+
import java.util.List;
14+
15+
import static dev.mhh.cloner.TestDatabase.*;
16+
import static org.junit.jupiter.api.Assertions.assertEquals;
17+
import static org.junit.jupiter.api.Assertions.assertTrue;
18+
19+
class RecloneTest {
20+
Cloner<Long> cloner = new ClonerImpl<>();
21+
22+
@Test
23+
void recloningSameDataTwiceSucceeds() throws SQLException, IOException, CloneException {
24+
runTestWithScripts(
25+
List.of("V1__schema.sql", "V2__data.sql"),
26+
List.of("V1__schema.sql"),
27+
this::runRecloneTestCase
28+
);
29+
}
30+
31+
@Test
32+
void recloningSameDataTwiceSucceedsWithRLS() throws SQLException, IOException, CloneException {
33+
runTestWithScripts(
34+
List.of("V1__schema.sql", "V2__data.sql", "V3__rls.sql"),
35+
List.of("V1__schema.sql", "V3__rls.sql"),
36+
this::runRecloneTestCase
37+
);
38+
}
39+
40+
@Test
41+
void recloneDeletesStaleScopedRows() throws SQLException, IOException, CloneException {
42+
runTestWithScripts(
43+
List.of("V1__schema.sql", "V2__data.sql"),
44+
List.of("V1__schema.sql"),
45+
this::runDeleteStaleScopedRowsTestCase
46+
);
47+
}
48+
49+
void runRecloneTestCase(final Connection exportConn, final Connection importConn) throws SQLException, IOException, CloneException {
50+
final byte[] zipBytes;
51+
try (final var out = new ByteArrayOutputStream()) {
52+
cloner.exportClone(out, exportConn, CLONE_CONFIGURATION, List.of(1L));
53+
zipBytes = out.toByteArray();
54+
}
55+
56+
// First import
57+
try (final var in = new ByteArrayInputStream(zipBytes)) {
58+
cloner.importClone(in, importConn, CLONE_CONFIGURATION);
59+
importConn.commit();
60+
}
61+
62+
assertCount(importConn, MAIN_TABLE, RELEVANT, 1);
63+
assertCount(importConn, SUB_TABLE, RELEVANT, 2);
64+
65+
// Second import — must not fail with conflict errors
66+
try (final var in = new ByteArrayInputStream(zipBytes)) {
67+
cloner.importClone(in, importConn, CLONE_CONFIGURATION);
68+
}
69+
70+
// Data should be identical after reclone — no duplicates
71+
assertCount(importConn, MAIN_TABLE, RELEVANT, 1);
72+
assertCount(importConn, SUB_TABLE, RELEVANT, 2);
73+
assertCount(importConn, SUB_EQUALITY_TABLE, RELEVANT, 2);
74+
assertCount(importConn, SUB_SUB_TABLE, RELEVANT, 6);
75+
assertCount(importConn, SUB_SUB_SUB_TABLE, RELEVANT, 2);
76+
assertCount(importConn, SUB_SUB_POINTER_TABLE, RELEVANT, 3);
77+
}
78+
79+
void runDeleteStaleScopedRowsTestCase(final Connection exportConn, final Connection importConn) throws SQLException, IOException, CloneException {
80+
final byte[] zipBytes;
81+
try (final var out = new ByteArrayOutputStream()) {
82+
cloner.exportClone(out, exportConn, CLONE_CONFIGURATION, List.of(1L));
83+
zipBytes = out.toByteArray();
84+
}
85+
86+
// First import
87+
try (final var in = new ByteArrayInputStream(zipBytes)) {
88+
cloner.importClone(in, importConn, CLONE_CONFIGURATION);
89+
importConn.commit();
90+
}
91+
92+
assertCount(importConn, SUB_TABLE, RELEVANT, 2);
93+
94+
// Insert a stale row that is within the clone scope (main_id=1 is cloned)
95+
// but whose id won't appear in any future export — simulates a row deleted from source
96+
try (final var stmt = importConn.createStatement()) {
97+
stmt.execute("insert into " + SUB_TABLE + "(id, main_id, common_value, body) values (99, 1, 'deleted_value', 'stale_scoped')");
98+
}
99+
100+
assertCount(importConn, SUB_TABLE, "stale_scoped", 1);
101+
102+
// Reclone — the stale_scoped row (id=99, main_id=1) should be deleted
103+
// because main_id=1 is in scope but id=99 is not in the export
104+
try (final var in = new ByteArrayInputStream(zipBytes)) {
105+
cloner.importClone(in, importConn, CLONE_CONFIGURATION);
106+
}
107+
108+
assertCount(importConn, SUB_TABLE, "stale_scoped", 0);
109+
assertCount(importConn, SUB_TABLE, RELEVANT, 2);
110+
}
111+
112+
void assertCount(final Connection conn, final String table, final String body, final int expectedCount) throws SQLException {
113+
assertEquals(
114+
expectedCount,
115+
count(conn, "select count(*) from " + table + " where body = '" + body + "'"),
116+
"Counts of row in table '" + table + "' with body '" + body + "' do not match"
117+
);
118+
}
119+
120+
@SuppressWarnings("SqlSourceToSinkFlow")
121+
int count(final Connection conn, final String sql) throws SQLException {
122+
try (final var stmt = conn.prepareStatement(sql)) {
123+
final var rs = stmt.executeQuery();
124+
assertTrue(rs.next(), "Query returned no rows: " + sql);
125+
return rs.getInt(1);
126+
}
127+
}
128+
}

0 commit comments

Comments
 (0)