Commit ae6b5149 authored by Eric Dagobert's avatar Eric Dagobert

client/server version

parent f7e35199
......@@ -15,6 +15,7 @@ from tabulate import tabulate
import makeclusters
from distribs import *
import pandas as pd
import io
class Graphs:
......@@ -44,22 +45,23 @@ class Graphs:
for i,z in enumerate(centroids):
plt.scatter(z[0][0],z[0][1],s=50,c=[colormap[i]],alpha=1.0)
plt.title(t)
plt.show()
plt.savefig(t)
plt.close(fig)
plt.title(t)
buf = io.BytesIO()
plt.savefig(buf,format='png')
buf.flush()
plt.savefig('graph.png',format='png')
return buf
def draw_clusters(self, file):
with open(file,mode='rb') as f:
cl = dill.load(f)
self.graph_(cl._embedding, cl._centroids, cl._clabels, 'clusters')
def draw_clusters(self):
cl = self._clusters
return self.graph_(cl._embedding, cl._centroids, cl._clabels, 'clusters')
def draw_regimes(self, file):
with open(file,mode='rb') as f:
r = dill.load(f)
self.graph_(r._clusters._embedding, r._rcentroids, r._regimes, 'regimes')
def draw_regimes(self):
r = self._regimes
return self.graph_(r._clusters._embedding, r._rcentroids, r._regimes, 'regimes')
def loaddist(self,ticker):
def loaddist_(self,ticker):
mfile = self._conf.dataref()
mfields = [ticker]
self._cmap = eval(self._conf.clmap())
......@@ -111,8 +113,10 @@ class Graphs:
c=0
curr_label=k
cx += 1
buf = io.BytesIO()
plt.savefig(buf,format='png')
return buf
def graph_data(self):
reg = self._reg
......@@ -137,26 +141,28 @@ class Graphs:
return xt,Y
def printusage():
print('usage: graphs.py -w <what=[clusters|regimes|ts]> -i <file>')
print('usage: graphs.py -w <what=[clusters|regimes|ticker]> -i <file>')
def atexit():
printusage()
sys.exit(2)
def action( func,arg):
def action(g,func,arg):
if func == 'regimes':
return g.draw_regimes()
elif func == 'clusters':
return g.draw_clusters()
elif func == 'ticker':
#g.loaddist(arg)
xt,yt = g.graph_data()
return g.annotate(xt,yt)
def init_graph():
conf = ConfigReader('regimes.ini')
g = Graphs(conf)
if func == 'draw_regimes':
g.draw_regimes(arg)
elif func == 'draw_clusters':
g.draw_clusters(arg)
elif func == 'draw_ticker':
g.loaddist(arg)
xt,yt = g.graph_data()
g.annotate(xt,yt)
plt.show()
return g
if __name__ == "__main__":
args = sys.argv[1:]
try:
......@@ -178,10 +184,10 @@ if __name__ == "__main__":
ARG = arg
if ARG is None or FUNC is None:
atexit()
action(FUNC,ARG)
plt.show()
except getopt.GetoptError:
atexit()
\ No newline at end of file
import distribs
import makeclusters
import dill
#todo: persistence
import os
class ObjectContainer:
_instance=None
_cache = {}
@classmethod
def getInstance(cls):
if cls._instance is None:
cls._instance = ObjectContainer()
return cls._instance
@classmethod
def get_cache(cls,what):
if what in cls._cache.keys():
return cls._cache[what]
return None
@classmethod
def set_cache(cls,what,v):
cls._cache[what]=v
def __init__(self):
self._container={}
self._depend = {}
self.load()
def get(self,name):
return self._container[name]
def __contains__(self,name):
return name in self._container.keys()
def set(self,name,value):
self._container[name]=value
def save(self):
with open ('objcont.plk','wb') as f:
dill.dump(self._container,f)
def load(self):
if os.path.isfile('objcont.plk'):
with open('objcont.plk','rb') as f:
self._container = dill.load(f)
if __name__ == "__main__":
ob = ObjectContainer.getInstance()
print (ob._container)
\ No newline at end of file
import asyncio
from utils import ProgressBar
import distribs
import makeclusters
import queue
from tcpip import Message
import os
import dill
from objects import ObjectContainer
import graphs
class Process:
def __init__(self,loop):
self._loop = loop
self._queue = asyncio.Queue(loop = self._loop)
self._running = False
self._completed = False
self._progress = None
self.name = 'noname'
async def scan(self,progress):
while progress._iteration < progress._total:
await self._queue.put(Message(Message.STRING,progress._display()))
await asyncio.sleep(1)
async def start(self):
self.run()
class ProcessCovar(Process):
def __init__(self,loop):
Process.__init__(self,loop)
self._running = False
self.name = 'covariances'
async def run(self):
self._running = True
args = distribs.init()
self._progress = args[-1]._pr
D = args[-1]
self._distrib = D
await distribs.run(*args)
self._completed = True
return 'ok'
def interrupt(self):
self._distrib._interrupt.set()
class ProcessClusters(Process):
def __init__(self,loop):
Process.__init__(self,loop)
self._running = False
self.name = 'clusters'
async def run(self):
self._running = True
args = makeclusters.init_clusters()
self._clusters = args
self._progress = args._pr
await makeclusters.run_clusters(args)
self.save()
return 'ok'
def save(self):
makeclusters.save_clusters(self._clusters)
self._completed = True
def interrupt(self):
self._clusters._interrupt.set()
class ProcessRegimes(Process):
def __init__(self,loop):
Process.__init__(self,loop)
self._running = False
self.name = 'regimes'
async def run(self):
self._running = True
args = makeclusters.init_regimes()
self._regimes = args
self._progress = None
if args is None:
return
ret = await makeclusters.run_regimes(args)
self.save()
return ret
def save(self):
ObjectContainer.getInstance().set('regimes',self._regimes)
ObjectContainer.getInstance().save()
self._completed = True
def interrupt(self):
pass
class ProcessFile(Process):
def __init__(self,loop,args):
Process.__init__(self,loop)
self._running = False
self._fname = args[0]
self._data = args[1]
self.name = 'send ' + self._fname
async def run(self):
self._running = True
with open(self._fname,'wb') as f:
f.write(self._data)
f.flush()
self._completed = True
return 'ok'
def interrupt(self):
pass
class ProcessGraphs(Process):
def __init__(self,loop,args):
Process.__init__(self,loop)
self._running = False
self._func = args[0]
self._args = None
if len(args) > 1:
self._args = args[1]
self.name = 'graph ' + self._func
async def run(self):
self._running = True
g = graphs.init_graph()
#todo reclalc if not there
g._clusters = ObjectContainer.getInstance().get('clusters')
g._regimes = ObjectContainer.getInstance().get('regimes')
buf = graphs.action(g, self._func, self._args)
print ('size', len(buf.getvalue()))
self._completed = True
return buf
def interrupt(self):
pass
if __name__=="__main__":
loop = asyncio.get_event_loop()
p = ProcessCovar(loop)
p._running=True
p._loop.run_until_complete(p.run())
\ No newline at end of file
#!/usr/bin/python3 -O
import asyncio
from tcpip import Message
import pickle
import sys,getopt
import io
from PIL import Image
import matplotlib.pyplot as plt
from config_manager import ConfigReader
class Client:
def __init__(self,i_o, callback):
self._conf = ConfigReader('regimes.ini')
self._server = self._conf.server()
self._port = self._conf.port()
self._loop = asyncio.get_event_loop()
self._callback = callback
self._poll = True
self._loop = asyncio.get_event_loop()
self._processes = set()
self._connections = {}
self._messages = asyncio.Queue(loop=self._loop)
self._i_o = i_o
self._th1 = asyncio.ensure_future(self.poll())
if self._i_o == 'i':
print('starting input thread')
self._th2 = asyncio.ensure_future(self.scan_input())
#self._th3 = asyncio.ensure_future(self.connect())
self._server = self._loop.run_until_complete(asyncio.gather(self._th1))
async def connect(self):
if self._i_o == 'i':
self._inreader = asyncio.StreamReader(loop=self._loop)
self._reader_protocol = asyncio.StreamReaderProtocol(self._inreader)
await self._loop.connect_read_pipe(lambda: self._reader_protocol, sys.stdin)
self._reader, self._writer = await asyncio.open_connection(self._server,self._port,loop=self._loop)
self._writer.transport.set_write_buffer_limits(0)
async def poll(self):
await self.connect()
while self._poll:
header = await self._reader.read(4)
size = int.from_bytes(header,byteorder='big')
s = 0
data = b''
while s < size:
toread = min(size - len(data),2**8)
chunk = await self._reader.read(toread)
s += len(chunk)
data += chunk
if data == '\n' or self._reader._eof:
return
else:
M = pickle.loads(data)
self._callback(self,M)
def run(self):
self._loop.run_forever()
self._loop.close()
def send(self, data):
self._writer.write(data)
self._writer.drain()
async def scan_input(self):
while True:
await asyncio.sleep(0)
try:
print ('>')
i = await self._inreader.readline()
self.input(i.decode())
except:
return
def input(self, line):
cmd = line.strip().split(' ')
if (cmd[0] == 'run'):
self.process_run_cmd(cmd[1])
elif (cmd[0] == 'send'):
self.process_send_cmd(cmd[1])
elif (cmd[0] == 'killall'):
self.process_killall()
elif (cmd[0] == 'graph'):
self.process_graph_cmd(cmd[1:])
elif (cmd[0] == 'quit'):
exit(0)
else :
print('unknown command:',cmd[0])
def process_run_cmd(self, cmd):
if cmd == "covariances":
m = Message(Message.RUN_COVARS,'')
self.send(m.tos())
elif cmd == "clusters":
m = Message(Message.RUN_CLUSTERS,'')
self.send(m.tos())
elif cmd == "regimes":
m = Message(Message.RUN_REGIMES,'')
self.send(m.tos())
else :
print ('unknown argument:',cmd)
def process_graph_cmd(self, args):
m = Message(Message.GRAPH,args)
self.send(m.tos())
def process_send_cmd(self, cmd):
try:
with open(cmd,'rb') as f:
data = f.read()
m = Message(Message.FILE, (cmd,data))
self.send(m.tos())
except Exception as e:
print (e)
def process_killall(self):
m = Message(Message.KILL_ALL,'')
self.send(m.tos())
def cb(c,M):
if c._i_o == 'o':
if M.cmd == Message.PROGRESS:
print(M.args,end='\r')
elif M.cmd == Message.STRING:
print('message from server:')
print (M.args)
elif M.cmd == Message.GRAPH:
im = Image.open(M.args)
im.show()
if __name__=="__main__":
args = sys.argv[1:]
try:
opts, args = getopt.getopt(args, 'i')
io = 'o'
for opt, arg in opts:
print (opt)
if opt == '-i':
io='i'
c = Client(io, cb)
c.run()
except Exception as e :
print (e)
#!/usr/bin/python3 -O
import asyncio
import tcpip
from tcpip import Message
import pickle
from config_manager import ConfigReader
from process import ProcessCovar
from process import ProcessClusters
from process import ProcessRegimes
from process import ProcessFile
from process import ProcessGraphs
from objects import ObjectContainer
import atexit
class Server:
def __init__(self):
ObjectContainer.getInstance().load()
self._conf = ConfigReader('regimes.ini')
self._server = self._conf.server()
self._port = self._conf.port()
self._loop = asyncio.get_event_loop()
self._processes = set()
self._connections = {}
self._messages = asyncio.Queue(loop=self._loop)
self._delconnections = asyncio.Queue(loop=self._loop)
self._core = asyncio.start_server(self.handle_connect, self._server, self._port, loop=self._loop)
self._th1 = asyncio.ensure_future(self.processing_thread())
self._th2 = asyncio.ensure_future(self.scan_thread())
self._th3 = asyncio.ensure_future(self.out_thread())
self._run = True
def run(self):
self._server = self._loop.run_until_complete(asyncio.gather(self._core,self._th1,self._th2,self._th3))
self._loop.run_forever()
async def out_thread(self):
while self._run:
await asyncio.sleep(0)
msg = await self._messages.get()
data = msg.tos()
for a,(r,w) in self._connections.items():
if w._transport.is_closing():
print ('disconnected')
self._delconnections.put_nowait(a)
else:
try:
size = len(data).to_bytes(4,byteorder='big')
w.write(size)
w.write(data)
#w.write(Message._SEP)
await w.drain()
await w.drain()
await asyncio.sleep(0)
except:
self._delconnections.put_nowait(a)
w.close()
print ('error connection:',a)
await asyncio.sleep(0)
while (not self._delconnections.empty()):
self._connections.pop(self._delconnections.get_nowait())
def close(self):
# Close the server
self._server.close()
self._loop.run_until_complete(server.wait_closed())
self._loop.close()
def kill_all(self):
for p in self._processes :
if (p._running):
p.interrupt()
m = Message(Message.STRING,'all tasks interrupted')
self._messages.put(m)
async def scan_thread(self):
while self._run:
await asyncio.sleep(1)
todel = []
for p in self._processes :
if p._completed:
if p._progress:
p._progress._iteration = p._progress._total
m = Message(Message.PROGRESS,p._progress._display())
await self._messages.put(m)
m = Message(Message.STRING,p.name + ' done')
await self._messages.put(m)
todel.append(p)
if (p._running):
if p._progress:
m = Message(Message.PROGRESS,p._progress._display())
await self._messages.put(m)
else:
m = Message(Message.STRING,p.name + ': in progress')
while todel:
self._processes.remove(todel.pop())
async def processing_thread(self):
while self._run:
for p in self._processes:
if not p._running and not p._completed:
r = await p.run()
print (p.name,'finished')
if not r:
m = Message(Message.STRING,p.name + ': cannot be started')
await self._messages.put(m)
elif r != 'ok':
if type(p) is ProcessRegimes:
m = Message(Message.STRING,r)
await self._messages.put(m)
elif type(p) is ProcessGraphs:
m = Message(Message.GRAPH,r)
await self._messages.put(m)
break
await asyncio.sleep(1)
async def handle_connect(self, reader, writer):
addr = writer.get_extra_info('peername')
#bug python 3.6
writer.transport.set_write_buffer_limits(0)
self._connections[addr] = (reader,writer)
print ('connection from ', addr)
if reader.exception() :
print (reader.exception())
while self._run:
try:
data = await reader.read(3096)
if len(data)==0:
return
else:
await self.process(data)
await asyncio.sleep(0)
except:
await asyncio.sleep(1)
continue
async def process(self, data):
if len(data) == 0:
return
message = pickle.loads(data)
if message.cmd == Message.STRING:
print ('processing message STRING')
rep = self.rString(*message.args)
elif message.cmd == Message.RUN_COVARS:
print ('processing message RUN_COVARS')
process = ProcessCovar(self._loop)
self._processes.add(process)
elif message.cmd == Message.KILL_ALL:
print ('processing message KILL_ALL')
self.kill_all()
elif message.cmd == Message.RUN_CLUSTERS:
print ('processing message RUN_CLUSTERS')
process = ProcessClusters(self._loop)
self._processes.add(process)
elif message.cmd == Message.RUN_REGIMES:
print ('processing message RUN_REGIMES')
process = ProcessRegimes(self._loop)
self._processes.add(process)
elif message.cmd == Message.FILE:
print ('processing message FILE')
process = ProcessFile(self._loop,message.args)
self._processes.add(process)
elif message.cmd == Message.GRAPH:
print ('processing message GRAPH')
process = ProcessGraphs(self._loop,message.args)
self._processes.add(process)
def clean_exit():
print ('saving ...')
ObjectContainer.getInstance().save()
if __name__ == "__main__":
atexit.register(clean_exit)
s = Server()
try:
s.run()
s.close()
except Exception as e:
ObjectContainer.getInstance().save()
print('exception ',e)
\ No newline at end of file
import pickle
class Message:
RUN_COVARS = 0
RUN_CLUSTERS = 1
STRING=2
PROGRESS=3
DONE = 4
KILL_ALL = 5
RUN_REGIMES = 6
FILE=7
GRAPH=8
_SEP = '$$'.encode()
def __init__(self, what,data):
self.cmd = what
self.args = data
def tos(self):
s = pickle.dumps(self)
return s
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment