feat: support streamer routes

This commit is contained in:
2025-04-19 13:31:08 +02:00
parent e3643ff330
commit 812562e096

View File

@ -1,7 +1,7 @@
package server package server
import ( import (
"errors" "fmt"
"log" "log"
"net" "net"
@ -9,22 +9,42 @@ import (
) )
type RouteHandler func(req *hsp.Request) *hsp.Response type RouteHandler func(req *hsp.Request) *hsp.Response
type StreamHandler func(req *hsp.Request, stream chan []byte)
type Router struct { type Router struct {
Routes map[string]RouteHandler routes map[string]RouteHandler
streamers map[string]StreamHandler
streamMaxSize uint64
streamBufferSize uint16
} }
func NewRouter() *Router { func NewRouter() *Router {
return &Router{ return &Router{
Routes: make(map[string]RouteHandler), routes: make(map[string]RouteHandler),
streamers: make(map[string]StreamHandler),
} }
} }
func (r *Router) AddRoute(pathname string, handler RouteHandler) { func (r *Router) AddRoute(pathname string, handler RouteHandler) {
if _, ok := r.Routes[pathname]; ok { if _, ok := r.routes[pathname]; ok {
log.Printf("WARN: Rewriting existing route '%s'\n", pathname) log.Printf("WARN: Rewriting existing route '%s'\n", pathname)
} }
r.Routes[pathname] = handler r.routes[pathname] = handler
}
func (r *Router) AddStreamer(pathname string, handler StreamHandler) {
if _, ok := r.streamers[pathname]; ok {
log.Printf("WARN: Rewriting existing streamer '%s'\n", pathname)
}
r.streamers[pathname] = handler
}
func (r *Router) SetStreamMaxSize(size uint64) {
r.streamMaxSize = size
}
func (r *Router) SetStreamBufferSize(size uint16) {
r.streamBufferSize = size
} }
func (r *Router) Handle(conn net.Conn) error { func (r *Router) Handle(conn net.Conn) error {
@ -43,12 +63,70 @@ func (r *Router) Handle(conn net.Conn) error {
if route, ok := packet.Headers["route"]; ok { if route, ok := packet.Headers["route"]; ok {
log.Printf("[ROUTER] New connection to '%s'", route) log.Printf("[ROUTER] New connection to '%s'", route)
if handler, ok := r.Routes[route]; ok {
req := hsp.NewRequest(conn, packet) req := hsp.NewRequest(conn, packet)
switch req.GetRequestKind() {
case "single-hit":
if handler, ok := r.routes[route]; ok {
res := handler(req) res := handler(req)
_, err := dupl.WritePacket(res.ToPacket()) _, err := dupl.WritePacket(res.ToPacket())
return err return err
} }
case "stream":
if handler, ok := r.streamers[route]; ok {
info, err := req.GetStreamInfo()
if err != nil {
_, err = dupl.WritePacket(hsp.NewErrorResponse(err).ToPacket())
return err
} }
return errors.New("Not Found")
streamSize := uint64(min(info.TotalBytes, r.streamMaxSize))
bufferSize := uint16(min(info.BufferSize, r.streamBufferSize))
res := hsp.NewStatusResponse(hsp.STATUS_SUCCESS)
res.AddHeader(hsp.H_XSTREAM, fmt.Sprintf("%d:%d", streamSize, bufferSize))
res.AddHeader(hsp.H_XSTREAM_KEY, "0") // TODO: generate id
_, err = dupl.WritePacket(res.ToPacket())
if err != nil {
return err
}
req := hsp.NewRequest(conn, res.ToPacket())
bc := make(chan []byte)
go func() {
handler(req, bc)
}()
buf := make([]byte, bufferSize)
var totalReceived uint64
totalReceived = 0
for totalReceived < streamSize {
n, err := conn.Read(buf)
if err != nil || n <= 0 {
break
}
if n > 0 {
totalReceived += uint64(n)
}
}
res = hsp.NewStatusResponse(hsp.STATUS_SUCCESS)
res.AddHeader(hsp.H_XSTREAM, fmt.Sprintf("%d:0", streamSize - totalReceived))
res.AddHeader(hsp.H_XSTREAM_KEY, "0") // TODO: generate id
_, err = dupl.WritePacket(res.ToPacket())
conn.Close()
close(bc)
return err
}
default:
return fmt.Errorf("Unsupported request kind: %s", req.GetRequestKind())
}
}
_, err = dupl.WritePacket(hsp.NewStatusResponse(hsp.STATUS_NOTFOUND).ToPacket())
return err
} }