Skip to content

Commit

Permalink
Add kafka producer new (#159)
Browse files Browse the repository at this point in the history
* modify socketTimeout to 120s

* add regular match

* support rc + Pessimistic

* revert Fix pessimistic commit wait error

* revert revert

* add kafka producer
  • Loading branch information
aressu1985 authored Jan 16, 2024
1 parent b9f687c commit b725c53
Show file tree
Hide file tree
Showing 22 changed files with 378 additions and 34 deletions.
47 changes: 25 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ The Cases and Results are 1-1 correspondence, and they are actually `git submodu
name: "dump"
passwrod: "111"
```

* In `kafka.yml` file, configure the kafka server address, only for the cases that need to produce or consume messages from kafka server.
## 3. Run mo-tester

* With the simple below command, all the SQL test cases will automatically run and generate reports and error messages to *report/report.txt* and *report/error.txt*.
Expand All @@ -73,17 +73,18 @@ The Cases and Results are 1-1 correspondence, and they are actually `git submodu

If you'd like to adjust the test range, you can just change the `path` parameter of `run.yml`. And you can also specify some parameters when executing the command `./run.sh`, parameters are as followings:

|Parameters|Description|
|---|---|
|-p|set the path of test cases needed to be executed by mo-tester, the default value is configured by the `path` in `run.yaml`|
|-m|set the method that mo-tester will run with, the default value is configured by the `method` in `run.yaml`|
|-t|set the times that mo-tester will execute cases for, must be numeric, default is 1|
|-r|set The success rate that test cases should reach, the default value is configured by the `rate` in `run.yaml`|
|-i|set the including list, and only script files in the path whose name contains one of the lists will be executed, if more than one, separated by `,`, if not specified, refers to all cases included|
|-e|set the excluding list, and script files in the path whose name contains one of the lists will not be executed, if more than one, separated by `,`, if not specified, refers to none of the cases excluded|
|-g|means SQL commands which is marked with [bvt:issue] flag will not be executed,this flag starts with [-- @bvt:issue#{issueNO.}],and ends with [-- @bvt:issue],eg:<br>-- @bvt:issue#3236<br/><br>select date_add("1997-12-31 23:59:59",INTERVAL "-10000:1" HOUR_MINUTE);<br/><br>select date_add("1997-12-31 23:59:59",INTERVAL "-100 1" YEAR_MONTH);<br/><br>-- @bvt:issue<br/><br>Those two sql commands are associated with issue#3236, and they will not be executed in bvt test, until the flag is removed when issue#3236 is fixed.<br/>|
|-n|means the metadata of the resultset will be ignored when comparing the result|
|-c|only check whether the case file matches the related result file|
| Parameters |Description|
|------------|---|
| -p |set the path of test cases needed to be executed by mo-tester, the default value is configured by the `path` in `run.yaml`|
| -m |set the method that mo-tester will run with, the default value is configured by the `method` in `run.yaml`|
| -t |set the times that mo-tester will execute cases for, must be numeric, default is 1|
| -r |set The success rate that test cases should reach, the default value is configured by the `rate` in `run.yaml`|
| -i |set the including list, and only script files in the path whose name contains one of the lists will be executed, if more than one, separated by `,`, if not specified, refers to all cases included|
| -e |set the excluding list, and script files in the path whose name contains one of the lists will not be executed, if more than one, separated by `,`, if not specified, refers to none of the cases excluded|
| -g |means SQL commands which is marked with [bvt:issue] flag will not be executed,this flag starts with [-- @bvt:issue#{issueNO.}],and ends with [-- @bvt:issue],eg:<br>-- @bvt:issue#3236<br/><br>select date_add("1997-12-31 23:59:59",INTERVAL "-10000:1" HOUR_MINUTE);<br/><br>select date_add("1997-12-31 23:59:59",INTERVAL "-100 1" YEAR_MONTH);<br/><br>-- @bvt:issue<br/><br>Those two sql commands are associated with issue#3236, and they will not be executed in bvt test, until the flag is removed when issue#3236 is fixed.<br/>|
| -n |means the metadata of the resultset will be ignored when comparing the result|
| -c |only check whether the case file matches the related result file|
| -s |set the resource path that mo-tester use to store resources, and can be refered to in test file|

**Examples**:

Expand All @@ -100,16 +101,18 @@ Every time running `run.sh` will overwrite the report of the *error.txt* file,
## 4. Set tags in case scripts
Sometimes, to achieve some specific purposes, such as pausing or creating a new connection, you can add some special tags to the script file. The mo tester provides the following tags for use:

| Tags | Description |
|----------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| -- @skip:issue#{IssueNo.} | If set, the whole script file will be skipped, and not be executed any more for issue{IssueNo.} |
| -- @bvt:issue#{IssueNo.}<br/>-- @bvt:issue | The sql statements between those two tags will be not executed for issue{IssueNo.} |
| -- @sleep:{time} | The mo-tester will wait for {time} s |
| -- @session:id=2&user=root&password=111<br/> -- @session | The mo-tester will create a new connetion to execute sql statements between those two tags.<br/>Default value of id is 1, max is 10.<br/>Defualt value of user and password is configured in `mo.yml`. |
| -- @sortkey: | If the result is sorted, need set this tag for the sql statement. e.g.<br/> -- @sortkey:0,1: means sort keys are first column and second colum. |
| -- @delimiter {C} | Set new delimeter to String C, C can any string expect that starts with [/,#,--] |
| -- @system {C} | Set System Command that will be executed by the runner system |
| -- @wait:{D}:[commit or wait] | means this command will be blocked until the connection[id={D}] commit or rollback |
| Tags | Description |
|---------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| -- @skip:issue#{IssueNo.} | If set, the whole script file will be skipped, and not be executed any more for issue{IssueNo.} |
| -- @bvt:issue#{IssueNo.}<br/>-- @bvt:issue | The sql statements between those two tags will be not executed for issue{IssueNo.} |
| -- @sleep:{time} | The mo-tester will wait for {time} s |
| -- @session:id=2&user=root&password=111<br/> -- @session | The mo-tester will create a new connetion to execute sql statements between those two tags.<br/>Default value of id is 1, max is 10.<br/>Defualt value of user and password is configured in `mo.yml`. |
| -- @sortkey: | If the result is sorted, need set this tag for the sql statement. e.g.<br/> -- @sortkey:0,1: means sort keys are first column and second colum. |
| -- @delimiter {C} | Set new delimeter to String C, C can any string expect that starts with [/,#,--] |
| -- @system {C} | Set System Command that will be executed by the runner system |
| -- @wait:{D}:[commit or wait] | means this command will be blocked until the connection[id={D}] commit or rollback |
| -- @ignore:{num},...{num} | means the designated columns which index are in {num}s will not be check. |
| -- @kafka:produce:{topic}}<br/>JSON ARRAY<br/>-- @bvt:produce | means the mo-tester will send all the items in this json array to the designated topic of kafka server.<br/>The kafka server is configured in `kafka.yml` |



Expand Down
5 changes: 4 additions & 1 deletion cases/template.result
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
select sleep(10);
sleep(10)
0
create table t1 ( a int not null default 1, int32 int primary key);
insert into t1 (int32) values (-1),(1234567890),(2147483647);
insert into t1 (int32) values (-1),(1234567890),(2147483647);
Expand All @@ -12,7 +14,8 @@ select min(int32),max(int32),max(int32)-1 from t1;
min(int32) max(int32) max(int32) - 1
-1 2147483647 2147483646
select min(int32),max(int32),max(int32)-1 from t1 group by a;

min(int32) max(int32) max(int32) - 1
-1 2147483647 2147483646
drop table t1;
CREATE TABLE NATION (
N_NATIONKEY INTEGER NOT NULL,
Expand Down
11 changes: 11 additions & 0 deletions cases/template.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@

select sleep(10);
create table t1 ( a int not null default 1, int32 int primary key);
-- @kafka:produce:mo-stream-test
["This is the first message",
"This is the second message",
"This is the third message"]
-- @kafka:produce
insert into t1 (int32) values (-1),(1234567890),(2147483647);
-- @pattern
insert into t1 (int32) values (-1),(1234567890),(2147483647);
Expand Down Expand Up @@ -45,3 +51,8 @@ use template;
select * from nation;
select * from nation limit 2;
-- @session
-- @kafka:produce:mo-stream-test
["That is the first message",
"That is the second message",
"That is the third message"]
-- @kafka:produce
3 changes: 3 additions & 0 deletions kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
server: "localhost:9092"
topic: "mo-stream-test"
client_id: "mo-kafka-client"
Binary file added lib/fastjson-1.2.83.jar
Binary file not shown.
Binary file added lib/kafka-clients-3.6.0.jar
Binary file not shown.
Binary file added lib/lz4-java-1.8.0.jar
Binary file not shown.
Binary file modified lib/mo-tester-1.0-SNAPSHOT.jar
Binary file not shown.
Binary file added lib/slf4j-api-2.0.9.jar
Binary file not shown.
Binary file added lib/snappy-java-1.1.10.4.jar
Binary file not shown.
Binary file added lib/zstd-jni-1.5.5-1.jar
Binary file not shown.
44 changes: 39 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,6 @@
<artifactId>snakeyaml</artifactId>
<version>1.26</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>io.github.java-diff-utils</groupId>
<artifactId>java-diff-utils</artifactId>
Expand All @@ -110,5 +105,44 @@
<artifactId>dom4j</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.9</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>2.0.9</version>
<type>pom</type>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/log4j-over-slf4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-slf4j2-impl -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
<version>2.22.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
</dependencies>
</project>
19 changes: 19 additions & 0 deletions src/main/java/io/mo/cases/TestScript.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package io.mo.cases;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import io.mo.constant.RESULT;
import io.mo.stream.TopicAndRecords;

import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TestScript {

Expand All @@ -13,6 +18,8 @@ public class TestScript {
private ArrayList<SqlCommand> failedCommands = new ArrayList<>();
private ArrayList<SqlCommand> ignoredCommands = new ArrayList<>();
private ArrayList<SqlCommand> abnormalCommands = new ArrayList<>();

private Map<Integer, TopicAndRecords> produceRecords = new HashMap<Integer,TopicAndRecords>();

private String fileName;

Expand Down Expand Up @@ -134,4 +141,16 @@ public boolean isSkiped() {
public void setSkiped(boolean skiped) {
this.skiped = skiped;
}

public boolean isKafkaProduceCmd(int pos){
return produceRecords.containsKey(pos);
}

public void addKafkaProduceRecord(int pos, TopicAndRecords tar){
produceRecords.put(pos,tar);
}

public TopicAndRecords getTopicAndRecord(int pos){
return produceRecords.get(pos);
}
}
3 changes: 3 additions & 0 deletions src/main/java/io/mo/constant/COMMON.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public class COMMON {
//if result is type of error, and not unique, can use this flag to regular match
public static String REGULAR_MATCH_FLAG = "-- @pattern";

public static String KAFKA_PRODUCE_START_FLAG = "-- @kafka:produce:";
public static String KAFKA_PRODUCE_END_FLAG = "-- @kafka:produce";

public static String IGNORE_COLUMN_FLAG = "-- @ignore:";

public static String LOG_DIR = "log";
Expand Down
31 changes: 26 additions & 5 deletions src/main/java/io/mo/db/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import io.mo.constant.RESULT;
import io.mo.result.RSSet;
import io.mo.result.StmtResult;
import io.mo.stream.KafkaManager;
import io.mo.stream.Producer;
import io.mo.stream.TopicAndRecords;
import io.mo.util.MoConfUtil;
import io.mo.util.ResultParser;
import org.apache.log4j.Logger;
Expand Down Expand Up @@ -60,7 +63,18 @@ public static void run(TestScript script){
ArrayList<SqlCommand> commands = script.getCommands();
long start = System.currentTimeMillis();

for (SqlCommand command : commands) {
//for (SqlCommand command : commands) {
for(int i = 0; i < commands.size(); i++){
SqlCommand command = commands.get(i);

//if related to kafka stream record

if(script.isKafkaProduceCmd(i)){
Producer producer = KafkaManager.getProducer();
TopicAndRecords tar = script.getTopicAndRecord(i);
producer.send(tar);
LOG.info(String.format("Succeed to send the following messages to kafka server and topic[%s]:\n%s",tar.getTopic(),tar.getRecordsStr()));
}

//if need to sleep
if(command.getSleeptime() > 0){
Expand Down Expand Up @@ -328,6 +342,13 @@ public static boolean genRS(TestScript script){
try{
command = commands.get(j);

if(script.isKafkaProduceCmd(j)){
Producer producer = KafkaManager.getProducer();
TopicAndRecords tar = script.getTopicAndRecord(j);
producer.send(tar);
LOG.info(String.format("Succeed to send the following messages to kafka server and topic[%s]:\n%s",tar.getTopic(),tar.getRecordsStr()));
}

if(command.getSleeptime() > 0){
LOG.info(String.format("The tester will sleep for %s s, please wait....", command.getSleeptime()));
command.sleep();
Expand Down Expand Up @@ -656,13 +677,13 @@ public static void dropTestDB(Connection connection,String name){
public static void dropTestDB(Connection connection,TestScript script){
dropTestDB(connection,script.getUseDB());
}
public static void syncCommit(){

public static void syncCommit() {
Connection connection = ConnectionManager.getConnectionForSys();
if(connection == null){
if (connection == null) {
LOG.error("select mo_ctl('cn','synccommit','') failed. cause: Can not get invalid connection for sys user.");
}

try {
Statement statement = connection.createStatement();
statement.execute("select mo_ctl('cn','synccommit','')");
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/io/mo/stream/Consumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.mo.stream;

import org.apache.kafka.clients.consumer.KafkaConsumer;

public class Consumer {
KafkaConsumer<String,String> consumer;

public Consumer(KafkaConsumer consumer){
this.consumer = consumer;
}
}
23 changes: 23 additions & 0 deletions src/main/java/io/mo/stream/KafkaManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.mo.stream;

import io.mo.util.KafkaConfiUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaManager {

public static Producer getProducer(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConfiUtil.getServerAddr());
properties.put(ProducerConfig.CLIENT_ID_CONFIG,KafkaConfiUtil.getClientId());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
Producer producer = new Producer(kafkaProducer);
return producer;
}
}
Loading

0 comments on commit b725c53

Please sign in to comment.