|
25 | 25 | import java.util.List; |
26 | 26 |
|
27 | 27 | import redis.clients.jedis.Jedis; |
| 28 | +import redis.clients.jedis.StreamEntry; |
28 | 29 | import redis.clients.jedis.StreamEntryID; |
29 | 30 |
|
30 | 31 | import static java.util.Objects.requireNonNull; |
@@ -52,11 +53,12 @@ public class KvrocksDataProcess { |
52 | 53 | this.fields = fieldInfo.getFields(); |
53 | 54 |
|
54 | 55 | String type = jedis.type(tableName); |
55 | | - this.dataType = requireNonNull(KvrocksDataType.fromTypeName(type), |
56 | | - () -> "unsupported Kvrocks data type: " + type); |
57 | | - this.dataFormat = requireNonNull( |
58 | | - KvrocksDataFormat.fromTypeName(fieldInfo.getDataFormat()), |
59 | | - () -> "unsupported data format: " + fieldInfo.getDataFormat()); |
| 56 | + this.dataType = |
| 57 | + requireNonNull(KvrocksDataType.fromTypeName(type)); |
| 58 | + this.dataFormat = |
| 59 | + requireNonNull( |
| 60 | + KvrocksDataFormat.fromTypeName( |
| 61 | + fieldInfo.getDataFormat())); |
60 | 62 |
|
61 | 63 | this.objectMapper = new ObjectMapper(); |
62 | 64 | objectMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); |
@@ -99,17 +101,19 @@ private List<Object[]> readString() { |
99 | 101 | /** Reads entries from a Kvrocks Stream via XRANGE. */ |
100 | 102 | private List<Object[]> readStream() { |
101 | 103 | List<Object[]> rows = new ArrayList<>(); |
102 | | - jedis.xrange(tableName, (StreamEntryID) null, (StreamEntryID) null, |
103 | | - Integer.MAX_VALUE).forEach(entry -> { |
104 | | - // Each stream entry has an ID and a map of field-value pairs. |
105 | | - // We serialize the map as a JSON string and parse it. |
| 104 | + List<StreamEntry> entries = |
| 105 | + jedis.xrange(tableName, (StreamEntryID) null, |
| 106 | + (StreamEntryID) null, Integer.MAX_VALUE); |
| 107 | + for (StreamEntry entry : entries) { |
106 | 108 | try { |
107 | | - String json = objectMapper.writeValueAsString(entry.getFields()); |
| 109 | + String json = |
| 110 | + objectMapper.writeValueAsString(entry.getFields()); |
108 | 111 | rows.add(parseRow(json)); |
109 | 112 | } catch (Exception e) { |
110 | | - throw new RuntimeException("Failed to serialize stream entry", e); |
| 113 | + throw new RuntimeException( |
| 114 | + "Failed to serialize stream entry", e); |
111 | 115 | } |
112 | | - }); |
| 116 | + } |
113 | 117 | return rows; |
114 | 118 | } |
115 | 119 |
|
|
0 commit comments