Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

additional functionalities to support packed-struct encoding #3187

Open
broccoliSpicy opened this issue Nov 29, 2024 · 1 comment
Open

additional functionalities to support packed-struct encoding #3187

broccoliSpicy opened this issue Nov 29, 2024 · 1 comment
Assignees

Comments

@broccoliSpicy
Copy link
Contributor

broccoliSpicy commented Nov 29, 2024

  1. during write, able to write a struct with multiple fields to one page, and record it in the metadata.

  2. during read, able to detect a column is a packed-struct column and it maps to many column indices.

  3. during scheduling, able to map one physical page to many fields in this struct.

we should also allow reading a subset of fields instruct array

@broccoliSpicy broccoliSpicy self-assigned this Nov 29, 2024
@broccoliSpicy
Copy link
Contributor Author

broccoliSpicy commented Dec 6, 2024

issue with PR #3186
9a5a13f

some facts:

fact 1:

from lance.file import LanceFileReader, LanceFileWriter
import pyarrow as pa

# Define the fields for the struct column
fields = [
    pa.field('x', pa.uint32()),
    pa.field('y', pa.uint32()),
]

# Create the struct type
struct_type = pa.struct(fields)

# Create 8 rows of data for the struct column
data = [
    {'x': 1, 'y': 2},
    {'x': 4, 'y': 5},
    {'x': 7, 'y': 8},
    {'x': 10, 'y': 11},
    {'x': 13, 'y': 14},
    {'x': 16, 'y': 17},
    {'x': 19, 'y': 20},
    {'x': 22, 'y': 23}
]

lance_file_path = "/home/x/packed-struct.lance"

# Convert the data to a list of structs
struct_array = pa.array(data, type=struct_type)

# Define the new int32 column
int32_column = pa.array([1, 2, 3, 4, 5, 6, 7, 8], type=pa.int32())

# Define the metadata
metadata = {b'packed': b'true'}
struct_field = pa.field('struct_col', struct_type, metadata=metadata)
int32_field = pa.field('int_col', pa.int32())
second_struct_field = pa.field('second_struct_col', struct_type)

# Create a schema with the updated fields
schema = pa.schema([struct_field, second_struct_field])

# Create a table using the struct array, int32 column, and duplicate struct column
table = pa.Table.from_arrays([struct_array, struct_array], schema=schema)

# Write the table to a Lance file
with LanceFileWriter(lance_file_path, version="2.1") as writer:
    writer.write_batch(table)
print("Data written to Lance file successfully.")

# Read the Lance file and display the contents
tab_lance = LanceFileReader(lance_file_path).read_all().to_table()
print(tab_lance.to_pandas())

has result:

           struct_col   second_struct_col
0    {'x': 1, 'y': 2}    {'x': 1, 'y': 1}
1    {'x': 4, 'y': 5}    {'x': 4, 'y': 4}
2    {'x': 7, 'y': 8}    {'x': 7, 'y': 7}
3  {'x': 10, 'y': 11}  {'x': 10, 'y': 10}
4  {'x': 13, 'y': 14}  {'x': 13, 'y': 13}
5  {'x': 16, 'y': 17}  {'x': 16, 'y': 16}
6  {'x': 19, 'y': 20}  {'x': 19, 'y': 19}
7  {'x': 22, 'y': 23}  {'x': 22, 'y': 22}

observation: the result of second_struct_col is wrong, y field has the wrong result of x field.

fact 2:
when test with:

from lance.file import LanceFileReader, LanceFileWriter
import pyarrow as pa

# Define the fields for the struct column
fields = [
    pa.field('x', pa.uint64()),
    pa.field('y', pa.uint32()),
]

# Create the struct type
struct_type = pa.struct(fields)

# Create 8 rows of data for the struct column
data = [
    {'x': 1, 'y': 2},
    {'x': 4, 'y': 5},
    {'x': 7, 'y': 8},
    {'x': 10, 'y': 11},
    {'x': 13, 'y': 14},
    {'x': 16, 'y': 17},
    {'x': 19, 'y': 20},
    {'x': 22, 'y': 23}
]

lance_file_path = "/home/x/packed-struct.lance"

# Convert the data to a list of structs
struct_array = pa.array(data, type=struct_type)

# Define the new int32 column
int32_column = pa.array([1, 2, 3, 4, 5, 6, 7, 8], type=pa.int32())

# Define the metadata
metadata = {b'packed': b'true'}
struct_field = pa.field('struct_col', struct_type, metadata=metadata)
int32_field = pa.field('int_col', pa.int32())
second_struct_field = pa.field('second_struct_col', struct_type)
second_int32_field = pa.field('second_struct_col', pa.int32())

# Create a schema with the updated fields
schema = pa.schema([struct_field, second_struct_field])

# Create a table using the struct array, int32 column, and duplicate struct column
table = pa.Table.from_arrays([struct_array, struct_array], schema=schema)

# Write the table to a Lance file
with LanceFileWriter(lance_file_path, version="2.1") as writer:
    writer.write_batch(table)
print("Data written to Lance file successfully.")

# Read the Lance file and display the contents
tab_lance = LanceFileReader(lance_file_path).read_all().to_table()
print(tab_lance.to_pandas())

the Loaded page:

child: LoadedPage { decoder: MiniBlockDecoder { rep_decompressor: ConstantDecompressor { scalar: LanceBuffer::Borrowed(bytes=0x0000 #bytes=2), num_values: 8 }, def_decompressor: ConstantDecompressor { scalar: LanceBuffer::Borrowed(bytes=0x0000 #bytes=2), num_values: 8 }, value_decompressor: PackedStructFixedWidthMiniBlockDecompressor { bits_per_values: [64, 32], array_encoding: ValueDecompressor { bytes_per_value: 12 } }, def_meaning: [AllValidItem], data: [ScheduledChunk { data: LanceBuffer::Borrowed(bytes=0x00000000600000000100... #bytes=104), vals_in_chunk: 8, vals_targeted: 8, ranges: [0..8] }], offset_in_current_chunk: 0, num_rows: 8, dictionary: None }, path: [], page_index: 0 }
child: LoadedPage { decoder: MiniBlockDecoder { rep_decompressor: ConstantDecompressor { scalar: LanceBuffer::Borrowed(bytes=0x0000 #bytes=2), num_values: 8 }, def_decompressor: ConstantDecompressor { scalar: LanceBuffer::Borrowed(bytes=0x0000 #bytes=2), num_values: 8 }, value_decompressor: ValueDecompressor { bytes_per_value: 8 }, def_meaning: [AllValidItem, AllValidItem], data: [ScheduledChunk { data: LanceBuffer::Borrowed(bytes=0x00000000400000000100... #bytes=72), vals_in_chunk: 8, vals_targeted: 8, ranges: [0..8] }], offset_in_current_chunk: 0, num_rows: 8, dictionary: None }, path: [], page_index: 0 }
child: LoadedPage { decoder: MiniBlockDecoder { rep_decompressor: ConstantDecompressor { scalar: LanceBuffer::Borrowed(bytes=0x0000 #bytes=2), num_values: 8 }, def_decompressor: ConstantDecompressor { scalar: LanceBuffer::Borrowed(bytes=0x0000 #bytes=2), num_values: 8 }, value_decompressor: ValueDecompressor { bytes_per_value: 8 }, def_meaning: [AllValidItem, AllValidItem], data: [ScheduledChunk { data: LanceBuffer::Borrowed(bytes=0x00000000400000000100... #bytes=72), vals_in_chunk: 8, vals_targeted: 8, ranges: [0..8] }], offset_in_current_chunk: 0, num_rows: 8, dictionary: None }, path: [], page_index: 0 }

observation: the second page content is the same as the third page content, but they should be different. either we read a same page twice or we write one page twice.
after printing out the encoded page:

    description of encoded page: PageLayout { layout: Some(MiniBlockLayout(MiniBlockLayout { rep_compression: Some(ArrayEncoding { array_encoding: Some(Constant(Constant { value: [0, 0], num_values: 8 })) }), def_compression: Some(ArrayEncoding { array_encoding: Some(Constant(Constant { value: [0, 0], num_values: 8 })) }), value_compression: Some(ArrayEncoding { array_encoding: Some(Flat(Flat { bits_per_value: 64, buffer: Some(Buffer { buffer_index: 0, buffer_type: Page }), compression: None })) }), dictionary: None, layers: [RepdefAllValidItem, RepdefAllValidItem] })) }
description of encoded page: PageLayout { layout: Some(MiniBlockLayout(MiniBlockLayout { rep_compression: Some(ArrayEncoding { array_encoding: Some(Constant(Constant { value: [0, 0], num_values: 8 })) }), def_compression: Some(ArrayEncoding { array_encoding: Some(Constant(Constant { value: [0, 0], num_values: 8 })) }), value_compression: Some(ArrayEncoding { array_encoding: Some(Flat(Flat { bits_per_value: 32, buffer: Some(Buffer { buffer_index: 0, buffer_type: Page }), compression: None })) }), dictionary: None, layers: [RepdefAllValidItem, RepdefAllValidItem] })) }
description of encoded page: PageLayout { layout: Some(MiniBlockLayout(MiniBlockLayout { rep_compression: Some(ArrayEncoding { array_encoding: Some(Constant(Constant { value: [0, 0], num_values: 8 })) }), def_compression: Some(ArrayEncoding { array_encoding: Some(Constant(Constant { value: [0, 0], num_values: 8 })) }), value_compression: Some(ArrayEncoding { array_encoding: Some(PackedStructFixedWidthMiniBlock(PackedStructFixedWidthMiniBlock { flat: Some(ArrayEncoding { array_encoding: Some(Flat(Flat { bits_per_value: 96, buffer: Some(Buffer { buffer_index: 0, buffer_type: Page }), compression: None })) }), bits_per_values: [64, 32] })) }), dictionary: None, layers: [RepdefAllValidItem] })) }

the write path seem correct.

fact 3:
a packed struct followed by a integer column is fine

fact 4:
a packed struct followed by a normal struct, the first field of the second struct is fine.

fact 5:
with schema

metadata = {b'packed': b'true'}
struct_field = pa.field('struct_col', struct_type, metadata=metadata)
int32_field = pa.field('int_col', pa.int32())
second_struct_field = pa.field('second_struct_col', struct_type)
second_int32_field = pa.field('second_struct_col', pa.int32())

# Create a schema with the updated fields
schema = pa.schema([struct_field, int32_field, second_struct_field])

panic at thread 'lance_background_thread' panicked at /home/x/packed-struct6/rust/lance-encoding/src/repdef.rs:1156:9: index out of bounds: the len is 1 but the index is 1

    pub fn is_all_valid(&self) -> bool {
        self.def_meaning[self.current_layer].is_all_valid()
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant