Skip to content

Commit

Permalink
Fix topic bug (#160)
Browse files Browse the repository at this point in the history
* modify socketTimeout to 120s

* add regular match

* support rc + Pessimistic

* revert Fix pessimistic commit wait error

* revert revert

* fix topic bug
  • Loading branch information
aressu1985 authored Jan 17, 2024
1 parent b725c53 commit de38ee4
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 13 deletions.
Binary file modified lib/mo-tester-1.0-SNAPSHOT.jar
Binary file not shown.
30 changes: 17 additions & 13 deletions src/main/java/io/mo/util/ScriptParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.parser.Feature;
import com.alibaba.fastjson.parser.ParserConfig;
import com.alibaba.fastjson.serializer.SerializerFeature;
import freemarker.template.utility.NumberUtil;
import io.mo.cases.SqlCommand;
import io.mo.cases.TestScript;
Expand Down Expand Up @@ -90,14 +93,14 @@ public static void parseScript(String path){
command.setRegularMatch(true);
}

if(trimmedLine.startsWith(COMMON.KAFKA_PRODUCE_START_FLAG)){
String topic = trimmedLine.substring(COMMON.KAFKA_PRODUCE_START_FLAG.length());
if(topic == null || topic.equalsIgnoreCase("")){
LOG.error(String.format("[%s][row:%s]No topic info in kafka produce tag.",path,rowNum));
continue;
}
tar.setTopic(topic);
}
// if(trimmedLine.startsWith(COMMON.KAFKA_PRODUCE_START_FLAG)){
// String topic = trimmedLine.substring(COMMON.KAFKA_PRODUCE_START_FLAG.length());
// if(topic == null || topic.equalsIgnoreCase("")){
// LOG.error(String.format("[%s][row:%s]No topic info in kafka produce tag.",path,rowNum));
// continue;
// }
// tar.setTopic(topic);
// }

if(trimmedLine.startsWith(COMMON.KAFKA_PRODUCE_START_FLAG)){
String topic = trimmedLine.substring(COMMON.KAFKA_PRODUCE_START_FLAG.length());
Expand All @@ -113,11 +116,12 @@ public static void parseScript(String path){
isProduceRecord = false;
JSONArray array = JSON.parseArray(messages.toString());
for(int i = 0; i < array.size();i++){
tar.addRecord(array.get(i).toString());
tar.addRecord(JSON.toJSONString(array.get(i),SerializerFeature.NotWriteDefaultValue));
}
int index = testScript.getCommands().size();
testScript.addKafkaProduceRecord(index,tar);
messages.delete(0,messages.length());
tar = new TopicAndRecords();
}


Expand Down Expand Up @@ -284,12 +288,12 @@ public static TestScript getTestScript(){
}

public static void main(String[] args){
String str = "[\"This is the first message\",\n" +
"\"This is the second message\",\n" +
"\"This is the third message\"]";
String str = "[{\"c1\":\"yyjjuejf\",\"c2\":\"中国\",\"c3\":\"##$%^&@\",\"c4\":\"\"},\n" +
"{\"c1\":NULL,\"c2\":\"0xDERFW9883\",\"c3\":\"5727362\",\"c4\":\"x'612543'\"}]";
System.out.println(str);
JSONArray array = JSON.parseArray(str);
for(int i = 0 ; i < array.size();i++) {
System.out.println(array.get(i));
System.out.println(JSON.toJSONString(array.get(i),SerializerFeature.WriteMapNullValue));
}
}
}

0 comments on commit de38ee4

Please sign in to comment.