流式传输中处理文件分片
Streaming!!
为了提高文件传输的效率和增加容错,我们往往会选择在服务端将文件chunking成一个个分片,直接将这些分片通过传输管道传输到客户端,在客户端进行分片的组装
最理想的状态是:管道十分可靠,而且还可以保持分片传输的顺序,客户端可以很惬意得将分片按照顺序组装完成
不太好的状态是:管道虽然可靠,但是不能保证分片传输后还是保持原有的顺序
顺序可能变成:F1C1 → F1C3 → F2C2 → F3C1 → F1C2 这种不按套路的序列
为了保证传输的效率,我们还往往会采取异步处理的策略,同一段时间内,我们有很多线程不断试图保存保存分片数据
所以,我们面对的问题就很明显了:如何处理乱序分片+防止竞争
0x00
msg, err := sc.stream.Recv()
这是数据的入口,一个分片就这样进入了我们的某一个线程
我这里的处理思路是:
每一个文件就创建一个struct,用来handle所有属于这个文件的分片
为了方便地整合管理所有这些struct,要有一个map来做stuct的存取,当然map一定是要线程安全的,这里用sync.Map
所以,对于每一个进来的分片,我们都先从map里面找有没有相对应的文件的struct,如果有,拿取,如果没有,创建一个新的struct并存入,碰巧和sync.Map的LoadOrStore方法不谋而合
直接封装一个get or create方法
func (sc *StreamController) getOrCreateProcessor(msg *proto.DataChunk) (*processor.FileProcessor, error) {
entry := &ProcessorEntry{
Once: &sync.Once{},
}
entryInterface, _ := sc.FileProcessorMap.LoadOrStore(msg.FileName, entry)
actualEntry := entryInterface.(*ProcessorEntry) // 这里的类型断言是安全的,因为我们只存储一种类型
actualEntry.Once.Do(func() {
actualEntry.Processor = processor.NewFileProcessor(
msg.FileName,
msg.ChunkTotal,
msg.FileSize,
sc.recvErr,
)
logger.Info("处理器运作")
})
return actualEntry.Processor, nil
}
type ProcessorEntry struct {
Once *sync.Once
Processor *processor.FileProcessor
}
FileProcessorMap.LoadOrStore不必多说,有则取,无则存
判断一个struct是否存在,对于这种并发场景太昂贵了,直接采用Once.Do这种原子操作,无论执行多少次,都只执行最开始的一次,实现了只创建一次FileProcessor
这个函数实现了一个通用的方法,每一个线程都可以安全地取得匹配自己分片的struct
接下来只需要将分片信息传递给这个struct,由他处理此分片
0x01
// ProcessChunk 处理单个文件分片,并发安全
func (fp *FileProcessor) ProcessChunk(chunkIndex int, data []byte) {
fp.mu.Lock()
defer fp.mu.Unlock()
// 再次检查是否已完成,防止竞态
if fp.isComplete.Load() {
fp.recvErr <- fmt.Errorf("file %s processing already completed", fp.fileName)
return
}
// 检查是否已存储该分片
if _, exists := fp.chunks[chunkIndex]; exists {
return // 避免重复存储
}
// 存储分片
// fp.chunks[chunkIndex] = make([]byte, len(data))
// copy(fp.chunks[chunkIndex], data)
fp.chunks[chunkIndex] = data
/* if chunkIndex == 0 {
logger.Infof("%x", data)
}*/
// 增加计数
count := fp.receivedCount.Add(1)
// 确保 `WriteToFile` 只被调用一次
if count == atomic.LoadInt32(&fp.totalChunks) {
fp.isComplete.Store(true)
go fp.WriteToFile()
}
}
在并发场景,加锁是很重要的,尤其ProcessChunk是每一个线程都会调用的,竞争非常剧烈,不能不加锁
这里面的逻辑很简单:
检查是否是一个已经填充完毕的struct
如果不是,将自身填充进map
原子计数器,分片数加一,当计数==分片总数则将fp.isComplete设为True,收集完毕则开始将文件写入磁盘