forked from taiyoh/graphqlws-subscription-server
-
Notifications
You must be signed in to change notification settings - Fork 0
/
subscribeservice.go
95 lines (82 loc) · 2.61 KB
/
subscribeservice.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package gss
import (
"context"
"net/http"
"github.com/functionalfoundry/graphqlws"
"github.com/graphql-go/graphql"
)
type GraphQLContextKey string
type CanSendToUserFunc func(conn *graphqlws.Connection, reqData *RequestData) bool
type SubscribeService struct {
graphqlws.SubscriptionManager
Schema *graphql.Schema
Pool graphqlws.SubscriptionManager
Filter SubscribeFilter
canSendToUser CanSendToUserFunc
}
func NewSubscribeService(schema *graphql.Schema, c CanSendToUserFunc) *SubscribeService {
return &SubscribeService{
Schema: schema,
Pool: graphqlws.NewSubscriptionManager(schema),
Filter: NewSubscribeFilter(),
canSendToUser: c,
}
}
func (s *SubscribeService) AddSubscription(conn graphqlws.Connection, sub *graphqlws.Subscription) []error {
errs := s.Pool.AddSubscription(conn, sub)
if errs != nil {
return errs
}
s.Filter.RegisterConnectionIDFromDocument(conn.ID(), sub.ID, sub.Document, sub.Variables)
return nil
}
func (s *SubscribeService) RemoveSubscription(conn graphqlws.Connection, sub *graphqlws.Subscription) {
s.Pool.RemoveSubscription(conn, sub)
s.Filter.RemoveSubscriptionIDFromConnectionID(conn.ID(), sub.ID)
}
func (s *SubscribeService) RemoveSubscriptions(conn graphqlws.Connection) {
s.Pool.RemoveSubscriptions(conn)
s.Filter.RemoveConnectionIDFromChannels(conn.ID())
}
func (s *SubscribeService) Subscriptions() graphqlws.Subscriptions {
return s.Pool.Subscriptions()
}
func (s *SubscribeService) Publish(reqData *RequestData) {
ctx := context.Background()
ctx = context.WithValue(ctx, GraphQLContextKey("payload"), reqData.Payload)
connIDsMap := s.Filter.GetChannelRegisteredConnectionIDs(reqData.Channel)
for conn, subsByID := range s.Pool.Subscriptions() {
if len(reqData.Users) > 0 && !s.canSendToUser(&conn, reqData) {
continue
}
subID, ok := connIDsMap[conn.ID()]
if !ok {
continue
}
for sid, sub := range subsByID {
if sid != subID {
continue
}
res := graphql.Do(graphql.Params{
Schema: *s.Schema, // The GraphQL schema
RequestString: sub.Query,
VariableValues: sub.Variables,
OperationName: sub.OperationName,
Context: ctx,
})
rest := map[string]interface{}{}
for k, v := range res.Data.(map[string]interface{}) {
if k == sub.Fields[0] {
rest[k] = v
}
}
sub.SendData(&graphqlws.DataMessagePayload{Data: rest})
}
}
}
func (s *SubscribeService) NewSubscriptionHandler(authCallback graphqlws.AuthenticateFunc) http.Handler {
return graphqlws.NewHandler(graphqlws.HandlerConfig{
SubscriptionManager: s,
Authenticate: authCallback,
})
}