Skip to content

Commit

Permalink
Update Keyspace/Table name in prepared Query statement
Browse files Browse the repository at this point in the history
Previously TokenAwarePolicy always used Keyspace explicitly set in
cluster.Keyspace regardless of the keyspace in the Query. Now after
preparing statement Keyspace and Table names are transferred to the
Query and it can make use of that.

Fixes: #1621
  • Loading branch information
sylwiaszunejko committed Jul 14, 2023
1 parent 642e867 commit 85d3da3
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 43 deletions.
2 changes: 1 addition & 1 deletion cass1batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestShouldPrepareFunction(t *testing.T) {
}

for _, test := range shouldPrepareTests {
q := &Query{stmt: test.Stmt}
q := &Query{stmt: test.Stmt, routingInfo: &queryRoutingInfo{}}
if got := q.shouldPrepare(); got != test.Result {
t.Fatalf("%q: got %v, expected %v\n", test.Stmt, got, test.Result)
}
Expand Down
6 changes: 6 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1376,6 +1376,12 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter {
params: params,
customPayload: qry.customPayload,
}

// Set "keyspace" and "table" property in the query if it is present in preparedMetadata
qry.routingInfo.mu.Lock()
qry.routingInfo.keyspace = info.request.keyspace
qry.routingInfo.table = info.request.table
qry.routingInfo.mu.Unlock()
} else {
frame = &writeQueryFrame{
statement: qry.stmt,
Expand Down
13 changes: 8 additions & 5 deletions frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,10 @@ type preparedMetadata struct {

// proto v4+
pkeyColumns []int

keyspace string

table string
}

func (r preparedMetadata) String() string {
Expand Down Expand Up @@ -952,26 +956,25 @@ func (f *framer) parsePreparedMetadata() preparedMetadata {
return meta
}

var keyspace, table string
globalSpec := meta.flags&flagGlobalTableSpec == flagGlobalTableSpec
if globalSpec {
keyspace = f.readString()
table = f.readString()
meta.keyspace = f.readString()
meta.table = f.readString()
}

var cols []ColumnInfo
if meta.colCount < 1000 {
// preallocate columninfo to avoid excess copying
cols = make([]ColumnInfo, meta.colCount)
for i := 0; i < meta.colCount; i++ {
f.readCol(&cols[i], &meta.resultMetadata, globalSpec, keyspace, table)
f.readCol(&cols[i], &meta.resultMetadata, globalSpec, meta.keyspace, meta.table)
}
} else {
// use append, huge number of columns usually indicates a corrupt frame or
// just a huge row.
for i := 0; i < meta.colCount; i++ {
var col ColumnInfo
f.readCol(&col, &meta.resultMetadata, globalSpec, keyspace, table)
f.readCol(&col, &meta.resultMetadata, globalSpec, meta.keyspace, meta.table)
cols = append(cols, col)
}
}
Expand Down
33 changes: 19 additions & 14 deletions integration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,22 @@ function run_tests() {
local keypath="$(pwd)/testdata/pki"

local conf=(
"client_encryption_options.enabled: true"
"client_encryption_options.keystore: $keypath/.keystore"
"client_encryption_options.keystore_password: cassandra"
"client_encryption_options.require_client_auth: true"
"client_encryption_options.truststore: $keypath/.truststore"
"client_encryption_options.truststore_password: cassandra"
"concurrent_reads: 2"
"concurrent_writes: 2"
"rpc_server_type: sync"
"rpc_min_threads: 2"
"rpc_max_threads: 2"
"write_request_timeout_in_ms: 5000"
"read_request_timeout_in_ms: 5000"
)
"client_encryption_options.enabled: true"
"client_encryption_options.keystore: $keypath/.keystore"
"client_encryption_options.keystore_password: cassandra"
"client_encryption_options.require_client_auth: true"
"client_encryption_options.truststore: $keypath/.truststore"
"client_encryption_options.truststore_password: cassandra"
"concurrent_reads: 2"
"concurrent_writes: 2"
"write_request_timeout_in_ms: 5000"
"read_request_timeout_in_ms: 5000"
)

conf+=(
"enable_user_defined_functions: true"
"enable_materialized_views: true"
)

ccm remove test || true

Expand Down Expand Up @@ -58,6 +60,9 @@ function run_tests() {
export JVM_EXTRA_OPTS=" -Dcassandra.test.fail_writes_ks=test -Dcassandra.custom_query_handler_class=org.apache.cassandra.cql3.CustomPayloadMirroringQueryHandler"
fi

proto=4
export JVM_EXTRA_OPTS=" -Dcassandra.test.fail_writes_ks=test -Dcassandra.custom_query_handler_class=org.apache.cassandra.cql3.CustomPayloadMirroringQueryHandler"

sleep 1s

ccm list
Expand Down
62 changes: 62 additions & 0 deletions keyspace_table_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package gocql

import (
"context"
"fmt"
"log"
"testing"
)

// Keyspace_table checks if Query.Keyspace() is updated based on prepared statement
func TestKeyspaceTable(t *testing.T) {
cluster := createCluster()

fallback := RoundRobinHostPolicy()
cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(fallback)

session, err := cluster.CreateSession()
if err != nil {
t.Fatal("createSession:", err)
}

cluster.Keyspace = "wrong_keyspace"

keyspace := "test"
table := "table1"

createKeyspace(t, cluster, keyspace)

err = createTable(session, fmt.Sprintf(`CREATE TABLE %s.%s (pk int, ck int, v int, PRIMARY KEY (pk, ck));
`, keyspace, table))

if err != nil {
panic(fmt.Sprintf("unable to create table: %v", err))
}

if err := session.control.awaitSchemaAgreement(); err != nil {
t.Fatal(err)
}

ctx := context.Background()

// insert a row
if err := session.Query(`INSERT INTO test.table1(pk, ck, v) VALUES (?, ?, ?)`,
1, 2, 3).WithContext(ctx).Exec(); err != nil {
log.Fatal(err)
}

var pk int

/* Search for a specific set of records whose 'pk' column matches
* the value of inserted row. */
qry := session.Query(`SELECT pk FROM test.table1 WHERE pk = ? LIMIT 1`,
1).WithContext(ctx).Consistency(One)
if err := qry.Scan(&pk); err != nil {
log.Fatal(err)
}

// cluster.Keyspace was set to "wrong_keyspace", but during prepering statement
// Keyspace in Query should be changed to "test" and Table should be changed to table1
assertEqual(t, "qry.Keyspace()", "test", qry.Keyspace())
assertEqual(t, "qry.Table()", "table1", qry.Table())
}
14 changes: 7 additions & 7 deletions policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestHostPolicy_TokenAware_SimpleStrategy(t *testing.T) {
return nil, errors.New("not initalized")
}

query := &Query{}
query := &Query{routingInfo: &queryRoutingInfo{}}
query.getKeyspace = func() string { return keyspace }

iter := policy.Pick(nil)
Expand Down Expand Up @@ -201,7 +201,7 @@ func TestHostPolicy_TokenAware_NilHostInfo(t *testing.T) {
}
policy.SetPartitioner("OrderedPartitioner")

query := &Query{}
query := &Query{routingInfo: &queryRoutingInfo{}}
query.getKeyspace = func() string { return "myKeyspace" }
query.RoutingKey([]byte("20"))

Expand Down Expand Up @@ -259,7 +259,7 @@ func TestCOWList_Add(t *testing.T) {

// TestSimpleRetryPolicy makes sure that we only allow 1 + numRetries attempts
func TestSimpleRetryPolicy(t *testing.T) {
q := &Query{}
q := &Query{routingInfo: &queryRoutingInfo{}}

// this should allow a total of 3 tries.
rt := &SimpleRetryPolicy{NumRetries: 2}
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestExponentialBackoffPolicy(t *testing.T) {

func TestDowngradingConsistencyRetryPolicy(t *testing.T) {

q := &Query{cons: LocalQuorum}
q := &Query{cons: LocalQuorum, routingInfo: &queryRoutingInfo{}}

rewt0 := &RequestErrWriteTimeout{
Received: 0,
Expand Down Expand Up @@ -478,7 +478,7 @@ func TestHostPolicy_TokenAware(t *testing.T) {
return nil, errors.New("not initialized")
}

query := &Query{}
query := &Query{routingInfo: &queryRoutingInfo{}}
query.getKeyspace = func() string { return keyspace }

iter := policy.Pick(nil)
Expand Down Expand Up @@ -580,7 +580,7 @@ func TestHostPolicy_TokenAware_NetworkStrategy(t *testing.T) {
return nil, errors.New("not initialized")
}

query := &Query{}
query := &Query{routingInfo: &queryRoutingInfo{}}
query.getKeyspace = func() string { return keyspace }

iter := policy.Pick(nil)
Expand Down Expand Up @@ -707,7 +707,7 @@ func TestHostPolicy_TokenAware_RackAware(t *testing.T) {
policyWithFallbackInternal.getKeyspaceName = policyInternal.getKeyspaceName
policyWithFallbackInternal.getKeyspaceMetadata = policyInternal.getKeyspaceMetadata

query := &Query{}
query := &Query{routingInfo: &queryRoutingInfo{}}
query.getKeyspace = func() string { return keyspace }

iter := policy.Pick(nil)
Expand Down
1 change: 1 addition & 0 deletions query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type ExecutableQuery interface {
speculativeExecutionPolicy() SpeculativeExecutionPolicy
GetRoutingKey() ([]byte, error)
Keyspace() string
Table() string
IsIdempotent() bool

withContext(context.Context) ExecutableQuery
Expand Down
Loading

0 comments on commit 85d3da3

Please sign in to comment.