axonhub / llm /httpclient /decoder_test.go
llzai's picture
Upload 1793 files
9853396 verified
package httpclient
import (
"bytes"
"context"
"io"
"maps"
"testing"
"github.com/stretchr/testify/require"
)
// mockStreamDecoder implements StreamDecoder for testing.
type mockStreamDecoder struct {
rc io.ReadCloser
events []*StreamEvent
index int
err error
closed bool
}
func newMockStreamDecoder(ctx context.Context, rc io.ReadCloser, events []*StreamEvent) *mockStreamDecoder {
return &mockStreamDecoder{
rc: rc,
events: events,
index: -1,
}
}
func (m *mockStreamDecoder) Next() bool {
if m.err != nil {
return false
}
m.index++
return m.index < len(m.events)
}
func (m *mockStreamDecoder) Current() *StreamEvent {
if m.index < 0 || m.index >= len(m.events) {
return nil
}
return m.events[m.index]
}
func (m *mockStreamDecoder) Err() error {
return m.err
}
func (m *mockStreamDecoder) Close() error {
m.closed = true
return m.rc.Close()
}
// mockReadCloser for testing.
type mockReadCloser struct {
*bytes.Reader
closed bool
}
func (m *mockReadCloser) Close() error {
m.closed = true
return nil
}
func newMockReadCloser(data []byte) *mockReadCloser {
return &mockReadCloser{
Reader: bytes.NewReader(data),
closed: false,
}
}
func TestRegisterDecoder(t *testing.T) {
// Save original state
originalDecoders := make(map[string]StreamDecoderFactory)
maps.Copy(originalDecoders, globalRegistry.decoders)
// Clean up after test
defer func() {
globalRegistry.mu.Lock()
globalRegistry.decoders = originalDecoders
globalRegistry.mu.Unlock()
}()
// Test registering a new decoder
testContentType := "application/test"
testFactory := func(ctx context.Context, rc io.ReadCloser) StreamDecoder {
return newMockStreamDecoder(ctx, rc, []*StreamEvent{})
}
RegisterDecoder(testContentType, testFactory)
// Verify decoder was registered
factory, exists := GetDecoder(testContentType)
require.True(t, exists)
require.NotNil(t, factory)
// Test that the factory works
ctx := context.Background()
rc := newMockReadCloser([]byte("test"))
decoder := factory(ctx, rc)
require.NotNil(t, decoder)
require.Implements(t, (*StreamDecoder)(nil), decoder)
}
func TestGetDecoder(t *testing.T) {
// Test getting existing decoder (text/event-stream should be registered by default)
factory, exists := GetDecoder("text/event-stream")
require.True(t, exists)
require.NotNil(t, factory)
// Test getting non-existent decoder
factory, exists = GetDecoder("application/non-existent")
require.False(t, exists)
require.Nil(t, factory)
}
func TestDefaultSSEDecoder(t *testing.T) {
// Create a simple SSE stream
sseData := "data: {\"type\": \"test\", \"message\": \"hello\"}\n\n"
rc := newMockReadCloser([]byte(sseData))
// Create decoder
ctx := context.Background()
decoder := NewDefaultSSEDecoder(ctx, rc)
require.NotNil(t, decoder)
require.Implements(t, (*StreamDecoder)(nil), decoder)
// Test Next() and Current()
hasNext := decoder.Next()
require.True(t, hasNext)
require.NoError(t, decoder.Err())
event := decoder.Current()
require.NotNil(t, event)
require.Equal(t, "", event.Type) // Default SSE type
require.Contains(t, string(event.Data), "hello")
// Test Close()
err := decoder.Close()
require.NoError(t, err)
require.True(t, rc.closed)
}
func TestDefaultSSEDecoder_EmptyStream(t *testing.T) {
ctx := context.Background()
rc := newMockReadCloser([]byte(""))
decoder := NewDefaultSSEDecoder(ctx, rc)
// Should return false for empty stream
hasNext := decoder.Next()
require.False(t, hasNext)
// Current should return nil
event := decoder.Current()
require.Nil(t, event)
// Close should work
err := decoder.Close()
require.NoError(t, err)
}
func TestDefaultSSEDecoder_NextAfterClose(t *testing.T) {
// Create a simple SSE stream
sseData := "data: {\"type\": \"test\", \"message\": \"hello\"}\n\n"
rc := newMockReadCloser([]byte(sseData))
ctx := context.Background()
decoder := NewDefaultSSEDecoder(ctx, rc)
err := decoder.Close()
require.NoError(t, err)
require.True(t, rc.closed)
hasNext := decoder.Next()
require.False(t, hasNext)
require.NoError(t, decoder.Err())
}
func TestStreamDecoderInterface(t *testing.T) {
ctx := context.Background()
// Test that our mock decoder implements the interface correctly
events := []*StreamEvent{
{Type: "test1", Data: []byte("data1")},
{Type: "test2", Data: []byte("data2")},
}
rc := newMockReadCloser([]byte("test"))
decoder := newMockStreamDecoder(ctx, rc, events)
// Test Next() and Current() for multiple events
require.True(t, decoder.Next())
event1 := decoder.Current()
require.Equal(t, "test1", event1.Type)
require.Equal(t, []byte("data1"), event1.Data)
require.True(t, decoder.Next())
event2 := decoder.Current()
require.Equal(t, "test2", event2.Type)
require.Equal(t, []byte("data2"), event2.Data)
// No more events
require.False(t, decoder.Next())
require.Nil(t, decoder.Current())
// Test error handling
require.NoError(t, decoder.Err())
// Test close
err := decoder.Close()
require.NoError(t, err)
require.True(t, decoder.closed)
require.True(t, rc.closed)
}