diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index a2d85f5..6e4a1c3 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -28,12 +28,28 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v5 - name: Setup Go uses: actions/setup-go@v5 with: - go-version: "~1.22" + go-version: "~1.24" - name: Test run: go test ./... -race + + lint: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v5 + + - name: Setup Go + uses: actions/setup-go@v5 + with: + go-version: "~1.24" + + - name: golangci-lint + uses: golangci/golangci-lint-action@v7 + with: + version: v2.1.6 diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml new file mode 100644 index 0000000..b48f889 --- /dev/null +++ b/.github/workflows/integration.yaml @@ -0,0 +1,46 @@ +name: integration + +on: + push: + branches: + - main + pull_request: + +permissions: + actions: none + checks: none + contents: read + deployments: none + issues: none + packages: none + pull-requests: none + repository-projects: none + security-events: none + statuses: none + +# Cancel in-progress runs for pull requests when developers push +# additional changes +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: ${{ github.event_name == 'pull_request' }} + +jobs: + integration-test: + runs-on: ubuntu-latest + timeout-minutes: 10 + steps: + - name: Checkout + uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 + + - name: Setup Go + uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c # v6.1.0 + with: + go-version: "~1.24" + + - name: Create KinD cluster + uses: helm/kind-action@92086f6be054225fa813e0a4b13787fc9088faab # v1.13.0 + with: + cluster_name: integration-test + + - name: Run integration tests + run: go test -tags=integration -v -timeout=8m ./... diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 52e11b4..c943ad6 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -20,7 +20,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v5 - name: Setup Go uses: actions/setup-go@v5 diff --git a/.gitignore b/.gitignore index 5278fea..bda1548 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ +coder-logstream-kube coder-logstream-kube-* -build \ No newline at end of file +build/ diff --git a/.golangci.yaml b/.golangci.yaml new file mode 100644 index 0000000..bec718e --- /dev/null +++ b/.golangci.yaml @@ -0,0 +1,29 @@ +# See https://golangci-lint.run/usage/configuration/ +version: "2" + +linters: + enable: + - govet + - errcheck + - staticcheck + - unused + - ineffassign + - misspell + - revive + settings: + govet: + enable-all: true + disable: + - fieldalignment + - shadow + misspell: + locale: US + revive: + rules: + - name: exported + arguments: + - "checkPrivateReceivers" + +formatters: + enable: + - goimports diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..0f04106 --- /dev/null +++ b/Makefile @@ -0,0 +1,87 @@ +# Colors for output +GREEN := $(shell printf '\033[32m') +RESET := $(shell printf '\033[0m') +BOLD := $(shell printf '\033[1m') + +# Shell source files - use shfmt to find them (respects .editorconfig) +SHELL_SRC_FILES := $(shell shfmt -f .) + +.PHONY: all +all: build + +.PHONY: build +build: + go build ./... + +.PHONY: test +test: + go test ./... -race + +.PHONY: test/integration +test/integration: + go test -tags=integration -v -timeout=8m ./... + +.PHONY: lint +lint: lint/go lint/shellcheck + +.PHONY: lint/go +lint/go: + golangci-lint run --timeout=5m + +.PHONY: lint/shellcheck +lint/shellcheck: $(SHELL_SRC_FILES) + echo "--- shellcheck" + shellcheck --external-sources $(SHELL_SRC_FILES) + +.PHONY: fmt +fmt: fmt/go fmt/shfmt + +.PHONY: fmt/go +fmt/go: + go fmt ./... + +.PHONY: fmt/shfmt +fmt/shfmt: $(SHELL_SRC_FILES) +ifdef FILE + # Format single shell script + if [[ -f "$(FILE)" ]] && [[ "$(FILE)" == *.sh ]]; then \ + echo "$(GREEN)==>$(RESET) $(BOLD)fmt/shfmt$(RESET) $(FILE)"; \ + shfmt -w "$(FILE)"; \ + fi +else + echo "$(GREEN)==>$(RESET) $(BOLD)fmt/shfmt$(RESET)" +# Only do diff check in CI, errors on diff. +ifdef CI + shfmt -d $(SHELL_SRC_FILES) +else + shfmt -w $(SHELL_SRC_FILES) +endif +endif + +.PHONY: clean +clean: + rm -f coder-logstream-kube + +.PHONY: kind/create +kind/create: + ./scripts/kind-setup.sh create + +.PHONY: kind/delete +kind/delete: + ./scripts/kind-setup.sh delete + +.PHONY: help +help: + @echo "Available targets:" + @echo " build - Build the project" + @echo " test - Run unit tests" + @echo " test/integration - Run integration tests (requires KinD cluster)" + @echo " lint - Run all linters" + @echo " lint/go - Run golangci-lint" + @echo " lint/shellcheck - Run shellcheck on shell scripts" + @echo " fmt - Format all code" + @echo " fmt/go - Format Go code" + @echo " fmt/shfmt - Format shell scripts" + @echo " kind/create - Create KinD cluster for integration tests" + @echo " kind/delete - Delete KinD cluster" + @echo " clean - Remove build artifacts" diff --git a/README.md b/README.md index bf55d8d..6a4f300 100644 --- a/README.md +++ b/README.md @@ -64,3 +64,42 @@ Kubernetes provides an [informers](https://pkg.go.dev/k8s.io/client-go/informers - [`SSL_CERT_FILE`](https://go.dev/src/crypto/x509/root_unix.go#L19): Specifies the path to an SSL certificate. - [`SSL_CERT_DIR`](https://go.dev/src/crypto/x509/root_unix.go#L25): Identifies which directory to check for SSL certificate files. + +## Development + +### Running Tests + +Unit tests can be run with: + +```console +go test ./... -race +``` + +### Integration Tests + +Integration tests run against a real Kubernetes cluster using [KinD (Kubernetes in Docker)](https://kind.sigs.k8s.io/). + +**Prerequisites:** +- [Docker](https://docs.docker.com/get-docker/) +- [KinD](https://kind.sigs.k8s.io/docs/user/quick-start/#installation) +- [kubectl](https://kubernetes.io/docs/tasks/tools/) + +**Setup and run:** + +```console +# Create a KinD cluster +./scripts/kind-setup.sh create + +# Run integration tests +go test -tags=integration -v ./... + +# Clean up when done +./scripts/kind-setup.sh delete +``` + +The integration tests validate: +- Pod event streaming with real Kubernetes informers +- ReplicaSet event handling +- Multi-namespace support +- Label selector filtering + diff --git a/helm/templates/service.yaml b/helm/templates/service.yaml index c89a98a..9aee16c 100644 --- a/helm/templates/service.yaml +++ b/helm/templates/service.yaml @@ -98,6 +98,10 @@ spec: nodeSelector: {{- toYaml . | nindent 8 }} {{- end }} + {{- with .Values.podSecurityContext }} + podSecurityContext: + {{- toYaml . | nindent 8 }} + {{- end }} containers: - name: coder-logstream-kube image: "{{ .Values.image.repo }}:{{ .Values.image.tag | default .Chart.AppVersion }}" diff --git a/helm/values.yaml b/helm/values.yaml index 5a6d1b6..daa847f 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -92,8 +92,11 @@ labels: {} # securityContext -- Container-level security context # See: https://kubernetes.io/docs/tasks/configure-pod-container/security-context/ -securityContext: {} - # allowPrivilegeEscalation: false +securityContext: + runAsNonRoot: true + runAsUser: 65532 + runAsGroup: 65532 + allowPrivilegeEscalation: false # capabilities: # drop: # - ALL @@ -101,3 +104,8 @@ securityContext: {} # runAsNonRoot: true # seccompProfile: # type: RuntimeDefault + +podSecurityContext: {} +# Optional, only if your cluster requires group ownership for mounted volumes: +# podSecurityContext: +# fsGroup: 65532 \ No newline at end of file diff --git a/integration_test.go b/integration_test.go new file mode 100644 index 0000000..0e14fba --- /dev/null +++ b/integration_test.go @@ -0,0 +1,514 @@ +//go:build integration + +package main + +import ( + "context" + "fmt" + "net/url" + "os" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + + "cdr.dev/slog" + "cdr.dev/slog/sloggers/slogtest" +) + +// getKubeClient creates a Kubernetes client from the default kubeconfig. +// It will use KUBECONFIG env var if set, otherwise ~/.kube/config. +// It also verifies the cluster is a KinD cluster to prevent accidentally +// running tests against production clusters. +func getKubeClient(t *testing.T) kubernetes.Interface { + t.Helper() + + kubeconfig := os.Getenv("KUBECONFIG") + if kubeconfig == "" { + home, err := os.UserHomeDir() + require.NoError(t, err, "failed to get user home dir") + kubeconfig = home + "/.kube/config" + } + + config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + require.NoError(t, err, "failed to build kubeconfig") + + // Safety check: ensure we're connecting to a KinD cluster. + // KinD clusters run on localhost or have "kind" in the host. + // This prevents accidentally running destructive tests against production clusters. + if config.Host != "" && os.Getenv("INTEGRATION_TEST_UNSAFE") != "1" { + isKind := strings.Contains(config.Host, "127.0.0.1") || + strings.Contains(config.Host, "localhost") || + strings.Contains(strings.ToLower(config.Host), "kind") + if !isKind { + t.Fatalf("Safety check failed: integration tests must run against a KinD cluster. "+ + "Current context points to %q. Set KUBECONFIG to a KinD cluster config or "+ + "set INTEGRATION_TEST_UNSAFE=1 to bypass this check.", config.Host) + } + } + + client, err := kubernetes.NewForConfig(config) + require.NoError(t, err, "failed to create kubernetes client") + + return client +} + +// createTestNamespace creates a unique namespace for test isolation. +// It registers cleanup to delete the namespace after the test. +func createTestNamespace(t *testing.T, ctx context.Context, client kubernetes.Interface) string { + t.Helper() + + name := fmt.Sprintf("logstream-test-%d", time.Now().UnixNano()) + + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } + + _, err := client.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{}) + require.NoError(t, err, "failed to create test namespace") + + t.Cleanup(func() { + // Use a fresh context for cleanup in case the test context is cancelled + cleanupCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + err := client.CoreV1().Namespaces().Delete(cleanupCtx, name, metav1.DeleteOptions{}) + if err != nil { + t.Logf("warning: failed to delete test namespace %s: %v", name, err) + } + }) + + return name +} + +// waitForLogContaining waits until a log containing the given substring is received. +// It collects all logs seen and returns them along with whether the target was found. +func waitForLogContaining(t *testing.T, ctx context.Context, api *fakeAgentAPI, timeout time.Duration, substring string) (allLogs []string, found bool) { + t.Helper() + + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + for { + select { + case logs := <-api.logs: + for _, log := range logs { + allLogs = append(allLogs, log.Output) + if strings.Contains(log.Output, substring) { + return allLogs, true + } + } + case <-timeoutCtx.Done(): + return allLogs, false + } + } +} + +// waitForLogSource waits for log source registration with a timeout. +func waitForLogSource(t *testing.T, ctx context.Context, api *fakeAgentAPI, timeout time.Duration) { + t.Helper() + + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + select { + case <-api.logSource: + return + case <-timeoutCtx.Done(): + t.Fatal("timeout waiting for log source registration") + } +} + +func TestIntegration_PodEvents(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + client := getKubeClient(t) + namespace := createTestNamespace(t, ctx, client) + + // Start fake Coder API server + api := newFakeAgentAPI(t) + defer api.server.Close() + + agentURL, err := url.Parse(api.server.URL) + require.NoError(t, err) + + // Create the pod event logger + // Note: We don't set clock, so it uses a real clock for integration tests + reporter, err := newPodEventLogger(ctx, podEventLoggerOptions{ + client: client, + coderURL: agentURL, + namespaces: []string{namespace}, + logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug), + logDebounce: 5 * time.Second, // Use shorter debounce for faster tests + }) + require.NoError(t, err) + defer reporter.Close() + + // Wait a bit for informers to sync + time.Sleep(1 * time.Second) + + // Create a pod with CODER_AGENT_TOKEN + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: namespace, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Image: "busybox:latest", + Command: []string{"sleep", "3600"}, + Env: []corev1.EnvVar{ + { + Name: "CODER_AGENT_TOKEN", + Value: "test-token-integration", + }, + }, + }, + }, + // Use a non-existent node to keep the pod in Pending state + // This avoids needing to actually run the container + NodeSelector: map[string]string{ + "non-existent-label": "non-existent-value", + }, + }, + } + + _, err = client.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{}) + require.NoError(t, err) + + // Wait for log source registration + waitForLogSource(t, ctx, api, 30*time.Second) + + // Wait for the "Created pod" log (may receive other logs first like scheduling warnings) + logs, found := waitForLogContaining(t, ctx, api, 30*time.Second, "Created pod") + require.True(t, found, "expected 'Created pod' log, got: %v", logs) + + // Delete the pod and verify deletion event + err = client.CoreV1().Pods(namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + + // Wait for the "Deleted pod" log + logs, found = waitForLogContaining(t, ctx, api, 30*time.Second, "Deleted pod") + require.True(t, found, "expected 'Deleted pod' log, got: %v", logs) +} + +func TestIntegration_ReplicaSetEvents(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + client := getKubeClient(t) + namespace := createTestNamespace(t, ctx, client) + + // Start fake Coder API server + api := newFakeAgentAPI(t) + defer api.server.Close() + + agentURL, err := url.Parse(api.server.URL) + require.NoError(t, err) + + // Create the pod event logger + // Note: We don't set clock, so it uses a real clock for integration tests + reporter, err := newPodEventLogger(ctx, podEventLoggerOptions{ + client: client, + coderURL: agentURL, + namespaces: []string{namespace}, + logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug), + logDebounce: 5 * time.Second, // Use shorter debounce for faster tests + }) + require.NoError(t, err) + defer reporter.Close() + + // Wait a bit for informers to sync + time.Sleep(1 * time.Second) + + // Create a ReplicaSet with CODER_AGENT_TOKEN + replicas := int32(1) + rs := &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-rs", + Namespace: namespace, + }, + Spec: appsv1.ReplicaSetSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test-rs", + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "test-rs", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Image: "busybox:latest", + Command: []string{"sleep", "3600"}, + Env: []corev1.EnvVar{ + { + Name: "CODER_AGENT_TOKEN", + Value: "test-token-rs-integration", + }, + }, + }, + }, + // Use a non-existent node to keep pods in Pending state + NodeSelector: map[string]string{ + "non-existent-label": "non-existent-value", + }, + }, + }, + }, + } + + _, err = client.AppsV1().ReplicaSets(namespace).Create(ctx, rs, metav1.CreateOptions{}) + require.NoError(t, err) + + // Wait for log source registration + waitForLogSource(t, ctx, api, 30*time.Second) + + // Wait for the "Queued pod from ReplicaSet" log + logs, found := waitForLogContaining(t, ctx, api, 30*time.Second, "Queued pod from ReplicaSet") + require.True(t, found, "expected 'Queued pod from ReplicaSet' log, got: %v", logs) + + // Delete the ReplicaSet + err = client.AppsV1().ReplicaSets(namespace).Delete(ctx, rs.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + + // Wait for the "Deleted ReplicaSet" log + logs, found = waitForLogContaining(t, ctx, api, 30*time.Second, "Deleted ReplicaSet") + require.True(t, found, "expected 'Deleted ReplicaSet' log, got: %v", logs) +} + +func TestIntegration_MultiNamespace(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + client := getKubeClient(t) + + // Create two namespaces + namespace1 := createTestNamespace(t, ctx, client) + namespace2 := createTestNamespace(t, ctx, client) + + // Start fake Coder API server + api := newFakeAgentAPI(t) + defer api.server.Close() + + agentURL, err := url.Parse(api.server.URL) + require.NoError(t, err) + + // Create the pod event logger watching both namespaces + // Note: We don't set clock, so it uses a real clock for integration tests + reporter, err := newPodEventLogger(ctx, podEventLoggerOptions{ + client: client, + coderURL: agentURL, + namespaces: []string{namespace1, namespace2}, + logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug), + logDebounce: 5 * time.Second, // Use shorter debounce for faster tests + }) + require.NoError(t, err) + defer reporter.Close() + + // Wait for informers to sync + time.Sleep(1 * time.Second) + + // Create a pod in namespace1 + pod1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-ns1", + Namespace: namespace1, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Image: "busybox:latest", + Command: []string{"sleep", "3600"}, + Env: []corev1.EnvVar{ + { + Name: "CODER_AGENT_TOKEN", + Value: "test-token-ns1", + }, + }, + }, + }, + NodeSelector: map[string]string{ + "non-existent-label": "non-existent-value", + }, + }, + } + + _, err = client.CoreV1().Pods(namespace1).Create(ctx, pod1, metav1.CreateOptions{}) + require.NoError(t, err) + + // Wait for log source and logs from first pod + waitForLogSource(t, ctx, api, 30*time.Second) + logs, found := waitForLogContaining(t, ctx, api, 30*time.Second, "Created pod") + require.True(t, found, "expected 'Created pod' log for first pod, got: %v", logs) + + // Create a pod in namespace2 + pod2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-ns2", + Namespace: namespace2, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Image: "busybox:latest", + Command: []string{"sleep", "3600"}, + Env: []corev1.EnvVar{ + { + Name: "CODER_AGENT_TOKEN", + Value: "test-token-ns2", + }, + }, + }, + }, + NodeSelector: map[string]string{ + "non-existent-label": "non-existent-value", + }, + }, + } + + _, err = client.CoreV1().Pods(namespace2).Create(ctx, pod2, metav1.CreateOptions{}) + require.NoError(t, err) + + // Wait for log source and logs from second pod + waitForLogSource(t, ctx, api, 30*time.Second) + logs, found = waitForLogContaining(t, ctx, api, 30*time.Second, "Created pod") + require.True(t, found, "expected 'Created pod' log for second pod, got: %v", logs) + + // Both namespaces should have received events + t.Log("Successfully received events from both namespaces") +} + +func TestIntegration_LabelSelector(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + client := getKubeClient(t) + namespace := createTestNamespace(t, ctx, client) + + // Start fake Coder API server + api := newFakeAgentAPI(t) + defer api.server.Close() + + agentURL, err := url.Parse(api.server.URL) + require.NoError(t, err) + + // Create the pod event logger with a label selector + // Note: We don't set clock, so it uses a real clock for integration tests + reporter, err := newPodEventLogger(ctx, podEventLoggerOptions{ + client: client, + coderURL: agentURL, + namespaces: []string{namespace}, + labelSelector: "coder-workspace=true", + logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug), + logDebounce: 5 * time.Second, // Use shorter debounce for faster tests + }) + require.NoError(t, err) + defer reporter.Close() + + // Wait for informers to sync + time.Sleep(1 * time.Second) + + // Create a pod WITHOUT the matching label - should be ignored + podNoLabel := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-no-label", + Namespace: namespace, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Image: "busybox:latest", + Command: []string{"sleep", "3600"}, + Env: []corev1.EnvVar{ + { + Name: "CODER_AGENT_TOKEN", + Value: "test-token-no-label", + }, + }, + }, + }, + NodeSelector: map[string]string{ + "non-existent-label": "non-existent-value", + }, + }, + } + + _, err = client.CoreV1().Pods(namespace).Create(ctx, podNoLabel, metav1.CreateOptions{}) + require.NoError(t, err) + + // Wait a bit to ensure the pod without label is not picked up + time.Sleep(2 * time.Second) + + // Create a pod WITH the matching label - should be tracked + podWithLabel := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-with-label", + Namespace: namespace, + Labels: map[string]string{ + "coder-workspace": "true", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Image: "busybox:latest", + Command: []string{"sleep", "3600"}, + Env: []corev1.EnvVar{ + { + Name: "CODER_AGENT_TOKEN", + Value: "test-token-with-label", + }, + }, + }, + }, + NodeSelector: map[string]string{ + "non-existent-label": "non-existent-value", + }, + }, + } + + _, err = client.CoreV1().Pods(namespace).Create(ctx, podWithLabel, metav1.CreateOptions{}) + require.NoError(t, err) + + // Wait for log source registration - this should only happen for the labeled pod + waitForLogSource(t, ctx, api, 30*time.Second) + + // Wait for logs - look specifically for "Created pod" with the labeled pod name + logs, found := waitForLogContaining(t, ctx, api, 30*time.Second, "Created pod") + require.True(t, found, "expected 'Created pod' log for labeled pod, got: %v", logs) + + // Verify that none of the logs mention the unlabeled pod + for _, log := range logs { + require.NotContains(t, log, "test-pod-no-label", "should not receive logs for unlabeled pod") + } +} diff --git a/logger.go b/logger.go index 2aa4ed3..0e5c29f 100644 --- a/logger.go +++ b/logger.go @@ -1,3 +1,5 @@ +// Package main implements coder-logstream-kube, a Kubernetes controller +// that streams pod logs to the Coder agent API. package main import ( @@ -34,6 +36,8 @@ type podEventLoggerOptions struct { logger slog.Logger logDebounce time.Duration + // maxRetries is the maximum number of retries for a log send failure. + maxRetries int // The following fields are optional! namespaces []string @@ -52,6 +56,10 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve opts.clock = quartz.NewReal() } + if opts.maxRetries == 0 { + opts.maxRetries = 10 + } + logCh := make(chan agentLog, 512) ctx, cancelFunc := context.WithCancel(ctx) reporter := &podEventLogger{ @@ -75,12 +83,15 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve logCache: logCache{ logs: map[string][]agentsdk.Log{}, }, + maxRetries: opts.maxRetries, }, } // If no namespaces are provided, we listen for events in all namespaces. if len(opts.namespaces) == 0 { - reporter.initNamespace("") + if err := reporter.initNamespace(""); err != nil { + return nil, fmt.Errorf("init namespace: %w", err) + } } else { for _, namespace := range opts.namespaces { if err := reporter.initNamespace(namespace); err != nil { @@ -322,6 +333,7 @@ func (p *podEventLogger) sendDelete(token string) { } } +// Close stops the pod event logger and releases all resources. func (p *podEventLogger) Close() error { p.cancelFunc() close(p.stopChan) @@ -407,6 +419,11 @@ type logQueuer struct { loggerTTL time.Duration loggers map[string]agentLoggerLifecycle logCache logCache + + // retries maps agent tokens to their retry state for exponential backoff + retries map[string]*retryState + // maxRetries is the maximum number of retries for a log send failure. + maxRetries int } func (l *logQueuer) work(ctx context.Context) { @@ -427,87 +444,119 @@ func (l *logQueuer) work(ctx context.Context) { } } +func (l *logQueuer) newLogger(ctx context.Context, log agentLog) (agentLoggerLifecycle, error) { + client := agentsdk.New(l.coderURL) + client.SetSessionToken(log.agentToken) + logger := l.logger.With(slog.F("resource_name", log.resourceName)) + client.SDK.SetLogger(logger) + + _, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{ + ID: sourceUUID, + Icon: "/icon/k8s.png", + DisplayName: "Kubernetes", + }) + if err != nil { + // Posting the log source failed, which affects how logs appear. + // We'll retry to ensure the log source is properly registered. + logger.Error(ctx, "post log source", slog.Error(err)) + return agentLoggerLifecycle{}, err + } + + ls := agentsdk.NewLogSender(logger) + sl := ls.GetScriptLogger(sourceUUID) + + gracefulCtx, gracefulCancel := context.WithCancel(context.Background()) + + // connect to Agent v2.0 API, since we don't need features added later. + // This maximizes compatibility. + arpc, err := client.ConnectRPC20(gracefulCtx) + if err != nil { + logger.Error(ctx, "drpc connect", slog.Error(err)) + gracefulCancel() + return agentLoggerLifecycle{}, err + } + go func() { + err := ls.SendLoop(gracefulCtx, arpc) + // if the send loop exits on its own without the context + // canceling, timeout the logger and force it to recreate. + if err != nil && ctx.Err() == nil { + l.loggerTimeout(log.agentToken) + } + }() + + closeTimer := l.clock.AfterFunc(l.loggerTTL, func() { + logger.Info(ctx, "logger timeout firing") + l.loggerTimeout(log.agentToken) + }) + lifecycle := agentLoggerLifecycle{ + scriptLogger: sl, + close: func() { + defer func() { + _ = arpc.DRPCConn().Close() + }() + defer client.SDK.HTTPClient.CloseIdleConnections() + // We could be stopping for reasons other than the timeout. If + // so, stop the timer. + closeTimer.Stop() + defer gracefulCancel() + timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel) + defer timeout.Stop() + logger.Info(ctx, "logger closing") + + if err := sl.Flush(gracefulCtx); err != nil { + // ctx err + logger.Warn(gracefulCtx, "timeout reached while flushing") + return + } + + if err := ls.WaitUntilEmpty(gracefulCtx); err != nil { + // ctx err + logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty") + } + }, + } + lifecycle.closeTimer = closeTimer + return lifecycle, nil +} + func (l *logQueuer) processLog(ctx context.Context, log agentLog) { l.mu.Lock() defer l.mu.Unlock() - queuedLogs := l.logCache.push(log) + + queuedLogs := l.logCache.get(log.agentToken) + if isAgentLogEmpty(log) { + if queuedLogs == nil { + return + } + } else { + queuedLogs = l.logCache.push(log) + } + lgr, ok := l.loggers[log.agentToken] if !ok { - client := agentsdk.New(l.coderURL) - client.SetSessionToken(log.agentToken) - logger := l.logger.With(slog.F("resource_name", log.resourceName)) - client.SDK.SetLogger(logger) - - _, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{ - ID: sourceUUID, - Icon: "/icon/k8s.png", - DisplayName: "Kubernetes", - }) - if err != nil { - // This shouldn't fail sending the log, as it only affects how they - // appear. - logger.Error(ctx, "post log source", slog.Error(err)) + // skip if we're in a retry cooldown window + if rs := l.retries[log.agentToken]; rs != nil && rs.timer != nil { + return } - ls := agentsdk.NewLogSender(logger) - sl := ls.GetScriptLogger(sourceUUID) - - gracefulCtx, gracefulCancel := context.WithCancel(context.Background()) - - // connect to Agent v2.0 API, since we don't need features added later. - // This maximizes compatibility. - arpc, err := client.ConnectRPC20(gracefulCtx) + var err error + lgr, err = l.newLogger(ctx, log) if err != nil { - logger.Error(ctx, "drpc connect", slog.Error(err)) - gracefulCancel() + l.scheduleRetry(ctx, log.agentToken) return } - go func() { - err := ls.SendLoop(gracefulCtx, arpc) - // if the send loop exits on its own without the context - // canceling, timeout the logger and force it to recreate. - if err != nil && ctx.Err() == nil { - l.loggerTimeout(log.agentToken) - } - }() - - closeTimer := l.clock.AfterFunc(l.loggerTTL, func() { - logger.Info(ctx, "logger timeout firing") - l.loggerTimeout(log.agentToken) - }) - lifecycle := agentLoggerLifecycle{ - scriptLogger: sl, - close: func() { - // We could be stopping for reasons other than the timeout. If - // so, stop the timer. - closeTimer.Stop() - defer gracefulCancel() - timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel) - defer timeout.Stop() - logger.Info(ctx, "logger closing") - - if err := sl.Flush(gracefulCtx); err != nil { - // ctx err - logger.Warn(gracefulCtx, "timeout reached while flushing") - return - } - - if err := ls.WaitUntilEmpty(gracefulCtx); err != nil { - // ctx err - logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty") - } - - _ = arpc.DRPCConn().Close() - client.SDK.HTTPClient.CloseIdleConnections() - }, - } - lifecycle.closeTimer = closeTimer - l.loggers[log.agentToken] = lifecycle - lgr = lifecycle + l.loggers[log.agentToken] = lgr } lgr.resetCloseTimer(l.loggerTTL) - _ = lgr.scriptLogger.Send(ctx, queuedLogs...) + if len(queuedLogs) == 0 { + return + } + if err := lgr.scriptLogger.Send(ctx, queuedLogs...); err != nil { + l.scheduleRetry(ctx, log.agentToken) + return + } + l.clearRetryLocked(log.agentToken) l.logCache.delete(log.agentToken) } @@ -516,8 +565,9 @@ func (l *logQueuer) processDelete(log agentLog) { lgr, ok := l.loggers[log.agentToken] if ok { delete(l.loggers, log.agentToken) - } + l.clearRetryLocked(log.agentToken) + l.logCache.delete(log.agentToken) l.mu.Unlock() if ok { @@ -549,6 +599,81 @@ func (l *agentLoggerLifecycle) resetCloseTimer(ttl time.Duration) { } } +// retryState tracks exponential backoff for an agent token. +type retryState struct { + delay time.Duration + timer *quartz.Timer + retryCount int + exhausted bool // prevent retry state recreation after max retries +} + +func (l *logQueuer) scheduleRetry(ctx context.Context, token string) { + if l.retries == nil { + l.retries = make(map[string]*retryState) + } + + rs := l.retries[token] + + if rs != nil && rs.exhausted { + return + } + + if rs == nil { + rs = &retryState{delay: time.Second, retryCount: 0, exhausted: false} + l.retries[token] = rs + } + + rs.retryCount++ + + // If we've reached the max retries, clear the retry state and delete the log cache. + if rs.retryCount >= l.maxRetries { + l.logger.Error(ctx, "max retries exceeded", + slog.F("retryCount", rs.retryCount), + slog.F("maxRetries", l.maxRetries)) + rs.exhausted = true + if rs.timer != nil { + rs.timer.Stop() + rs.timer = nil + } + l.logCache.delete(token) + return + } + + if rs.timer != nil { + return + } + + l.logger.Info(ctx, "scheduling retry", + slog.F("delay", rs.delay.String()), + slog.F("retryCount", rs.retryCount)) + + rs.timer = l.clock.AfterFunc(rs.delay, func() { + l.mu.Lock() + defer l.mu.Unlock() + + if cur := l.retries[token]; cur != nil && !cur.exhausted { + cur.timer = nil + l.q <- agentLog{op: opLog, agentToken: token} + } + }) + + rs.delay *= 2 + if rs.delay > 30*time.Second { + rs.delay = 30 * time.Second + } +} + +// clearRetryLocked clears the retry state for the given token. +// The caller must hold the mutex lock. +func (l *logQueuer) clearRetryLocked(token string) { + if rs := l.retries[token]; rs != nil { + if rs.timer != nil { + rs.timer.Stop() + } + delete(l.retries, token) + } +} + func newColor(value ...color.Attribute) *color.Color { c := color.New(value...) c.EnableColor() @@ -572,3 +697,15 @@ func (l *logCache) push(log agentLog) []agentsdk.Log { func (l *logCache) delete(token string) { delete(l.logs, token) } + +func (l *logCache) get(token string) []agentsdk.Log { + logs, ok := l.logs[token] + if !ok { + return nil + } + return logs +} + +func isAgentLogEmpty(log agentLog) bool { + return log.resourceName == "" && log.log.Output == "" && log.log.CreatedAt.IsZero() +} diff --git a/logger_test.go b/logger_test.go index 3ab1c0b..49a1836 100644 --- a/logger_test.go +++ b/logger_test.go @@ -486,6 +486,471 @@ func Test_logQueuer(t *testing.T) { // wait for the client to disconnect _ = testutil.RequireRecvCtx(ctx, t, api.disconnect) }) + + t.Run("RetryMechanism", func(t *testing.T) { + t.Parallel() + + // Create a failing API that will reject connections + failingAPI := newFailingAgentAPI(t) + agentURL, err := url.Parse(failingAPI.server.URL) + require.NoError(t, err) + clock := quartz.NewMock(t) + ttl := time.Second + + ch := make(chan agentLog, 10) + logger := slogtest.Make(t, &slogtest.Options{ + IgnoreErrors: true, + }) + lq := &logQueuer{ + logger: logger, + clock: clock, + q: ch, + coderURL: agentURL, + loggerTTL: ttl, + loggers: map[string]agentLoggerLifecycle{}, + logCache: logCache{ + logs: map[string][]agentsdk.Log{}, + }, + maxRetries: 10, + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + go lq.work(ctx) + + token := "retry-token" + ch <- agentLog{ + op: opLog, + resourceName: "hello", + agentToken: token, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "This is a log.", + Level: codersdk.LogLevelInfo, + }, + } + + // Wait for the initial failure to be processed and retry state to be created + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + rs := lq.retries[token] + return rs != nil && rs.timer != nil && rs.delay == 2*time.Second + }, testutil.WaitShort, testutil.IntervalFast) + + // Verify retry state exists and has correct doubled delay (it gets doubled after scheduling) + lq.mu.Lock() + rs := lq.retries[token] + require.NotNil(t, rs) + require.Equal(t, 2*time.Second, rs.delay) // Delay gets doubled after scheduling + require.NotNil(t, rs.timer) + lq.mu.Unlock() + + // Advance clock to trigger first retry + clock.Advance(time.Second) + + // Wait for retry to be processed and delay to double again + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + rs := lq.retries[token] + return rs != nil && rs.delay == 4*time.Second + }, testutil.WaitShort, testutil.IntervalFast) + + // Check that delay doubled again for next retry + lq.mu.Lock() + rs = lq.retries[token] + require.NotNil(t, rs) + require.Equal(t, 4*time.Second, rs.delay) + lq.mu.Unlock() + + // Advance clock to trigger second retry + clock.Advance(2 * time.Second) + + // Wait for retry to be processed and delay to double again + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + rs := lq.retries[token] + return rs != nil && rs.delay == 8*time.Second + }, testutil.WaitShort, testutil.IntervalFast) + + // Check that delay doubled again + lq.mu.Lock() + rs = lq.retries[token] + require.NotNil(t, rs) + require.Equal(t, 8*time.Second, rs.delay) + lq.mu.Unlock() + }) + + t.Run("RetryMaxDelay", func(t *testing.T) { + t.Parallel() + + clock := quartz.NewMock(t) + ch := make(chan agentLog, 10) + lq := &logQueuer{ + logger: slogtest.Make(t, nil), + clock: clock, + q: ch, + logCache: logCache{ + logs: map[string][]agentsdk.Log{}, + }, + maxRetries: 10, + } + + ctx := context.Background() + token := "test-token" + + // Set up a retry state with a large delay + lq.retries = make(map[string]*retryState) + lq.retries[token] = &retryState{ + delay: 20 * time.Second, + retryCount: 0, + } + + // Schedule a retry - should cap at 30 seconds + lq.scheduleRetry(ctx, token) + + rs := lq.retries[token] + require.NotNil(t, rs) + require.Equal(t, 30*time.Second, rs.delay) + + // Schedule another retry - should stay at 30 seconds + lq.scheduleRetry(ctx, token) + rs = lq.retries[token] + require.NotNil(t, rs) + require.Equal(t, 30*time.Second, rs.delay) + }) + + t.Run("ClearRetry", func(t *testing.T) { + t.Parallel() + + clock := quartz.NewMock(t) + ch := make(chan agentLog, 10) + lq := &logQueuer{ + logger: slogtest.Make(t, nil), + clock: clock, + q: ch, + logCache: logCache{ + logs: map[string][]agentsdk.Log{}, + }, + maxRetries: 2, + } + + ctx := context.Background() + token := "test-token" + + // Schedule a retry + lq.scheduleRetry(ctx, token) + require.NotNil(t, lq.retries[token]) + + // Clear the retry + lq.clearRetryLocked(token) + require.Nil(t, lq.retries[token]) + }) + + t.Run("MaxRetries", func(t *testing.T) { + t.Parallel() + + // Create a failing API that will reject connections + failingAPI := newFailingAgentAPI(t) + agentURL, err := url.Parse(failingAPI.server.URL) + require.NoError(t, err) + clock := quartz.NewMock(t) + ttl := time.Second + + ch := make(chan agentLog, 10) + logger := slogtest.Make(t, &slogtest.Options{ + IgnoreErrors: true, + }) + lq := &logQueuer{ + logger: logger, + clock: clock, + q: ch, + coderURL: agentURL, + loggerTTL: ttl, + loggers: map[string]agentLoggerLifecycle{}, + logCache: logCache{ + logs: map[string][]agentsdk.Log{}, + }, + retries: make(map[string]*retryState), + maxRetries: 2, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + go lq.work(ctx) + + token := "max-retry-token" + ch <- agentLog{ + op: opLog, + resourceName: "hello", + agentToken: token, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "This is a log.", + Level: codersdk.LogLevelInfo, + }, + } + + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + rs := lq.retries[token] + return rs != nil && rs.retryCount == 1 + }, testutil.WaitShort, testutil.IntervalFast) + + clock.Advance(time.Second) + + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + rs := lq.retries[token] + return rs != nil && rs.retryCount == 2 + }, testutil.WaitShort, testutil.IntervalFast) + + clock.Advance(2 * time.Second) + + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + rs := lq.retries[token] + return rs == nil || rs.exhausted + }, testutil.WaitShort, testutil.IntervalFast) + + lq.mu.Lock() + cachedLogs := lq.logCache.get(token) + lq.mu.Unlock() + require.Nil(t, cachedLogs) + }) +} + +func Test_logCache(t *testing.T) { + t.Parallel() + + t.Run("PushAndGet", func(t *testing.T) { + t.Parallel() + + lc := logCache{ + logs: map[string][]agentsdk.Log{}, + } + + token := "test-token" + + // Initially should return nil + logs := lc.get(token) + require.Nil(t, logs) + + // Push first log + log1 := agentLog{ + agentToken: token, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "First log", + Level: codersdk.LogLevelInfo, + }, + } + returnedLogs := lc.push(log1) + require.Len(t, returnedLogs, 1) + require.Equal(t, "First log", returnedLogs[0].Output) + + // Get should return the cached logs + cachedLogs := lc.get(token) + require.Len(t, cachedLogs, 1) + require.Equal(t, "First log", cachedLogs[0].Output) + + // Push second log to same token + log2 := agentLog{ + agentToken: token, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "Second log", + Level: codersdk.LogLevelWarn, + }, + } + returnedLogs = lc.push(log2) + require.Len(t, returnedLogs, 2) + require.Equal(t, "First log", returnedLogs[0].Output) + require.Equal(t, "Second log", returnedLogs[1].Output) + + // Get should return both logs + cachedLogs = lc.get(token) + require.Len(t, cachedLogs, 2) + require.Equal(t, "First log", cachedLogs[0].Output) + require.Equal(t, "Second log", cachedLogs[1].Output) + }) + + t.Run("Delete", func(t *testing.T) { + t.Parallel() + + lc := logCache{ + logs: map[string][]agentsdk.Log{}, + } + + token := "test-token" + + // Push a log + log := agentLog{ + agentToken: token, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "Test log", + Level: codersdk.LogLevelInfo, + }, + } + lc.push(log) + + // Verify it exists + cachedLogs := lc.get(token) + require.Len(t, cachedLogs, 1) + + // Delete it + lc.delete(token) + + // Should return nil now + cachedLogs = lc.get(token) + require.Nil(t, cachedLogs) + }) + + t.Run("MultipleTokens", func(t *testing.T) { + t.Parallel() + + lc := logCache{ + logs: map[string][]agentsdk.Log{}, + } + + token1 := "token1" + token2 := "token2" + + // Push logs for different tokens + log1 := agentLog{ + agentToken: token1, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "Log for token1", + Level: codersdk.LogLevelInfo, + }, + } + log2 := agentLog{ + agentToken: token2, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "Log for token2", + Level: codersdk.LogLevelError, + }, + } + + lc.push(log1) + lc.push(log2) + + // Each token should have its own logs + logs1 := lc.get(token1) + require.Len(t, logs1, 1) + require.Equal(t, "Log for token1", logs1[0].Output) + + logs2 := lc.get(token2) + require.Len(t, logs2, 1) + require.Equal(t, "Log for token2", logs2[0].Output) + + // Delete one token shouldn't affect the other + lc.delete(token1) + require.Nil(t, lc.get(token1)) + + logs2 = lc.get(token2) + require.Len(t, logs2, 1) + require.Equal(t, "Log for token2", logs2[0].Output) + }) + + t.Run("EmptyLogHandling", func(t *testing.T) { + t.Parallel() + + api := newFakeAgentAPI(t) + agentURL, err := url.Parse(api.server.URL) + require.NoError(t, err) + clock := quartz.NewMock(t) + ttl := time.Second + + ch := make(chan agentLog, 10) + lq := &logQueuer{ + logger: slogtest.Make(t, nil), + clock: clock, + q: ch, + coderURL: agentURL, + loggerTTL: ttl, + loggers: map[string]agentLoggerLifecycle{}, + logCache: logCache{ + logs: map[string][]agentsdk.Log{}, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + go lq.work(ctx) + + token := "test-token" + + // Send an empty log first - should be ignored since no cached logs exist + emptyLog := agentLog{ + op: opLog, + resourceName: "", + agentToken: token, + log: agentsdk.Log{ + Output: "", + CreatedAt: time.Time{}, + }, + } + ch <- emptyLog + + // Wait to ensure processing completes - no logger should be created for empty log with no cache + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + _, exists := lq.loggers[token] + return !exists + }, testutil.WaitShort, testutil.IntervalFast) + + // No logger should be created for empty log with no cache + lq.mu.Lock() + _, exists := lq.loggers[token] + require.False(t, exists) + lq.mu.Unlock() + + // Now send a real log to establish the logger + realLog := agentLog{ + op: opLog, + resourceName: "hello", + agentToken: token, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "Real log", + Level: codersdk.LogLevelInfo, + }, + } + ch <- realLog + + // Should create logger and send log + _ = testutil.RequireRecvCtx(ctx, t, api.logSource) + logs := testutil.RequireRecvCtx(ctx, t, api.logs) + require.Len(t, logs, 1) + require.Contains(t, logs[0].Output, "Real log") + + // Now send empty log - should trigger flush of any cached logs + ch <- emptyLog + + // Wait for processing - logger should still exist after empty log + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + _, exists := lq.loggers[token] + return exists + }, testutil.WaitShort, testutil.IntervalFast) + + // Logger should still exist + lq.mu.Lock() + _, exists = lq.loggers[token] + require.True(t, exists) + lq.mu.Unlock() + }) } func newFakeAgentAPI(t *testing.T) *fakeAgentAPI { @@ -524,7 +989,9 @@ func newFakeAgentAPI(t *testing.T) *fakeAgentAPI { } ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageBinary) - defer wsNetConn.Close() + defer func() { + _ = wsNetConn.Close() + }() config := yamux.DefaultConfig() config.LogOutput = io.Discard @@ -547,6 +1014,21 @@ func newFakeAgentAPI(t *testing.T) *fakeAgentAPI { return fakeAPI } +func newFailingAgentAPI(_ *testing.T) *fakeAgentAPI { + fakeAPI := &fakeAgentAPI{ + disconnect: make(chan struct{}), + logs: make(chan []*proto.Log), + logSource: make(chan agentsdk.PostLogSourceRequest), + } + + // Create a server that always returns 401 Unauthorized errors + fakeAPI.server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + http.Error(w, "Unauthorized", http.StatusUnauthorized) + })) + + return fakeAPI +} + type fakeAgentAPI struct { disconnect chan struct{} logs chan []*proto.Log diff --git a/main.go b/main.go index 5e5d2bd..ce425f8 100644 --- a/main.go +++ b/main.go @@ -34,7 +34,7 @@ func root() *cobra.Command { cmd := &cobra.Command{ Use: "coder-logstream-kube", Short: "Stream Kubernetes Pod events to the Coder startup logs.", - RunE: func(cmd *cobra.Command, args []string) error { + RunE: func(cmd *cobra.Command, _ []string) error { if coderURL == "" { return fmt.Errorf("--coder-url is required") } @@ -79,11 +79,14 @@ func root() *cobra.Command { fieldSelector: fieldSelector, labelSelector: labelSelector, logger: slog.Make(sloghuman.Sink(cmd.ErrOrStderr())).Leveled(slog.LevelDebug), + maxRetries: 15, // 15 retries is the default max retries for a log send failure. }) if err != nil { return fmt.Errorf("create pod event reporter: %w", err) } - defer reporter.Close() + defer func() { + _ = reporter.Close() + }() select { case err := <-reporter.errChan: return fmt.Errorf("pod event reporter: %w", err) diff --git a/scripts/Dockerfile b/scripts/Dockerfile index fe869e7..8f380e9 100644 --- a/scripts/Dockerfile +++ b/scripts/Dockerfile @@ -1,4 +1,5 @@ FROM --platform=$BUILDPLATFORM scratch AS base ARG TARGETARCH -COPY ./coder-logstream-kube-${TARGETARCH} /coder-logstream-kube +COPY --chmod=0555 ./coder-logstream-kube-${TARGETARCH} /coder-logstream-kube +USER 65532:65532 ENTRYPOINT ["/coder-logstream-kube"] \ No newline at end of file diff --git a/scripts/build.sh b/scripts/build.sh index a17eb0b..e6805b7 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -13,8 +13,8 @@ archs=(amd64 arm64 arm) # build for all architectures for arch in "${archs[@]}"; do - echo "Building for $arch" - GOARCH=$arch GOOS=linux CGO_ENABLED=0 go build -ldflags "-s -w" -o ./coder-logstream-kube-"$arch" ../ + echo "Building for $arch" + GOARCH=$arch GOOS=linux CGO_ENABLED=0 go build -ldflags "-s -w" -o ./coder-logstream-kube-"$arch" ../ done # We have to use docker buildx to tag multiple images with @@ -24,10 +24,10 @@ BUILDER_EXISTS=$(docker buildx ls | grep $BUILDER_NAME || true) # If builder doesn't exist, create it if [ -z "$BUILDER_EXISTS" ]; then - echo "Creating dockerx builder $BUILDER_NAME..." - docker buildx create --use --platform=linux/arm64,linux/amd64,linux/arm/v7 --name $BUILDER_NAME + echo "Creating dockerx builder $BUILDER_NAME..." + docker buildx create --use --platform=linux/arm64,linux/amd64,linux/arm/v7 --name $BUILDER_NAME else - echo "Builder $BUILDER_NAME already exists. Using it." + echo "Builder $BUILDER_NAME already exists. Using it." fi # Ensure the builder is bootstrapped and ready to use @@ -35,15 +35,15 @@ docker buildx inspect --bootstrap &>/dev/null # Build and push the image if [ "$CI" = "false" ]; then - docker buildx build --platform linux/"$current" -t coder-logstream-kube --load . + docker buildx build --platform linux/"$current" -t coder-logstream-kube --load . else - VERSION=$(../scripts/version.sh) - BASE=ghcr.io/coder/coder-logstream-kube - IMAGE=$BASE:$VERSION - # if version contains "rc" skip pushing to latest - if [[ $VERSION == *"rc"* ]]; then - docker buildx build --platform linux/amd64,linux/arm64,linux/arm/v7 -t "$IMAGE" --push . - else - docker buildx build --platform linux/amd64,linux/arm64,linux/arm/v7 -t "$IMAGE" -t $BASE:latest --push . - fi + VERSION=$(../scripts/version.sh) + BASE=ghcr.io/coder/coder-logstream-kube + IMAGE=$BASE:$VERSION + # if version contains "rc" skip pushing to latest + if [[ $VERSION == *"rc"* ]]; then + docker buildx build --platform linux/amd64,linux/arm64,linux/arm/v7 -t "$IMAGE" --push . + else + docker buildx build --platform linux/amd64,linux/arm64,linux/arm/v7 -t "$IMAGE" -t $BASE:latest --push . + fi fi diff --git a/scripts/helm.sh b/scripts/helm.sh index fa3bfed..d06e870 100755 --- a/scripts/helm.sh +++ b/scripts/helm.sh @@ -15,7 +15,7 @@ # to the Coder OSS repo. This requires `gsutil` to be installed and configured. set -euo pipefail -cd $(dirname $(dirname "${BASH_SOURCE[0]}")) +cd "$(dirname "$(dirname "${BASH_SOURCE[0]}")")" log() { echo "$*" 1>&2 diff --git a/scripts/kind-setup.sh b/scripts/kind-setup.sh new file mode 100755 index 0000000..b2f602a --- /dev/null +++ b/scripts/kind-setup.sh @@ -0,0 +1,109 @@ +#!/usr/bin/env bash + +# This script sets up a KinD cluster for running integration tests locally. +# Usage: ./scripts/kind-setup.sh [create|delete] + +set -euo pipefail + +CLUSTER_NAME="${KIND_CLUSTER_NAME:-logstream-integration-test}" + +usage() { + echo "Usage: $0 [create|delete|status]" + echo "" + echo "Commands:" + echo " create - Create a KinD cluster for integration tests" + echo " delete - Delete the KinD cluster" + echo " status - Check if the cluster exists and is running" + echo "" + echo "Environment variables:" + echo " KIND_CLUSTER_NAME - Name of the cluster (default: logstream-integration-test)" + exit 1 +} + +check_kind() { + if ! command -v kind &>/dev/null; then + echo "Error: 'kind' is not installed." + echo "Install it from: https://kind.sigs.k8s.io/docs/user/quick-start/#installation" + exit 1 + fi +} + +check_kubectl() { + if ! command -v kubectl &>/dev/null; then + echo "Error: 'kubectl' is not installed." + echo "Install it from: https://kubernetes.io/docs/tasks/tools/" + exit 1 + fi +} + +cluster_exists() { + kind get clusters 2>/dev/null | grep -q "^${CLUSTER_NAME}$" +} + +create_cluster() { + check_kind + check_kubectl + + if cluster_exists; then + echo "Cluster '${CLUSTER_NAME}' already exists." + echo "Use '$0 delete' to remove it first, or '$0 status' to check its status." + exit 0 + fi + + echo "Creating KinD cluster '${CLUSTER_NAME}'..." + kind create cluster --name "${CLUSTER_NAME}" --wait 60s + + echo "" + echo "Cluster created successfully!" + echo "" + echo "To run integration tests:" + echo " go test -tags=integration -v ./..." + echo "" + echo "To delete the cluster when done:" + echo " $0 delete" +} + +delete_cluster() { + check_kind + + if ! cluster_exists; then + echo "Cluster '${CLUSTER_NAME}' does not exist." + exit 0 + fi + + echo "Deleting KinD cluster '${CLUSTER_NAME}'..." + kind delete cluster --name "${CLUSTER_NAME}" + echo "Cluster deleted successfully!" +} + +status_cluster() { + check_kind + + if cluster_exists; then + echo "Cluster '${CLUSTER_NAME}' exists." + echo "" + echo "Cluster info:" + kubectl cluster-info --context "kind-${CLUSTER_NAME}" 2>/dev/null || echo " (unable to get cluster info)" + echo "" + echo "Nodes:" + kubectl get nodes --context "kind-${CLUSTER_NAME}" 2>/dev/null || echo " (unable to get nodes)" + else + echo "Cluster '${CLUSTER_NAME}' does not exist." + echo "Use '$0 create' to create it." + fi +} + +case "${1:-}" in +create) + create_cluster + ;; +delete) + delete_cluster + ;; +status) + status_cluster + ;; +*) + usage + ;; +esac diff --git a/scripts/version.sh b/scripts/version.sh index 72a2f5b..82385be 100755 --- a/scripts/version.sh +++ b/scripts/version.sh @@ -1,7 +1,7 @@ #!/usr/bin/env bash set -euo pipefail -cd $(dirname "${BASH_SOURCE[0]}") +cd "$(dirname "${BASH_SOURCE[0]}")" last_tag="$(git describe --tags --abbrev=0)" version="$last_tag"