From 830ab904d65e8ef4f2900a27efc38aaf75cd5f87 Mon Sep 17 00:00:00 2001 From: Trevor Berrange Sanchez Date: Tue, 4 Jun 2024 16:59:52 +0200 Subject: [PATCH] Fix invalid free This involves passing the "message allocator" as a more general "table allocator" at the initialisation site of gossip service, to allow the gossip table to properly free the data with the right allocator. --- src/cmd/cmd.zig | 12 ++++---- src/gossip/fuzz.zig | 2 +- src/gossip/service.zig | 66 ++++++++++++++++++++++-------------------- 3 files changed, 43 insertions(+), 37 deletions(-) diff --git a/src/cmd/cmd.zig b/src/cmd/cmd.zig index 169f84d6b..500cb2224 100644 --- a/src/cmd/cmd.zig +++ b/src/cmd/cmd.zig @@ -47,8 +47,14 @@ const config = @import("config.zig"); const ACCOUNT_INDEX_BINS = @import("../accountsdb/db.zig").ACCOUNT_INDEX_BINS; const socket_tag = @import("../gossip/data.zig").socket_tag; +// TODO: use better allocator, unless GPA becomes more performant. + var gpa = std.heap.GeneralPurposeAllocator(.{}){}; const gpa_allocator = gpa.allocator(); + +var gossip_value_gpa: std.heap.GeneralPurposeAllocator(.{}) = .{}; +const gossip_value_gpa_allocator = gossip_value_gpa.allocator(); + const base58Encoder = base58.Encoder.init(.{}); const gossip_host = struct { @@ -532,6 +538,7 @@ fn initGossip( return try GossipService.init( gpa_allocator, + gossip_value_gpa_allocator, contact_info, my_keypair, entrypoints, @@ -572,13 +579,8 @@ fn initRepair( } fn runGossipWithConfigValues(gossip_service: *GossipService) !void { - // TODO: use better allocator, unless GPA becomes more performant. - var gp_message_allocator: std.heap.GeneralPurposeAllocator(.{}) = .{}; - defer _ = gp_message_allocator.deinit(); - const gossip_config = config.current.gossip; return gossip_service.run(.{ - .message_allocator = gp_message_allocator.allocator(), .spy_node = gossip_config.spy_node, .dump = gossip_config.dump, }); diff --git a/src/gossip/fuzz.zig b/src/gossip/fuzz.zig index 7f5581514..c538d625c 100644 --- a/src/gossip/fuzz.zig +++ b/src/gossip/fuzz.zig @@ -300,6 +300,7 @@ pub fn run() !void { var fuzz_exit = AtomicBool.init(false); var gossip_service_fuzzer = try GossipService.init( + allocator, allocator, fuzz_contact_info, fuzz_keypair, @@ -310,7 +311,6 @@ pub fn run() !void { const fuzz_handle = try std.Thread.spawn(.{}, GossipService.run, .{ &gossip_service_fuzzer, .{ - .message_allocator = allocator, .spy_node = true, .dump = false, }, diff --git a/src/gossip/service.zig b/src/gossip/service.zig index 6c7159bc3..6b8186f41 100644 --- a/src/gossip/service.zig +++ b/src/gossip/service.zig @@ -94,6 +94,7 @@ pub const GOSSIP_VERIFY_PACKET_PARALLEL_TASKS = 4; pub const GossipService = struct { allocator: std.mem.Allocator, + gossip_value_allocator: std.mem.Allocator, // note: this contact info should not change gossip_socket: UdpSocket, @@ -132,7 +133,11 @@ pub const GossipService = struct { const Entrypoint = struct { addr: SocketAddr, info: ?ContactInfo = null }; pub fn init( + /// Must be thread-safe. allocator: std.mem.Allocator, + /// Can be supplied as a different allocator in order to reduce contention. + /// Must be thread safe. + gossip_value_allocator: std.mem.Allocator, my_contact_info: ContactInfo, my_keypair: KeyPair, entrypoints: ?[]const SocketAddr, @@ -156,7 +161,7 @@ pub const GossipService = struct { }); logger.debugf("using n_threads in gossip: {}", .{n_threads}); - var gossip_table = try GossipTable.init(allocator, thread_pool); + var gossip_table = try GossipTable.init(gossip_value_allocator, thread_pool); errdefer gossip_table.deinit(); const gossip_table_rw = RwMux(GossipTable).init(gossip_table); @@ -191,6 +196,9 @@ pub const GossipService = struct { ); return .{ + .allocator = allocator, + .gossip_value_allocator = gossip_value_allocator, + .my_contact_info = my_contact_info, .my_keypair = my_keypair, .my_pubkey = my_pubkey, @@ -201,7 +209,6 @@ pub const GossipService = struct { .packet_outgoing_channel = packet_outgoing_channel, .verified_incoming_channel = verified_incoming_channel, .gossip_table_rw = gossip_table_rw, - .allocator = allocator, .push_msg_queue_mux = Mux(ArrayList(SignedGossipData)).init(push_msg_q), .active_set_rw = RwMux(ActiveSet).init(active_set), .failed_pull_hashes_mux = Mux(HashTimeQueue).init(failed_pull_hashes), @@ -287,12 +294,6 @@ pub const GossipService = struct { }; pub const RunThreadsParams = struct { - /// Allocator used to allocate message metadata. - /// Helpful to use a dedicated allocator to reduce contention - /// during message allocation & deallocation. - /// Should be thread safe, and remain valid until calling `joinAll` on the result. - message_allocator: std.mem.Allocator, - spy_node: bool, dump: bool, }; @@ -309,7 +310,6 @@ pub const GossipService = struct { self: *Self, params: RunThreadsParams, ) std.Thread.SpawnError!RunHandles { - const message_allocator = params.message_allocator; const spy_node = params.spy_node; const dump = params.dump; @@ -334,10 +334,10 @@ pub const GossipService = struct { }); errdefer exitAndJoin(self.exit, receiver_thread); - const packet_verifier_thread = try Thread.spawn(.{}, verifyPackets, .{ self, message_allocator }); + const packet_verifier_thread = try Thread.spawn(.{}, verifyPackets, .{self}); errdefer exitAndJoin(self.exit, packet_verifier_thread); - const message_processor_thread = try Thread.spawn(.{}, processMessages, .{ self, message_allocator }); + const message_processor_thread = try Thread.spawn(.{}, processMessages, .{ self, self.gossip_value_allocator }); errdefer exitAndJoin(self.exit, message_processor_thread); const maybe_message_builder_thread: ?std.Thread = if (!spy_node) try Thread.spawn(.{}, buildMessages, .{self}) else null; @@ -382,7 +382,7 @@ pub const GossipService = struct { const VerifyMessageTask = ThreadPoolTask(VerifyMessageEntry); const VerifyMessageEntry = struct { - allocator: std.mem.Allocator, + gossip_value_allocator: std.mem.Allocator, packet_batch: ArrayList(Packet), verified_incoming_channel: *Channel(GossipMessageWithEndpoint), logger: Logger, @@ -392,7 +392,7 @@ pub const GossipService = struct { for (@as([]const Packet, self.packet_batch.items)) |*packet| { var message = bincode.readFromSlice( - self.allocator, + self.gossip_value_allocator, GossipMessage, packet.data[0..packet.size], bincode.Params.standard, @@ -403,7 +403,7 @@ pub const GossipService = struct { message.sanitize() catch { self.logger.errf("gossip: packet_verify: failed to sanitize", .{}); - bincode.free(self.allocator, message); + bincode.free(self.gossip_value_allocator, message); continue; }; @@ -412,7 +412,7 @@ pub const GossipService = struct { "gossip: packet_verify: failed to verify signature: {} from {}", .{ e, packet.addr }, ); - bincode.free(self.allocator, message); + bincode.free(self.gossip_value_allocator, message); continue; }; @@ -428,19 +428,14 @@ pub const GossipService = struct { /// main logic for deserializing Packets into GossipMessage messages /// and verifing they have valid values, and have valid signatures. /// Verified GossipMessagemessages are then sent to the verified_channel. - fn verifyPackets( - self: *Self, - /// Must be thread-safe. Can be a specific allocator which will - /// only be contended for by the tasks spawned by in function. - task_allocator: std.mem.Allocator, - ) !void { + fn verifyPackets(self: *Self) !void { const tasks = try VerifyMessageTask.init(self.allocator, GOSSIP_VERIFY_PACKET_PARALLEL_TASKS); defer self.allocator.free(tasks); // pre-allocate all the tasks for (tasks) |*task| { task.entry = .{ - .allocator = task_allocator, + .gossip_value_allocator = self.gossip_value_allocator, .verified_incoming_channel = self.verified_incoming_channel, .packet_batch = undefined, .logger = self.logger, @@ -512,7 +507,7 @@ pub const GossipService = struct { }; /// main logic for recieving and processing gossip messages. - pub fn processMessages(self: *Self, message_allocator: std.mem.Allocator) !void { + pub fn processMessages(self: *Self, gossip_value_allocator: std.mem.Allocator) !void { var timer = std.time.Timer.start() catch unreachable; var last_table_trim_ts: u64 = 0; var msg_count: usize = 0; @@ -581,7 +576,7 @@ pub const GossipService = struct { // would be safer. For more info, see: // - GossipTable.remove // - https://github.com/Syndica/sig/pull/69 - msg.message.shallowFree(message_allocator); + msg.message.shallowFree(gossip_value_allocator); } self.verified_incoming_channel.allocator.free(messages); } @@ -2180,6 +2175,7 @@ test "gossip.service: build messages startup and shutdown" { logger.spawn(); var gossip_service = try GossipService.init( + allocator, allocator, contact_info, my_keypair, @@ -2233,6 +2229,7 @@ test "gossip.service: tests handling prune messages" { logger.spawn(); var gossip_service = try GossipService.init( + allocator, allocator, contact_info, my_keypair, @@ -2307,6 +2304,7 @@ test "gossip.service: tests handling pull responses" { logger.spawn(); var gossip_service = try GossipService.init( + allocator, allocator, contact_info, my_keypair, @@ -2366,6 +2364,7 @@ test "gossip.service: tests handle pull request" { logger.spawn(); var gossip_service = try GossipService.init( + allocator, allocator, contact_info, my_keypair, @@ -2457,6 +2456,7 @@ test "gossip.service: test build prune messages and handle push messages" { logger.spawn(); var gossip_service = try GossipService.init( + allocator, allocator, contact_info, my_keypair, @@ -2544,6 +2544,7 @@ test "gossip.service: test build pull requests" { logger.spawn(); var gossip_service = try GossipService.init( + allocator, allocator, contact_info, my_keypair, @@ -2586,6 +2587,7 @@ test "gossip.service: test build push messages" { logger.spawn(); var gossip_service = try GossipService.init( + allocator, allocator, contact_info, my_keypair, @@ -2657,6 +2659,7 @@ test "gossip.gossip_service: test packet verification" { // noop for this case because this tests error failed verification const logger: Logger = .noop; var gossip_service = try GossipService.init( + allocator, allocator, contact_info, keypair, @@ -2669,7 +2672,7 @@ test "gossip.gossip_service: test packet verification" { var packet_channel = gossip_service.packet_incoming_channel; var verified_channel = gossip_service.verified_incoming_channel; - const packet_verifier_handle = try Thread.spawn(.{}, GossipService.verifyPackets, .{ &gossip_service, gossip_service.allocator }); + const packet_verifier_handle = try Thread.spawn(.{}, GossipService.verifyPackets, .{&gossip_service}); var rng = std.rand.DefaultPrng.init(getWallclockMs()); var data = gossip.GossipData.randomFromIndex(rng.random(), 0); @@ -2786,7 +2789,7 @@ test "gossip.gossip_service: test packet verification" { test "gossip.gossip_service: process contact info push packet" { const allocator = std.testing.allocator; - const message_allocator = allocator; + const gossip_value_allocator = allocator; var exit = AtomicBool.init(false); var my_keypair = try KeyPair.create([_]u8{1} ** 32); const my_pubkey = Pubkey.fromPublicKey(&my_keypair.public_key); @@ -2797,6 +2800,7 @@ test "gossip.gossip_service: process contact info push packet" { logger.spawn(); var gossip_service = try GossipService.init( + allocator, allocator, contact_info, my_keypair, @@ -2815,7 +2819,7 @@ test "gossip.gossip_service: process contact info push packet" { var packet_handle = try Thread.spawn( .{}, GossipService.processMessages, - .{ &gossip_service, message_allocator }, + .{ &gossip_service, gossip_value_allocator }, ); // send a push message @@ -2827,7 +2831,7 @@ test "gossip.gossip_service: process contact info push packet" { .LegacyContactInfo = legacy_contact_info, }; const gossip_value = try gossip.SignedGossipData.initSigned(gossip_data, &kp); - const heap_values = try message_allocator.dupe(gossip.SignedGossipData, &.{gossip_value}); + const heap_values = try gossip_value_allocator.dupe(gossip.SignedGossipData, &.{gossip_value}); const msg = GossipMessage{ .PushMessage = .{ id, heap_values }, }; @@ -2888,6 +2892,7 @@ test "gossip.service: init, exit, and deinit" { logger.spawn(); var gossip_service = try GossipService.init( + std.testing.allocator, std.testing.allocator, contact_info, my_keypair, @@ -2898,7 +2903,6 @@ test "gossip.service: init, exit, and deinit" { const handle = try std.Thread.spawn(.{}, GossipService.run, .{ &gossip_service, .{ - .message_allocator = std.testing.allocator, .spy_node = true, .dump = false, }, @@ -2973,6 +2977,7 @@ pub const BenchmarkGossipServiceGeneral = struct { // process incoming packets/messsages var exit = AtomicBool.init(false); var gossip_service = try GossipService.init( + allocator, allocator, contact_info, keypair, @@ -2987,7 +2992,6 @@ pub const BenchmarkGossipServiceGeneral = struct { const packet_handle = try Thread.spawn(.{}, GossipService.run, .{ &gossip_service, .{ - .message_allocator = allocator, .spy_node = true, // dont build any outgoing messages .dump = false, }, @@ -3104,6 +3108,7 @@ pub const BenchmarkGossipServicePullRequests = struct { var exit = AtomicBool.init(false); var gossip_service = try GossipService.init( + allocator, allocator, contact_info, keypair, @@ -3154,7 +3159,6 @@ pub const BenchmarkGossipServicePullRequests = struct { const packet_handle = try Thread.spawn(.{}, GossipService.run, .{ &gossip_service, .{ - .message_allocator = allocator, .spy_node = true, // dont build any outgoing messages .dump = false, },