| package httpclient
|
|
|
| import (
|
| "bytes"
|
| "context"
|
| "io"
|
| "maps"
|
| "testing"
|
|
|
| "github.com/stretchr/testify/require"
|
| )
|
|
|
|
|
| type mockStreamDecoder struct {
|
| rc io.ReadCloser
|
| events []*StreamEvent
|
| index int
|
| err error
|
| closed bool
|
| }
|
|
|
| func newMockStreamDecoder(ctx context.Context, rc io.ReadCloser, events []*StreamEvent) *mockStreamDecoder {
|
| return &mockStreamDecoder{
|
| rc: rc,
|
| events: events,
|
| index: -1,
|
| }
|
| }
|
|
|
| func (m *mockStreamDecoder) Next() bool {
|
| if m.err != nil {
|
| return false
|
| }
|
|
|
| m.index++
|
|
|
| return m.index < len(m.events)
|
| }
|
|
|
| func (m *mockStreamDecoder) Current() *StreamEvent {
|
| if m.index < 0 || m.index >= len(m.events) {
|
| return nil
|
| }
|
|
|
| return m.events[m.index]
|
| }
|
|
|
| func (m *mockStreamDecoder) Err() error {
|
| return m.err
|
| }
|
|
|
| func (m *mockStreamDecoder) Close() error {
|
| m.closed = true
|
| return m.rc.Close()
|
| }
|
|
|
|
|
| type mockReadCloser struct {
|
| *bytes.Reader
|
|
|
| closed bool
|
| }
|
|
|
| func (m *mockReadCloser) Close() error {
|
| m.closed = true
|
| return nil
|
| }
|
|
|
| func newMockReadCloser(data []byte) *mockReadCloser {
|
| return &mockReadCloser{
|
| Reader: bytes.NewReader(data),
|
| closed: false,
|
| }
|
| }
|
|
|
| func TestRegisterDecoder(t *testing.T) {
|
|
|
| originalDecoders := make(map[string]StreamDecoderFactory)
|
| maps.Copy(originalDecoders, globalRegistry.decoders)
|
|
|
|
|
| defer func() {
|
| globalRegistry.mu.Lock()
|
| globalRegistry.decoders = originalDecoders
|
| globalRegistry.mu.Unlock()
|
| }()
|
|
|
|
|
| testContentType := "application/test"
|
| testFactory := func(ctx context.Context, rc io.ReadCloser) StreamDecoder {
|
| return newMockStreamDecoder(ctx, rc, []*StreamEvent{})
|
| }
|
|
|
| RegisterDecoder(testContentType, testFactory)
|
|
|
|
|
| factory, exists := GetDecoder(testContentType)
|
| require.True(t, exists)
|
| require.NotNil(t, factory)
|
|
|
|
|
| ctx := context.Background()
|
| rc := newMockReadCloser([]byte("test"))
|
| decoder := factory(ctx, rc)
|
| require.NotNil(t, decoder)
|
| require.Implements(t, (*StreamDecoder)(nil), decoder)
|
| }
|
|
|
| func TestGetDecoder(t *testing.T) {
|
|
|
| factory, exists := GetDecoder("text/event-stream")
|
| require.True(t, exists)
|
| require.NotNil(t, factory)
|
|
|
|
|
| factory, exists = GetDecoder("application/non-existent")
|
| require.False(t, exists)
|
| require.Nil(t, factory)
|
| }
|
|
|
| func TestDefaultSSEDecoder(t *testing.T) {
|
|
|
| sseData := "data: {\"type\": \"test\", \"message\": \"hello\"}\n\n"
|
| rc := newMockReadCloser([]byte(sseData))
|
|
|
|
|
| ctx := context.Background()
|
| decoder := NewDefaultSSEDecoder(ctx, rc)
|
| require.NotNil(t, decoder)
|
| require.Implements(t, (*StreamDecoder)(nil), decoder)
|
|
|
|
|
| hasNext := decoder.Next()
|
| require.True(t, hasNext)
|
| require.NoError(t, decoder.Err())
|
|
|
| event := decoder.Current()
|
| require.NotNil(t, event)
|
| require.Equal(t, "", event.Type)
|
| require.Contains(t, string(event.Data), "hello")
|
|
|
|
|
| err := decoder.Close()
|
| require.NoError(t, err)
|
| require.True(t, rc.closed)
|
| }
|
|
|
| func TestDefaultSSEDecoder_EmptyStream(t *testing.T) {
|
| ctx := context.Background()
|
| rc := newMockReadCloser([]byte(""))
|
| decoder := NewDefaultSSEDecoder(ctx, rc)
|
|
|
|
|
| hasNext := decoder.Next()
|
| require.False(t, hasNext)
|
|
|
|
|
| event := decoder.Current()
|
| require.Nil(t, event)
|
|
|
|
|
| err := decoder.Close()
|
| require.NoError(t, err)
|
| }
|
|
|
| func TestDefaultSSEDecoder_NextAfterClose(t *testing.T) {
|
|
|
| sseData := "data: {\"type\": \"test\", \"message\": \"hello\"}\n\n"
|
| rc := newMockReadCloser([]byte(sseData))
|
|
|
| ctx := context.Background()
|
| decoder := NewDefaultSSEDecoder(ctx, rc)
|
|
|
| err := decoder.Close()
|
| require.NoError(t, err)
|
| require.True(t, rc.closed)
|
|
|
| hasNext := decoder.Next()
|
| require.False(t, hasNext)
|
| require.NoError(t, decoder.Err())
|
| }
|
|
|
| func TestStreamDecoderInterface(t *testing.T) {
|
| ctx := context.Background()
|
|
|
| events := []*StreamEvent{
|
| {Type: "test1", Data: []byte("data1")},
|
| {Type: "test2", Data: []byte("data2")},
|
| }
|
|
|
| rc := newMockReadCloser([]byte("test"))
|
| decoder := newMockStreamDecoder(ctx, rc, events)
|
|
|
|
|
| require.True(t, decoder.Next())
|
| event1 := decoder.Current()
|
| require.Equal(t, "test1", event1.Type)
|
| require.Equal(t, []byte("data1"), event1.Data)
|
|
|
| require.True(t, decoder.Next())
|
| event2 := decoder.Current()
|
| require.Equal(t, "test2", event2.Type)
|
| require.Equal(t, []byte("data2"), event2.Data)
|
|
|
|
|
| require.False(t, decoder.Next())
|
| require.Nil(t, decoder.Current())
|
|
|
|
|
| require.NoError(t, decoder.Err())
|
|
|
|
|
| err := decoder.Close()
|
| require.NoError(t, err)
|
| require.True(t, decoder.closed)
|
| require.True(t, rc.closed)
|
| }
|
|
|