Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

apply clock_error for data store #573

Merged
merged 1 commit into from
Oct 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ func GetDefaultConfig() *Config {
}
}

func (config *Config) Now() *time.Time {
return lo.ToPtr(time.Now().Add(config.Server.ClockError))
}

func (config *Config) UserNeighborDigest() string {
var builder strings.Builder
builder.WriteString(fmt.Sprintf("%v-%v", config.Recommend.UserNeighbors.NeighborType, config.Recommend.UserNeighbors.EnableIndex))
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/orcaman/concurrent-map v1.0.0
github.com/prometheus/client_golang v1.13.0
github.com/rakyll/statik v0.1.7
github.com/samber/lo v1.27.0
github.com/samber/lo v1.33.0
github.com/schollz/progressbar/v3 v3.9.0
github.com/scylladb/go-set v1.0.2
github.com/sijms/go-ora/v2 v2.4.27
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,8 @@ github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU=
github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/samber/lo v1.27.0 h1:GOyDWxsblvqYobqsmUuMddPa2/mMzkKyojlXol4+LaQ=
github.com/samber/lo v1.27.0/go.mod h1:it33p9UtPMS7z72fP4gw/EIfQB2eI8ke7GR2wc6+Rhg=
github.com/samber/lo v1.33.0 h1:2aKucr+rQV6gHpY3bpeZu69uYoQOzVhGT3J22Op6Cjk=
github.com/samber/lo v1.33.0/go.mod h1:HLeWcJRRyLKp3+/XBJvOrerCQn9mhdKMHyd7IRlgeQ8=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/schollz/progressbar/v3 v3.9.0 h1:k9SRNQ8KZyibz1UZOaKxnkUE3iGtmGSDt1YY9KlCYQk=
github.com/schollz/progressbar/v3 v3.9.0/go.mod h1:W5IEwbJecncFGBvuEh4A7HT1nZZ6WNIL2i3qbnI0WKY=
Expand Down
4 changes: 2 additions & 2 deletions master/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ type Feedback struct {
func (m *Master) getTypedFeedbackByUser(request *restful.Request, response *restful.Response) {
feedbackType := request.PathParameter("feedback-type")
userId := request.PathParameter("user-id")
feedback, err := m.DataClient.GetUserFeedback(userId, false, feedbackType)
feedback, err := m.DataClient.GetUserFeedback(userId, m.Config.Now(), feedbackType)
if err != nil {
server.InternalServerError(response, err)
return
Expand Down Expand Up @@ -1143,7 +1143,7 @@ func (m *Master) importExportFeedback(response http.ResponseWriter, request *htt
return
}
// write rows
feedbackChan, errChan := m.DataClient.GetFeedbackStream(batchSize, nil)
feedbackChan, errChan := m.DataClient.GetFeedbackStream(batchSize, nil, m.Config.Now())
for feedback := range feedbackChan {
for _, v := range feedback {
if _, err = response.Write([]byte(fmt.Sprintf("%s,%s,%s,%v\r\n",
Expand Down
8 changes: 4 additions & 4 deletions master/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func TestMaster_ImportFeedback(t *testing.T) {
// check
assert.Equal(t, http.StatusOK, w.Result().StatusCode)
assert.JSONEq(t, marshal(t, server.Success{RowAffected: 3}), w.Body.String())
_, feedback, err := s.DataClient.GetFeedback("", 100, nil)
_, feedback, err := s.DataClient.GetFeedback("", 100, nil, lo.ToPtr(time.Now()))
assert.NoError(t, err)
assert.Equal(t, []data.Feedback{
{FeedbackKey: data.FeedbackKey{FeedbackType: "click", UserId: "0", ItemId: "2"}},
Expand Down Expand Up @@ -379,7 +379,7 @@ func TestMaster_ImportFeedback_Default(t *testing.T) {
// check
assert.Equal(t, http.StatusOK, w.Result().StatusCode)
assert.JSONEq(t, marshal(t, server.Success{RowAffected: 3}), w.Body.String())
_, feedback, err := s.DataClient.GetFeedback("", 100, nil)
_, feedback, err := s.DataClient.GetFeedback("", 100, nil, lo.ToPtr(time.Now()))
assert.NoError(t, err)
assert.Equal(t, []data.Feedback{
{FeedbackKey: data.FeedbackKey{FeedbackType: "click", UserId: "0", ItemId: "2"}},
Expand Down Expand Up @@ -748,7 +748,7 @@ func TestMaster_Purge(t *testing.T) {
_, items, err := s.DataClient.GetItems("", 100, nil)
assert.NoError(t, err)
assert.Equal(t, 100, len(items))
_, feedbacks, err := s.DataClient.GetFeedback("", 100, nil)
_, feedbacks, err := s.DataClient.GetFeedback("", 100, nil, lo.ToPtr(time.Now()))
assert.NoError(t, err)
assert.Equal(t, 100, len(feedbacks))

Expand Down Expand Up @@ -776,7 +776,7 @@ func TestMaster_Purge(t *testing.T) {
_, items, err = s.DataClient.GetItems("", 100, nil)
assert.NoError(t, err)
assert.Empty(t, items)
_, feedbacks, err = s.DataClient.GetFeedback("", 100, nil)
_, feedbacks, err = s.DataClient.GetFeedback("", 100, nil, lo.ToPtr(time.Now()))
assert.NoError(t, err)
assert.Empty(t, feedbacks)
}
4 changes: 2 additions & 2 deletions master/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -1507,7 +1507,7 @@ func (m *Master) LoadDataFromDatabase(database data.Database, posFeedbackTypes,
// STEP 3: pull positive feedback
var feedbackCount float64
start = time.Now()
feedbackChan, errChan := database.GetFeedbackStream(batchSize, feedbackTimeLimit, posFeedbackTypes...)
feedbackChan, errChan := database.GetFeedbackStream(batchSize, feedbackTimeLimit, m.Config.Now(), posFeedbackTypes...)
for feedback := range feedbackChan {
for _, f := range feedback {
feedbackCount++
Expand Down Expand Up @@ -1546,7 +1546,7 @@ func (m *Master) LoadDataFromDatabase(database data.Database, posFeedbackTypes,

// STEP 4: pull negative feedback
start = time.Now()
feedbackChan, errChan = database.GetFeedbackStream(batchSize, feedbackTimeLimit, readTypes...)
feedbackChan, errChan = database.GetFeedbackStream(batchSize, feedbackTimeLimit, m.Config.Now(), readTypes...)
for feedback := range feedbackChan {
for _, f := range feedback {
feedbackCount++
Expand Down
12 changes: 6 additions & 6 deletions server/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ func (s *RestServer) requireUserFeedback(ctx *recommendContext) error {
if ctx.userFeedback == nil {
start := time.Now()
var err error
ctx.userFeedback, err = s.DataClient.GetUserFeedback(ctx.userId, false)
ctx.userFeedback, err = s.DataClient.GetUserFeedback(ctx.userId, s.Config.Now())
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -822,7 +822,7 @@ func (s *RestServer) RecommendUserBased(ctx *recommendContext) error {
}
for _, user := range similarUsers {
// load historical feedback
feedbacks, err := s.DataClient.GetUserFeedback(user.Id, false, s.Config.Recommend.DataSource.PositiveFeedbackTypes...)
feedbacks, err := s.DataClient.GetUserFeedback(user.Id, s.Config.Now(), s.Config.Recommend.DataSource.PositiveFeedbackTypes...)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1232,7 +1232,7 @@ func (s *RestServer) deleteUser(request *restful.Request, response *restful.Resp
func (s *RestServer) getTypedFeedbackByUser(request *restful.Request, response *restful.Response) {
feedbackType := request.PathParameter("feedback-type")
userId := request.PathParameter("user-id")
feedback, err := s.DataClient.GetUserFeedback(userId, false, feedbackType)
feedback, err := s.DataClient.GetUserFeedback(userId, s.Config.Now(), feedbackType)
if err != nil {
InternalServerError(response, err)
return
Expand All @@ -1243,7 +1243,7 @@ func (s *RestServer) getTypedFeedbackByUser(request *restful.Request, response *
// get feedback by user-id
func (s *RestServer) getFeedbackByUser(request *restful.Request, response *restful.Response) {
userId := request.PathParameter("user-id")
feedback, err := s.DataClient.GetUserFeedback(userId, false)
feedback, err := s.DataClient.GetUserFeedback(userId, s.Config.Now())
if err != nil {
InternalServerError(response, err)
return
Expand Down Expand Up @@ -1627,7 +1627,7 @@ func (s *RestServer) getFeedback(request *restful.Request, response *restful.Res
BadRequest(response, err)
return
}
cursor, feedback, err := s.DataClient.GetFeedback(cursor, n, nil)
cursor, feedback, err := s.DataClient.GetFeedback(cursor, n, nil, s.Config.Now())
if err != nil {
InternalServerError(response, err)
return
Expand All @@ -1644,7 +1644,7 @@ func (s *RestServer) getTypedFeedback(request *restful.Request, response *restfu
BadRequest(response, err)
return
}
cursor, feedback, err := s.DataClient.GetFeedback(cursor, n, nil, feedbackType)
cursor, feedback, err := s.DataClient.GetFeedback(cursor, n, nil, s.Config.Now(), feedbackType)
if err != nil {
InternalServerError(response, err)
return
Expand Down
4 changes: 2 additions & 2 deletions server/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ func TestServer_Feedback(t *testing.T) {
Status(http.StatusOK).
Body(`{"RowAffected": 1}`).
End()
ret, err := s.DataClient.GetUserFeedback("0", false, "click")
ret, err := s.DataClient.GetUserFeedback("0", s.Config.Now(), "click")
assert.NoError(t, err)
assert.Equal(t, 1, len(ret))
assert.Equal(t, "override", ret[0].Comment)
Expand All @@ -741,7 +741,7 @@ func TestServer_Feedback(t *testing.T) {
Status(http.StatusOK).
Body(`{"RowAffected": 1}`).
End()
ret, err = s.DataClient.GetUserFeedback("0", false, "click")
ret, err = s.DataClient.GetUserFeedback("0", s.Config.Now(), "click")
assert.NoError(t, err)
assert.Equal(t, 1, len(ret))
assert.Equal(t, "override", ret[0].Comment)
Expand Down
8 changes: 4 additions & 4 deletions storage/data/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,21 +122,21 @@ type Database interface {
DeleteItem(itemId string) error
GetItem(itemId string) (Item, error)
ModifyItem(itemId string, patch ItemPatch) error
GetItems(cursor string, n int, timeLimit *time.Time) (string, []Item, error)
GetItems(cursor string, n int, beginTime *time.Time) (string, []Item, error)
GetItemFeedback(itemId string, feedbackTypes ...string) ([]Feedback, error)
BatchInsertUsers(users []User) error
DeleteUser(userId string) error
GetUser(userId string) (User, error)
ModifyUser(userId string, patch UserPatch) error
GetUsers(cursor string, n int) (string, []User, error)
GetUserFeedback(userId string, withFuture bool, feedbackTypes ...string) ([]Feedback, error)
GetUserFeedback(userId string, endTime *time.Time, feedbackTypes ...string) ([]Feedback, error)
GetUserItemFeedback(userId, itemId string, feedbackTypes ...string) ([]Feedback, error)
DeleteUserItemFeedback(userId, itemId string, feedbackTypes ...string) (int, error)
BatchInsertFeedback(feedback []Feedback, insertUser, insertItem, overwrite bool) error
GetFeedback(cursor string, n int, timeLimit *time.Time, feedbackTypes ...string) (string, []Feedback, error)
GetFeedback(cursor string, n int, beginTime, endTime *time.Time, feedbackTypes ...string) (string, []Feedback, error)
GetUserStream(batchSize int) (chan []User, chan error)
GetItemStream(batchSize int, timeLimit *time.Time) (chan []Item, chan error)
GetFeedbackStream(batchSize int, timeLimit *time.Time, feedbackTypes ...string) (chan []Feedback, chan error)
GetFeedbackStream(batchSize int, beginTime, endTime *time.Time, feedbackTypes ...string) (chan []Feedback, chan error)
}

// Open a connection to a database.
Expand Down
48 changes: 24 additions & 24 deletions storage/data/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ func getItemStream(t *testing.T, db Database, batchSize int) []Item {
return items
}

func getFeedback(t *testing.T, db Database, batchSize int, timeLimit *time.Time, feedbackTypes ...string) []Feedback {
func getFeedback(t *testing.T, db Database, batchSize int, beginTime, endTime *time.Time, feedbackTypes ...string) []Feedback {
feedback := make([]Feedback, 0)
var err error
var data []Feedback
cursor := ""
for {
cursor, data, err = db.GetFeedback(cursor, batchSize, timeLimit, feedbackTypes...)
cursor, data, err = db.GetFeedback(cursor, batchSize, beginTime, endTime, feedbackTypes...)
assert.NoError(t, err)
feedback = append(feedback, data...)
if cursor == "" {
Expand All @@ -118,9 +118,9 @@ func getFeedback(t *testing.T, db Database, batchSize int, timeLimit *time.Time,
}
}

func getFeedbackStream(t *testing.T, db Database, batchSize int, timeLimit *time.Time, feedbackTypes ...string) []Feedback {
func getFeedbackStream(t *testing.T, db Database, batchSize int, beginTime, endTime *time.Time, feedbackTypes ...string) []Feedback {
var feedbacks []Feedback
feedbackChan, errChan := db.GetFeedbackStream(batchSize, timeLimit, feedbackTypes...)
feedbackChan, errChan := db.GetFeedbackStream(batchSize, beginTime, endTime, feedbackTypes...)
for batchFeedback := range feedbackChan {
feedbacks = append(feedbacks, batchFeedback...)
}
Expand Down Expand Up @@ -226,18 +226,18 @@ func testFeedback(t *testing.T, db Database) {
err = db.BatchInsertFeedback(futureFeedback, true, true, true)
assert.NoError(t, err)
// Get feedback
ret := getFeedback(t, db, 3, nil, positiveFeedbackType)
ret := getFeedback(t, db, 3, nil, lo.ToPtr(time.Now()), positiveFeedbackType)
assert.Equal(t, feedback, ret)
ret = getFeedback(t, db, 2, nil)
ret = getFeedback(t, db, 2, nil, lo.ToPtr(time.Now()))
assert.Equal(t, len(feedback)+2, len(ret))
ret = getFeedback(t, db, 2, lo.ToPtr(timestamp.Add(time.Second)))
ret = getFeedback(t, db, 2, lo.ToPtr(timestamp.Add(time.Second)), lo.ToPtr(time.Now()))
assert.Empty(t, ret)
// Get feedback stream
feedbackFromStream := getFeedbackStream(t, db, 3, nil, positiveFeedbackType)
feedbackFromStream := getFeedbackStream(t, db, 3, nil, lo.ToPtr(time.Now()), positiveFeedbackType)
assert.ElementsMatch(t, feedback, feedbackFromStream)
feedbackFromStream = getFeedbackStream(t, db, 3, nil)
feedbackFromStream = getFeedbackStream(t, db, 3, nil, lo.ToPtr(time.Now()))
assert.Equal(t, len(feedback)+2, len(feedbackFromStream))
feedbackFromStream = getFeedbackStream(t, db, 3, lo.ToPtr(timestamp.Add(time.Second)))
feedbackFromStream = getFeedbackStream(t, db, 3, lo.ToPtr(timestamp.Add(time.Second)), lo.ToPtr(time.Now()))
assert.Empty(t, feedbackFromStream)
// Get items
err = db.Optimize()
Expand Down Expand Up @@ -277,13 +277,13 @@ func testFeedback(t *testing.T, db Database) {
assert.NoError(t, err)
assert.Equal(t, Item{ItemId: "0", Labels: []string{"b"}, Timestamp: time.Date(1996, 4, 8, 10, 0, 0, 0, time.UTC)}, item)
// Get typed feedback by user
ret, err = db.GetUserFeedback("2", false, positiveFeedbackType)
ret, err = db.GetUserFeedback("2", lo.ToPtr(time.Now()), positiveFeedbackType)
assert.NoError(t, err)
assert.Equal(t, 1, len(ret))
assert.Equal(t, "2", ret[0].UserId)
assert.Equal(t, "4", ret[0].ItemId)
// Get all feedback by user
ret, err = db.GetUserFeedback("2", false)
ret, err = db.GetUserFeedback("2", lo.ToPtr(time.Now()))
assert.NoError(t, err)
assert.Equal(t, 2, len(ret))
// Get typed feedback by item
Expand All @@ -304,7 +304,7 @@ func testFeedback(t *testing.T, db Database) {
assert.NoError(t, err)
err = db.Optimize()
assert.NoError(t, err)
ret, err = db.GetUserFeedback("0", false, positiveFeedbackType)
ret, err = db.GetUserFeedback("0", lo.ToPtr(time.Now()), positiveFeedbackType)
assert.NoError(t, err)
assert.Equal(t, 1, len(ret))
assert.Equal(t, "override", ret[0].Comment)
Expand All @@ -316,7 +316,7 @@ func testFeedback(t *testing.T, db Database) {
assert.NoError(t, err)
err = db.Optimize()
assert.NoError(t, err)
ret, err = db.GetUserFeedback("0", false, positiveFeedbackType)
ret, err = db.GetUserFeedback("0", lo.ToPtr(time.Now()), positiveFeedbackType)
assert.NoError(t, err)
assert.Equal(t, 1, len(ret))
assert.Equal(t, "override", ret[0].Comment)
Expand Down Expand Up @@ -487,10 +487,10 @@ func testDeleteUser(t *testing.T, db Database) {
assert.NoError(t, err)
_, err = db.GetUser("a")
assert.NotNil(t, err, "failed to delete user")
ret, err := db.GetUserFeedback("a", false, positiveFeedbackType)
ret, err := db.GetUserFeedback("a", lo.ToPtr(time.Now()), positiveFeedbackType)
assert.NoError(t, err)
assert.Equal(t, 0, len(ret))
_, ret, err = db.GetFeedback("", 100, nil, positiveFeedbackType)
_, ret, err = db.GetFeedback("", 100, nil, lo.ToPtr(time.Now()), positiveFeedbackType)
assert.NoError(t, err)
assert.Empty(t, ret)
}
Expand All @@ -514,7 +514,7 @@ func testDeleteItem(t *testing.T, db Database) {
ret, err := db.GetItemFeedback("b", positiveFeedbackType)
assert.NoError(t, err)
assert.Equal(t, 0, len(ret))
_, ret, err = db.GetFeedback("", 100, nil, positiveFeedbackType)
_, ret, err = db.GetFeedback("", 100, nil, lo.ToPtr(time.Now()), positiveFeedbackType)
assert.NoError(t, err)
assert.Empty(t, ret)
}
Expand Down Expand Up @@ -610,11 +610,11 @@ func testTimeLimit(t *testing.T, db Database) {
}
err = db.BatchInsertFeedback(feedbacks, true, true, true)
assert.NoError(t, err)
_, retFeedback, err := db.GetFeedback("", 100, &timeLimit)
_, retFeedback, err := db.GetFeedback("", 100, &timeLimit, lo.ToPtr(time.Now()))
assert.NoError(t, err)
assert.Equal(t, []Feedback{feedbacks[4], feedbacks[3], feedbacks[2]}, retFeedback)
typeFilter := "type1"
_, retFeedback, err = db.GetFeedback("", 100, &timeLimit, typeFilter)
_, retFeedback, err = db.GetFeedback("", 100, &timeLimit, lo.ToPtr(time.Now()), typeFilter)
assert.NoError(t, err)
assert.Equal(t, []Feedback{feedbacks[4], feedbacks[3]}, retFeedback)
}
Expand All @@ -633,14 +633,14 @@ func testTimeZone(t *testing.T, db Database) {
}, true, true, true)
assert.NoError(t, err)
// get feedback stream
feedback := getFeedback(t, db, 10, nil)
feedback := getFeedback(t, db, 10, nil, lo.ToPtr(time.Now()))
assert.Equal(t, 3, len(feedback))
// get feedback
_, feedback, err = db.GetFeedback("", 10, nil)
_, feedback, err = db.GetFeedback("", 10, nil, lo.ToPtr(time.Now()))
assert.NoError(t, err)
assert.Equal(t, 3, len(feedback))
// get user feedback
feedback, err = db.GetUserFeedback("1", false)
feedback, err = db.GetUserFeedback("1", lo.ToPtr(time.Now()))
assert.NoError(t, err)
assert.Equal(t, 2, len(feedback))
// get item feedback
Expand Down Expand Up @@ -723,7 +723,7 @@ func testPurge(t *testing.T, db Database) {
_, items, err := db.GetItems("", 100, nil)
assert.NoError(t, err)
assert.Equal(t, 100, len(items))
_, feedbacks, err := db.GetFeedback("", 100, nil)
_, feedbacks, err := db.GetFeedback("", 100, nil, lo.ToPtr(time.Now()))
assert.NoError(t, err)
assert.Equal(t, 100, len(feedbacks))
// purge data
Expand All @@ -735,7 +735,7 @@ func testPurge(t *testing.T, db Database) {
_, items, err = db.GetItems("", 100, nil)
assert.NoError(t, err)
assert.Empty(t, items)
_, feedbacks, err = db.GetFeedback("", 100, nil)
_, feedbacks, err = db.GetFeedback("", 100, nil, lo.ToPtr(time.Now()))
assert.NoError(t, err)
assert.Empty(t, feedbacks)
// purge empty database
Expand Down
Loading