Skip to content

Commit

Permalink
Allow sending commands after sending an unsubscribe (#1036)
Browse files Browse the repository at this point in the history
* Add test of async commands after unsubscribe

Verify that commands are handled after unsubscribing from a channel.
A command is sent before the `unsubscribe` response is received,
which currently triggers an assert in async.c:567:

`redisProcessCallbacks: Assertion `(c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)' failed.`

* Handle async commands after an unsubscribe

When unsubscribing from the last channel we move from the `subscribe`
state to a normal state. These states uses different holders for the
command callback information.
By moving the callback info during the state change the callback order
can be maintained.
  • Loading branch information
bjosv authored Jan 18, 2022
1 parent ff860e5 commit f2ce598
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 18 deletions.
9 changes: 8 additions & 1 deletion async.c
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,15 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
/* Unset subscribed flag only when no pipelined pending subscribe. */
if (reply->element[2]->integer == 0
&& dictSize(ac->sub.channels) == 0
&& dictSize(ac->sub.patterns) == 0)
&& dictSize(ac->sub.patterns) == 0) {
c->flags &= ~REDIS_SUBSCRIBED;

/* Move ongoing regular command callbacks. */
redisCallback cb;
while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) {
__redisPushCallback(&ac->replies,&cb);
}
}
}
}
sdsfree(sname);
Expand Down
41 changes: 24 additions & 17 deletions test.c
Original file line number Diff line number Diff line change
Expand Up @@ -1454,8 +1454,15 @@ typedef struct TestState {
redisOptions *options;
int checkpoint;
int resp3;
int disconnect;
} TestState;

/* Helper to disconnect and stop event loop */
void async_disconnect(redisAsyncContext *ac) {
redisAsyncDisconnect(ac);
event_base_loopbreak(base);
}

/* Testcase timeout, will trigger a failure */
void timeout_cb(int fd, short event, void *arg) {
(void) fd; (void) event; (void) arg;
Expand All @@ -1480,9 +1487,18 @@ void publish_msg(redisOptions *options, const char* channel, const char* msg) {
disconnect(c, 0);
}

/* Expect a reply of type INTEGER */
void integer_cb(redisAsyncContext *ac, void *r, void *privdata) {
redisReply *reply = r;
TestState *state = privdata;
assert(reply != NULL && reply->type == REDIS_REPLY_INTEGER);
state->checkpoint++;
if (state->disconnect) async_disconnect(ac);
}

/* Subscribe callback for test_pubsub_handling and test_pubsub_handling_resp3:
* - a published message triggers an unsubscribe
* - an unsubscribe response triggers a disconnect */
* - a command is sent before the unsubscribe response is received. */
void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
redisReply *reply = r;
TestState *state = privdata;
Expand All @@ -1505,13 +1521,13 @@ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
redisAsyncCommand(ac,unexpected_cb,
(void*)"unsubscribe should call subscribe_cb()",
"unsubscribe");
/* Send a regular command after unsubscribing, then disconnect */
state->disconnect = 1;
redisAsyncCommand(ac,integer_cb,state,"LPUSH mylist foo");

} else if (strcmp(reply->element[0]->str,"unsubscribe") == 0) {
assert(strcmp(reply->element[1]->str,"mychannel") == 0 &&
reply->element[2]->str == NULL);

/* Disconnect after unsubscribe */
redisAsyncDisconnect(ac);
event_base_loopbreak(base);
} else {
printf("Unexpected pubsub command: %s\n", reply->element[0]->str);
exit(1);
Expand All @@ -1520,11 +1536,11 @@ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {

/* Expect a reply of type ARRAY */
void array_cb(redisAsyncContext *ac, void *r, void *privdata) {
(void) ac;
redisReply *reply = r;
TestState *state = privdata;
assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY);
state->checkpoint++;
if (state->disconnect) async_disconnect(ac);
}

/* Expect a NULL reply */
Expand Down Expand Up @@ -1565,7 +1581,7 @@ static void test_pubsub_handling(struct config config) {
event_base_free(base);

/* Verify test checkpoints */
assert(state.checkpoint == 2);
assert(state.checkpoint == 3);
}

/* Unexpected push message, will trigger a failure */
Expand All @@ -1575,15 +1591,6 @@ void unexpected_push_cb(redisAsyncContext *ac, void *r) {
exit(1);
}

/* Expect a reply of type INTEGER */
void integer_cb(redisAsyncContext *ac, void *r, void *privdata) {
(void) ac;
redisReply *reply = r;
TestState *state = privdata;
assert(reply != NULL && reply->type == REDIS_REPLY_INTEGER);
state->checkpoint++;
}

static void test_pubsub_handling_resp3(struct config config) {
test("Subscribe, handle published message and unsubscribe using RESP3: ");
/* Setup event dispatcher with a testcase timeout */
Expand Down Expand Up @@ -1624,7 +1631,7 @@ static void test_pubsub_handling_resp3(struct config config) {
event_base_free(base);

/* Verify test checkpoints */
assert(state.checkpoint == 5);
assert(state.checkpoint == 6);
}

/* Subscribe callback for test_command_timeout_during_pubsub:
Expand Down

0 comments on commit f2ce598

Please sign in to comment.