feat: remove stream support
This commit is contained in:
@ -83,7 +83,7 @@ func StartServer(addr *hsp.Adddress) {
|
|||||||
|
|
||||||
router := server.NewRouter()
|
router := server.NewRouter()
|
||||||
|
|
||||||
router.AddRoute("/", Index)
|
router.AddRoute("*", Index)
|
||||||
|
|
||||||
srv.SetListener(handler)
|
srv.SetListener(handler)
|
||||||
|
|
||||||
@ -116,7 +116,10 @@ func StartServer(addr *hsp.Adddress) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func StartSession(addr *hsp.Adddress) {
|
func StartSession(addr *hsp.Adddress) {
|
||||||
fmt.Println("Starting session on", addr.String())
|
url := addr.String() + addr.Route
|
||||||
|
fmt.Println("Starting session on", url)
|
||||||
|
|
||||||
|
c := client.NewClient()
|
||||||
|
|
||||||
rl, err := readline.New("> ")
|
rl, err := readline.New("> ")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -125,15 +128,13 @@ func StartSession(addr *hsp.Adddress) {
|
|||||||
|
|
||||||
defer rl.Close()
|
defer rl.Close()
|
||||||
|
|
||||||
c := client.NewClient()
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
line, err := rl.Readline()
|
line, err := rl.Readline()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
c.SendText(addr.String(), line)
|
c.SendText(url, line)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -143,23 +144,33 @@ func main() {
|
|||||||
|
|
||||||
var host string
|
var host string
|
||||||
var service string
|
var service string
|
||||||
|
var address string
|
||||||
|
|
||||||
flag.StringVar(&host, "host", "localhost", "specify target host (default: localhost)")
|
flag.StringVar(&host, "host", "localhost", "specify server host (default: localhost)")
|
||||||
flag.StringVar(&service, "port", "998", "specify target port (default: 998)")
|
flag.StringVar(&service, "port", "998", "specify server port (default: 998)")
|
||||||
|
flag.StringVar(&address, "addr", "localhost:998", "specify target address (default: :998)")
|
||||||
|
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
a := fmt.Sprintf("%s:%s", host, service)
|
|
||||||
addr, err := hsp.ParseAddress(a)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("ERR: Invalid address %s: %v\n", a, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if listening {
|
if listening {
|
||||||
|
a := fmt.Sprintf("%s:%s", host, service)
|
||||||
|
addr, err := hsp.ParseAddress(a)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("ERR: Invalid address %s: %v\n", a, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
StartServer(addr)
|
StartServer(addr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
addr, err := hsp.ParseAddress(address)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("ERR: Invalid address %s: %v\n", address, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
StartSession(addr)
|
StartSession(addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -31,6 +31,7 @@ const (
|
|||||||
STATUS_SUCCESS = 0
|
STATUS_SUCCESS = 0
|
||||||
STATUS_NOTFOUND = 69
|
STATUS_NOTFOUND = 69
|
||||||
STATUS_INTERNALERR = 129
|
STATUS_INTERNALERR = 129
|
||||||
|
STATUS_RECEIVED = 1
|
||||||
)
|
)
|
||||||
|
|
||||||
var DATA_FORMATS map[string]string = map[string]string{
|
var DATA_FORMATS map[string]string = map[string]string{
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
|
|
||||||
@ -9,19 +8,14 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type RouteHandler func(req *hsp.Request) *hsp.Response
|
type RouteHandler func(req *hsp.Request) *hsp.Response
|
||||||
type StreamHandler func(req *hsp.Request, stream chan []byte)
|
|
||||||
|
|
||||||
type Router struct {
|
type Router struct {
|
||||||
routes map[string]RouteHandler
|
routes map[string]RouteHandler
|
||||||
streamers map[string]StreamHandler
|
|
||||||
streamMaxSize uint64
|
|
||||||
streamBufferSize uint16
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRouter() *Router {
|
func NewRouter() *Router {
|
||||||
return &Router{
|
return &Router{
|
||||||
routes: make(map[string]RouteHandler),
|
routes: make(map[string]RouteHandler),
|
||||||
streamers: make(map[string]StreamHandler),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -32,21 +26,6 @@ func (r *Router) AddRoute(pathname string, handler RouteHandler) {
|
|||||||
r.routes[pathname] = handler
|
r.routes[pathname] = handler
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Router) AddStreamer(pathname string, handler StreamHandler) {
|
|
||||||
if _, ok := r.streamers[pathname]; ok {
|
|
||||||
log.Printf("WARN: Rewriting existing streamer '%s'\n", pathname)
|
|
||||||
}
|
|
||||||
r.streamers[pathname] = handler
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Router) SetStreamMaxSize(size uint64) {
|
|
||||||
r.streamMaxSize = size
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Router) SetStreamBufferSize(size uint16) {
|
|
||||||
r.streamBufferSize = size
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Router) Handle(conn net.Conn) error {
|
func (r *Router) Handle(conn net.Conn) error {
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
|
|
||||||
@ -62,69 +41,14 @@ func (r *Router) Handle(conn net.Conn) error {
|
|||||||
if route, ok := packet.Headers["route"]; ok {
|
if route, ok := packet.Headers["route"]; ok {
|
||||||
req := hsp.NewRequest(conn, packet)
|
req := hsp.NewRequest(conn, packet)
|
||||||
|
|
||||||
switch req.GetRequestKind() {
|
if handler, ok := r.routes[route]; ok {
|
||||||
case "single-hit":
|
res := handler(req)
|
||||||
if handler, ok := r.routes[route]; ok {
|
_, err := dupl.WritePacket(res.ToPacket())
|
||||||
res := handler(req)
|
return err
|
||||||
_, err := dupl.WritePacket(res.ToPacket())
|
} else if fallback, ok := r.routes["*"]; ok {
|
||||||
return err
|
res := fallback(req)
|
||||||
} else if fallback, ok := r.routes["*"]; ok {
|
_, err := dupl.WritePacket(res.ToPacket())
|
||||||
res := fallback(req)
|
return err
|
||||||
_, err := dupl.WritePacket(res.ToPacket())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
case "stream":
|
|
||||||
if handler, ok := r.streamers[route]; ok {
|
|
||||||
info, err := req.GetStreamInfo()
|
|
||||||
if err != nil {
|
|
||||||
_, err = dupl.WritePacket(hsp.NewErrorResponse(err).ToPacket())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
streamSize := uint64(min(info.TotalBytes, r.streamMaxSize))
|
|
||||||
bufferSize := uint16(min(info.BufferSize, r.streamBufferSize))
|
|
||||||
|
|
||||||
res := hsp.NewStatusResponse(hsp.STATUS_SUCCESS)
|
|
||||||
res.AddHeader(hsp.H_XSTREAM, fmt.Sprintf("%d:%d", streamSize, bufferSize))
|
|
||||||
res.AddHeader(hsp.H_XSTREAM_KEY, "0") // TODO: generate id
|
|
||||||
|
|
||||||
_, err = dupl.WritePacket(res.ToPacket())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
req := hsp.NewRequest(conn, res.ToPacket())
|
|
||||||
bc := make(chan []byte)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
handler(req, bc)
|
|
||||||
}()
|
|
||||||
|
|
||||||
buf := make([]byte, bufferSize)
|
|
||||||
var totalReceived uint64
|
|
||||||
totalReceived = 0
|
|
||||||
for totalReceived < streamSize {
|
|
||||||
n, err := conn.Read(buf)
|
|
||||||
if err != nil || n <= 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if n > 0 {
|
|
||||||
totalReceived += uint64(n)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
res = hsp.NewStatusResponse(hsp.STATUS_SUCCESS)
|
|
||||||
res.AddHeader(hsp.H_XSTREAM, fmt.Sprintf("%d:0", streamSize - totalReceived))
|
|
||||||
res.AddHeader(hsp.H_XSTREAM_KEY, "0") // TODO: generate id
|
|
||||||
_, err = dupl.WritePacket(res.ToPacket())
|
|
||||||
|
|
||||||
conn.Close()
|
|
||||||
close(bc)
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("Unsupported request kind: %s", req.GetRequestKind())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user