用python 写了个用户态分布式存储系统(且这么称吧),实现文件存储。(今天在google 上看到两个人和我干了同样工作,PyGfs 使用了pyro 库封装了一些远程调用,后者则是模拟实现了GFS 功能,两份代码都不长。)
自己的分布式文件系统总体分为Client 端和DataServer 端,没有NameServer 是因为通过文件名hash 得到对应的DS 服务器。存储的粒度为文件,提供write 和fetch,写文件和获取文件功能,读写过程:通过对路径+文件名进行hash,用机器数量为模数,得到目的主机编号和地址,数据通过socket 传过去,DS 的机器数在部署时可配置,写在配置文件。
TODO:文件分片,NS 实现(只保存目录树)
配置文件为ip , port 格式:
1: 127.0.0.1,10001
2: 127.0.0.1,10002
3: 127.0.0.1,10003
DataServer 端:
1: #!/usr/bin/env python
2: #encoding=utf-8
3: import os,sys
4: import socket
5:
6: class DataServer:
7: port=10000
8: def __init__(self):
9: pass
10: def run(self):
11: dsoc=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
12: dsoc.bind(('localhost',self.port))
13: print "listening at",self.port
14: while True:
15: dsoc.listen(1)
16: conn,addr=dsoc.accept()
17: print "connected",addr
18: databuf=conn.recv(13)
19: print "received",databuf
20: if databuf.startswith('WRITFILE'):
21: print "OPS == write file"
22: dlist=databuf.split(',')
23: fnamelen=int(dlist[1])
24: conn.send('OK')
25: print "filenamelen is",fnamelen
26: filename=conn.recv(fnamelen)
27: filename=filename[filename.rindex('\\')+1:]
28: print "file is",filename
29: fp=open(filename,'wb')
30: while True:
31: data=conn.recv(1024)
32: if not data:break
33: fp.write(data)
34: fp.flush()
35: fp.close()
36: print "finished!",filename
37: if databuf.startswith('FETCFILE'):
38: print "OPS == fetch file"
39: dlist=databuf.split(',')
40: fnamelen=int(dlist[1])
41: conn.send('OK')
42: print "filenamelen is",fnamelen
43: filename=conn.recv(fnamelen)
44: filename=filename[filename.rindex('\\')+1:]
45: print "file is",filename
46: fp=open(filename,'rb')
47: while True:
48: data=fp.read(4096)
49: if not data:
50: break
51: while len(data)>0:
52: sent=conn.send(data)
53: data=data[sent:]
54: print "Fished to send ",filename
55: conn.close()
56:
57: def setPort(self,port):
58: self.port=int(port)
59:
60: if __name__=="__main__":
61: ds=DataServer()
62: ds.setPort(sys.argv[1])
63: ds.run()
Client 端:
1: #!/usr/bin/env python
2: #encoding=utf-8
3: import os
4: import socket
5: import hashlib
6:
7: class Configuration:
8: count=0
9: clist=[]
10: def __init__(self):
11: self.readPath("machineconf.txt")
12: def readPath(self,pStr):
13: self.pathStr=pStr
14: fp=open(self.pathStr,'r')
15: while True:
16: line=fp.readline()
17: if not line:
18: break
19: line=line.rstrip()
20: self.clist.append(line)
21: self.count=self.count+1
22: def getCount(self):
23: return self.count
24: def getList(self):
25: return self.clist
26:
27: class Client:
28: maList=[]
29: macNum=0
30: def __init__(self):
31: conf=Configuration()
32: self.maList=conf.getList()
33: self.macNum=conf.getCount()
34: def write(self,src):
35: srcHash=self.hash(src)
36: print "File is",src
37: locatedNum=srcHash%self.macNum
38: print "Location machine number is",locatedNum
39: IPort=self.maList[locatedNum].split(',')
40: toIp=IPort[0]
41: toPort=IPort[1]
42: print "Send to",toIp,toPort
43: self.send(src,toIp,toPort)
44: def hash(self,src):
45: md5=hashlib.md5()
46: md5.update(src)
47: return int(md5.hexdigest()[-4:],16)
48: def send(self,src,ip,port):
49: clsoc=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
50: clsoc.connect((ip,int(port)))
51: fp=open(src,'rb')
52: formStr="WRITFILE,%04d"%len(src)
53: print "formStr",formStr,len(formStr)
54: clsoc.send(formStr)
55: resdata=clsoc.recv(1024)
56: if resdata.startswith('OK'):
57: print "OK"
58: print "sending....",src
59: clsoc.send(src)
60: print "sending data...."
61: while True:
62: data=fp.read(4096)
63: if not data:
64: break
65: while len(data)>0:
66: sent=clsoc.send(data)
67: data=data[sent:]
68: print "Fished to send ",src
69: fp.close()
70: def fetch(self,src):
71: srcHash=self.hash(src)
72: locatedNum=srcHash%self.macNum
73: IPort=self.maList[locatedNum].split(',')
74: toIp=IPort[0]
75: toPort=IPort[1]
76: print "fetch from",toIp,toPort
77: self.get(src,toIp,toPort)
78: def get(self,src,ip,port):
79: clsoc=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
80: clsoc.connect((ip,int(port)))
81: formStr="FETCFILE,%04d"%len(src)
82: print "formStr",formStr
83: clsoc.send(formStr)
84: resdata=clsoc.recv(1024)
85: if resdata.startswith('OK'):
86: print "OK"
87: print "sending....",src
88: clsoc.send(src)
89: print "fetching data...."
90: ffile=src[src.rindex('\\')+1:]
91: fp=open(ffile,'wb')
92: while True:
93: data=clsoc.recv(1024)
94: if not data:break
95: fp.write(data)
96: print "fetching",data[-8:]
97: fp.flush()
98: fp.close()
99: print "finished!",src
100: if __name__=="__main__":
101: mac=Client()
102: src="e:\\pics\\tumblr_m0pvvmcIv41r9mp65o1_500.gif"
103: mac.write(src)
104: mac.fetch(src)