dvc890 commited on
Commit
88c4472
1 Parent(s): 6f0d20b

Update pkg/plugins/service/wsstostream/wsstostream.go

Browse files
pkg/plugins/service/wsstostream/wsstostream.go CHANGED
@@ -148,23 +148,26 @@ func (s *WssToStream) ReadMessage() (io.ReadCloser, error) {
148
  _, msg, err := s.Server.ReadMessage()
149
  if err != nil {
150
  logger.Log.Error("read message error:", err)
 
151
  }
152
  var response WsResponse
153
  if err = json.Unmarshal(msg, &response); err != nil {
154
  logger.Log.Error("unmarshal message error:", err)
 
155
  }
156
  if response.Data.ResponseId == s.ResponseId && response.Data.ConversationId == s.ConversationId {
157
- if response.Data.Body == "ZGF0YTogW0RPTkVdCgo=" {
158
- s.Server.Close()
159
- }
160
  data, err := base64.StdEncoding.DecodeString(response.Data.Body)
161
  if err != nil {
 
162
  return nil, err
163
  }
 
 
 
 
164
  return NewNopCloser(data), nil
165
- } else {
166
- return nil, nil
167
  }
 
168
  }
169
  func (s *WssToStream) Read(p []byte) (n int, err error) {
170
  logger.Log.Debug("Read")
@@ -175,23 +178,25 @@ func (s *WssToStream) Read(p []byte) (n int, err error) {
175
  var response WsResponse
176
  if err = json.Unmarshal(message, &response); err != nil {
177
  logger.Log.Error("unmarshal message error:", err)
 
178
  }
179
  if response.Data.ResponseId == s.ResponseId && response.Data.ConversationId == s.ConversationId {
180
- if response.Data.Body == "ZGF0YTogW0RPTkVdCgo=" {
181
- s.Server.Close()
182
- }
183
  data, err := base64.StdEncoding.DecodeString(response.Data.Body)
184
  if err != nil {
 
185
  return 0, err
186
  }
187
  copyLen := copy(p, data)
188
  if copyLen < len(data) {
189
- return copyLen, errors.New("buffer too small to hold message")
 
 
 
 
190
  }
191
  return copyLen, nil
192
- } else {
193
- return 0, nil
194
- }
195
  }
196
 
197
  func (s *WssToStream) Close() error {
 
148
  _, msg, err := s.Server.ReadMessage()
149
  if err != nil {
150
  logger.Log.Error("read message error:", err)
151
+ return nil, err
152
  }
153
  var response WsResponse
154
  if err = json.Unmarshal(msg, &response); err != nil {
155
  logger.Log.Error("unmarshal message error:", err)
156
+ return nil, err
157
  }
158
  if response.Data.ResponseId == s.ResponseId && response.Data.ConversationId == s.ConversationId {
 
 
 
159
  data, err := base64.StdEncoding.DecodeString(response.Data.Body)
160
  if err != nil {
161
+ logger.Log.Error("decode base64 message error:", err)
162
  return nil, err
163
  }
164
+ if response.Data.Body == "ZGF0YTogW0RPTkVdCgo=" {
165
+ s.Server.Close()
166
+ return NewNopCloser(data), io.EOF
167
+ }
168
  return NewNopCloser(data), nil
 
 
169
  }
170
+ return nil, errors.New("response id or conversation id does not match")
171
  }
172
  func (s *WssToStream) Read(p []byte) (n int, err error) {
173
  logger.Log.Debug("Read")
 
178
  var response WsResponse
179
  if err = json.Unmarshal(message, &response); err != nil {
180
  logger.Log.Error("unmarshal message error:", err)
181
+ return 0, err
182
  }
183
  if response.Data.ResponseId == s.ResponseId && response.Data.ConversationId == s.ConversationId {
 
 
 
184
  data, err := base64.StdEncoding.DecodeString(response.Data.Body)
185
  if err != nil {
186
+ logger.Log.Error("decode base64 message error:", err)
187
  return 0, err
188
  }
189
  copyLen := copy(p, data)
190
  if copyLen < len(data) {
191
+ return copyLen, io.ErrShortBuffer
192
+ }
193
+ if response.Data.Body == "ZGF0YTogW0RPTkVdCgo=" {
194
+ s.Server.Close()
195
+ return copyLen, io.EOF
196
  }
197
  return copyLen, nil
198
+ }
199
+ return 0, errors.New("response id or conversation id do not match")
 
200
  }
201
 
202
  func (s *WssToStream) Close() error {