Skip to content

Commit

Permalink
client: support reading multiple audio tracks (#179)
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Dec 10, 2024
1 parent 6238240 commit a5eaa57
Show file tree
Hide file tree
Showing 10 changed files with 1,012 additions and 852 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Features:
* Client

* Read streams in MPEG-TS, fMP4 or Low-latency format
* Read a single video track and/or a single audio track
* Read a single video track and/or multiple audio tracks
* Read tracks encoded with AV1, VP9, H265, H264, Opus, MPEG-4 Audio (AAC)
* Get absolute timestamp of incoming data

Expand Down
4 changes: 0 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ type ClientOnDataMPEG4AudioFunc func(pts int64, aus [][]byte)
// ClientOnDataOpusFunc is the prototype of the function passed to OnDataOpus().
type ClientOnDataOpusFunc func(pts int64, packets [][]byte)

type clientOnStreamTracksFunc func(ctx context.Context, isLeading bool, tracks []*Track) ([]*clientTrack, bool)

type clientOnDataFunc func(pts int64, dts int64, data [][]byte)

func clientAbsoluteURL(base *url.URL, relative string) (*url.URL, error) {
u, err := url.Parse(relative)
if err != nil {
Expand Down
178 changes: 62 additions & 116 deletions client_primary_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func cloneURL(ur *url.URL) *url.URL {
}
}

func clientDownloadPlaylist(
func downloadPlaylist(
ctx context.Context,
httpClient *http.Client,
onRequest ClientOnRequestFunc,
Expand Down Expand Up @@ -94,34 +94,19 @@ func pickLeadingPlaylist(variants []*playlist.MultivariantVariant) *playlist.Mul
return leadingPlaylist
}

func pickAudioPlaylist(alternatives []*playlist.MultivariantRendition, groupID string) *playlist.MultivariantRendition {
candidates := func() []*playlist.MultivariantRendition {
var ret []*playlist.MultivariantRendition
for _, alt := range alternatives {
if alt.GroupID == groupID {
ret = append(ret, alt)
}
}
return ret
}()
if candidates == nil {
return nil
}
func getRenditionsByGroup(
renditions []*playlist.MultivariantRendition,
groupID string,
) []*playlist.MultivariantRendition {
var ret []*playlist.MultivariantRendition

// pick the default audio playlist
for _, alt := range candidates {
if alt.Default {
return alt
for _, alt := range renditions {
if alt.GroupID == groupID {
ret = append(ret, alt)
}
}

// alternatively, pick the first one
return candidates[0]
}

type streamTracksEntry struct {
isLeading bool
tracks []*Track
return ret
}

type clientPrimaryDownloader struct {
Expand All @@ -139,34 +124,24 @@ type clientPrimaryDownloader struct {
getLeadingTimeConv func(ctx context.Context) (clientTimeConv, bool)

clientTracks map[*Track]*clientTrack

// in
chStreamTracks chan streamTracksEntry
chStreamEnded chan struct{}

// out
startStreaming chan struct{}
}

func (d *clientPrimaryDownloader) initialize() {
d.chStreamTracks = make(chan streamTracksEntry)
d.chStreamEnded = make(chan struct{})
d.startStreaming = make(chan struct{})
}

func (d *clientPrimaryDownloader) run(ctx context.Context) error {
d.onDownloadPrimaryPlaylist(d.primaryPlaylistURL.String())

pl, err := clientDownloadPlaylist(ctx, d.httpClient, d.onRequest, d.primaryPlaylistURL)
pl, err := downloadPlaylist(ctx, d.httpClient, d.onRequest, d.primaryPlaylistURL)
if err != nil {
return err
}

streamCount := 0
var streams []*clientStreamDownloader

switch plt := pl.(type) {
case *playlist.Media:
ds := &clientStreamDownloader{
stream := &clientStreamDownloader{
isLeading: true,
httpClient: d.httpClient,
onRequest: d.onRequest,
Expand All @@ -177,13 +152,12 @@ func (d *clientPrimaryDownloader) run(ctx context.Context) error {
playlistURL: d.primaryPlaylistURL,
firstPlaylist: plt,
rp: d.rp,
setStreamTracks: d.setStreamTracks,
setStreamEnded: d.setStreamEnded,
setLeadingTimeConv: d.setLeadingTimeConv,
getLeadingTimeConv: d.getLeadingTimeConv,
}
d.rp.add(ds)
streamCount++
stream.initialize()
d.rp.add(stream)
streams = append(streams, stream)

case *playlist.Multivariant:
leadingPlaylist := pickLeadingPlaylist(plt.Variants)
Expand All @@ -197,7 +171,7 @@ func (d *clientPrimaryDownloader) run(ctx context.Context) error {
return err
}

ds := &clientStreamDownloader{
stream := &clientStreamDownloader{
isLeading: true,
httpClient: d.httpClient,
onRequest: d.onRequest,
Expand All @@ -208,44 +182,49 @@ func (d *clientPrimaryDownloader) run(ctx context.Context) error {
playlistURL: u,
firstPlaylist: nil,
rp: d.rp,
setStreamTracks: d.setStreamTracks,
setStreamEnded: d.setStreamEnded,
setLeadingTimeConv: d.setLeadingTimeConv,
getLeadingTimeConv: d.getLeadingTimeConv,
}
d.rp.add(ds)
streamCount++
stream.initialize()
d.rp.add(stream)
streams = append(streams, stream)

if leadingPlaylist.Audio != "" {
audioPlaylist := pickAudioPlaylist(plt.Renditions, leadingPlaylist.Audio)
if audioPlaylist == nil {
return fmt.Errorf("audio playlist with id \"%s\" not found", leadingPlaylist.Audio)
audioPlaylists := getRenditionsByGroup(plt.Renditions, leadingPlaylist.Audio)
if audioPlaylists == nil {
return fmt.Errorf("no playlist with Group ID \"%s\" found", leadingPlaylist.Audio)

Check warning on line 195 in client_primary_downloader.go

View check run for this annotation

Codecov / codecov/patch

client_primary_downloader.go#L195

Added line #L195 was not covered by tests
}

if audioPlaylist.URI != nil {
u, err = clientAbsoluteURL(d.primaryPlaylistURL, *audioPlaylist.URI)
if err != nil {
return err
for _, pl := range audioPlaylists {
// stream data already included in the leading playlist
if pl.URI == nil {
continue

Check warning on line 201 in client_primary_downloader.go

View check run for this annotation

Codecov / codecov/patch

client_primary_downloader.go#L201

Added line #L201 was not covered by tests
}

ds := &clientStreamDownloader{
isLeading: false,
onRequest: d.onRequest,
httpClient: d.httpClient,
onDownloadStreamPlaylist: d.onDownloadStreamPlaylist,
onDownloadSegment: d.onDownloadSegment,
onDownloadPart: d.onDownloadPart,
onDecodeError: d.onDecodeError,
playlistURL: u,
firstPlaylist: nil,
rp: d.rp,
setStreamTracks: d.setStreamTracks,
setLeadingTimeConv: d.setLeadingTimeConv,
getLeadingTimeConv: d.getLeadingTimeConv,
setStreamEnded: d.setStreamEnded,
if pl.URI != nil {
u, err = clientAbsoluteURL(d.primaryPlaylistURL, *pl.URI)
if err != nil {
return err
}

Check warning on line 208 in client_primary_downloader.go

View check run for this annotation

Codecov / codecov/patch

client_primary_downloader.go#L207-L208

Added lines #L207 - L208 were not covered by tests

stream := &clientStreamDownloader{
isLeading: false,
onRequest: d.onRequest,
httpClient: d.httpClient,
onDownloadStreamPlaylist: d.onDownloadStreamPlaylist,
onDownloadSegment: d.onDownloadSegment,
onDownloadPart: d.onDownloadPart,
onDecodeError: d.onDecodeError,
playlistURL: u,
rendition: pl,
rp: d.rp,
setLeadingTimeConv: d.setLeadingTimeConv,
getLeadingTimeConv: d.getLeadingTimeConv,
}
stream.initialize()
d.rp.add(stream)
streams = append(streams, stream)
}
d.rp.add(ds)
streamCount++
}
}

Expand All @@ -255,14 +234,10 @@ func (d *clientPrimaryDownloader) run(ctx context.Context) error {

var tracks []*Track

for i := 0; i < streamCount; i++ {
for _, stream := range streams {
select {
case entry := <-d.chStreamTracks:
if entry.isLeading {
tracks = append(append([]*Track(nil), entry.tracks...), tracks...)
} else {
tracks = append(tracks, entry.tracks...)
}
case streamTracks := <-stream.chTracks:
tracks = append(tracks, streamTracks...)

case <-ctx.Done():
return fmt.Errorf("terminated")
Expand All @@ -278,50 +253,21 @@ func (d *clientPrimaryDownloader) run(ctx context.Context) error {
return err
}

close(d.startStreaming)

for i := 0; i < streamCount; i++ {
for _, stream := range streams {
select {
case <-d.chStreamEnded:
case stream.chStartStreaming <- d.clientTracks:
case <-ctx.Done():
return fmt.Errorf("terminated")
}
}

return ErrClientEOS
}

func (d *clientPrimaryDownloader) setStreamTracks(
ctx context.Context,
isLeading bool,
tracks []*Track,
) ([]*clientTrack, bool) {
select {
case d.chStreamTracks <- streamTracksEntry{
isLeading: isLeading,
tracks: tracks,
}:
case <-ctx.Done():
return nil, false
}

select {
case <-d.startStreaming:
case <-ctx.Done():
return nil, false
}

streamClientTracks := make([]*clientTrack, len(tracks))
for i, track := range tracks {
streamClientTracks[i] = d.clientTracks[track]
for _, stream := range streams {
select {
case <-stream.chEnded:
case <-ctx.Done():
return fmt.Errorf("terminated")
}
}

return streamClientTracks, true
}

func (d *clientPrimaryDownloader) setStreamEnded(ctx context.Context) {
select {
case d.chStreamEnded <- struct{}{}:
case <-ctx.Done():
}
return ErrClientEOS
}
58 changes: 49 additions & 9 deletions client_stream_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,27 @@ type clientStreamDownloader struct {
onDownloadPart ClientOnDownloadPartFunc
onDecodeError ClientOnDecodeErrorFunc
playlistURL *url.URL
rendition *playlist.MultivariantRendition
firstPlaylist *playlist.Media
rp *clientRoutinePool
setStreamTracks clientOnStreamTracksFunc
setStreamEnded func(context.Context)
setLeadingTimeConv func(clientTimeConv)
getLeadingTimeConv func(context.Context) (clientTimeConv, bool)

segmentQueue *clientSegmentQueue
curSegmentID *int

// out
chTracks chan []*Track
chEnded chan struct{}

// in
chStartStreaming chan map[*Track]*clientTrack
}

func (d *clientStreamDownloader) initialize() {
d.chTracks = make(chan []*Track)
d.chEnded = make(chan struct{})
d.chStartStreaming = make(chan map[*Track]*clientTrack)
}

func (d *clientStreamDownloader) run(ctx context.Context) error {
Expand All @@ -82,7 +94,7 @@ func (d *clientStreamDownloader) run(ctx context.Context) error {
d.segmentQueue.initialize()

if d.firstPlaylist.Map != nil && d.firstPlaylist.Map.URI != "" {
byts, err := d.downloadSegment(
initFile, err := d.downloadSegment(
ctx,
d.firstPlaylist.Map.URI,
d.firstPlaylist.Map.ByteRangeStart,
Expand All @@ -94,11 +106,12 @@ func (d *clientStreamDownloader) run(ctx context.Context) error {
proc := &clientStreamProcessorFMP4{
ctx: ctx,
isLeading: d.isLeading,
initFile: byts,
rendition: d.rendition,
initFile: initFile,
segmentQueue: d.segmentQueue,
rp: d.rp,
setStreamTracks: d.setStreamTracks,
setStreamEnded: d.setStreamEnded,
setTracks: d.setTracks,
setEnded: d.setEnded,
setLeadingTimeConv: d.setLeadingTimeConv,
getLeadingTimeConv: d.getLeadingTimeConv,
}
Expand All @@ -110,8 +123,8 @@ func (d *clientStreamDownloader) run(ctx context.Context) error {
isLeading: d.isLeading,
segmentQueue: d.segmentQueue,
rp: d.rp,
setStreamTracks: d.setStreamTracks,
setStreamEnded: d.setStreamEnded,
setTracks: d.setTracks,
setEnded: d.setEnded,
setLeadingTimeConv: d.setLeadingTimeConv,
getLeadingTimeConv: d.getLeadingTimeConv,
}
Expand Down Expand Up @@ -190,7 +203,7 @@ func (d *clientStreamDownloader) downloadPlaylist(

d.onDownloadStreamPlaylist(ur.String())

pl, err := clientDownloadPlaylist(ctx, d.httpClient, d.onRequest, ur)
pl, err := downloadPlaylist(ctx, d.httpClient, d.onRequest, ur)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -346,3 +359,30 @@ func (d *clientStreamDownloader) fillSegmentQueue(

return nil
}

func (d *clientStreamDownloader) setTracks(ctx context.Context, tracks []*Track) ([]*clientTrack, bool) {
select {
case d.chTracks <- tracks:
case <-ctx.Done():
return nil, false

Check warning on line 367 in client_stream_downloader.go

View check run for this annotation

Codecov / codecov/patch

client_stream_downloader.go#L366-L367

Added lines #L366 - L367 were not covered by tests
}

var allTracks map[*Track]*clientTrack

select {
case allTracks = <-d.chStartStreaming:
case <-ctx.Done():
return nil, false

Check warning on line 375 in client_stream_downloader.go

View check run for this annotation

Codecov / codecov/patch

client_stream_downloader.go#L374-L375

Added lines #L374 - L375 were not covered by tests
}

streamTracks := make([]*clientTrack, len(tracks))
for i, track := range tracks {
streamTracks[i] = allTracks[track]
}

return streamTracks, true
}

func (d *clientStreamDownloader) setEnded() {
close(d.chEnded)
}
Loading

0 comments on commit a5eaa57

Please sign in to comment.