Skip to content

Commit

Permalink
hierarchy cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Benjamin Lefaudeux committed Oct 28, 2024
1 parent d2ab48c commit e33deb6
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 22 deletions.
35 changes: 13 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
[![Build & Test](https://github.com/Photoroom/datago/actions/workflows/go.yml/badge.svg)](https://github.com/Photoroom/datago/actions/workflows/go.yml)
[![Gopy](https://github.com/Photoroom/datago/actions/workflows/gopy.yml/badge.svg)](https://github.com/Photoroom/datago/actions/workflows/gopy.yml)

datago
======
# datago

A golang-based data loader which can be used from Python. Compatible with a soon-to-be open sourced VectorDB-enabled data stack, which exposes HTTP requests.

Datago handles, outside of the Python GIL

- per sample IO from object storage
- deserialization (jpg and png decompression)
- some optional vision processing (aligning different image payloads)
Expand All @@ -19,11 +19,9 @@ Datago is rank and world-size aware, in which case the samples are dispatched de

<img width="922" alt="Screenshot 2024-09-24 at 9 39 44 PM" src="https://github.com/user-attachments/assets/b58002ce-f961-438b-af72-9e1338527365">


<details> <summary><strong>Use it</strong></summary>

Use the package from Python
---------------------------
## Use the package from Python

```python
from datago import datago
Expand All @@ -40,24 +38,20 @@ for _ in range(10):

Please note that the image buffers will be passed around as raw pointers, they can be re-interpreted in python with the attached helpers


Match the raw exported buffers with typical python types
--------------------------------------------------------
## Match the raw exported buffers with typical python types

See helper functions provided in `polyglot.py`, should be self explanatory

</details><details> <summary><strong>Build it</strong></summary>

Install deps
------------
## Install deps

```bash
$ sudo apt install golang libjpeg-turbo8-dev libvips-dev
$ sudo ldconfig
```

Build a benchmark CLI
---------------------
## Build a benchmark CLI

From the root of this project `datago_src`:

Expand All @@ -77,23 +71,20 @@ Running it with additional sanity checks
$ go run -race cmd/main/main.go
```

Run the go test suite
---------------------
## Run the go test suite

From the src folder

```bash
$ go test -v tests/client_test.go
```

Refresh the python package and its binaries
-------------------------------------------
## Refresh the python package and its binaries

- Install the dependencies as detailed in the next point
- Run the `generate_python_package.sh` script

Generate the python package binaries manually
---------------------------------------------
## Generate the python package binaries manually

```bash
$ python3 -m pip install pybindgen
Expand All @@ -103,6 +94,7 @@ $ go install golang.org/x/image/draw
```

NOTE:

- you may need to add `~/go/bin` to your PATH so that gopy is found.
- - Either `export PATH=$PATH:~/go/bin` or add it to your .bashrc
- you may need this to make sure that LDD looks at the current folder `export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:.`
Expand All @@ -115,18 +107,17 @@ $ gopy pkg -author="Photoroom" -email="[email protected]" -url="" -name="datago

then you can `pip install -e .` from here.

## Update the pypi release (maintainers)

Update the pypi release (maintainers)
-------------------------------------
```
python3 setup.py sdist
python3 -m twine upload dist/* --verbose
```
</details>
# License
License
=======
MIT License
Copyright (c) 2024 Photoroom
Expand Down
67 changes: 67 additions & 0 deletions src/pkg/backend_filesystem.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package datago

import (
"fmt"
"os"
)

type BackendFileSystem struct {
config *DatagoConfig
}

func loadSample(config *DatagoConfig, filesystem_sample fsSampleMetadata, transform *ARAwareTransform, _pre_encode_images bool) *Sample {
// Load the file into []bytes
bytes_buffer, err := os.ReadFile(filesystem_sample.filePath)
if err != nil {
fmt.Println("Error reading file:", filesystem_sample.filePath)
return nil
}

img_payload, _, err := imageFromBuffer(bytes_buffer, transform, -1., config.PreEncodeImages, false)
if err != nil {
fmt.Println("Error loading image:", filesystem_sample.fileName)
return nil
}

return &Sample{ID: filesystem_sample.fileName,
Image: *img_payload,
}
}

func (b BackendFileSystem) collectSamples(chanSampleMetadata chan SampleDataPointers, chanSamples chan Sample, transform *ARAwareTransform, pre_encode_images bool) {

ack_channel := make(chan bool)

sampleWorker := func() {
for {
item_to_fetch, open := <-chanSampleMetadata
if !open {
ack_channel <- true
return
}

// Cast the item to fetch to the correct type
filesystem_sample, ok := item_to_fetch.(fsSampleMetadata)
if !ok {
panic("Failed to cast the item to fetch to dbSampleMetadata. This worker is probably misconfigured")
}

sample := loadSample(b.config, filesystem_sample, transform, pre_encode_images)
if sample != nil {
chanSamples <- *sample
}
}
}

// Start the workers and work on the metadata channel
for i := 0; i < b.config.ConcurrentDownloads; i++ {
go sampleWorker()
}

// Wait for all the workers to be done or overall context to be cancelled
for i := 0; i < b.config.ConcurrentDownloads; i++ {
<-ack_channel
}
close(chanSamples)
fmt.Println("No more items to serve, wrapping up")
}
52 changes: 52 additions & 0 deletions src/pkg/backend_http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package datago

import (
"fmt"
"net/http"
"time"
)

type BackendHTTP struct {
config *GeneratorDBConfig
}

func (b BackendHTTP) collectSamples(chanSampleMetadata chan SampleDataPointers, chanSamples chan Sample, transform *ARAwareTransform, pre_encode_images bool) {

ack_channel := make(chan bool)

sampleWorker := func() {
// One HHTP client per goroutine, make sure we don't run into racing conditions when renewing
http_client := http.Client{Timeout: 30 * time.Second}

for {
item_to_fetch, open := <-chanSampleMetadata
if !open {
ack_channel <- true
return
}

// Cast the item to fetch to the correct type
http_sample, ok := item_to_fetch.(dbSampleMetadata)
if !ok {
panic("Failed to cast the item to fetch to dbSampleMetadata. This worker is probably misconfigured")
}

sample := fetchSample(b.config, &http_client, http_sample, transform, pre_encode_images)
if sample != nil {
chanSamples <- *sample
}
}
}

// Start the workers and work on the metadata channel
for i := 0; i < b.config.ConcurrentDownloads; i++ {
go sampleWorker()
}

// Wait for all the workers to be done or overall context to be cancelled
for i := 0; i < b.config.ConcurrentDownloads; i++ {
<-ack_channel
}
close(chanSamples)
fmt.Println("No more items to serve, wrapping up")
}
5 changes: 5 additions & 0 deletions src/pkg/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,13 @@ func GetClient(config DatagoConfig) *DatagoClient {
backend = BackendHTTP{config: &db_config}
} else if config.SourceType == SourceTypeFileSystem {
fmt.Println("Creating a FileSystem-backed dataloader")
<<<<<<< Updated upstream
db_config := config.SourceConfig.(GeneratorFileSystemConfig)
generator = newDatagoGeneratorFileSystem(db_config)
=======
fs_config := config.SourceConfig.(DatagoGeneratorFileSystemConfig)
generator = newDatagoGeneratorFileSystem(fs_config)
>>>>>>> Stashed changes
backend = BackendFileSystem{config: &config}
} else {
// TODO: Handle other sources
Expand Down

0 comments on commit e33deb6

Please sign in to comment.