部分代码参考之前的“写一个分布式存储系统有多简单?”,保持数据服务器端不变,修改客户端节点选择方式即可。
修改:1、添加HashRing 类,进行一致性哈希环的管理,保存在客户端client ,因此节点/数据服务器端可以不做任何修改(添加hello 测试)。2、客户端修改节点选择方式,使用consistent hashing。
接受一致性哈希的国内的也比较多了,可以参考这里
conshash.py 文件
1: import md5
2:
3: class HashRing(object):
4: def __init__(self,nodes=None,replicas=3):
5: """initial HashRing,
6: ring store pairs of (key,node),
7: replicas represent the number of virtual nodes
8: sorted_keys saves sorted keys"""
9: self.ring=dict()
10: self.replicas=replicas
11: self.sorted_keys=[]
12: if nodes:
13: for node in nodes:
14: self.add_node(node)
15:
16: def add_node(self,node):
17: """add each (virtual)node, sorted"""
18: for i in xrange(0,self.replicas):
19: key=self.gen_key('%s%s'%(node,i))
20: self.ring[key]=node
21: self.sorted_keys.append(key)
22: self.sorted_keys.sort()
23:
24: def get_node_pos(self,str_src):
25: """return the node to store src file"""
26: fHash=self.gen_key(str_src)
27: node_hash=self.upper_node_hash(fHash)
28: for each in self.ring:
29: if each==node_hash:
30: return self.ring[each]
31:
32: def gen_key(self,str_node):
33: """return (virtual)node's hash result"""
34: m=md5.new()
35: m.update(str_node)
36: return long(m.hexdigest(),16)
37:
38: def upper_node_hash(self,xhash):
39: """return upper node hash for xhash"""
40: for each in self.sorted_keys:
41: if each>xhash:
42: return each
client 端:
1: #!/usr/bin/env python
2: #encoding=utf-8
3:
4: import os
5: import socket
6: import hashlib
7: from conshash import HashRing
8:
9: class Configuration:
10: count=0
11: clist=[]
12: def __init__(self):
13: """read the configuration file"""
14: self.readPath("machineconf.txt")
15:
16: def readPath(self,pStr):
17: """read the machines from the file"""
18: self.pathStr=pStr
19: fp=open(self.pathStr,'r')
20: while True:
21: line=fp.readline()
22: if not line:
23: break
24: line=line.rstrip()
25: self.clist.append(line)
26: self.count=self.count+1
27:
28: def get_count(self):
29: """return the number of the nodes"""
30: return self.count
31:
32: def get_list(self):
33: """return the list of the nodes"""
34: return self.clist
35:
36: class Client:
37: def __init__(self):
38: """get nodes"""
39: self.malist=[]
40: self.macnum=0
41: conf=Configuration()
42: self.malist=conf.get_list()
43: self.macnum=conf.get_count()
44: print "nodelist:",self.malist
45: self.hring=HashRing(self.malist)
46:
47: def write(self,src):
48: """write src file into node"""
49: print "File is",src
50: toIP,toPort=self.src_to_node(src)
51: print "Send to",toIP,toPort
52: self.send(src,toIP,toPort)
53:
54: def src_to_node(self,src):
55: des=self.hring.get_node_pos(src)
56: iPort=des.split(':')
57: return iPort[0],iPort[1]
58:
59: def send(self,src,ip,port):
60: clsoc=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
61: clsoc.connect((ip,int(port)))
62: fp=open(src,'rb')
63: formStr="WRITFILE,%04d"%len(src)
64: print "formStr",formStr,len(formStr)
65: clsoc.send(formStr)
66: resdata=clsoc.recv(1024)
67: if resdata.startswith('OK'):
68: print "OK"
69: print "sending....",src
70: clsoc.send(src)
71: print "sending data...."
72: while True:
73: data=fp.read(4096)
74: if not data:
75: break
76: while len(data)>0:
77: sent=clsoc.send(data)
78: data=data[sent:]
79: print "Fished to send ",src
80: fp.close()
81:
82: def fetch(self,src):
83: toIP,toPort=self.src_to_node(src)
84: print "fetch from",toIP,toPort
85: self.get(src,toIP,toPort)
86:
87: def get(self,src,ip,port):
88: clsoc=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
89: clsoc.connect((ip,int(port)))
90: formStr="FETCFILE,%04d"%len(src)
91: print "formStr",formStr
92: clsoc.send(formStr)
93: resdata=clsoc.recv(1024)
94: if resdata.startswith('OK'):
95: print "OK"
96: print "sending....",src
97: clsoc.send(src)
98: print "fetching data...."
99: ffile=src[src.rindex('\\')+1:]
100: fp=open(ffile,'wb')
101: while True:
102: data=clsoc.recv(1024)
103: if not data:break
104: fp.write(data)
105: print "fetching",data[-8:]
106: fp.flush()
107: fp.close()
108: print "finished!",src
109:
110: if __name__=="__main__":
111: mac=Client()
112: src="E:\\paper\\key-value\\Dynam_A_Transparent_Dynamic_Optimization_System.pdf"
113: mac.write(src)
114: mac.fetch(src)
node 端:
1: #!/usr/bin/env python
2: #encoding=utf-8
3:
4: import os,sys
5: import socket
6:
7: class Node:
8: def __init__(self):
9: self.port=10000
10: def run(self):
11: dsoc=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
12: dsoc.bind(('localhost',self.port))
13: print "listening at",self.port
14: while True:
15: dsoc.listen(1)
16: conn,addr=dsoc.accept()
17: print "connected",addr
18: databuf=conn.recv(13)
19: print "received",databuf
20: if databuf.startswith('WRITFILE'):
21: print "OPS == write file"
22: dlist=databuf.split(',')
23: fnamelen=int(dlist[1])
24: conn.send('OK')
25: print "filenamelen is",fnamelen
26: filename=conn.recv(fnamelen)
27: filename=filename[filename.rindex('\\')+1:]
28: print "file is",filename
29: fp=open(filename,'wb')
30: while True:
31: data=conn.recv(1024)
32: if not data:break
33: fp.write(data)
34: fp.flush()
35: fp.close()
36: print "finished!",filename
37: if databuf.startswith('FETCFILE'):
38: print "OPS == fetch file"
39: dlist=databuf.split(',')
40: fnamelen=int(dlist[1])
41: conn.send('OK')
42: print "filenamelen is",fnamelen
43: filename=conn.recv(fnamelen)
44: filename=filename[filename.rindex('\\')+1:]
45: print "file is",filename
46: fp=open(filename,'rb')
47: while True:
48: data=fp.read(4096)
49: if not data:
50: break
51: while len(data)>0:
52: sent=conn.send(data)
53: data=data[sent:]
54: print "Fished to send ",filename
55: if databuf.startswith('HELLNODE'):
56: print "client say hello!"
57: conn.close()
58:
59: def setPort(self,port):
60: self.port=int(port)
61:
62: if __name__=="__main__":
63: nd=Node()
64: nd.setPort(sys.argv[1])
65: nd.run()
- 存储node 输出:
E:\python\consistent hash\myDHT>python node.py 10003
listening at 10003
connected (‘127.0.0.1’, 54992)
received WRITFILE,0070
OPS == write file
filenamelen is 70
file is Dynam_A_Transparent_Dynamic_Optimization_System.pdf
finished! Dynam_A_Transparent_Dynamic_Optimization_System.pdf
connected (‘127.0.0.1’, 54993)
received FETCFILE,0070
OPS == fetch file
filenamelen is 70
file is Dynam_A_Transparent_Dynamic_Optimization_System.pdf
Fished to send Dynam_A_Transparent_Dynamic_Optimization_System.pdf
- client 输出:
E:\python\consistent hash\myDHT>python client.py
nodelist: [‘127.0.0.1:10001’, ‘127.0.0.1:10002’, ‘127.0.0.1:10003’]
File is E:\paper\key-value\Dynam_A_Transparent_Dynamic_Optimization_System.pdf
Send to 127.0.0.1 10003
formStr WRITFILE,0070 13
OK
sending…. E:\paper\key-value\Dynam_A_Transparent_Dynamic_Optimization_System.p
df
sending data….
Fished to send E:\paper\key-value\Dynam_A_Transparent_Dynamic_Optimization_Syst
em.pdf
fetch from 127.0.0.1 10003
formStr FETCFILE,0070
OK
sending…. E:\paper\key-value\Dynam_A_Transparent_Dynamic_Optimization_System.p
df
fetching data….
/Tyching
fetching /Type /F
fetching 鰧;倖罕
fetching astChar
fetching 1Z改Y_
fetching 芡?兩
fetching 粷?^?
fetching 礥\毻
fetching 鮨?琯
;xE>hing 潵D
1etching 0 obj
fetching <暉o
fetching ?讲玌
fetching Qr&$c
fetching 怯熸刭:)
fetching 袏@?_靬
fetching 鐩?}F?
fetching O74s札c
fetching 疵鞕a腂
fetching O獫RX@昑
fetching j3u翰6o|
fetching 涿?b翡
fetching 姩*K痶?
fetching _9敌瞩踱
fetching ph滍w
fetching ?K?A
fetching ?攁譹保
fetching < /GS2 1
finished! E:\paper\key-value\Dynam_A_Transparent_Dynamic_Optimization_System.pdf
Pingback引用通告: Dynamo: Amazon’s highly available key-value store | dullgull