Scalable Consistency in Scatter

原文地址

  • idea:可扩展、一致性的分布式key-value 存储(evaluation of Scatter, a scalable and consistent distributed key-value storage system)
  • key insight:基于DHT 的一致性组
  • DHT 功能简介:传统DHT 中,应用数据和节点ID 都哈希为键值key,数值保存在先于或紧接着键值key 的那个节点
  • 为什么会发生一致性问题:1.Assignment violation 节点加入或者离开时对keyID 范围的声明的重叠。2.节点加入和离开对临近指针的影响。

Scatter Design

goal:一致性,可扩展性,自适应性

方法:用组来代替单个节点,RSM 基于paxos 一致性算法,组内使用一致性协议维持内部一致性,但是如静态指定组会存在一些问题:

  1. 当大量nodes 离开或者加入会使得该组失效
  2. 健壮性、可扩展性问题
  3. 当组内节点增多,一致性算法性能下降
  4. 热点问题

因此Scatter 允许多组操作:

  1. split       组一分为二
  2. merge   两临近组合二为一
  3. migrate 将组员从一组移动到另一组
  4. repartition 临近两组交换键值空间

强一致性在拓扑结构不是原子变化的情况下是很难保证的,因此文章引出“自创”的nested consensus 老保证原子性,但实际上nested consensus 就是组内的paxos 算法保证强一致性,组间两阶段提交保证弱一致性。

接着文章从发起方coordinator group,参与方participant group 解释了nested consensus 过程,实际上就是两阶段提交,文章也指出这样的交互过程需要大量的广播和消息,可以作为future work 进行改进。

整个Scatter 系统对外提供存储服务,每个组中会利用paxos 算法选举出一个master,组间交互只通过master,而具体数据时存在组内所有成员,存储对应数据的称为primary。文章谈到了所做的一些性能优化,lease,diskless paxos(信息不用存盘)、relaxed reads 等。

最后进行了测试并与openDHT 比较了consistency。

 

个人体会:

文章主要是提出了可扩展和一致性的key-value 存储系统,最大的创新在于把传统DHT 中单个节点的方法改成了一个组,节点的加入和退出就限制在了组内,对外这个组的状态和功能还是不变的。为了满足可扩展性,组可以进行分裂合并操作,通过形式上说明和实验组操作的OPs/sec、延时说明这个方法是可行、高效的。一致性是通过paxos 算法和两阶段提交实现,也不能说有新意。文章通过与openDHT 的比较,说明Scatter 不损失性能和可用性的情况下,一致性上比openDHT要好。但我有个小问题,组操作如果是手动的话就谈不上adaptive 了,如果是自动的话,这种组操作在节点加入和离开的时候频不频繁,虽然文章也给出了组操作的thoughput 和latency ,但我觉得这个应该同时进行读写操作才能说明问题,如果在现实情况中组操作严重干扰了用户的正常读写的话,试验中的高性能和可用性就不好说了。

写一个分布式存储系统有多简单? (How easy to write a Distributed Storage System ?)

用python 写了个用户态分布式存储系统(且这么称吧),实现文件存储。(今天在google 上看到两个人和我干了同样工作,PyGfs 使用了pyro 库封装了一些远程调用,后者则是模拟实现了GFS 功能,两份代码都不长。)

自己的分布式文件系统总体分为Client 端和DataServer 端,没有NameServer 是因为通过文件名hash 得到对应的DS 服务器。存储的粒度为文件,提供write 和fetch,写文件和获取文件功能,读写过程:通过对路径+文件名进行hash,用机器数量为模数,得到目的主机编号和地址,数据通过socket 传过去,DS 的机器数在部署时可配置,写在配置文件。

TODO:文件分片,NS 实现(只保存目录树)

配置文件为ip , port 格式:

 1: 127.0.0.1,10001
 2: 127.0.0.1,10002
 3: 127.0.0.1,10003

 

DataServer 端:

 1: #!/usr/bin/env python
 2: #encoding=utf-8
 3: import os,sys
 4: import socket
 5:
 6: class DataServer:
 7:     port=10000
 8:     def __init__(self):
 9:         pass
 10:     def run(self):
 11:         dsoc=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
 12:         dsoc.bind(('localhost',self.port))
 13:         print "listening at",self.port
 14:         while True:
 15:             dsoc.listen(1)
 16:             conn,addr=dsoc.accept()
 17:             print "connected",addr
 18:             databuf=conn.recv(13)
 19:             print "received",databuf
 20:             if databuf.startswith('WRITFILE'):
 21:                 print "OPS == write file"
 22:                 dlist=databuf.split(',')
 23:                 fnamelen=int(dlist[1])
 24:                 conn.send('OK')
 25:                 print "filenamelen is",fnamelen
 26:                 filename=conn.recv(fnamelen)
 27:                 filename=filename[filename.rindex('\\')+1:]
 28:                 print "file is",filename
 29:                 fp=open(filename,'wb')
 30:                 while True:
 31:                     data=conn.recv(1024)
 32:                     if not data:break
 33:                     fp.write(data)
 34:                 fp.flush()
 35:                 fp.close()
 36:                 print "finished!",filename
 37:             if databuf.startswith('FETCFILE'):
 38:                 print "OPS == fetch file"
 39:                 dlist=databuf.split(',')
 40:                 fnamelen=int(dlist[1])
 41:                 conn.send('OK')
 42:                 print "filenamelen is",fnamelen
 43:                 filename=conn.recv(fnamelen)
 44:                 filename=filename[filename.rindex('\\')+1:]
 45:                 print "file is",filename
 46:                 fp=open(filename,'rb')
 47:                 while True:
 48:                     data=fp.read(4096)
 49:                     if not data:
 50:                         break
 51:                     while len(data)>0:
 52:                         sent=conn.send(data)
 53:                         data=data[sent:]
 54:                 print "Fished to send ",filename
 55:             conn.close()
 56:
 57:     def setPort(self,port):
 58:         self.port=int(port)
 59:
 60: if __name__=="__main__":
 61:     ds=DataServer()
 62:     ds.setPort(sys.argv[1])
 63:     ds.run()

 

Client 端:

 1: #!/usr/bin/env python
 2: #encoding=utf-8
 3: import os
 4: import socket
 5: import hashlib
 6:
 7: class Configuration:
 8:     count=0
 9:     clist=[]
 10:     def __init__(self):
 11:         self.readPath("machineconf.txt")
 12:     def readPath(self,pStr):
 13:         self.pathStr=pStr
 14:         fp=open(self.pathStr,'r')
 15:         while True:
 16:             line=fp.readline()
 17:             if not line:
 18:                 break
 19:             line=line.rstrip()
 20:             self.clist.append(line)
 21:             self.count=self.count+1
 22:     def getCount(self):
 23:         return self.count
 24:     def getList(self):
 25:         return self.clist
 26:
 27: class Client:
 28:     maList=[]
 29:     macNum=0
 30:     def __init__(self):
 31:         conf=Configuration()
 32:         self.maList=conf.getList()
 33:         self.macNum=conf.getCount()
 34:     def write(self,src):
 35:         srcHash=self.hash(src)
 36:         print "File is",src
 37:         locatedNum=srcHash%self.macNum
 38:         print "Location machine number is",locatedNum
 39:         IPort=self.maList[locatedNum].split(',')
 40:         toIp=IPort[0]
 41:         toPort=IPort[1]
 42:         print "Send to",toIp,toPort
 43:         self.send(src,toIp,toPort)
 44:     def hash(self,src):
 45:         md5=hashlib.md5()
 46:         md5.update(src)
 47:         return int(md5.hexdigest()[-4:],16)
 48:     def send(self,src,ip,port):
 49:         clsoc=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
 50:         clsoc.connect((ip,int(port)))
 51:         fp=open(src,'rb')
 52:         formStr="WRITFILE,%04d"%len(src)
 53:         print "formStr",formStr,len(formStr)
 54:         clsoc.send(formStr)
 55:         resdata=clsoc.recv(1024)
 56:         if resdata.startswith('OK'):
 57:             print "OK"
 58:         print "sending....",src
 59:         clsoc.send(src)
 60:         print "sending data...."
 61:         while True:
 62:             data=fp.read(4096)
 63:             if not data:
 64:                 break
 65:             while len(data)>0:
 66:                 sent=clsoc.send(data)
 67:                 data=data[sent:]
 68:         print "Fished to send ",src
 69:         fp.close()
 70:     def fetch(self,src):
 71:         srcHash=self.hash(src)
 72:         locatedNum=srcHash%self.macNum
 73:         IPort=self.maList[locatedNum].split(',')
 74:         toIp=IPort[0]
 75:         toPort=IPort[1]
 76:         print "fetch from",toIp,toPort
 77:         self.get(src,toIp,toPort)
 78:     def get(self,src,ip,port):
 79:         clsoc=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
 80:         clsoc.connect((ip,int(port)))
 81:         formStr="FETCFILE,%04d"%len(src)
 82:         print "formStr",formStr
 83:         clsoc.send(formStr)
 84:         resdata=clsoc.recv(1024)
 85:         if resdata.startswith('OK'):
 86:             print "OK"
 87:         print "sending....",src
 88:         clsoc.send(src)
 89:         print "fetching data...."
 90:         ffile=src[src.rindex('\\')+1:]
 91:         fp=open(ffile,'wb')
 92:         while True:
 93:             data=clsoc.recv(1024)
 94:             if not data:break
 95:             fp.write(data)
 96:             print "fetching",data[-8:]
 97:         fp.flush()
 98:         fp.close()
 99:         print "finished!",src
 100: if __name__=="__main__":
 101:     mac=Client()
 102:     src="e:\\pics\\tumblr_m0pvvmcIv41r9mp65o1_500.gif"
 103:     mac.write(src)
 104:     mac.fetch(src)

HDFS---Block

块大小是在配置时改变。参考http://serverfault.com/questions/300135/hadoop-hdfs-set-file-block-size-from-commandline

Block 的分析 http://blog.sina.com.cn/s/blog_698853500100rxwu.html

  • org.apache.hadoop.hdfs.protocol
    • Block.java 实现Block 类,Block 文件名为”blk_”+BlockId,
    • BlockListAsLongs.java 提供了一个long[] 类型的接口用于访问list of blocks,在进行块报告的时候有用,返回的是long[] 型,而不是Block[] 型。里面就讲到了Block 有三个longs:block_id,block length 和 genration stamp(http://hi.baidu.com/jay23jack/blog/item /388d770bb4db20b72fddd40f.html 谈到了stamp 和BlockId 的作用和区别)。
    • BlockLocalPathInfo.java 本地文件系统数据块和它在本地元数据的路径。
    • LocateBlock.java/LocateBlocks.java 非常重要的基础类,LocatedBlock 的内容有如下:

LocatedBlock

数据块(Block 类型,offset of the first byte of the block in the file),

 数据块在文件的偏移(long),

 数据块指针(DatanodeInfo[]),

 是否损坏(boolean)。

LocatedBlocks

fileLength(文件长度,即块组长度)

 List<LocatedBlock> blocks (块组列表)

 underConstruction (是否在租约中)

  • org.apache.hadoop.hdfs.server.namenode
    • BlocksMap.java 块到块元数据(块属于哪个INode以及块存在哪些DataNodes上)的映射。BlockInfo 是BlocksMap 中的重要结构,保存了上面所说的映射关系, 继承了Block 类并包含了INodeFile 对象和Block[] 结构,该结构包含了(1)数据块所在DataNode 描述符,即triplets[3*i](2)triplets[3*i+1] 是前一个数据块(3)triplets[3*i+2]是后一个数据块。如下:

Block

blockId (long)

numBytes (long)

generationStamp(long)

BlockInfo

blockId (long)

numBytes (long)

generationStamp(long)

inode(INodeFile)

DN1|prev|next< >DN2|prev|next< >DN3|prev|next

  • org.apache.hadoop.hdfs.server.datanode
    • BlockAlreadyExistException.java 当块已经存在时不可写
    • BlockReceiver.java 接收一个块并写入到磁盘
    • BlockSender.java 从本地读取一个块发送给接收者
    • DataBlockScanner 扫描块,但不改变元数据。扫描速度限制在1MB 8MB,默认扫描时间为三周,块内包含了扫描类型时间和结果

参考:http://blog.csdn.net/AE86_FC/article/details/5842020 NameNode启动过程详细剖析

 

Brewer 的 CAP 理论

Brewer 在 2000 年基于他在伯克利大学的工作以及对 Inktomi 的观察上提出了 CAP 理论(牛人就是观察出的理论啊!),这之前(1997 年SOSP Cluster-Based Scalable Network Services 和1999年 Cluster-Based Scalable Network Services)他和他的同事也提出了应该在高扩展性系统做出取舍权衡,所以在 2000 年提出的这个理论也不是一个特别意外新颖的观点,和许多著名的理论相同,他们都是建立大量工作和牛人基础之上的。

继续阅读