Consistent hashing 的python 实现

部分代码参考之前的“写一个分布式存储系统有多简单?”,保持数据服务器端不变,修改客户端节点选择方式即可。

修改:1、添加HashRing 类,进行一致性哈希环的管理,保存在客户端client ,因此节点/数据服务器端可以不做任何修改(添加hello 测试)。2、客户端修改节点选择方式,使用consistent hashing。

接受一致性哈希的国内的也比较多了,可以参考这里

conshash.py 文件

 1: import md5
 2:
 3: class HashRing(object):
 4:     def __init__(self,nodes=None,replicas=3):
 5:         """initial HashRing, 
 6:         ring store pairs of (key,node),
 7:         replicas represent the number of virtual nodes
 8:         sorted_keys saves sorted keys"""
 9:         self.ring=dict()
 10:         self.replicas=replicas
 11:         self.sorted_keys=[]
 12:         if nodes:
 13:             for node in nodes:
 14:                 self.add_node(node)
 15:
 16:     def add_node(self,node):
 17:         """add each (virtual)node, sorted"""
 18:         for i in xrange(0,self.replicas):
 19:             key=self.gen_key('%s%s'%(node,i))
 20:             self.ring[key]=node
 21:             self.sorted_keys.append(key)
 22:         self.sorted_keys.sort()
 23:
 24:     def get_node_pos(self,str_src):
 25:         """return the node to store src file"""
 26:         fHash=self.gen_key(str_src)
 27:         node_hash=self.upper_node_hash(fHash)
 28:         for each in self.ring:
 29:             if each==node_hash:
 30:                 return self.ring[each]
 31:
 32:     def gen_key(self,str_node):
 33:         """return (virtual)node's hash result"""
 34:         m=md5.new()
 35:         m.update(str_node)
 36:         return long(m.hexdigest(),16)
 37:
 38:     def upper_node_hash(self,xhash):
 39:         """return upper node hash for xhash"""
 40:         for each in self.sorted_keys:
 41:             if each>xhash:
 42:                 return each

 

 

client 端:

 1: #!/usr/bin/env python
 2: #encoding=utf-8
 3:
 4: import os
 5: import socket
 6: import hashlib
 7: from conshash import HashRing
 8:
 9: class Configuration:
 10:     count=0
 11:     clist=[]
 12:     def __init__(self):
 13:         """read the configuration file"""
 14:         self.readPath("machineconf.txt")
 15:
 16:     def readPath(self,pStr):
 17:         """read the machines from the file"""
 18:         self.pathStr=pStr
 19:         fp=open(self.pathStr,'r')
 20:         while True:
 21:             line=fp.readline()
 22:             if not line:
 23:                 break
 24:             line=line.rstrip()
 25:             self.clist.append(line)
 26:             self.count=self.count+1
 27:
 28:     def get_count(self):
 29:         """return the number of the nodes"""
 30:         return self.count
 31:
 32:     def get_list(self):
 33:         """return the list of the nodes"""
 34:         return self.clist
 35:
 36: class Client:
 37:     def __init__(self):
 38:         """get nodes"""
 39:         self.malist=[]
 40:         self.macnum=0
 41:         conf=Configuration()
 42:         self.malist=conf.get_list()
 43:         self.macnum=conf.get_count()
 44:         print "nodelist:",self.malist
 45:         self.hring=HashRing(self.malist)
 46:
 47:     def write(self,src):
 48:         """write src file into node"""
 49:         print "File is",src
 50:         toIP,toPort=self.src_to_node(src)
 51:         print "Send to",toIP,toPort
 52:         self.send(src,toIP,toPort)
 53:
 54:     def src_to_node(self,src):
 55:         des=self.hring.get_node_pos(src)
 56:         iPort=des.split(':')
 57:         return iPort[0],iPort[1]
 58:
 59:     def send(self,src,ip,port):
 60:         clsoc=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
 61:         clsoc.connect((ip,int(port)))
 62:         fp=open(src,'rb')
 63:         formStr="WRITFILE,%04d"%len(src)
 64:         print "formStr",formStr,len(formStr)
 65:         clsoc.send(formStr)
 66:         resdata=clsoc.recv(1024)
 67:         if resdata.startswith('OK'):
 68:             print "OK"
 69:         print "sending....",src
 70:         clsoc.send(src)
 71:         print "sending data...."
 72:         while True:
 73:             data=fp.read(4096)
 74:             if not data:
 75:                 break
 76:             while len(data)>0:
 77:                 sent=clsoc.send(data)
 78:                 data=data[sent:]
 79:         print "Fished to send ",src
 80:         fp.close()
 81:
 82:     def fetch(self,src):
 83:         toIP,toPort=self.src_to_node(src)
 84:         print "fetch from",toIP,toPort
 85:         self.get(src,toIP,toPort)
 86:
 87:     def get(self,src,ip,port):
 88:         clsoc=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
 89:         clsoc.connect((ip,int(port)))
 90:         formStr="FETCFILE,%04d"%len(src)
 91:         print "formStr",formStr
 92:         clsoc.send(formStr)
 93:         resdata=clsoc.recv(1024)
 94:         if resdata.startswith('OK'):
 95:             print "OK"
 96:         print "sending....",src
 97:         clsoc.send(src)
 98:         print "fetching data...."
 99:         ffile=src[src.rindex('\\')+1:]
 100:         fp=open(ffile,'wb')
 101:         while True:
 102:             data=clsoc.recv(1024)
 103:             if not data:break
 104:             fp.write(data)
 105:             print "fetching",data[-8:]
 106:         fp.flush()
 107:         fp.close()
 108:         print "finished!",src
 109:
 110: if __name__=="__main__":
 111:     mac=Client()
 112:     src="E:\\paper\\key-value\\Dynam_A_Transparent_Dynamic_Optimization_System.pdf"
 113:     mac.write(src)
 114:     mac.fetch(src)

 

 

node  端:

 1: #!/usr/bin/env python
 2: #encoding=utf-8
 3:
 4: import os,sys
 5: import socket
 6:
 7: class Node:
 8:     def __init__(self):
 9:         self.port=10000
 10:     def run(self):
 11:         dsoc=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
 12:         dsoc.bind(('localhost',self.port))
 13:         print "listening at",self.port
 14:         while True:
 15:             dsoc.listen(1)
 16:             conn,addr=dsoc.accept()
 17:             print "connected",addr
 18:             databuf=conn.recv(13)
 19:             print "received",databuf
 20:             if databuf.startswith('WRITFILE'):
 21:                 print "OPS == write file"
 22:                 dlist=databuf.split(',')
 23:                 fnamelen=int(dlist[1])
 24:                 conn.send('OK')
 25:                 print "filenamelen is",fnamelen
 26:                 filename=conn.recv(fnamelen)
 27:                 filename=filename[filename.rindex('\\')+1:]
 28:                 print "file is",filename
 29:                 fp=open(filename,'wb')
 30:                 while True:
 31:                     data=conn.recv(1024)
 32:                     if not data:break
 33:                     fp.write(data)
 34:                 fp.flush()
 35:                 fp.close()
 36:                 print "finished!",filename
 37:             if databuf.startswith('FETCFILE'):
 38:                 print "OPS == fetch file"
 39:                 dlist=databuf.split(',')
 40:                 fnamelen=int(dlist[1])
 41:                 conn.send('OK')
 42:                 print "filenamelen is",fnamelen
 43:                 filename=conn.recv(fnamelen)
 44:                 filename=filename[filename.rindex('\\')+1:]
 45:                 print "file is",filename
 46:                 fp=open(filename,'rb')
 47:                 while True:
 48:                     data=fp.read(4096)
 49:                     if not data:
 50:                         break
 51:                     while len(data)>0:
 52:                         sent=conn.send(data)
 53:                         data=data[sent:]
 54:                 print "Fished to send ",filename
 55:             if databuf.startswith('HELLNODE'):
 56:                 print "client say hello!"
 57:             conn.close()
 58:
 59:     def setPort(self,port):
 60:         self.port=int(port)
 61:
 62: if __name__=="__main__":
 63:     nd=Node()
 64:     nd.setPort(sys.argv[1])
 65:     nd.run()

 


  • 存储node 输出:

E:\python\consistent hash\myDHT>python node.py 10003
listening at 10003
connected (‘127.0.0.1’, 54992)
received WRITFILE,0070
OPS == write file
filenamelen is 70
file is Dynam_A_Transparent_Dynamic_Optimization_System.pdf
finished! Dynam_A_Transparent_Dynamic_Optimization_System.pdf
connected (‘127.0.0.1’, 54993)
received FETCFILE,0070
OPS == fetch file
filenamelen is 70
file is Dynam_A_Transparent_Dynamic_Optimization_System.pdf
Fished to send  Dynam_A_Transparent_Dynamic_Optimization_System.pdf

  • client 输出:

E:\python\consistent hash\myDHT>python client.py
nodelist: [‘127.0.0.1:10001’, ‘127.0.0.1:10002’, ‘127.0.0.1:10003’]
File is E:\paper\key-value\Dynam_A_Transparent_Dynamic_Optimization_System.pdf
Send to 127.0.0.1 10003
formStr WRITFILE,0070 13
OK
sending…. E:\paper\key-value\Dynam_A_Transparent_Dynamic_Optimization_System.p
df
sending data….
Fished to send  E:\paper\key-value\Dynam_A_Transparent_Dynamic_Optimization_Syst
em.pdf
fetch from 127.0.0.1 10003
formStr FETCFILE,0070
OK
sending…. E:\paper\key-value\Dynam_A_Transparent_Dynamic_Optimization_System.p
df
fetching data….
/Tyching
fetching /Type /F
fetching 鰧;倖罕
fetching astChar
fetching 1Z改Y_
fetching 芡?兩
fetching 粷?^?
fetching 礥\毻
fetching 鮨?琯
;xE>hing 潵D
1etching  0 obj
fetching <暉o
fetching ?讲玌
fetching Qr&$c
fetching 怯熸刭:)
fetching 袏@?_靬
fetching 鐩?}F?
fetching O74s札c
fetching 疵鞕a腂
fetching O獫[email protected]
fetching j3u翰6o|
fetching 涿?b翡
fetching 姩*K痶?
fetching _9敌瞩踱
fetching ph滍w
fetching ?K?A
fetching ?攁譹保
fetching < /GS2 1
finished! E:\paper\key-value\Dynam_A_Transparent_Dynamic_Optimization_System.pdf

Consistent hashing 的python 实现》上有1条评论

  1. Pingback引用通告: Dynamo: Amazon’s highly available key-value store | dullgull

发表评论

电子邮件地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据