133
pkg/logs/logs.go
Normal file
133
pkg/logs/logs.go
Normal file
@ -0,0 +1,133 @@
|
||||
package logs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"os/signal"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
"github.com/docker/docker/api/types"
|
||||
containerTypes "github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
"github.com/docker/docker/api/types/swarm"
|
||||
dockerClient "github.com/docker/docker/client"
|
||||
)
|
||||
|
||||
type TailOpts struct {
|
||||
AppName string
|
||||
Services []string
|
||||
StdErr bool
|
||||
Since string
|
||||
Buffer *bytes.Buffer
|
||||
ToBuffer bool
|
||||
Filters filters.Args
|
||||
}
|
||||
|
||||
// TailLogs gathers logs for the given app with optional service names to be
|
||||
// filtered on. These logs can be printed to os.Stdout or gathered to a buffer.
|
||||
func TailLogs(
|
||||
cl *dockerClient.Client,
|
||||
opts TailOpts,
|
||||
) error {
|
||||
sigintChannel := make(chan os.Signal, 1)
|
||||
signal.Notify(sigintChannel, os.Interrupt)
|
||||
defer signal.Stop(sigintChannel)
|
||||
|
||||
f := filters.NewArgs()
|
||||
for _, name := range opts.Services {
|
||||
f.Add("name", name)
|
||||
}
|
||||
|
||||
services, err := cl.ServiceList(
|
||||
context.Background(),
|
||||
types.ServiceListOptions{Filters: f},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
waitCh := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
var wg sync.WaitGroup
|
||||
for _, service := range services {
|
||||
f := filters.NewArgs()
|
||||
f.Add("name", service.Spec.Name)
|
||||
|
||||
tasks, err := cl.TaskList(context.Background(), types.TaskListOptions{Filters: f})
|
||||
if err != nil {
|
||||
// TODO
|
||||
// return nil
|
||||
}
|
||||
|
||||
if len(tasks) > 0 {
|
||||
// Need to sort the tasks by the CreatedAt field in the inverse order.
|
||||
// Otherwise they are in the reversed order and not sorted properly.
|
||||
slices.SortFunc[[]swarm.Task](tasks, func(t1, t2 swarm.Task) int {
|
||||
return int(t2.Meta.CreatedAt.Unix() - t1.Meta.CreatedAt.Unix())
|
||||
})
|
||||
lastTask := tasks[0].Status
|
||||
if lastTask.State != swarm.TaskStateRunning {
|
||||
// for _, task := range tasks {
|
||||
// TODO
|
||||
// return fmt.Errorf("[%s] %s state %s: %s", service.Spec.Name, task.Meta.CreatedAt.Format(time.RFC3339), task.Status.State, task.Status.Err)
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
// Collect the logs in a go routine, so the logs from all services are
|
||||
// collected in parallel.
|
||||
wg.Add(1)
|
||||
go func(serviceID string) {
|
||||
logs, err := cl.ServiceLogs(context.Background(), serviceID, containerTypes.LogsOptions{
|
||||
ShowStderr: true,
|
||||
ShowStdout: !opts.StdErr,
|
||||
Since: opts.Since,
|
||||
Until: "",
|
||||
Timestamps: true,
|
||||
Follow: true,
|
||||
Tail: "50", // TODO: if buf, bump it up!
|
||||
Details: false, // TODO: if buf, yes!
|
||||
})
|
||||
if err != nil {
|
||||
// TODO
|
||||
// log.Debugf("unable to poll logs: %s", err)
|
||||
}
|
||||
defer logs.Close()
|
||||
|
||||
if opts.ToBuffer {
|
||||
_, err = io.Copy(opts.Buffer, logs)
|
||||
if err != nil && err != io.EOF {
|
||||
// TODO
|
||||
// log.Debugf("unable to copy buffer: %s", err)
|
||||
}
|
||||
logs.Close()
|
||||
return
|
||||
}
|
||||
|
||||
_, err = io.Copy(os.Stdout, logs)
|
||||
if err != nil && err != io.EOF {
|
||||
// TODO
|
||||
// log.Debugf("unable to copy buffer: %s", err)
|
||||
}
|
||||
logs.Close()
|
||||
|
||||
}(service.ID)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(waitCh)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-waitCh:
|
||||
return nil
|
||||
case <-sigintChannel:
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
Reference in New Issue
Block a user