Skip to content

Commit

Permalink
Fix invalid free
Browse files Browse the repository at this point in the history
This involves passing the "message allocator" as a more general
"table allocator" at the initialisation site of gossip service,
to allow the gossip table to properly free the data with the right
allocator.
  • Loading branch information
InKryption committed Jun 4, 2024
1 parent c2e60fb commit 830ab90
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 37 deletions.
12 changes: 7 additions & 5 deletions src/cmd/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,14 @@ const config = @import("config.zig");
const ACCOUNT_INDEX_BINS = @import("../accountsdb/db.zig").ACCOUNT_INDEX_BINS;
const socket_tag = @import("../gossip/data.zig").socket_tag;

// TODO: use better allocator, unless GPA becomes more performant.

var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const gpa_allocator = gpa.allocator();

var gossip_value_gpa: std.heap.GeneralPurposeAllocator(.{}) = .{};
const gossip_value_gpa_allocator = gossip_value_gpa.allocator();

const base58Encoder = base58.Encoder.init(.{});

const gossip_host = struct {
Expand Down Expand Up @@ -532,6 +538,7 @@ fn initGossip(

return try GossipService.init(
gpa_allocator,
gossip_value_gpa_allocator,
contact_info,
my_keypair,
entrypoints,
Expand Down Expand Up @@ -572,13 +579,8 @@ fn initRepair(
}

fn runGossipWithConfigValues(gossip_service: *GossipService) !void {
// TODO: use better allocator, unless GPA becomes more performant.
var gp_message_allocator: std.heap.GeneralPurposeAllocator(.{}) = .{};
defer _ = gp_message_allocator.deinit();

const gossip_config = config.current.gossip;
return gossip_service.run(.{
.message_allocator = gp_message_allocator.allocator(),
.spy_node = gossip_config.spy_node,
.dump = gossip_config.dump,
});
Expand Down
2 changes: 1 addition & 1 deletion src/gossip/fuzz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ pub fn run() !void {

var fuzz_exit = AtomicBool.init(false);
var gossip_service_fuzzer = try GossipService.init(
allocator,
allocator,
fuzz_contact_info,
fuzz_keypair,
Expand All @@ -310,7 +311,6 @@ pub fn run() !void {

const fuzz_handle = try std.Thread.spawn(.{}, GossipService.run, .{
&gossip_service_fuzzer, .{
.message_allocator = allocator,
.spy_node = true,
.dump = false,
},
Expand Down
66 changes: 35 additions & 31 deletions src/gossip/service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ pub const GOSSIP_VERIFY_PACKET_PARALLEL_TASKS = 4;

pub const GossipService = struct {
allocator: std.mem.Allocator,
gossip_value_allocator: std.mem.Allocator,

// note: this contact info should not change
gossip_socket: UdpSocket,
Expand Down Expand Up @@ -132,7 +133,11 @@ pub const GossipService = struct {
const Entrypoint = struct { addr: SocketAddr, info: ?ContactInfo = null };

pub fn init(
/// Must be thread-safe.
allocator: std.mem.Allocator,
/// Can be supplied as a different allocator in order to reduce contention.
/// Must be thread safe.
gossip_value_allocator: std.mem.Allocator,
my_contact_info: ContactInfo,
my_keypair: KeyPair,
entrypoints: ?[]const SocketAddr,
Expand All @@ -156,7 +161,7 @@ pub const GossipService = struct {
});
logger.debugf("using n_threads in gossip: {}", .{n_threads});

var gossip_table = try GossipTable.init(allocator, thread_pool);
var gossip_table = try GossipTable.init(gossip_value_allocator, thread_pool);
errdefer gossip_table.deinit();

const gossip_table_rw = RwMux(GossipTable).init(gossip_table);
Expand Down Expand Up @@ -191,6 +196,9 @@ pub const GossipService = struct {
);

return .{
.allocator = allocator,
.gossip_value_allocator = gossip_value_allocator,

.my_contact_info = my_contact_info,
.my_keypair = my_keypair,
.my_pubkey = my_pubkey,
Expand All @@ -201,7 +209,6 @@ pub const GossipService = struct {
.packet_outgoing_channel = packet_outgoing_channel,
.verified_incoming_channel = verified_incoming_channel,
.gossip_table_rw = gossip_table_rw,
.allocator = allocator,
.push_msg_queue_mux = Mux(ArrayList(SignedGossipData)).init(push_msg_q),
.active_set_rw = RwMux(ActiveSet).init(active_set),
.failed_pull_hashes_mux = Mux(HashTimeQueue).init(failed_pull_hashes),
Expand Down Expand Up @@ -287,12 +294,6 @@ pub const GossipService = struct {
};

pub const RunThreadsParams = struct {
/// Allocator used to allocate message metadata.
/// Helpful to use a dedicated allocator to reduce contention
/// during message allocation & deallocation.
/// Should be thread safe, and remain valid until calling `joinAll` on the result.
message_allocator: std.mem.Allocator,

spy_node: bool,
dump: bool,
};
Expand All @@ -309,7 +310,6 @@ pub const GossipService = struct {
self: *Self,
params: RunThreadsParams,
) std.Thread.SpawnError!RunHandles {
const message_allocator = params.message_allocator;
const spy_node = params.spy_node;
const dump = params.dump;

Expand All @@ -334,10 +334,10 @@ pub const GossipService = struct {
});
errdefer exitAndJoin(self.exit, receiver_thread);

const packet_verifier_thread = try Thread.spawn(.{}, verifyPackets, .{ self, message_allocator });
const packet_verifier_thread = try Thread.spawn(.{}, verifyPackets, .{self});
errdefer exitAndJoin(self.exit, packet_verifier_thread);

const message_processor_thread = try Thread.spawn(.{}, processMessages, .{ self, message_allocator });
const message_processor_thread = try Thread.spawn(.{}, processMessages, .{ self, self.gossip_value_allocator });
errdefer exitAndJoin(self.exit, message_processor_thread);

const maybe_message_builder_thread: ?std.Thread = if (!spy_node) try Thread.spawn(.{}, buildMessages, .{self}) else null;
Expand Down Expand Up @@ -382,7 +382,7 @@ pub const GossipService = struct {

const VerifyMessageTask = ThreadPoolTask(VerifyMessageEntry);
const VerifyMessageEntry = struct {
allocator: std.mem.Allocator,
gossip_value_allocator: std.mem.Allocator,
packet_batch: ArrayList(Packet),
verified_incoming_channel: *Channel(GossipMessageWithEndpoint),
logger: Logger,
Expand All @@ -392,7 +392,7 @@ pub const GossipService = struct {

for (@as([]const Packet, self.packet_batch.items)) |*packet| {
var message = bincode.readFromSlice(
self.allocator,
self.gossip_value_allocator,
GossipMessage,
packet.data[0..packet.size],
bincode.Params.standard,
Expand All @@ -403,7 +403,7 @@ pub const GossipService = struct {

message.sanitize() catch {
self.logger.errf("gossip: packet_verify: failed to sanitize", .{});
bincode.free(self.allocator, message);
bincode.free(self.gossip_value_allocator, message);
continue;
};

Expand All @@ -412,7 +412,7 @@ pub const GossipService = struct {
"gossip: packet_verify: failed to verify signature: {} from {}",
.{ e, packet.addr },
);
bincode.free(self.allocator, message);
bincode.free(self.gossip_value_allocator, message);
continue;
};

Expand All @@ -428,19 +428,14 @@ pub const GossipService = struct {
/// main logic for deserializing Packets into GossipMessage messages
/// and verifing they have valid values, and have valid signatures.
/// Verified GossipMessagemessages are then sent to the verified_channel.
fn verifyPackets(
self: *Self,
/// Must be thread-safe. Can be a specific allocator which will
/// only be contended for by the tasks spawned by in function.
task_allocator: std.mem.Allocator,
) !void {
fn verifyPackets(self: *Self) !void {
const tasks = try VerifyMessageTask.init(self.allocator, GOSSIP_VERIFY_PACKET_PARALLEL_TASKS);
defer self.allocator.free(tasks);

// pre-allocate all the tasks
for (tasks) |*task| {
task.entry = .{
.allocator = task_allocator,
.gossip_value_allocator = self.gossip_value_allocator,
.verified_incoming_channel = self.verified_incoming_channel,
.packet_batch = undefined,
.logger = self.logger,
Expand Down Expand Up @@ -512,7 +507,7 @@ pub const GossipService = struct {
};

/// main logic for recieving and processing gossip messages.
pub fn processMessages(self: *Self, message_allocator: std.mem.Allocator) !void {
pub fn processMessages(self: *Self, gossip_value_allocator: std.mem.Allocator) !void {
var timer = std.time.Timer.start() catch unreachable;
var last_table_trim_ts: u64 = 0;
var msg_count: usize = 0;
Expand Down Expand Up @@ -581,7 +576,7 @@ pub const GossipService = struct {
// would be safer. For more info, see:
// - GossipTable.remove
// - https://github.com/Syndica/sig/pull/69
msg.message.shallowFree(message_allocator);
msg.message.shallowFree(gossip_value_allocator);
}
self.verified_incoming_channel.allocator.free(messages);
}
Expand Down Expand Up @@ -2180,6 +2175,7 @@ test "gossip.service: build messages startup and shutdown" {
logger.spawn();

var gossip_service = try GossipService.init(
allocator,
allocator,
contact_info,
my_keypair,
Expand Down Expand Up @@ -2233,6 +2229,7 @@ test "gossip.service: tests handling prune messages" {
logger.spawn();

var gossip_service = try GossipService.init(
allocator,
allocator,
contact_info,
my_keypair,
Expand Down Expand Up @@ -2307,6 +2304,7 @@ test "gossip.service: tests handling pull responses" {
logger.spawn();

var gossip_service = try GossipService.init(
allocator,
allocator,
contact_info,
my_keypair,
Expand Down Expand Up @@ -2366,6 +2364,7 @@ test "gossip.service: tests handle pull request" {
logger.spawn();

var gossip_service = try GossipService.init(
allocator,
allocator,
contact_info,
my_keypair,
Expand Down Expand Up @@ -2457,6 +2456,7 @@ test "gossip.service: test build prune messages and handle push messages" {
logger.spawn();

var gossip_service = try GossipService.init(
allocator,
allocator,
contact_info,
my_keypair,
Expand Down Expand Up @@ -2544,6 +2544,7 @@ test "gossip.service: test build pull requests" {
logger.spawn();

var gossip_service = try GossipService.init(
allocator,
allocator,
contact_info,
my_keypair,
Expand Down Expand Up @@ -2586,6 +2587,7 @@ test "gossip.service: test build push messages" {
logger.spawn();

var gossip_service = try GossipService.init(
allocator,
allocator,
contact_info,
my_keypair,
Expand Down Expand Up @@ -2657,6 +2659,7 @@ test "gossip.gossip_service: test packet verification" {
// noop for this case because this tests error failed verification
const logger: Logger = .noop;
var gossip_service = try GossipService.init(
allocator,
allocator,
contact_info,
keypair,
Expand All @@ -2669,7 +2672,7 @@ test "gossip.gossip_service: test packet verification" {
var packet_channel = gossip_service.packet_incoming_channel;
var verified_channel = gossip_service.verified_incoming_channel;

const packet_verifier_handle = try Thread.spawn(.{}, GossipService.verifyPackets, .{ &gossip_service, gossip_service.allocator });
const packet_verifier_handle = try Thread.spawn(.{}, GossipService.verifyPackets, .{&gossip_service});

var rng = std.rand.DefaultPrng.init(getWallclockMs());
var data = gossip.GossipData.randomFromIndex(rng.random(), 0);
Expand Down Expand Up @@ -2786,7 +2789,7 @@ test "gossip.gossip_service: test packet verification" {

test "gossip.gossip_service: process contact info push packet" {
const allocator = std.testing.allocator;
const message_allocator = allocator;
const gossip_value_allocator = allocator;
var exit = AtomicBool.init(false);
var my_keypair = try KeyPair.create([_]u8{1} ** 32);
const my_pubkey = Pubkey.fromPublicKey(&my_keypair.public_key);
Expand All @@ -2797,6 +2800,7 @@ test "gossip.gossip_service: process contact info push packet" {
logger.spawn();

var gossip_service = try GossipService.init(
allocator,
allocator,
contact_info,
my_keypair,
Expand All @@ -2815,7 +2819,7 @@ test "gossip.gossip_service: process contact info push packet" {
var packet_handle = try Thread.spawn(
.{},
GossipService.processMessages,
.{ &gossip_service, message_allocator },
.{ &gossip_service, gossip_value_allocator },
);

// send a push message
Expand All @@ -2827,7 +2831,7 @@ test "gossip.gossip_service: process contact info push packet" {
.LegacyContactInfo = legacy_contact_info,
};
const gossip_value = try gossip.SignedGossipData.initSigned(gossip_data, &kp);
const heap_values = try message_allocator.dupe(gossip.SignedGossipData, &.{gossip_value});
const heap_values = try gossip_value_allocator.dupe(gossip.SignedGossipData, &.{gossip_value});
const msg = GossipMessage{
.PushMessage = .{ id, heap_values },
};
Expand Down Expand Up @@ -2888,6 +2892,7 @@ test "gossip.service: init, exit, and deinit" {
logger.spawn();

var gossip_service = try GossipService.init(
std.testing.allocator,
std.testing.allocator,
contact_info,
my_keypair,
Expand All @@ -2898,7 +2903,6 @@ test "gossip.service: init, exit, and deinit" {

const handle = try std.Thread.spawn(.{}, GossipService.run, .{
&gossip_service, .{
.message_allocator = std.testing.allocator,
.spy_node = true,
.dump = false,
},
Expand Down Expand Up @@ -2973,6 +2977,7 @@ pub const BenchmarkGossipServiceGeneral = struct {
// process incoming packets/messsages
var exit = AtomicBool.init(false);
var gossip_service = try GossipService.init(
allocator,
allocator,
contact_info,
keypair,
Expand All @@ -2987,7 +2992,6 @@ pub const BenchmarkGossipServiceGeneral = struct {

const packet_handle = try Thread.spawn(.{}, GossipService.run, .{
&gossip_service, .{
.message_allocator = allocator,
.spy_node = true, // dont build any outgoing messages
.dump = false,
},
Expand Down Expand Up @@ -3104,6 +3108,7 @@ pub const BenchmarkGossipServicePullRequests = struct {
var exit = AtomicBool.init(false);

var gossip_service = try GossipService.init(
allocator,
allocator,
contact_info,
keypair,
Expand Down Expand Up @@ -3154,7 +3159,6 @@ pub const BenchmarkGossipServicePullRequests = struct {

const packet_handle = try Thread.spawn(.{}, GossipService.run, .{
&gossip_service, .{
.message_allocator = allocator,
.spy_node = true, // dont build any outgoing messages
.dump = false,
},
Expand Down

0 comments on commit 830ab90

Please sign in to comment.