Skip to content

Commit

Permalink
add kafka producer func
Browse files Browse the repository at this point in the history
  • Loading branch information
aressu1985 committed Nov 22, 2023
1 parent 2a95912 commit c4965c1
Show file tree
Hide file tree
Showing 16 changed files with 352 additions and 25 deletions.
47 changes: 25 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ The Cases and Results are 1-1 correspondence, and they are actually `git submodu
name: "dump"
passwrod: "111"
```

* In `kafka.yml` file, configure the kafka server address, only for the cases that need to produce or consume messages from kafka server.
## 3. Run mo-tester

* With the simple below command, all the SQL test cases will automatically run and generate reports and error messages to *report/report.txt* and *report/error.txt*.
Expand All @@ -73,17 +73,18 @@ The Cases and Results are 1-1 correspondence, and they are actually `git submodu

If you'd like to adjust the test range, you can just change the `path` parameter of `run.yml`. And you can also specify some parameters when executing the command `./run.sh`, parameters are as followings:

|Parameters|Description|
|---|---|
|-p|set the path of test cases needed to be executed by mo-tester, the default value is configured by the `path` in `run.yaml`|
|-m|set the method that mo-tester will run with, the default value is configured by the `method` in `run.yaml`|
|-t|set the times that mo-tester will execute cases for, must be numeric, default is 1|
|-r|set The success rate that test cases should reach, the default value is configured by the `rate` in `run.yaml`|
|-i|set the including list, and only script files in the path whose name contains one of the lists will be executed, if more than one, separated by `,`, if not specified, refers to all cases included|
|-e|set the excluding list, and script files in the path whose name contains one of the lists will not be executed, if more than one, separated by `,`, if not specified, refers to none of the cases excluded|
|-g|means SQL commands which is marked with [bvt:issue] flag will not be executed,this flag starts with [-- @bvt:issue#{issueNO.}],and ends with [-- @bvt:issue],eg:<br>-- @bvt:issue#3236<br/><br>select date_add("1997-12-31 23:59:59",INTERVAL "-10000:1" HOUR_MINUTE);<br/><br>select date_add("1997-12-31 23:59:59",INTERVAL "-100 1" YEAR_MONTH);<br/><br>-- @bvt:issue<br/><br>Those two sql commands are associated with issue#3236, and they will not be executed in bvt test, until the flag is removed when issue#3236 is fixed.<br/>|
|-n|means the metadata of the resultset will be ignored when comparing the result|
|-c|only check whether the case file matches the related result file|
| Parameters |Description|
|------------|---|
| -p |set the path of test cases needed to be executed by mo-tester, the default value is configured by the `path` in `run.yaml`|
| -m |set the method that mo-tester will run with, the default value is configured by the `method` in `run.yaml`|
| -t |set the times that mo-tester will execute cases for, must be numeric, default is 1|
| -r |set The success rate that test cases should reach, the default value is configured by the `rate` in `run.yaml`|
| -i |set the including list, and only script files in the path whose name contains one of the lists will be executed, if more than one, separated by `,`, if not specified, refers to all cases included|
| -e |set the excluding list, and script files in the path whose name contains one of the lists will not be executed, if more than one, separated by `,`, if not specified, refers to none of the cases excluded|
| -g |means SQL commands which is marked with [bvt:issue] flag will not be executed,this flag starts with [-- @bvt:issue#{issueNO.}],and ends with [-- @bvt:issue],eg:<br>-- @bvt:issue#3236<br/><br>select date_add("1997-12-31 23:59:59",INTERVAL "-10000:1" HOUR_MINUTE);<br/><br>select date_add("1997-12-31 23:59:59",INTERVAL "-100 1" YEAR_MONTH);<br/><br>-- @bvt:issue<br/><br>Those two sql commands are associated with issue#3236, and they will not be executed in bvt test, until the flag is removed when issue#3236 is fixed.<br/>|
| -n |means the metadata of the resultset will be ignored when comparing the result|
| -c |only check whether the case file matches the related result file|
| -s |set the resource path that mo-tester use to store resources, and can be refered to in test file|

**Examples**:

Expand All @@ -100,16 +101,18 @@ Every time running `run.sh` will overwrite the report of the *error.txt* file,
## 4. Set tags in case scripts
Sometimes, to achieve some specific purposes, such as pausing or creating a new connection, you can add some special tags to the script file. The mo tester provides the following tags for use:

| Tags | Description |
|----------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| -- @skip:issue#{IssueNo.} | If set, the whole script file will be skipped, and not be executed any more for issue{IssueNo.} |
| -- @bvt:issue#{IssueNo.}<br/>-- @bvt:issue | The sql statements between those two tags will be not executed for issue{IssueNo.} |
| -- @sleep:{time} | The mo-tester will wait for {time} s |
| -- @session:id=2&user=root&password=111<br/> -- @session | The mo-tester will create a new connetion to execute sql statements between those two tags.<br/>Default value of id is 1, max is 10.<br/>Defualt value of user and password is configured in `mo.yml`. |
| -- @sortkey: | If the result is sorted, need set this tag for the sql statement. e.g.<br/> -- @sortkey:0,1: means sort keys are first column and second colum. |
| -- @delimiter {C} | Set new delimeter to String C, C can any string expect that starts with [/,#,--] |
| -- @system {C} | Set System Command that will be executed by the runner system |
| -- @wait:{D}:[commit or wait] | means this command will be blocked until the connection[id={D}] commit or rollback |
| Tags | Description |
|---------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| -- @skip:issue#{IssueNo.} | If set, the whole script file will be skipped, and not be executed any more for issue{IssueNo.} |
| -- @bvt:issue#{IssueNo.}<br/>-- @bvt:issue | The sql statements between those two tags will be not executed for issue{IssueNo.} |
| -- @sleep:{time} | The mo-tester will wait for {time} s |
| -- @session:id=2&user=root&password=111<br/> -- @session | The mo-tester will create a new connetion to execute sql statements between those two tags.<br/>Default value of id is 1, max is 10.<br/>Defualt value of user and password is configured in `mo.yml`. |
| -- @sortkey: | If the result is sorted, need set this tag for the sql statement. e.g.<br/> -- @sortkey:0,1: means sort keys are first column and second colum. |
| -- @delimiter {C} | Set new delimeter to String C, C can any string expect that starts with [/,#,--] |
| -- @system {C} | Set System Command that will be executed by the runner system |
| -- @wait:{D}:[commit or wait] | means this command will be blocked until the connection[id={D}] commit or rollback |
| -- @ignore:{num},...{num} | means the designated columns which index are in {num}s will not be check. |
| -- @kafka:produce:{topic}}<br/>JSON ARRAY<br/>-- @bvt:produce | means the mo-tester will send all the items in this json array to the designated topic of kafka server.<br/>The kafka server is configured in `kafka.yml` |



Expand Down
5 changes: 4 additions & 1 deletion cases/template.result
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
select sleep(10);
sleep(10)
0
create table t1 ( a int not null default 1, int32 int primary key);
insert into t1 (int32) values (-1),(1234567890),(2147483647);
insert into t1 (int32) values (-1),(1234567890),(2147483647);
Expand All @@ -12,7 +14,8 @@ select min(int32),max(int32),max(int32)-1 from t1;
min(int32) max(int32) max(int32) - 1
-1 2147483647 2147483646
select min(int32),max(int32),max(int32)-1 from t1 group by a;

min(int32) max(int32) max(int32) - 1
-1 2147483647 2147483646
drop table t1;
CREATE TABLE NATION (
N_NATIONKEY INTEGER NOT NULL,
Expand Down
11 changes: 11 additions & 0 deletions cases/template.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@

select sleep(10);
create table t1 ( a int not null default 1, int32 int primary key);
-- @kafka:produce:mo-stream-test
["This is the first message",
"This is the second message",
"This is the third message"]
-- @kafka:produce
insert into t1 (int32) values (-1),(1234567890),(2147483647);
-- @pattern
insert into t1 (int32) values (-1),(1234567890),(2147483647);
Expand Down Expand Up @@ -45,3 +51,8 @@ use template;
select * from nation;
select * from nation limit 2;
-- @session
-- @kafka:produce:mo-stream-test
["That is the first message",
"That is the second message",
"That is the third message"]
-- @kafka:produce
Empty file added cases/test.sql
Empty file.
3 changes: 3 additions & 0 deletions kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
server: "localhost:9092"
topic: "mo-stream-test"
client_id: "mo-kafka-client"
Binary file modified lib/mo-tester-1.0-SNAPSHOT.jar
Binary file not shown.
17 changes: 17 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,22 @@
<artifactId>dom4j</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
</dependencies>
</project>
19 changes: 19 additions & 0 deletions src/main/java/io/mo/cases/TestScript.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package io.mo.cases;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import io.mo.constant.RESULT;
import io.mo.stream.TopicAndRecords;

import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TestScript {

Expand All @@ -13,6 +18,8 @@ public class TestScript {
private ArrayList<SqlCommand> failedCommands = new ArrayList<>();
private ArrayList<SqlCommand> ignoredCommands = new ArrayList<>();
private ArrayList<SqlCommand> abnormalCommands = new ArrayList<>();

private Map<Integer, TopicAndRecords> produceRecords = new HashMap<Integer,TopicAndRecords>();

private String fileName;

Expand Down Expand Up @@ -134,4 +141,16 @@ public boolean isSkiped() {
public void setSkiped(boolean skiped) {
this.skiped = skiped;
}

public boolean isKafkaProduceCmd(int pos){
return produceRecords.containsKey(pos);
}

public void addKafkaProduceRecord(int pos, TopicAndRecords tar){
produceRecords.put(pos,tar);
}

public TopicAndRecords getTopicAndRecord(int pos){
return produceRecords.get(pos);
}
}
3 changes: 3 additions & 0 deletions src/main/java/io/mo/constant/COMMON.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public class COMMON {
//if result is type of error, and not unique, can use this flag to regular match
public static String REGULAR_MATCH_FLAG = "-- @pattern";

public static String KAFKA_PRODUCE_START_FLAG = "-- @kafka:produce:";
public static String KAFKA_PRODUCE_END_FLAG = "-- @kafka:produce";

public static String IGNORE_COLUMN_FLAG = "-- @ignore:";

public static String LOG_DIR = "log";
Expand Down
23 changes: 22 additions & 1 deletion src/main/java/io/mo/db/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import io.mo.constant.RESULT;
import io.mo.result.RSSet;
import io.mo.result.StmtResult;
import io.mo.stream.KafkaManager;
import io.mo.stream.Producer;
import io.mo.stream.TopicAndRecords;
import io.mo.util.MoConfUtil;
import io.mo.util.ResultParser;
import org.apache.log4j.Logger;
Expand Down Expand Up @@ -60,7 +63,18 @@ public static void run(TestScript script){
ArrayList<SqlCommand> commands = script.getCommands();
long start = System.currentTimeMillis();

for (SqlCommand command : commands) {
//for (SqlCommand command : commands) {
for(int i = 0; i < commands.size(); i++){
SqlCommand command = commands.get(i);

//if related to kafka stream record

if(script.isKafkaProduceCmd(i)){
Producer producer = KafkaManager.getProducer();
TopicAndRecords tar = script.getTopicAndRecord(i);
producer.send(tar);
LOG.info(String.format("Succeed to send the following messages to kafka server and topic[%s]:\n%s",tar.getTopic(),tar.getRecordsStr()));
}

//if need to sleep
if(command.getSleeptime() > 0){
Expand Down Expand Up @@ -326,6 +340,13 @@ public static boolean genRS(TestScript script){
try{
command = commands.get(j);

if(script.isKafkaProduceCmd(j)){
Producer producer = KafkaManager.getProducer();
TopicAndRecords tar = script.getTopicAndRecord(j);
producer.send(tar);
LOG.info(String.format("Succeed to send the following messages to kafka server and topic[%s]:\n%s",tar.getTopic(),tar.getRecordsStr()));
}

if(command.getSleeptime() > 0){
LOG.info(String.format("The tester will sleep for %s s, please wait....", command.getSleeptime()));
command.sleep();
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/io/mo/stream/Consumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.mo.stream;

import org.apache.kafka.clients.consumer.KafkaConsumer;

public class Consumer {
KafkaConsumer<String,String> consumer;

public Consumer(KafkaConsumer consumer){
this.consumer = consumer;
}
}
23 changes: 23 additions & 0 deletions src/main/java/io/mo/stream/KafkaManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.mo.stream;

import io.mo.util.KafkaConfiUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaManager {

public static Producer getProducer(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConfiUtil.getServerAddr());
properties.put(ProducerConfig.CLIENT_ID_CONFIG,KafkaConfiUtil.getClientId());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
Producer producer = new Producer(kafkaProducer);
return producer;
}
}
60 changes: 60 additions & 0 deletions src/main/java/io/mo/stream/Producer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package io.mo.stream;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.concurrent.ExecutionException;

public class Producer {
private KafkaProducer<String,String> producer;

public Producer(KafkaProducer producer){
this.producer = producer;
}

public boolean send(String topic, String message){

ProducerRecord<String,String> record = new ProducerRecord<String,String>(topic,message);
try {
producer.send(record).get();
} catch (InterruptedException e) {
e.printStackTrace();
return false;
} catch (ExecutionException e) {
e.printStackTrace();
return false;
}
producer.close();

return true;
}

public boolean send(TopicAndRecords tar){
for(int i = 0; i < tar.size(); i++){
try {
ProducerRecord<String,String> record = new ProducerRecord<String,String>(tar.getTopic(),tar.getRecord(i));
producer.send(record).get();
} catch (InterruptedException e) {
e.printStackTrace();
return false;
} catch (ExecutionException e) {
e.printStackTrace();
return false;
}
}

producer.close();
return true;
}

public void close(){
this.producer.close();
}


public static void main(String[] args){
Producer producer = KafkaManager.getProducer();
producer.send("mo-stream-test","First message to topic mo-stream-test");

}
}
57 changes: 57 additions & 0 deletions src/main/java/io/mo/stream/TopicAndRecords.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.mo.stream;

import java.util.ArrayList;
import java.util.List;

public class TopicAndRecords {
private String topic;
private List<String> records = new ArrayList<>();

public TopicAndRecords(){

}

public TopicAndRecords(String topic){
this.topic = topic;
}

public void addRecord(String record){
records.add(record);
}

public int size(){
return records.size();
}

public String getRecord(int i){
return records.get(i);
}

public String getTopic(){
return topic;
}

public void setTopic(String topic){
this.topic = topic;
}

public String[] getRecords(){
if(records.size() > 0){
return records.toArray(new String[records.size()]);
}

return null;
}

public String getRecordsStr(){
StringBuffer buffer = new StringBuffer();
for(int i = 0; i < records.size();i++){
buffer.append(records.get(i));
buffer.append("\n");
}

if(buffer.length() > 0 )
return buffer.toString();
return null;
}
}
Loading

0 comments on commit c4965c1

Please sign in to comment.