StarrySkyWorld commited on
Commit
2e72a7f
·
1 Parent(s): 859f406

Add multi-part concurrent download with single-stream fallback

Browse files
Files changed (2) hide show
  1. README.md +1 -0
  2. backend/internal/services/tasks.go +166 -3
README.md CHANGED
@@ -13,6 +13,7 @@ pinned: false
13
 
14
  - 在沙箱根目录内浏览、上传、删除、移动、重命名文件
15
  - 发起服务端 URL 下载任务
 
16
  - 发起压缩包解压任务(支持 `.zip`、`.tar.gz`、`.tgz`)
17
  - 使用 SQLite 持久化任务状态并展示进度
18
 
 
13
 
14
  - 在沙箱根目录内浏览、上传、删除、移动、重命名文件
15
  - 发起服务端 URL 下载任务
16
+ - URL 下载支持 Range 分片并发加速(服务器不支持时自动回退单线程)
17
  - 发起压缩包解压任务(支持 `.zip`、`.tar.gz`、`.tgz`)
18
  - 使用 SQLite 持久化任务状态并展示进度
19
 
backend/internal/services/tasks.go CHANGED
@@ -14,6 +14,7 @@ import (
14
  "path/filepath"
15
  "strings"
16
  "sync"
 
17
  "time"
18
 
19
  "fastfileviewer/backend/internal/models"
@@ -38,6 +39,12 @@ type TaskService struct {
38
  subscribers map[uint]map[chan taskEvent]struct{}
39
  }
40
 
 
 
 
 
 
 
41
  func NewTaskService(database *gorm.DB, fs *FSService, workers int) *TaskService {
42
  if workers < 1 {
43
  workers = 1
@@ -170,7 +177,52 @@ func (s *TaskService) handleDownload(task *models.Task) error {
170
  }
171
 
172
  s.appendLog(task, "starting download: "+task.Source)
173
- resp, err := http.Get(task.Source) // #nosec G107 user-provided URLs are expected in this app design
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
174
  if err != nil {
175
  return err
176
  }
@@ -191,7 +243,6 @@ func (s *TaskService) handleDownload(task *models.Task) error {
191
  reader := bufio.NewReader(resp.Body)
192
  buf := make([]byte, 256*1024)
193
  lastTick := time.Now()
194
-
195
  for {
196
  n, readErr := reader.Read(buf)
197
  if n > 0 {
@@ -215,8 +266,120 @@ func (s *TaskService) handleDownload(task *models.Task) error {
215
  return readErr
216
  }
217
  }
 
 
218
 
219
- s.appendLog(task, "download complete")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
220
  return nil
221
  }
222
 
 
14
  "path/filepath"
15
  "strings"
16
  "sync"
17
+ "sync/atomic"
18
  "time"
19
 
20
  "fastfileviewer/backend/internal/models"
 
39
  subscribers map[uint]map[chan taskEvent]struct{}
40
  }
41
 
42
+ const (
43
+ minMultiPartSize = 16 * 1024 * 1024
44
+ maxDownloadParts = 8
45
+ partRetryLimit = 3
46
+ )
47
+
48
  func NewTaskService(database *gorm.DB, fs *FSService, workers int) *TaskService {
49
  if workers < 1 {
50
  workers = 1
 
177
  }
178
 
179
  s.appendLog(task, "starting download: "+task.Source)
180
+ size, supportsRange, err := probeDownload(task.Source)
181
+ if err == nil && supportsRange && size >= minMultiPartSize {
182
+ parts := int(size / (8 * 1024 * 1024))
183
+ if parts < 2 {
184
+ parts = 2
185
+ }
186
+ if parts > maxDownloadParts {
187
+ parts = maxDownloadParts
188
+ }
189
+ s.appendLog(task, fmt.Sprintf("range download enabled: parts=%d, size=%d bytes", parts, size))
190
+ if err := s.multiPartDownload(task, task.Source, target, size, parts); err == nil {
191
+ s.appendLog(task, "download complete")
192
+ return nil
193
+ } else {
194
+ s.appendLog(task, "multi-part failed, fallback to single stream: "+err.Error())
195
+ _ = os.Remove(target)
196
+ }
197
+ }
198
+
199
+ if err := s.singlePartDownload(task, task.Source, target); err != nil {
200
+ return err
201
+ }
202
+ s.appendLog(task, "download complete")
203
+ return nil
204
+ }
205
+
206
+ func probeDownload(url string) (size int64, supportsRange bool, err error) {
207
+ req, err := http.NewRequest(http.MethodHead, url, nil)
208
+ if err != nil {
209
+ return 0, false, err
210
+ }
211
+ resp, err := http.DefaultClient.Do(req) // #nosec G107 user-provided URLs are expected in this app design
212
+ if err != nil {
213
+ return 0, false, err
214
+ }
215
+ defer resp.Body.Close()
216
+ if resp.StatusCode < 200 || resp.StatusCode >= 400 {
217
+ return 0, false, fmt.Errorf("probe failed: %s", resp.Status)
218
+ }
219
+ size = resp.ContentLength
220
+ supportsRange = strings.Contains(strings.ToLower(resp.Header.Get("Accept-Ranges")), "bytes")
221
+ return size, supportsRange, nil
222
+ }
223
+
224
+ func (s *TaskService) singlePartDownload(task *models.Task, url, target string) error {
225
+ resp, err := http.Get(url) // #nosec G107 user-provided URLs are expected in this app design
226
  if err != nil {
227
  return err
228
  }
 
243
  reader := bufio.NewReader(resp.Body)
244
  buf := make([]byte, 256*1024)
245
  lastTick := time.Now()
 
246
  for {
247
  n, readErr := reader.Read(buf)
248
  if n > 0 {
 
266
  return readErr
267
  }
268
  }
269
+ return nil
270
+ }
271
 
272
+ func (s *TaskService) multiPartDownload(task *models.Task, url, target string, total int64, parts int) error {
273
+ file, err := os.OpenFile(target, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644)
274
+ if err != nil {
275
+ return err
276
+ }
277
+ defer file.Close()
278
+ if err := file.Truncate(total); err != nil {
279
+ return err
280
+ }
281
+
282
+ var downloaded atomic.Int64
283
+ stopProgress := make(chan struct{})
284
+ go func() {
285
+ ticker := time.NewTicker(500 * time.Millisecond)
286
+ defer ticker.Stop()
287
+ for {
288
+ select {
289
+ case <-stopProgress:
290
+ return
291
+ case <-ticker.C:
292
+ progress := int((downloaded.Load() * 100) / total)
293
+ if progress > 99 {
294
+ progress = 99
295
+ }
296
+ s.setProgress(task, progress)
297
+ }
298
+ }
299
+ }()
300
+
301
+ var wg sync.WaitGroup
302
+ errCh := make(chan error, parts)
303
+ chunkSize := total / int64(parts)
304
+
305
+ for i := 0; i < parts; i++ {
306
+ start := int64(i) * chunkSize
307
+ end := start + chunkSize - 1
308
+ if i == parts-1 {
309
+ end = total - 1
310
+ }
311
+
312
+ wg.Add(1)
313
+ go func(partID int, rangeStart, rangeEnd int64) {
314
+ defer wg.Done()
315
+ var lastErr error
316
+ for attempt := 1; attempt <= partRetryLimit; attempt++ {
317
+ err := downloadRange(url, file, rangeStart, rangeEnd, &downloaded)
318
+ if err == nil {
319
+ return
320
+ }
321
+ lastErr = err
322
+ time.Sleep(time.Duration(attempt) * 400 * time.Millisecond)
323
+ }
324
+ errCh <- fmt.Errorf("part %d failed: %w", partID, lastErr)
325
+ }(i+1, start, end)
326
+ }
327
+
328
+ wg.Wait()
329
+ close(stopProgress)
330
+ close(errCh)
331
+ for err := range errCh {
332
+ if err != nil {
333
+ return err
334
+ }
335
+ }
336
+ s.setProgress(task, 99)
337
+ return nil
338
+ }
339
+
340
+ func downloadRange(url string, out *os.File, start, end int64, downloaded *atomic.Int64) error {
341
+ req, err := http.NewRequest(http.MethodGet, url, nil)
342
+ if err != nil {
343
+ return err
344
+ }
345
+ req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, end))
346
+
347
+ resp, err := http.DefaultClient.Do(req) // #nosec G107 user-provided URLs are expected in this app design
348
+ if err != nil {
349
+ return err
350
+ }
351
+ defer resp.Body.Close()
352
+
353
+ if resp.StatusCode != http.StatusPartialContent {
354
+ return fmt.Errorf("server does not honor range request: %s", resp.Status)
355
+ }
356
+
357
+ reader := bufio.NewReader(resp.Body)
358
+ buf := make([]byte, 256*1024)
359
+ offset := start
360
+ expected := end - start + 1
361
+ var written int64
362
+
363
+ for {
364
+ n, readErr := reader.Read(buf)
365
+ if n > 0 {
366
+ if _, err := out.WriteAt(buf[:n], offset); err != nil {
367
+ return err
368
+ }
369
+ offset += int64(n)
370
+ written += int64(n)
371
+ downloaded.Add(int64(n))
372
+ }
373
+ if readErr == io.EOF {
374
+ break
375
+ }
376
+ if readErr != nil {
377
+ return readErr
378
+ }
379
+ }
380
+ if written != expected {
381
+ return fmt.Errorf("range incomplete: expected %d got %d", expected, written)
382
+ }
383
  return nil
384
  }
385