diff --git a/components/engine/engine/engine.go b/components/engine/engine/engine.go index aaf5c1f595..dc1984ccb5 100644 --- a/components/engine/engine/engine.go +++ b/components/engine/engine/engine.go @@ -43,6 +43,7 @@ func unregister(name string) { // containers by executing *jobs*. type Engine struct { handlers map[string]Handler + catchall Handler hack Hack // data for temporary hackery (see hack.go) id string Stdout io.Writer @@ -60,6 +61,10 @@ func (eng *Engine) Register(name string, handler Handler) error { return nil } +func (eng *Engine) RegisterCatchall(catchall Handler) { + eng.catchall = catchall +} + // New initializes a new engine. func New() *Engine { eng := &Engine{ @@ -113,9 +118,13 @@ func (eng *Engine) Job(name string, args ...string) *Job { if eng.Logging { job.Stderr.Add(utils.NopWriteCloser(eng.Stderr)) } - handler, exists := eng.handlers[name] - if exists { - job.handler = handler + if eng.catchall != nil { + job.handler = eng.catchall + } else { + handler, exists := eng.handlers[name] + if exists { + job.handler = handler + } } return job } diff --git a/components/engine/engine/remote.go b/components/engine/engine/remote.go new file mode 100644 index 0000000000..48638e4383 --- /dev/null +++ b/components/engine/engine/remote.go @@ -0,0 +1,120 @@ +package engine + +import ( + "fmt" + "github.com/dotcloud/docker/pkg/beam" + "github.com/dotcloud/docker/pkg/beam/data" + "io" + "os" + "strconv" + "sync" +) + +type Sender struct { + beam.Sender +} + +func NewSender(s beam.Sender) *Sender { + return &Sender{s} +} + +func (s *Sender) Install(eng *Engine) error { + // FIXME: this doesn't exist yet. + eng.RegisterCatchall(s.Handle) + return nil +} + +func (s *Sender) Handle(job *Job) Status { + msg := data.Empty().Set("cmd", append([]string{job.Name}, job.Args...)...) + peer, err := beam.SendConn(s, msg.Bytes()) + if err != nil { + return job.Errorf("beamsend: %v", err) + } + defer peer.Close() + var tasks sync.WaitGroup + defer tasks.Wait() + r := beam.NewRouter(nil) + r.NewRoute().KeyStartsWith("cmd", "log", "stdout").HasAttachment().Handler(func(p []byte, stdout *os.File) error { + tasks.Add(1) + io.Copy(job.Stdout, stdout) + tasks.Done() + return nil + }) + r.NewRoute().KeyStartsWith("cmd", "log", "stderr").HasAttachment().Handler(func(p []byte, stderr *os.File) error { + tasks.Add(1) + io.Copy(job.Stderr, stderr) + tasks.Done() + return nil + }) + r.NewRoute().KeyStartsWith("cmd", "log", "stdin").HasAttachment().Handler(func(p []byte, stdin *os.File) error { + tasks.Add(1) + io.Copy(stdin, job.Stdin) + tasks.Done() + return nil + }) + var status int + r.NewRoute().KeyStartsWith("cmd", "status").Handler(func(p []byte, f *os.File) error { + cmd := data.Message(p).Get("cmd") + if len(cmd) != 3 { + return fmt.Errorf("usage: %s <0-127>", cmd[0]) + } + s, err := strconv.ParseUint(cmd[2], 10, 8) + if err != nil { + return fmt.Errorf("usage: %s <0-127>", cmd[0]) + } + status = int(s) + return nil + + }) + if _, err := beam.Copy(r, peer); err != nil { + return job.Errorf("%v", err) + } + return Status(status) +} + +type Receiver struct { + *Engine + peer beam.Receiver +} + +func NewReceiver(peer beam.Receiver) *Receiver { + return &Receiver{Engine: New(), peer: peer} +} + +func (rcv *Receiver) Run() error { + r := beam.NewRouter(nil) + r.NewRoute().KeyExists("cmd").Handler(func(p []byte, f *os.File) error { + // Use the attachment as a beam return channel + peer, err := beam.FileConn(f) + if err != nil { + f.Close() + return err + } + cmd := data.Message(p).Get("cmd") + job := rcv.Engine.Job(cmd[0], cmd[1:]...) + stdout, err := beam.SendPipe(peer, data.Empty().Set("cmd", "log", "stdout").Bytes()) + if err != nil { + return err + } + job.Stdout.Add(stdout) + stderr, err := beam.SendPipe(peer, data.Empty().Set("cmd", "log", "stderr").Bytes()) + if err != nil { + return err + } + job.Stderr.Add(stderr) + stdin, err := beam.SendPipe(peer, data.Empty().Set("cmd", "log", "stdin").Bytes()) + if err != nil { + return err + } + job.Stdin.Add(stdin) + // ignore error because we pass the raw status + job.Run() + err = peer.Send(data.Empty().Set("cmd", "status", fmt.Sprintf("%d", job.status)).Bytes(), nil) + if err != nil { + return err + } + return nil + }) + _, err := beam.Copy(r, rcv.peer) + return err +} diff --git a/components/engine/engine/remote_test.go b/components/engine/engine/remote_test.go new file mode 100644 index 0000000000..54092ec934 --- /dev/null +++ b/components/engine/engine/remote_test.go @@ -0,0 +1,3 @@ +package engine + +import () diff --git a/components/engine/engine/rengine/main.go b/components/engine/engine/rengine/main.go new file mode 100644 index 0000000000..b4fa01d39c --- /dev/null +++ b/components/engine/engine/rengine/main.go @@ -0,0 +1,43 @@ +package main + +import ( + "fmt" + "github.com/dotcloud/docker/engine" + "github.com/dotcloud/docker/pkg/beam" + "net" + "os" +) + +func main() { + eng := engine.New() + + c, err := net.Dial("unix", "beam.sock") + if err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + return + } + defer c.Close() + f, err := c.(*net.UnixConn).File() + if err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + return + } + + child, err := beam.FileConn(f) + if err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + return + } + defer child.Close() + + sender := engine.NewSender(child) + sender.Install(eng) + + cmd := eng.Job(os.Args[1], os.Args[2:]...) + cmd.Stdout.Add(os.Stdout) + cmd.Stderr.Add(os.Stderr) + if err := cmd.Run(); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } +}