Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kafka producer new #159

Merged
merged 20 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 25 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ The Cases and Results are 1-1 correspondence, and they are actually `git submodu
name: "dump"
passwrod: "111"
```

* In `kafka.yml` file, configure the kafka server address, only for the cases that need to produce or consume messages from kafka server.
## 3. Run mo-tester

* With the simple below command, all the SQL test cases will automatically run and generate reports and error messages to *report/report.txt* and *report/error.txt*.
Expand All @@ -73,17 +73,18 @@ The Cases and Results are 1-1 correspondence, and they are actually `git submodu

If you'd like to adjust the test range, you can just change the `path` parameter of `run.yml`. And you can also specify some parameters when executing the command `./run.sh`, parameters are as followings:

|Parameters|Description|
|---|---|
|-p|set the path of test cases needed to be executed by mo-tester, the default value is configured by the `path` in `run.yaml`|
|-m|set the method that mo-tester will run with, the default value is configured by the `method` in `run.yaml`|
|-t|set the times that mo-tester will execute cases for, must be numeric, default is 1|
|-r|set The success rate that test cases should reach, the default value is configured by the `rate` in `run.yaml`|
|-i|set the including list, and only script files in the path whose name contains one of the lists will be executed, if more than one, separated by `,`, if not specified, refers to all cases included|
|-e|set the excluding list, and script files in the path whose name contains one of the lists will not be executed, if more than one, separated by `,`, if not specified, refers to none of the cases excluded|
|-g|means SQL commands which is marked with [bvt:issue] flag will not be executed,this flag starts with [-- @bvt:issue#{issueNO.}],and ends with [-- @bvt:issue],eg:<br>-- @bvt:issue#3236<br/><br>select date_add("1997-12-31 23:59:59",INTERVAL "-10000:1" HOUR_MINUTE);<br/><br>select date_add("1997-12-31 23:59:59",INTERVAL "-100 1" YEAR_MONTH);<br/><br>-- @bvt:issue<br/><br>Those two sql commands are associated with issue#3236, and they will not be executed in bvt test, until the flag is removed when issue#3236 is fixed.<br/>|
|-n|means the metadata of the resultset will be ignored when comparing the result|
|-c|only check whether the case file matches the related result file|
| Parameters |Description|
|------------|---|
| -p |set the path of test cases needed to be executed by mo-tester, the default value is configured by the `path` in `run.yaml`|
| -m |set the method that mo-tester will run with, the default value is configured by the `method` in `run.yaml`|
| -t |set the times that mo-tester will execute cases for, must be numeric, default is 1|
| -r |set The success rate that test cases should reach, the default value is configured by the `rate` in `run.yaml`|
| -i |set the including list, and only script files in the path whose name contains one of the lists will be executed, if more than one, separated by `,`, if not specified, refers to all cases included|
| -e |set the excluding list, and script files in the path whose name contains one of the lists will not be executed, if more than one, separated by `,`, if not specified, refers to none of the cases excluded|
| -g |means SQL commands which is marked with [bvt:issue] flag will not be executed,this flag starts with [-- @bvt:issue#{issueNO.}],and ends with [-- @bvt:issue],eg:<br>-- @bvt:issue#3236<br/><br>select date_add("1997-12-31 23:59:59",INTERVAL "-10000:1" HOUR_MINUTE);<br/><br>select date_add("1997-12-31 23:59:59",INTERVAL "-100 1" YEAR_MONTH);<br/><br>-- @bvt:issue<br/><br>Those two sql commands are associated with issue#3236, and they will not be executed in bvt test, until the flag is removed when issue#3236 is fixed.<br/>|
| -n |means the metadata of the resultset will be ignored when comparing the result|
| -c |only check whether the case file matches the related result file|
| -s |set the resource path that mo-tester use to store resources, and can be refered to in test file|

**Examples**:

Expand All @@ -100,16 +101,18 @@ Every time running `run.sh` will overwrite the report of the *error.txt* file,
## 4. Set tags in case scripts
Sometimes, to achieve some specific purposes, such as pausing or creating a new connection, you can add some special tags to the script file. The mo tester provides the following tags for use:

| Tags | Description |
|----------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| -- @skip:issue#{IssueNo.} | If set, the whole script file will be skipped, and not be executed any more for issue{IssueNo.} |
| -- @bvt:issue#{IssueNo.}<br/>-- @bvt:issue | The sql statements between those two tags will be not executed for issue{IssueNo.} |
| -- @sleep:{time} | The mo-tester will wait for {time} s |
| -- @session:id=2&user=root&password=111<br/> -- @session | The mo-tester will create a new connetion to execute sql statements between those two tags.<br/>Default value of id is 1, max is 10.<br/>Defualt value of user and password is configured in `mo.yml`. |
| -- @sortkey: | If the result is sorted, need set this tag for the sql statement. e.g.<br/> -- @sortkey:0,1: means sort keys are first column and second colum. |
| -- @delimiter {C} | Set new delimeter to String C, C can any string expect that starts with [/,#,--] |
| -- @system {C} | Set System Command that will be executed by the runner system |
| -- @wait:{D}:[commit or wait] | means this command will be blocked until the connection[id={D}] commit or rollback |
| Tags | Description |
|---------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| -- @skip:issue#{IssueNo.} | If set, the whole script file will be skipped, and not be executed any more for issue{IssueNo.} |
| -- @bvt:issue#{IssueNo.}<br/>-- @bvt:issue | The sql statements between those two tags will be not executed for issue{IssueNo.} |
| -- @sleep:{time} | The mo-tester will wait for {time} s |
| -- @session:id=2&user=root&password=111<br/> -- @session | The mo-tester will create a new connetion to execute sql statements between those two tags.<br/>Default value of id is 1, max is 10.<br/>Defualt value of user and password is configured in `mo.yml`. |
| -- @sortkey: | If the result is sorted, need set this tag for the sql statement. e.g.<br/> -- @sortkey:0,1: means sort keys are first column and second colum. |
| -- @delimiter {C} | Set new delimeter to String C, C can any string expect that starts with [/,#,--] |
| -- @system {C} | Set System Command that will be executed by the runner system |
| -- @wait:{D}:[commit or wait] | means this command will be blocked until the connection[id={D}] commit or rollback |
| -- @ignore:{num},...{num} | means the designated columns which index are in {num}s will not be check. |
| -- @kafka:produce:{topic}}<br/>JSON ARRAY<br/>-- @bvt:produce | means the mo-tester will send all the items in this json array to the designated topic of kafka server.<br/>The kafka server is configured in `kafka.yml` |



Expand Down
5 changes: 4 additions & 1 deletion cases/template.result
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
select sleep(10);
sleep(10)
0
create table t1 ( a int not null default 1, int32 int primary key);
insert into t1 (int32) values (-1),(1234567890),(2147483647);
insert into t1 (int32) values (-1),(1234567890),(2147483647);
Expand All @@ -12,7 +14,8 @@ select min(int32),max(int32),max(int32)-1 from t1;
min(int32) max(int32) max(int32) - 1
-1 2147483647 2147483646
select min(int32),max(int32),max(int32)-1 from t1 group by a;

min(int32) max(int32) max(int32) - 1
-1 2147483647 2147483646
drop table t1;
CREATE TABLE NATION (
N_NATIONKEY INTEGER NOT NULL,
Expand Down
11 changes: 11 additions & 0 deletions cases/template.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@

select sleep(10);
create table t1 ( a int not null default 1, int32 int primary key);
-- @kafka:produce:mo-stream-test
["This is the first message",
"This is the second message",
"This is the third message"]
-- @kafka:produce
insert into t1 (int32) values (-1),(1234567890),(2147483647);
-- @pattern
insert into t1 (int32) values (-1),(1234567890),(2147483647);
Expand Down Expand Up @@ -45,3 +51,8 @@ use template;
select * from nation;
select * from nation limit 2;
-- @session
-- @kafka:produce:mo-stream-test
["That is the first message",
"That is the second message",
"That is the third message"]
-- @kafka:produce
3 changes: 3 additions & 0 deletions kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
server: "localhost:9092"
topic: "mo-stream-test"
client_id: "mo-kafka-client"
Binary file added lib/fastjson-1.2.83.jar
Binary file not shown.
Binary file added lib/kafka-clients-3.6.0.jar
Binary file not shown.
Binary file added lib/lz4-java-1.8.0.jar
Binary file not shown.
Binary file modified lib/mo-tester-1.0-SNAPSHOT.jar
Binary file not shown.
Binary file added lib/slf4j-api-2.0.9.jar
Binary file not shown.
Binary file added lib/snappy-java-1.1.10.4.jar
Binary file not shown.
Binary file added lib/zstd-jni-1.5.5-1.jar
Binary file not shown.
44 changes: 39 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,6 @@
<artifactId>snakeyaml</artifactId>
<version>1.26</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>io.github.java-diff-utils</groupId>
<artifactId>java-diff-utils</artifactId>
Expand All @@ -110,5 +105,44 @@
<artifactId>dom4j</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.9</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>2.0.9</version>
<type>pom</type>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/log4j-over-slf4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-slf4j2-impl -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
<version>2.22.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
</dependencies>
</project>
19 changes: 19 additions & 0 deletions src/main/java/io/mo/cases/TestScript.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package io.mo.cases;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import io.mo.constant.RESULT;
import io.mo.stream.TopicAndRecords;

import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TestScript {

Expand All @@ -13,6 +18,8 @@ public class TestScript {
private ArrayList<SqlCommand> failedCommands = new ArrayList<>();
private ArrayList<SqlCommand> ignoredCommands = new ArrayList<>();
private ArrayList<SqlCommand> abnormalCommands = new ArrayList<>();

private Map<Integer, TopicAndRecords> produceRecords = new HashMap<Integer,TopicAndRecords>();

private String fileName;

Expand Down Expand Up @@ -134,4 +141,16 @@ public boolean isSkiped() {
public void setSkiped(boolean skiped) {
this.skiped = skiped;
}

public boolean isKafkaProduceCmd(int pos){
return produceRecords.containsKey(pos);
}

public void addKafkaProduceRecord(int pos, TopicAndRecords tar){
produceRecords.put(pos,tar);
}

public TopicAndRecords getTopicAndRecord(int pos){
return produceRecords.get(pos);
}
}
3 changes: 3 additions & 0 deletions src/main/java/io/mo/constant/COMMON.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public class COMMON {
//if result is type of error, and not unique, can use this flag to regular match
public static String REGULAR_MATCH_FLAG = "-- @pattern";

public static String KAFKA_PRODUCE_START_FLAG = "-- @kafka:produce:";
public static String KAFKA_PRODUCE_END_FLAG = "-- @kafka:produce";

public static String IGNORE_COLUMN_FLAG = "-- @ignore:";

public static String LOG_DIR = "log";
Expand Down
31 changes: 26 additions & 5 deletions src/main/java/io/mo/db/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import io.mo.constant.RESULT;
import io.mo.result.RSSet;
import io.mo.result.StmtResult;
import io.mo.stream.KafkaManager;
import io.mo.stream.Producer;
import io.mo.stream.TopicAndRecords;
import io.mo.util.MoConfUtil;
import io.mo.util.ResultParser;
import org.apache.log4j.Logger;
Expand Down Expand Up @@ -60,7 +63,18 @@ public static void run(TestScript script){
ArrayList<SqlCommand> commands = script.getCommands();
long start = System.currentTimeMillis();

for (SqlCommand command : commands) {
//for (SqlCommand command : commands) {
for(int i = 0; i < commands.size(); i++){
SqlCommand command = commands.get(i);

//if related to kafka stream record

if(script.isKafkaProduceCmd(i)){
Producer producer = KafkaManager.getProducer();
TopicAndRecords tar = script.getTopicAndRecord(i);
producer.send(tar);
LOG.info(String.format("Succeed to send the following messages to kafka server and topic[%s]:\n%s",tar.getTopic(),tar.getRecordsStr()));
}

//if need to sleep
if(command.getSleeptime() > 0){
Expand Down Expand Up @@ -328,6 +342,13 @@ public static boolean genRS(TestScript script){
try{
command = commands.get(j);

if(script.isKafkaProduceCmd(j)){
Producer producer = KafkaManager.getProducer();
TopicAndRecords tar = script.getTopicAndRecord(j);
producer.send(tar);
LOG.info(String.format("Succeed to send the following messages to kafka server and topic[%s]:\n%s",tar.getTopic(),tar.getRecordsStr()));
}

if(command.getSleeptime() > 0){
LOG.info(String.format("The tester will sleep for %s s, please wait....", command.getSleeptime()));
command.sleep();
Expand Down Expand Up @@ -656,13 +677,13 @@ public static void dropTestDB(Connection connection,String name){
public static void dropTestDB(Connection connection,TestScript script){
dropTestDB(connection,script.getUseDB());
}
public static void syncCommit(){

public static void syncCommit() {
Connection connection = ConnectionManager.getConnectionForSys();
if(connection == null){
if (connection == null) {
LOG.error("select mo_ctl('cn','synccommit','') failed. cause: Can not get invalid connection for sys user.");
}

try {
Statement statement = connection.createStatement();
statement.execute("select mo_ctl('cn','synccommit','')");
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/io/mo/stream/Consumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.mo.stream;

import org.apache.kafka.clients.consumer.KafkaConsumer;

public class Consumer {
KafkaConsumer<String,String> consumer;

public Consumer(KafkaConsumer consumer){
this.consumer = consumer;
}
}
23 changes: 23 additions & 0 deletions src/main/java/io/mo/stream/KafkaManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.mo.stream;

import io.mo.util.KafkaConfiUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaManager {

public static Producer getProducer(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConfiUtil.getServerAddr());
properties.put(ProducerConfig.CLIENT_ID_CONFIG,KafkaConfiUtil.getClientId());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
Producer producer = new Producer(kafkaProducer);
return producer;
}
}
Loading
Loading