流式传输中处理文件分片

Streaming!!

为了提高文件传输的效率和增加容错,我们往往会选择在服务端将文件chunking成一个个分片,直接将这些分片通过传输管道传输到客户端,在客户端进行分片的组装

最理想的状态是:管道十分可靠,而且还可以保持分片传输的顺序,客户端可以很惬意得将分片按照顺序组装完成

不太好的状态是:管道虽然可靠,但是不能保证分片传输后还是保持原有的顺序

顺序可能变成:F1C1 → F1C3 → F2C2 → F3C1 → F1C2 这种不按套路的序列

为了保证传输的效率,我们还往往会采取异步处理的策略,同一段时间内,我们有很多线程不断试图保存保存分片数据

所以,我们面对的问题就很明显了:如何处理乱序分片+防止竞争

0x00

msg, err := sc.stream.Recv()

这是数据的入口,一个分片就这样进入了我们的某一个线程

我这里的处理思路是:

每一个文件就创建一个struct,用来handle所有属于这个文件的分片

为了方便地整合管理所有这些struct,要有一个map来做stuct的存取,当然map一定是要线程安全的,这里用sync.Map

所以,对于每一个进来的分片,我们都先从map里面找有没有相对应的文件的struct,如果有,拿取,如果没有,创建一个新的struct并存入,碰巧和sync.Map的LoadOrStore方法不谋而合

直接封装一个get or create方法

func (sc *StreamController) getOrCreateProcessor(msg *proto.DataChunk) (*processor.FileProcessor, error) {
	entry := &ProcessorEntry{
		Once: &sync.Once{},
	}

	entryInterface, _ := sc.FileProcessorMap.LoadOrStore(msg.FileName, entry)
	actualEntry := entryInterface.(*ProcessorEntry) // 这里的类型断言是安全的,因为我们只存储一种类型

	actualEntry.Once.Do(func() {
		actualEntry.Processor = processor.NewFileProcessor(
			msg.FileName,
			msg.ChunkTotal,
			msg.FileSize,
			sc.recvErr,
		)
		logger.Info("处理器运作")
	})

	return actualEntry.Processor, nil
}
type ProcessorEntry struct {
	Once      *sync.Once
	Processor *processor.FileProcessor
}

FileProcessorMap.LoadOrStore不必多说,有则取,无则存

判断一个struct是否存在,对于这种并发场景太昂贵了,直接采用Once.Do这种原子操作,无论执行多少次,都只执行最开始的一次,实现了只创建一次FileProcessor

这个函数实现了一个通用的方法,每一个线程都可以安全地取得匹配自己分片的struct

接下来只需要将分片信息传递给这个struct,由他处理此分片

0x01

// ProcessChunk 处理单个文件分片,并发安全
func (fp *FileProcessor) ProcessChunk(chunkIndex int, data []byte) {
	fp.mu.Lock()
	defer fp.mu.Unlock()

	// 再次检查是否已完成,防止竞态
	if fp.isComplete.Load() {
		fp.recvErr <- fmt.Errorf("file %s processing already completed", fp.fileName)
		return
	}

	// 检查是否已存储该分片
	if _, exists := fp.chunks[chunkIndex]; exists {
		return // 避免重复存储
	}

	// 存储分片
	// fp.chunks[chunkIndex] = make([]byte, len(data))
	// copy(fp.chunks[chunkIndex], data)
	fp.chunks[chunkIndex] = data

	/*	if chunkIndex == 0 {
		logger.Infof("%x", data)
	}*/

	// 增加计数
	count := fp.receivedCount.Add(1)

	// 确保 `WriteToFile` 只被调用一次
	if count == atomic.LoadInt32(&fp.totalChunks) {
		fp.isComplete.Store(true)
		go fp.WriteToFile()
	}
}

在并发场景,加锁是很重要的,尤其ProcessChunk是每一个线程都会调用的,竞争非常剧烈,不能不加锁

这里面的逻辑很简单:

检查是否是一个已经填充完毕的struct

如果不是,将自身填充进map

原子计数器,分片数加一,当计数==分片总数则将fp.isComplete设为True,收集完毕则开始将文件写入磁盘