From 13d885f3916a1436245b6c6b05061790c7282df0 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 28 Aug 2025 14:55:48 +0500 Subject: [PATCH 1/5] chore(deps): bump actions/checkout from 4 to 5 (#120) Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/ci.yaml | 2 +- .github/workflows/release.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index a2d85f5..eb8d845 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -28,7 +28,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v5 - name: Setup Go uses: actions/setup-go@v5 diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 52e11b4..c943ad6 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -20,7 +20,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v5 - name: Setup Go uses: actions/setup-go@v5 From a5bd37625c989fc4d1458022fd981062e061afa3 Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Tue, 30 Sep 2025 09:28:46 +0200 Subject: [PATCH 2/5] feat: implement retry mechanism for log processing (#136) * feat: implement retry mechanism for log processing * add tests for retry mechanism and logCache functionality * simplify retry state initialization in logQueuer * implement maxRetries for log processing * Apply review suggestions and fix maxRetries * Remove maxRetries configuration from CLI and Helm values, setting a default of 15 retries for log send failures. --- logger.go | 268 ++++++++++++++++++++------- logger_test.go | 480 +++++++++++++++++++++++++++++++++++++++++++++++++ main.go | 1 + 3 files changed, 680 insertions(+), 69 deletions(-) diff --git a/logger.go b/logger.go index 2aa4ed3..8d0775a 100644 --- a/logger.go +++ b/logger.go @@ -34,6 +34,8 @@ type podEventLoggerOptions struct { logger slog.Logger logDebounce time.Duration + // maxRetries is the maximum number of retries for a log send failure. + maxRetries int // The following fields are optional! namespaces []string @@ -52,6 +54,10 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve opts.clock = quartz.NewReal() } + if opts.maxRetries == 0 { + opts.maxRetries = 10 + } + logCh := make(chan agentLog, 512) ctx, cancelFunc := context.WithCancel(ctx) reporter := &podEventLogger{ @@ -75,6 +81,7 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve logCache: logCache{ logs: map[string][]agentsdk.Log{}, }, + maxRetries: opts.maxRetries, }, } @@ -407,6 +414,11 @@ type logQueuer struct { loggerTTL time.Duration loggers map[string]agentLoggerLifecycle logCache logCache + + // retries maps agent tokens to their retry state for exponential backoff + retries map[string]*retryState + // maxRetries is the maximum number of retries for a log send failure. + maxRetries int } func (l *logQueuer) work(ctx context.Context) { @@ -427,87 +439,117 @@ func (l *logQueuer) work(ctx context.Context) { } } +func (l *logQueuer) newLogger(ctx context.Context, log agentLog) (agentLoggerLifecycle, error) { + client := agentsdk.New(l.coderURL) + client.SetSessionToken(log.agentToken) + logger := l.logger.With(slog.F("resource_name", log.resourceName)) + client.SDK.SetLogger(logger) + + _, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{ + ID: sourceUUID, + Icon: "/icon/k8s.png", + DisplayName: "Kubernetes", + }) + if err != nil { + // Posting the log source failed, which affects how logs appear. + // We'll retry to ensure the log source is properly registered. + logger.Error(ctx, "post log source", slog.Error(err)) + return agentLoggerLifecycle{}, err + } + + ls := agentsdk.NewLogSender(logger) + sl := ls.GetScriptLogger(sourceUUID) + + gracefulCtx, gracefulCancel := context.WithCancel(context.Background()) + + // connect to Agent v2.0 API, since we don't need features added later. + // This maximizes compatibility. + arpc, err := client.ConnectRPC20(gracefulCtx) + if err != nil { + logger.Error(ctx, "drpc connect", slog.Error(err)) + gracefulCancel() + return agentLoggerLifecycle{}, err + } + go func() { + err := ls.SendLoop(gracefulCtx, arpc) + // if the send loop exits on its own without the context + // canceling, timeout the logger and force it to recreate. + if err != nil && ctx.Err() == nil { + l.loggerTimeout(log.agentToken) + } + }() + + closeTimer := l.clock.AfterFunc(l.loggerTTL, func() { + logger.Info(ctx, "logger timeout firing") + l.loggerTimeout(log.agentToken) + }) + lifecycle := agentLoggerLifecycle{ + scriptLogger: sl, + close: func() { + defer arpc.DRPCConn().Close() + defer client.SDK.HTTPClient.CloseIdleConnections() + // We could be stopping for reasons other than the timeout. If + // so, stop the timer. + closeTimer.Stop() + defer gracefulCancel() + timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel) + defer timeout.Stop() + logger.Info(ctx, "logger closing") + + if err := sl.Flush(gracefulCtx); err != nil { + // ctx err + logger.Warn(gracefulCtx, "timeout reached while flushing") + return + } + + if err := ls.WaitUntilEmpty(gracefulCtx); err != nil { + // ctx err + logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty") + } + }, + } + lifecycle.closeTimer = closeTimer + return lifecycle, nil +} + func (l *logQueuer) processLog(ctx context.Context, log agentLog) { l.mu.Lock() defer l.mu.Unlock() - queuedLogs := l.logCache.push(log) + + queuedLogs := l.logCache.get(log.agentToken) + if isAgentLogEmpty(log) { + if queuedLogs == nil { + return + } + } else { + queuedLogs = l.logCache.push(log) + } + lgr, ok := l.loggers[log.agentToken] if !ok { - client := agentsdk.New(l.coderURL) - client.SetSessionToken(log.agentToken) - logger := l.logger.With(slog.F("resource_name", log.resourceName)) - client.SDK.SetLogger(logger) - - _, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{ - ID: sourceUUID, - Icon: "/icon/k8s.png", - DisplayName: "Kubernetes", - }) - if err != nil { - // This shouldn't fail sending the log, as it only affects how they - // appear. - logger.Error(ctx, "post log source", slog.Error(err)) + // skip if we're in a retry cooldown window + if rs := l.retries[log.agentToken]; rs != nil && rs.timer != nil { + return } - ls := agentsdk.NewLogSender(logger) - sl := ls.GetScriptLogger(sourceUUID) - - gracefulCtx, gracefulCancel := context.WithCancel(context.Background()) - - // connect to Agent v2.0 API, since we don't need features added later. - // This maximizes compatibility. - arpc, err := client.ConnectRPC20(gracefulCtx) + var err error + lgr, err = l.newLogger(ctx, log) if err != nil { - logger.Error(ctx, "drpc connect", slog.Error(err)) - gracefulCancel() + l.scheduleRetry(ctx, log.agentToken) return } - go func() { - err := ls.SendLoop(gracefulCtx, arpc) - // if the send loop exits on its own without the context - // canceling, timeout the logger and force it to recreate. - if err != nil && ctx.Err() == nil { - l.loggerTimeout(log.agentToken) - } - }() - - closeTimer := l.clock.AfterFunc(l.loggerTTL, func() { - logger.Info(ctx, "logger timeout firing") - l.loggerTimeout(log.agentToken) - }) - lifecycle := agentLoggerLifecycle{ - scriptLogger: sl, - close: func() { - // We could be stopping for reasons other than the timeout. If - // so, stop the timer. - closeTimer.Stop() - defer gracefulCancel() - timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel) - defer timeout.Stop() - logger.Info(ctx, "logger closing") - - if err := sl.Flush(gracefulCtx); err != nil { - // ctx err - logger.Warn(gracefulCtx, "timeout reached while flushing") - return - } - - if err := ls.WaitUntilEmpty(gracefulCtx); err != nil { - // ctx err - logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty") - } - - _ = arpc.DRPCConn().Close() - client.SDK.HTTPClient.CloseIdleConnections() - }, - } - lifecycle.closeTimer = closeTimer - l.loggers[log.agentToken] = lifecycle - lgr = lifecycle + l.loggers[log.agentToken] = lgr } lgr.resetCloseTimer(l.loggerTTL) - _ = lgr.scriptLogger.Send(ctx, queuedLogs...) + if len(queuedLogs) == 0 { + return + } + if err := lgr.scriptLogger.Send(ctx, queuedLogs...); err != nil { + l.scheduleRetry(ctx, log.agentToken) + return + } + l.clearRetryLocked(log.agentToken) l.logCache.delete(log.agentToken) } @@ -516,8 +558,9 @@ func (l *logQueuer) processDelete(log agentLog) { lgr, ok := l.loggers[log.agentToken] if ok { delete(l.loggers, log.agentToken) - } + l.clearRetryLocked(log.agentToken) + l.logCache.delete(log.agentToken) l.mu.Unlock() if ok { @@ -549,6 +592,81 @@ func (l *agentLoggerLifecycle) resetCloseTimer(ttl time.Duration) { } } +// retryState tracks exponential backoff for an agent token. +type retryState struct { + delay time.Duration + timer *quartz.Timer + retryCount int + exhausted bool // prevent retry state recreation after max retries +} + +func (l *logQueuer) scheduleRetry(ctx context.Context, token string) { + if l.retries == nil { + l.retries = make(map[string]*retryState) + } + + rs := l.retries[token] + + if rs != nil && rs.exhausted { + return + } + + if rs == nil { + rs = &retryState{delay: time.Second, retryCount: 0, exhausted: false} + l.retries[token] = rs + } + + rs.retryCount++ + + // If we've reached the max retries, clear the retry state and delete the log cache. + if rs.retryCount >= l.maxRetries { + l.logger.Error(ctx, "max retries exceeded", + slog.F("retryCount", rs.retryCount), + slog.F("maxRetries", l.maxRetries)) + rs.exhausted = true + if rs.timer != nil { + rs.timer.Stop() + rs.timer = nil + } + l.logCache.delete(token) + return + } + + if rs.timer != nil { + return + } + + l.logger.Info(ctx, "scheduling retry", + slog.F("delay", rs.delay.String()), + slog.F("retryCount", rs.retryCount)) + + rs.timer = l.clock.AfterFunc(rs.delay, func() { + l.mu.Lock() + defer l.mu.Unlock() + + if cur := l.retries[token]; cur != nil && !cur.exhausted { + cur.timer = nil + l.q <- agentLog{op: opLog, agentToken: token} + } + }) + + rs.delay *= 2 + if rs.delay > 30*time.Second { + rs.delay = 30 * time.Second + } +} + +// clearRetryLocked clears the retry state for the given token. +// The caller must hold the mutex lock. +func (l *logQueuer) clearRetryLocked(token string) { + if rs := l.retries[token]; rs != nil { + if rs.timer != nil { + rs.timer.Stop() + } + delete(l.retries, token) + } +} + func newColor(value ...color.Attribute) *color.Color { c := color.New(value...) c.EnableColor() @@ -572,3 +690,15 @@ func (l *logCache) push(log agentLog) []agentsdk.Log { func (l *logCache) delete(token string) { delete(l.logs, token) } + +func (l *logCache) get(token string) []agentsdk.Log { + logs, ok := l.logs[token] + if !ok { + return nil + } + return logs +} + +func isAgentLogEmpty(log agentLog) bool { + return log.resourceName == "" && log.log.Output == "" && log.log.CreatedAt.IsZero() +} diff --git a/logger_test.go b/logger_test.go index 3ab1c0b..259be40 100644 --- a/logger_test.go +++ b/logger_test.go @@ -486,6 +486,471 @@ func Test_logQueuer(t *testing.T) { // wait for the client to disconnect _ = testutil.RequireRecvCtx(ctx, t, api.disconnect) }) + + t.Run("RetryMechanism", func(t *testing.T) { + t.Parallel() + + // Create a failing API that will reject connections + failingAPI := newFailingAgentAPI(t) + agentURL, err := url.Parse(failingAPI.server.URL) + require.NoError(t, err) + clock := quartz.NewMock(t) + ttl := time.Second + + ch := make(chan agentLog, 10) + logger := slogtest.Make(t, &slogtest.Options{ + IgnoreErrors: true, + }) + lq := &logQueuer{ + logger: logger, + clock: clock, + q: ch, + coderURL: agentURL, + loggerTTL: ttl, + loggers: map[string]agentLoggerLifecycle{}, + logCache: logCache{ + logs: map[string][]agentsdk.Log{}, + }, + maxRetries: 10, + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + go lq.work(ctx) + + token := "retry-token" + ch <- agentLog{ + op: opLog, + resourceName: "hello", + agentToken: token, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "This is a log.", + Level: codersdk.LogLevelInfo, + }, + } + + // Wait for the initial failure to be processed and retry state to be created + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + rs := lq.retries[token] + return rs != nil && rs.timer != nil && rs.delay == 2*time.Second + }, testutil.WaitShort, testutil.IntervalFast) + + // Verify retry state exists and has correct doubled delay (it gets doubled after scheduling) + lq.mu.Lock() + rs := lq.retries[token] + require.NotNil(t, rs) + require.Equal(t, 2*time.Second, rs.delay) // Delay gets doubled after scheduling + require.NotNil(t, rs.timer) + lq.mu.Unlock() + + // Advance clock to trigger first retry + clock.Advance(time.Second) + + // Wait for retry to be processed and delay to double again + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + rs := lq.retries[token] + return rs != nil && rs.delay == 4*time.Second + }, testutil.WaitShort, testutil.IntervalFast) + + // Check that delay doubled again for next retry + lq.mu.Lock() + rs = lq.retries[token] + require.NotNil(t, rs) + require.Equal(t, 4*time.Second, rs.delay) + lq.mu.Unlock() + + // Advance clock to trigger second retry + clock.Advance(2 * time.Second) + + // Wait for retry to be processed and delay to double again + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + rs := lq.retries[token] + return rs != nil && rs.delay == 8*time.Second + }, testutil.WaitShort, testutil.IntervalFast) + + // Check that delay doubled again + lq.mu.Lock() + rs = lq.retries[token] + require.NotNil(t, rs) + require.Equal(t, 8*time.Second, rs.delay) + lq.mu.Unlock() + }) + + t.Run("RetryMaxDelay", func(t *testing.T) { + t.Parallel() + + clock := quartz.NewMock(t) + ch := make(chan agentLog, 10) + lq := &logQueuer{ + logger: slogtest.Make(t, nil), + clock: clock, + q: ch, + logCache: logCache{ + logs: map[string][]agentsdk.Log{}, + }, + maxRetries: 10, + } + + ctx := context.Background() + token := "test-token" + + // Set up a retry state with a large delay + lq.retries = make(map[string]*retryState) + lq.retries[token] = &retryState{ + delay: 20 * time.Second, + retryCount: 0, + } + + // Schedule a retry - should cap at 30 seconds + lq.scheduleRetry(ctx, token) + + rs := lq.retries[token] + require.NotNil(t, rs) + require.Equal(t, 30*time.Second, rs.delay) + + // Schedule another retry - should stay at 30 seconds + lq.scheduleRetry(ctx, token) + rs = lq.retries[token] + require.NotNil(t, rs) + require.Equal(t, 30*time.Second, rs.delay) + }) + + t.Run("ClearRetry", func(t *testing.T) { + t.Parallel() + + clock := quartz.NewMock(t) + ch := make(chan agentLog, 10) + lq := &logQueuer{ + logger: slogtest.Make(t, nil), + clock: clock, + q: ch, + logCache: logCache{ + logs: map[string][]agentsdk.Log{}, + }, + maxRetries: 2, + } + + ctx := context.Background() + token := "test-token" + + // Schedule a retry + lq.scheduleRetry(ctx, token) + require.NotNil(t, lq.retries[token]) + + // Clear the retry + lq.clearRetryLocked(token) + require.Nil(t, lq.retries[token]) + }) + + t.Run("MaxRetries", func(t *testing.T) { + t.Parallel() + + // Create a failing API that will reject connections + failingAPI := newFailingAgentAPI(t) + agentURL, err := url.Parse(failingAPI.server.URL) + require.NoError(t, err) + clock := quartz.NewMock(t) + ttl := time.Second + + ch := make(chan agentLog, 10) + logger := slogtest.Make(t, &slogtest.Options{ + IgnoreErrors: true, + }) + lq := &logQueuer{ + logger: logger, + clock: clock, + q: ch, + coderURL: agentURL, + loggerTTL: ttl, + loggers: map[string]agentLoggerLifecycle{}, + logCache: logCache{ + logs: map[string][]agentsdk.Log{}, + }, + retries: make(map[string]*retryState), + maxRetries: 2, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + go lq.work(ctx) + + token := "max-retry-token" + ch <- agentLog{ + op: opLog, + resourceName: "hello", + agentToken: token, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "This is a log.", + Level: codersdk.LogLevelInfo, + }, + } + + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + rs := lq.retries[token] + return rs != nil && rs.retryCount == 1 + }, testutil.WaitShort, testutil.IntervalFast) + + clock.Advance(time.Second) + + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + rs := lq.retries[token] + return rs != nil && rs.retryCount == 2 + }, testutil.WaitShort, testutil.IntervalFast) + + clock.Advance(2 * time.Second) + + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + rs := lq.retries[token] + return rs == nil || rs.exhausted + }, testutil.WaitShort, testutil.IntervalFast) + + lq.mu.Lock() + cachedLogs := lq.logCache.get(token) + lq.mu.Unlock() + require.Nil(t, cachedLogs) + }) +} + +func Test_logCache(t *testing.T) { + t.Parallel() + + t.Run("PushAndGet", func(t *testing.T) { + t.Parallel() + + lc := logCache{ + logs: map[string][]agentsdk.Log{}, + } + + token := "test-token" + + // Initially should return nil + logs := lc.get(token) + require.Nil(t, logs) + + // Push first log + log1 := agentLog{ + agentToken: token, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "First log", + Level: codersdk.LogLevelInfo, + }, + } + returnedLogs := lc.push(log1) + require.Len(t, returnedLogs, 1) + require.Equal(t, "First log", returnedLogs[0].Output) + + // Get should return the cached logs + cachedLogs := lc.get(token) + require.Len(t, cachedLogs, 1) + require.Equal(t, "First log", cachedLogs[0].Output) + + // Push second log to same token + log2 := agentLog{ + agentToken: token, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "Second log", + Level: codersdk.LogLevelWarn, + }, + } + returnedLogs = lc.push(log2) + require.Len(t, returnedLogs, 2) + require.Equal(t, "First log", returnedLogs[0].Output) + require.Equal(t, "Second log", returnedLogs[1].Output) + + // Get should return both logs + cachedLogs = lc.get(token) + require.Len(t, cachedLogs, 2) + require.Equal(t, "First log", cachedLogs[0].Output) + require.Equal(t, "Second log", cachedLogs[1].Output) + }) + + t.Run("Delete", func(t *testing.T) { + t.Parallel() + + lc := logCache{ + logs: map[string][]agentsdk.Log{}, + } + + token := "test-token" + + // Push a log + log := agentLog{ + agentToken: token, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "Test log", + Level: codersdk.LogLevelInfo, + }, + } + lc.push(log) + + // Verify it exists + cachedLogs := lc.get(token) + require.Len(t, cachedLogs, 1) + + // Delete it + lc.delete(token) + + // Should return nil now + cachedLogs = lc.get(token) + require.Nil(t, cachedLogs) + }) + + t.Run("MultipleTokens", func(t *testing.T) { + t.Parallel() + + lc := logCache{ + logs: map[string][]agentsdk.Log{}, + } + + token1 := "token1" + token2 := "token2" + + // Push logs for different tokens + log1 := agentLog{ + agentToken: token1, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "Log for token1", + Level: codersdk.LogLevelInfo, + }, + } + log2 := agentLog{ + agentToken: token2, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "Log for token2", + Level: codersdk.LogLevelError, + }, + } + + lc.push(log1) + lc.push(log2) + + // Each token should have its own logs + logs1 := lc.get(token1) + require.Len(t, logs1, 1) + require.Equal(t, "Log for token1", logs1[0].Output) + + logs2 := lc.get(token2) + require.Len(t, logs2, 1) + require.Equal(t, "Log for token2", logs2[0].Output) + + // Delete one token shouldn't affect the other + lc.delete(token1) + require.Nil(t, lc.get(token1)) + + logs2 = lc.get(token2) + require.Len(t, logs2, 1) + require.Equal(t, "Log for token2", logs2[0].Output) + }) + + t.Run("EmptyLogHandling", func(t *testing.T) { + t.Parallel() + + api := newFakeAgentAPI(t) + agentURL, err := url.Parse(api.server.URL) + require.NoError(t, err) + clock := quartz.NewMock(t) + ttl := time.Second + + ch := make(chan agentLog, 10) + lq := &logQueuer{ + logger: slogtest.Make(t, nil), + clock: clock, + q: ch, + coderURL: agentURL, + loggerTTL: ttl, + loggers: map[string]agentLoggerLifecycle{}, + logCache: logCache{ + logs: map[string][]agentsdk.Log{}, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + go lq.work(ctx) + + token := "test-token" + + // Send an empty log first - should be ignored since no cached logs exist + emptyLog := agentLog{ + op: opLog, + resourceName: "", + agentToken: token, + log: agentsdk.Log{ + Output: "", + CreatedAt: time.Time{}, + }, + } + ch <- emptyLog + + // Wait to ensure processing completes - no logger should be created for empty log with no cache + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + _, exists := lq.loggers[token] + return !exists + }, testutil.WaitShort, testutil.IntervalFast) + + // No logger should be created for empty log with no cache + lq.mu.Lock() + _, exists := lq.loggers[token] + require.False(t, exists) + lq.mu.Unlock() + + // Now send a real log to establish the logger + realLog := agentLog{ + op: opLog, + resourceName: "hello", + agentToken: token, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "Real log", + Level: codersdk.LogLevelInfo, + }, + } + ch <- realLog + + // Should create logger and send log + _ = testutil.RequireRecvCtx(ctx, t, api.logSource) + logs := testutil.RequireRecvCtx(ctx, t, api.logs) + require.Len(t, logs, 1) + require.Contains(t, logs[0].Output, "Real log") + + // Now send empty log - should trigger flush of any cached logs + ch <- emptyLog + + // Wait for processing - logger should still exist after empty log + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + _, exists := lq.loggers[token] + return exists + }, testutil.WaitShort, testutil.IntervalFast) + + // Logger should still exist + lq.mu.Lock() + _, exists = lq.loggers[token] + require.True(t, exists) + lq.mu.Unlock() + }) } func newFakeAgentAPI(t *testing.T) *fakeAgentAPI { @@ -547,6 +1012,21 @@ func newFakeAgentAPI(t *testing.T) *fakeAgentAPI { return fakeAPI } +func newFailingAgentAPI(t *testing.T) *fakeAgentAPI { + fakeAPI := &fakeAgentAPI{ + disconnect: make(chan struct{}), + logs: make(chan []*proto.Log), + logSource: make(chan agentsdk.PostLogSourceRequest), + } + + // Create a server that always returns 401 Unauthorized errors + fakeAPI.server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "Unauthorized", http.StatusUnauthorized) + })) + + return fakeAPI +} + type fakeAgentAPI struct { disconnect chan struct{} logs chan []*proto.Log diff --git a/main.go b/main.go index 5e5d2bd..4c65be5 100644 --- a/main.go +++ b/main.go @@ -79,6 +79,7 @@ func root() *cobra.Command { fieldSelector: fieldSelector, labelSelector: labelSelector, logger: slog.Make(sloghuman.Sink(cmd.ErrOrStderr())).Leveled(slog.LevelDebug), + maxRetries: 15, // 15 retries is the default max retries for a log send failure. }) if err != nil { return fmt.Errorf("create pod event reporter: %w", err) From 736495d03c528fece6986d490327db0e958121a6 Mon Sep 17 00:00:00 2001 From: Eric Paulsen Date: Fri, 3 Oct 2025 13:58:49 +0100 Subject: [PATCH 3/5] run image as non-root user & add securitycontext values (#138) * run image as non-root user & add securitycontext values * rm redundant comments --- helm/templates/service.yaml | 4 ++++ helm/values.yaml | 12 ++++++++++-- scripts/Dockerfile | 3 ++- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/helm/templates/service.yaml b/helm/templates/service.yaml index c89a98a..9aee16c 100644 --- a/helm/templates/service.yaml +++ b/helm/templates/service.yaml @@ -98,6 +98,10 @@ spec: nodeSelector: {{- toYaml . | nindent 8 }} {{- end }} + {{- with .Values.podSecurityContext }} + podSecurityContext: + {{- toYaml . | nindent 8 }} + {{- end }} containers: - name: coder-logstream-kube image: "{{ .Values.image.repo }}:{{ .Values.image.tag | default .Chart.AppVersion }}" diff --git a/helm/values.yaml b/helm/values.yaml index 5a6d1b6..daa847f 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -92,8 +92,11 @@ labels: {} # securityContext -- Container-level security context # See: https://kubernetes.io/docs/tasks/configure-pod-container/security-context/ -securityContext: {} - # allowPrivilegeEscalation: false +securityContext: + runAsNonRoot: true + runAsUser: 65532 + runAsGroup: 65532 + allowPrivilegeEscalation: false # capabilities: # drop: # - ALL @@ -101,3 +104,8 @@ securityContext: {} # runAsNonRoot: true # seccompProfile: # type: RuntimeDefault + +podSecurityContext: {} +# Optional, only if your cluster requires group ownership for mounted volumes: +# podSecurityContext: +# fsGroup: 65532 \ No newline at end of file diff --git a/scripts/Dockerfile b/scripts/Dockerfile index fe869e7..8f380e9 100644 --- a/scripts/Dockerfile +++ b/scripts/Dockerfile @@ -1,4 +1,5 @@ FROM --platform=$BUILDPLATFORM scratch AS base ARG TARGETARCH -COPY ./coder-logstream-kube-${TARGETARCH} /coder-logstream-kube +COPY --chmod=0555 ./coder-logstream-kube-${TARGETARCH} /coder-logstream-kube +USER 65532:65532 ENTRYPOINT ["/coder-logstream-kube"] \ No newline at end of file From db7bcb7145df9b8ca79fb82cdb1932455d9ad3ad Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Wed, 10 Dec 2025 18:33:01 +0100 Subject: [PATCH 4/5] ci: add golangci-lint job (#142) --- .github/workflows/ci.yaml | 18 +++++++++++++++++- .golangci.yaml | 29 +++++++++++++++++++++++++++++ logger.go | 11 +++++++++-- logger_test.go | 8 +++++--- main.go | 6 ++++-- 5 files changed, 64 insertions(+), 8 deletions(-) create mode 100644 .golangci.yaml diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index eb8d845..6e4a1c3 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -33,7 +33,23 @@ jobs: - name: Setup Go uses: actions/setup-go@v5 with: - go-version: "~1.22" + go-version: "~1.24" - name: Test run: go test ./... -race + + lint: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v5 + + - name: Setup Go + uses: actions/setup-go@v5 + with: + go-version: "~1.24" + + - name: golangci-lint + uses: golangci/golangci-lint-action@v7 + with: + version: v2.1.6 diff --git a/.golangci.yaml b/.golangci.yaml new file mode 100644 index 0000000..bec718e --- /dev/null +++ b/.golangci.yaml @@ -0,0 +1,29 @@ +# See https://golangci-lint.run/usage/configuration/ +version: "2" + +linters: + enable: + - govet + - errcheck + - staticcheck + - unused + - ineffassign + - misspell + - revive + settings: + govet: + enable-all: true + disable: + - fieldalignment + - shadow + misspell: + locale: US + revive: + rules: + - name: exported + arguments: + - "checkPrivateReceivers" + +formatters: + enable: + - goimports diff --git a/logger.go b/logger.go index 8d0775a..0e5c29f 100644 --- a/logger.go +++ b/logger.go @@ -1,3 +1,5 @@ +// Package main implements coder-logstream-kube, a Kubernetes controller +// that streams pod logs to the Coder agent API. package main import ( @@ -87,7 +89,9 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve // If no namespaces are provided, we listen for events in all namespaces. if len(opts.namespaces) == 0 { - reporter.initNamespace("") + if err := reporter.initNamespace(""); err != nil { + return nil, fmt.Errorf("init namespace: %w", err) + } } else { for _, namespace := range opts.namespaces { if err := reporter.initNamespace(namespace); err != nil { @@ -329,6 +333,7 @@ func (p *podEventLogger) sendDelete(token string) { } } +// Close stops the pod event logger and releases all resources. func (p *podEventLogger) Close() error { p.cancelFunc() close(p.stopChan) @@ -486,7 +491,9 @@ func (l *logQueuer) newLogger(ctx context.Context, log agentLog) (agentLoggerLif lifecycle := agentLoggerLifecycle{ scriptLogger: sl, close: func() { - defer arpc.DRPCConn().Close() + defer func() { + _ = arpc.DRPCConn().Close() + }() defer client.SDK.HTTPClient.CloseIdleConnections() // We could be stopping for reasons other than the timeout. If // so, stop the timer. diff --git a/logger_test.go b/logger_test.go index 259be40..49a1836 100644 --- a/logger_test.go +++ b/logger_test.go @@ -989,7 +989,9 @@ func newFakeAgentAPI(t *testing.T) *fakeAgentAPI { } ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageBinary) - defer wsNetConn.Close() + defer func() { + _ = wsNetConn.Close() + }() config := yamux.DefaultConfig() config.LogOutput = io.Discard @@ -1012,7 +1014,7 @@ func newFakeAgentAPI(t *testing.T) *fakeAgentAPI { return fakeAPI } -func newFailingAgentAPI(t *testing.T) *fakeAgentAPI { +func newFailingAgentAPI(_ *testing.T) *fakeAgentAPI { fakeAPI := &fakeAgentAPI{ disconnect: make(chan struct{}), logs: make(chan []*proto.Log), @@ -1020,7 +1022,7 @@ func newFailingAgentAPI(t *testing.T) *fakeAgentAPI { } // Create a server that always returns 401 Unauthorized errors - fakeAPI.server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fakeAPI.server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { http.Error(w, "Unauthorized", http.StatusUnauthorized) })) diff --git a/main.go b/main.go index 4c65be5..ce425f8 100644 --- a/main.go +++ b/main.go @@ -34,7 +34,7 @@ func root() *cobra.Command { cmd := &cobra.Command{ Use: "coder-logstream-kube", Short: "Stream Kubernetes Pod events to the Coder startup logs.", - RunE: func(cmd *cobra.Command, args []string) error { + RunE: func(cmd *cobra.Command, _ []string) error { if coderURL == "" { return fmt.Errorf("--coder-url is required") } @@ -84,7 +84,9 @@ func root() *cobra.Command { if err != nil { return fmt.Errorf("create pod event reporter: %w", err) } - defer reporter.Close() + defer func() { + _ = reporter.Close() + }() select { case err := <-reporter.errChan: return fmt.Errorf("pod event reporter: %w", err) From 8ea3f21c7e0566acaf2ca606352fc471b6854526 Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Mon, 15 Dec 2025 16:32:36 +0100 Subject: [PATCH 5/5] feat: add KinD integration tests (#141) * feat: add KinD integration tests Add integration tests that run against a real Kubernetes cluster using KinD (Kubernetes in Docker). This addresses #58. Changes: - Add integration_test.go with tests for: - Pod events (create/delete) - ReplicaSet events - Multi-namespace support - Label selector filtering - Add .github/workflows/integration.yaml CI workflow that: - Runs lint and unit tests first - Then runs integration tests with KinD - 10 minute timeout - Add scripts/kind-setup.sh for local development - Update README.md with integration test documentation The integration tests use the existing fakeAgentAPI to mock the Coder server, focusing on validating real Kubernetes informer behavior. * fix: use real clock for integration tests The integration tests were using quartz.NewMock(t) which creates a mock clock that doesn't advance automatically. This caused timeouts when waiting for log source registration because the timers in the log queuer never fired. Changes: - Remove mock clock usage from all integration tests - Use real clock (nil) which is the default - Reduce logDebounce to 5s for faster test execution - Increase informer sync wait to 1s for reliability * fix: remove duplicate unit-test job from integration workflow * fix: make integration tests more robust against event ordering The tests now use waitForLogContaining which continuously collects logs until finding the expected message, rather than expecting specific messages in the first batch of logs received. This fixes flaky tests caused by Kubernetes scheduling events arriving before pod lifecycle events. * fix: address PR review feedback - Add safety check to prevent running integration tests against non-KinD clusters (detects localhost/127.0.0.1/kind in host) - Use SHA pinning for GitHub Actions with version comments - Add INTEGRATION_TEST_UNSAFE=1 escape hatch for special cases * chore: add binary to gitignore * feat: add Makefile with lint/shellcheck and fmt/shfmt targets - Add Makefile with build, test, lint, and fmt targets - Fix shellcheck warnings in scripts/helm.sh and scripts/version.sh - Format shell scripts with shfmt * chore: update GitHub Actions dependencies and clean up .gitignore - Update actions/checkout to v6.0.1 - Update actions/setup-go to v6.1.0 - Update helm/kind-action to v1.13.0 - Remove buildcoder-logstream-kube from .gitignore * fix: address remaining PR review comments - Fix safety check logic: check INTEGRATION_TEST_UNSAFE env var before calling t.Fatalf, not after - Fix .gitignore: add coder-logstream-kube and build/ on separate lines * fix: use Go 1.24 in integration workflow to match go.mod --- .github/workflows/integration.yaml | 46 +++ .gitignore | 3 +- Makefile | 87 +++++ README.md | 39 +++ integration_test.go | 514 +++++++++++++++++++++++++++++ scripts/build.sh | 30 +- scripts/helm.sh | 2 +- scripts/kind-setup.sh | 109 ++++++ scripts/version.sh | 2 +- 9 files changed, 814 insertions(+), 18 deletions(-) create mode 100644 .github/workflows/integration.yaml create mode 100644 Makefile create mode 100644 integration_test.go create mode 100755 scripts/kind-setup.sh diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml new file mode 100644 index 0000000..b48f889 --- /dev/null +++ b/.github/workflows/integration.yaml @@ -0,0 +1,46 @@ +name: integration + +on: + push: + branches: + - main + pull_request: + +permissions: + actions: none + checks: none + contents: read + deployments: none + issues: none + packages: none + pull-requests: none + repository-projects: none + security-events: none + statuses: none + +# Cancel in-progress runs for pull requests when developers push +# additional changes +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: ${{ github.event_name == 'pull_request' }} + +jobs: + integration-test: + runs-on: ubuntu-latest + timeout-minutes: 10 + steps: + - name: Checkout + uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 + + - name: Setup Go + uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c # v6.1.0 + with: + go-version: "~1.24" + + - name: Create KinD cluster + uses: helm/kind-action@92086f6be054225fa813e0a4b13787fc9088faab # v1.13.0 + with: + cluster_name: integration-test + + - name: Run integration tests + run: go test -tags=integration -v -timeout=8m ./... diff --git a/.gitignore b/.gitignore index 5278fea..bda1548 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ +coder-logstream-kube coder-logstream-kube-* -build \ No newline at end of file +build/ diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..0f04106 --- /dev/null +++ b/Makefile @@ -0,0 +1,87 @@ +# Colors for output +GREEN := $(shell printf '\033[32m') +RESET := $(shell printf '\033[0m') +BOLD := $(shell printf '\033[1m') + +# Shell source files - use shfmt to find them (respects .editorconfig) +SHELL_SRC_FILES := $(shell shfmt -f .) + +.PHONY: all +all: build + +.PHONY: build +build: + go build ./... + +.PHONY: test +test: + go test ./... -race + +.PHONY: test/integration +test/integration: + go test -tags=integration -v -timeout=8m ./... + +.PHONY: lint +lint: lint/go lint/shellcheck + +.PHONY: lint/go +lint/go: + golangci-lint run --timeout=5m + +.PHONY: lint/shellcheck +lint/shellcheck: $(SHELL_SRC_FILES) + echo "--- shellcheck" + shellcheck --external-sources $(SHELL_SRC_FILES) + +.PHONY: fmt +fmt: fmt/go fmt/shfmt + +.PHONY: fmt/go +fmt/go: + go fmt ./... + +.PHONY: fmt/shfmt +fmt/shfmt: $(SHELL_SRC_FILES) +ifdef FILE + # Format single shell script + if [[ -f "$(FILE)" ]] && [[ "$(FILE)" == *.sh ]]; then \ + echo "$(GREEN)==>$(RESET) $(BOLD)fmt/shfmt$(RESET) $(FILE)"; \ + shfmt -w "$(FILE)"; \ + fi +else + echo "$(GREEN)==>$(RESET) $(BOLD)fmt/shfmt$(RESET)" +# Only do diff check in CI, errors on diff. +ifdef CI + shfmt -d $(SHELL_SRC_FILES) +else + shfmt -w $(SHELL_SRC_FILES) +endif +endif + +.PHONY: clean +clean: + rm -f coder-logstream-kube + +.PHONY: kind/create +kind/create: + ./scripts/kind-setup.sh create + +.PHONY: kind/delete +kind/delete: + ./scripts/kind-setup.sh delete + +.PHONY: help +help: + @echo "Available targets:" + @echo " build - Build the project" + @echo " test - Run unit tests" + @echo " test/integration - Run integration tests (requires KinD cluster)" + @echo " lint - Run all linters" + @echo " lint/go - Run golangci-lint" + @echo " lint/shellcheck - Run shellcheck on shell scripts" + @echo " fmt - Format all code" + @echo " fmt/go - Format Go code" + @echo " fmt/shfmt - Format shell scripts" + @echo " kind/create - Create KinD cluster for integration tests" + @echo " kind/delete - Delete KinD cluster" + @echo " clean - Remove build artifacts" diff --git a/README.md b/README.md index bf55d8d..6a4f300 100644 --- a/README.md +++ b/README.md @@ -64,3 +64,42 @@ Kubernetes provides an [informers](https://pkg.go.dev/k8s.io/client-go/informers - [`SSL_CERT_FILE`](https://go.dev/src/crypto/x509/root_unix.go#L19): Specifies the path to an SSL certificate. - [`SSL_CERT_DIR`](https://go.dev/src/crypto/x509/root_unix.go#L25): Identifies which directory to check for SSL certificate files. + +## Development + +### Running Tests + +Unit tests can be run with: + +```console +go test ./... -race +``` + +### Integration Tests + +Integration tests run against a real Kubernetes cluster using [KinD (Kubernetes in Docker)](https://kind.sigs.k8s.io/). + +**Prerequisites:** +- [Docker](https://docs.docker.com/get-docker/) +- [KinD](https://kind.sigs.k8s.io/docs/user/quick-start/#installation) +- [kubectl](https://kubernetes.io/docs/tasks/tools/) + +**Setup and run:** + +```console +# Create a KinD cluster +./scripts/kind-setup.sh create + +# Run integration tests +go test -tags=integration -v ./... + +# Clean up when done +./scripts/kind-setup.sh delete +``` + +The integration tests validate: +- Pod event streaming with real Kubernetes informers +- ReplicaSet event handling +- Multi-namespace support +- Label selector filtering + diff --git a/integration_test.go b/integration_test.go new file mode 100644 index 0000000..0e14fba --- /dev/null +++ b/integration_test.go @@ -0,0 +1,514 @@ +//go:build integration + +package main + +import ( + "context" + "fmt" + "net/url" + "os" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + + "cdr.dev/slog" + "cdr.dev/slog/sloggers/slogtest" +) + +// getKubeClient creates a Kubernetes client from the default kubeconfig. +// It will use KUBECONFIG env var if set, otherwise ~/.kube/config. +// It also verifies the cluster is a KinD cluster to prevent accidentally +// running tests against production clusters. +func getKubeClient(t *testing.T) kubernetes.Interface { + t.Helper() + + kubeconfig := os.Getenv("KUBECONFIG") + if kubeconfig == "" { + home, err := os.UserHomeDir() + require.NoError(t, err, "failed to get user home dir") + kubeconfig = home + "/.kube/config" + } + + config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + require.NoError(t, err, "failed to build kubeconfig") + + // Safety check: ensure we're connecting to a KinD cluster. + // KinD clusters run on localhost or have "kind" in the host. + // This prevents accidentally running destructive tests against production clusters. + if config.Host != "" && os.Getenv("INTEGRATION_TEST_UNSAFE") != "1" { + isKind := strings.Contains(config.Host, "127.0.0.1") || + strings.Contains(config.Host, "localhost") || + strings.Contains(strings.ToLower(config.Host), "kind") + if !isKind { + t.Fatalf("Safety check failed: integration tests must run against a KinD cluster. "+ + "Current context points to %q. Set KUBECONFIG to a KinD cluster config or "+ + "set INTEGRATION_TEST_UNSAFE=1 to bypass this check.", config.Host) + } + } + + client, err := kubernetes.NewForConfig(config) + require.NoError(t, err, "failed to create kubernetes client") + + return client +} + +// createTestNamespace creates a unique namespace for test isolation. +// It registers cleanup to delete the namespace after the test. +func createTestNamespace(t *testing.T, ctx context.Context, client kubernetes.Interface) string { + t.Helper() + + name := fmt.Sprintf("logstream-test-%d", time.Now().UnixNano()) + + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } + + _, err := client.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{}) + require.NoError(t, err, "failed to create test namespace") + + t.Cleanup(func() { + // Use a fresh context for cleanup in case the test context is cancelled + cleanupCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + err := client.CoreV1().Namespaces().Delete(cleanupCtx, name, metav1.DeleteOptions{}) + if err != nil { + t.Logf("warning: failed to delete test namespace %s: %v", name, err) + } + }) + + return name +} + +// waitForLogContaining waits until a log containing the given substring is received. +// It collects all logs seen and returns them along with whether the target was found. +func waitForLogContaining(t *testing.T, ctx context.Context, api *fakeAgentAPI, timeout time.Duration, substring string) (allLogs []string, found bool) { + t.Helper() + + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + for { + select { + case logs := <-api.logs: + for _, log := range logs { + allLogs = append(allLogs, log.Output) + if strings.Contains(log.Output, substring) { + return allLogs, true + } + } + case <-timeoutCtx.Done(): + return allLogs, false + } + } +} + +// waitForLogSource waits for log source registration with a timeout. +func waitForLogSource(t *testing.T, ctx context.Context, api *fakeAgentAPI, timeout time.Duration) { + t.Helper() + + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + select { + case <-api.logSource: + return + case <-timeoutCtx.Done(): + t.Fatal("timeout waiting for log source registration") + } +} + +func TestIntegration_PodEvents(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + client := getKubeClient(t) + namespace := createTestNamespace(t, ctx, client) + + // Start fake Coder API server + api := newFakeAgentAPI(t) + defer api.server.Close() + + agentURL, err := url.Parse(api.server.URL) + require.NoError(t, err) + + // Create the pod event logger + // Note: We don't set clock, so it uses a real clock for integration tests + reporter, err := newPodEventLogger(ctx, podEventLoggerOptions{ + client: client, + coderURL: agentURL, + namespaces: []string{namespace}, + logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug), + logDebounce: 5 * time.Second, // Use shorter debounce for faster tests + }) + require.NoError(t, err) + defer reporter.Close() + + // Wait a bit for informers to sync + time.Sleep(1 * time.Second) + + // Create a pod with CODER_AGENT_TOKEN + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: namespace, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Image: "busybox:latest", + Command: []string{"sleep", "3600"}, + Env: []corev1.EnvVar{ + { + Name: "CODER_AGENT_TOKEN", + Value: "test-token-integration", + }, + }, + }, + }, + // Use a non-existent node to keep the pod in Pending state + // This avoids needing to actually run the container + NodeSelector: map[string]string{ + "non-existent-label": "non-existent-value", + }, + }, + } + + _, err = client.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{}) + require.NoError(t, err) + + // Wait for log source registration + waitForLogSource(t, ctx, api, 30*time.Second) + + // Wait for the "Created pod" log (may receive other logs first like scheduling warnings) + logs, found := waitForLogContaining(t, ctx, api, 30*time.Second, "Created pod") + require.True(t, found, "expected 'Created pod' log, got: %v", logs) + + // Delete the pod and verify deletion event + err = client.CoreV1().Pods(namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + + // Wait for the "Deleted pod" log + logs, found = waitForLogContaining(t, ctx, api, 30*time.Second, "Deleted pod") + require.True(t, found, "expected 'Deleted pod' log, got: %v", logs) +} + +func TestIntegration_ReplicaSetEvents(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + client := getKubeClient(t) + namespace := createTestNamespace(t, ctx, client) + + // Start fake Coder API server + api := newFakeAgentAPI(t) + defer api.server.Close() + + agentURL, err := url.Parse(api.server.URL) + require.NoError(t, err) + + // Create the pod event logger + // Note: We don't set clock, so it uses a real clock for integration tests + reporter, err := newPodEventLogger(ctx, podEventLoggerOptions{ + client: client, + coderURL: agentURL, + namespaces: []string{namespace}, + logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug), + logDebounce: 5 * time.Second, // Use shorter debounce for faster tests + }) + require.NoError(t, err) + defer reporter.Close() + + // Wait a bit for informers to sync + time.Sleep(1 * time.Second) + + // Create a ReplicaSet with CODER_AGENT_TOKEN + replicas := int32(1) + rs := &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-rs", + Namespace: namespace, + }, + Spec: appsv1.ReplicaSetSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test-rs", + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "test-rs", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Image: "busybox:latest", + Command: []string{"sleep", "3600"}, + Env: []corev1.EnvVar{ + { + Name: "CODER_AGENT_TOKEN", + Value: "test-token-rs-integration", + }, + }, + }, + }, + // Use a non-existent node to keep pods in Pending state + NodeSelector: map[string]string{ + "non-existent-label": "non-existent-value", + }, + }, + }, + }, + } + + _, err = client.AppsV1().ReplicaSets(namespace).Create(ctx, rs, metav1.CreateOptions{}) + require.NoError(t, err) + + // Wait for log source registration + waitForLogSource(t, ctx, api, 30*time.Second) + + // Wait for the "Queued pod from ReplicaSet" log + logs, found := waitForLogContaining(t, ctx, api, 30*time.Second, "Queued pod from ReplicaSet") + require.True(t, found, "expected 'Queued pod from ReplicaSet' log, got: %v", logs) + + // Delete the ReplicaSet + err = client.AppsV1().ReplicaSets(namespace).Delete(ctx, rs.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + + // Wait for the "Deleted ReplicaSet" log + logs, found = waitForLogContaining(t, ctx, api, 30*time.Second, "Deleted ReplicaSet") + require.True(t, found, "expected 'Deleted ReplicaSet' log, got: %v", logs) +} + +func TestIntegration_MultiNamespace(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + client := getKubeClient(t) + + // Create two namespaces + namespace1 := createTestNamespace(t, ctx, client) + namespace2 := createTestNamespace(t, ctx, client) + + // Start fake Coder API server + api := newFakeAgentAPI(t) + defer api.server.Close() + + agentURL, err := url.Parse(api.server.URL) + require.NoError(t, err) + + // Create the pod event logger watching both namespaces + // Note: We don't set clock, so it uses a real clock for integration tests + reporter, err := newPodEventLogger(ctx, podEventLoggerOptions{ + client: client, + coderURL: agentURL, + namespaces: []string{namespace1, namespace2}, + logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug), + logDebounce: 5 * time.Second, // Use shorter debounce for faster tests + }) + require.NoError(t, err) + defer reporter.Close() + + // Wait for informers to sync + time.Sleep(1 * time.Second) + + // Create a pod in namespace1 + pod1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-ns1", + Namespace: namespace1, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Image: "busybox:latest", + Command: []string{"sleep", "3600"}, + Env: []corev1.EnvVar{ + { + Name: "CODER_AGENT_TOKEN", + Value: "test-token-ns1", + }, + }, + }, + }, + NodeSelector: map[string]string{ + "non-existent-label": "non-existent-value", + }, + }, + } + + _, err = client.CoreV1().Pods(namespace1).Create(ctx, pod1, metav1.CreateOptions{}) + require.NoError(t, err) + + // Wait for log source and logs from first pod + waitForLogSource(t, ctx, api, 30*time.Second) + logs, found := waitForLogContaining(t, ctx, api, 30*time.Second, "Created pod") + require.True(t, found, "expected 'Created pod' log for first pod, got: %v", logs) + + // Create a pod in namespace2 + pod2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-ns2", + Namespace: namespace2, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Image: "busybox:latest", + Command: []string{"sleep", "3600"}, + Env: []corev1.EnvVar{ + { + Name: "CODER_AGENT_TOKEN", + Value: "test-token-ns2", + }, + }, + }, + }, + NodeSelector: map[string]string{ + "non-existent-label": "non-existent-value", + }, + }, + } + + _, err = client.CoreV1().Pods(namespace2).Create(ctx, pod2, metav1.CreateOptions{}) + require.NoError(t, err) + + // Wait for log source and logs from second pod + waitForLogSource(t, ctx, api, 30*time.Second) + logs, found = waitForLogContaining(t, ctx, api, 30*time.Second, "Created pod") + require.True(t, found, "expected 'Created pod' log for second pod, got: %v", logs) + + // Both namespaces should have received events + t.Log("Successfully received events from both namespaces") +} + +func TestIntegration_LabelSelector(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + client := getKubeClient(t) + namespace := createTestNamespace(t, ctx, client) + + // Start fake Coder API server + api := newFakeAgentAPI(t) + defer api.server.Close() + + agentURL, err := url.Parse(api.server.URL) + require.NoError(t, err) + + // Create the pod event logger with a label selector + // Note: We don't set clock, so it uses a real clock for integration tests + reporter, err := newPodEventLogger(ctx, podEventLoggerOptions{ + client: client, + coderURL: agentURL, + namespaces: []string{namespace}, + labelSelector: "coder-workspace=true", + logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug), + logDebounce: 5 * time.Second, // Use shorter debounce for faster tests + }) + require.NoError(t, err) + defer reporter.Close() + + // Wait for informers to sync + time.Sleep(1 * time.Second) + + // Create a pod WITHOUT the matching label - should be ignored + podNoLabel := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-no-label", + Namespace: namespace, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Image: "busybox:latest", + Command: []string{"sleep", "3600"}, + Env: []corev1.EnvVar{ + { + Name: "CODER_AGENT_TOKEN", + Value: "test-token-no-label", + }, + }, + }, + }, + NodeSelector: map[string]string{ + "non-existent-label": "non-existent-value", + }, + }, + } + + _, err = client.CoreV1().Pods(namespace).Create(ctx, podNoLabel, metav1.CreateOptions{}) + require.NoError(t, err) + + // Wait a bit to ensure the pod without label is not picked up + time.Sleep(2 * time.Second) + + // Create a pod WITH the matching label - should be tracked + podWithLabel := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-with-label", + Namespace: namespace, + Labels: map[string]string{ + "coder-workspace": "true", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Image: "busybox:latest", + Command: []string{"sleep", "3600"}, + Env: []corev1.EnvVar{ + { + Name: "CODER_AGENT_TOKEN", + Value: "test-token-with-label", + }, + }, + }, + }, + NodeSelector: map[string]string{ + "non-existent-label": "non-existent-value", + }, + }, + } + + _, err = client.CoreV1().Pods(namespace).Create(ctx, podWithLabel, metav1.CreateOptions{}) + require.NoError(t, err) + + // Wait for log source registration - this should only happen for the labeled pod + waitForLogSource(t, ctx, api, 30*time.Second) + + // Wait for logs - look specifically for "Created pod" with the labeled pod name + logs, found := waitForLogContaining(t, ctx, api, 30*time.Second, "Created pod") + require.True(t, found, "expected 'Created pod' log for labeled pod, got: %v", logs) + + // Verify that none of the logs mention the unlabeled pod + for _, log := range logs { + require.NotContains(t, log, "test-pod-no-label", "should not receive logs for unlabeled pod") + } +} diff --git a/scripts/build.sh b/scripts/build.sh index a17eb0b..e6805b7 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -13,8 +13,8 @@ archs=(amd64 arm64 arm) # build for all architectures for arch in "${archs[@]}"; do - echo "Building for $arch" - GOARCH=$arch GOOS=linux CGO_ENABLED=0 go build -ldflags "-s -w" -o ./coder-logstream-kube-"$arch" ../ + echo "Building for $arch" + GOARCH=$arch GOOS=linux CGO_ENABLED=0 go build -ldflags "-s -w" -o ./coder-logstream-kube-"$arch" ../ done # We have to use docker buildx to tag multiple images with @@ -24,10 +24,10 @@ BUILDER_EXISTS=$(docker buildx ls | grep $BUILDER_NAME || true) # If builder doesn't exist, create it if [ -z "$BUILDER_EXISTS" ]; then - echo "Creating dockerx builder $BUILDER_NAME..." - docker buildx create --use --platform=linux/arm64,linux/amd64,linux/arm/v7 --name $BUILDER_NAME + echo "Creating dockerx builder $BUILDER_NAME..." + docker buildx create --use --platform=linux/arm64,linux/amd64,linux/arm/v7 --name $BUILDER_NAME else - echo "Builder $BUILDER_NAME already exists. Using it." + echo "Builder $BUILDER_NAME already exists. Using it." fi # Ensure the builder is bootstrapped and ready to use @@ -35,15 +35,15 @@ docker buildx inspect --bootstrap &>/dev/null # Build and push the image if [ "$CI" = "false" ]; then - docker buildx build --platform linux/"$current" -t coder-logstream-kube --load . + docker buildx build --platform linux/"$current" -t coder-logstream-kube --load . else - VERSION=$(../scripts/version.sh) - BASE=ghcr.io/coder/coder-logstream-kube - IMAGE=$BASE:$VERSION - # if version contains "rc" skip pushing to latest - if [[ $VERSION == *"rc"* ]]; then - docker buildx build --platform linux/amd64,linux/arm64,linux/arm/v7 -t "$IMAGE" --push . - else - docker buildx build --platform linux/amd64,linux/arm64,linux/arm/v7 -t "$IMAGE" -t $BASE:latest --push . - fi + VERSION=$(../scripts/version.sh) + BASE=ghcr.io/coder/coder-logstream-kube + IMAGE=$BASE:$VERSION + # if version contains "rc" skip pushing to latest + if [[ $VERSION == *"rc"* ]]; then + docker buildx build --platform linux/amd64,linux/arm64,linux/arm/v7 -t "$IMAGE" --push . + else + docker buildx build --platform linux/amd64,linux/arm64,linux/arm/v7 -t "$IMAGE" -t $BASE:latest --push . + fi fi diff --git a/scripts/helm.sh b/scripts/helm.sh index fa3bfed..d06e870 100755 --- a/scripts/helm.sh +++ b/scripts/helm.sh @@ -15,7 +15,7 @@ # to the Coder OSS repo. This requires `gsutil` to be installed and configured. set -euo pipefail -cd $(dirname $(dirname "${BASH_SOURCE[0]}")) +cd "$(dirname "$(dirname "${BASH_SOURCE[0]}")")" log() { echo "$*" 1>&2 diff --git a/scripts/kind-setup.sh b/scripts/kind-setup.sh new file mode 100755 index 0000000..b2f602a --- /dev/null +++ b/scripts/kind-setup.sh @@ -0,0 +1,109 @@ +#!/usr/bin/env bash + +# This script sets up a KinD cluster for running integration tests locally. +# Usage: ./scripts/kind-setup.sh [create|delete] + +set -euo pipefail + +CLUSTER_NAME="${KIND_CLUSTER_NAME:-logstream-integration-test}" + +usage() { + echo "Usage: $0 [create|delete|status]" + echo "" + echo "Commands:" + echo " create - Create a KinD cluster for integration tests" + echo " delete - Delete the KinD cluster" + echo " status - Check if the cluster exists and is running" + echo "" + echo "Environment variables:" + echo " KIND_CLUSTER_NAME - Name of the cluster (default: logstream-integration-test)" + exit 1 +} + +check_kind() { + if ! command -v kind &>/dev/null; then + echo "Error: 'kind' is not installed." + echo "Install it from: https://kind.sigs.k8s.io/docs/user/quick-start/#installation" + exit 1 + fi +} + +check_kubectl() { + if ! command -v kubectl &>/dev/null; then + echo "Error: 'kubectl' is not installed." + echo "Install it from: https://kubernetes.io/docs/tasks/tools/" + exit 1 + fi +} + +cluster_exists() { + kind get clusters 2>/dev/null | grep -q "^${CLUSTER_NAME}$" +} + +create_cluster() { + check_kind + check_kubectl + + if cluster_exists; then + echo "Cluster '${CLUSTER_NAME}' already exists." + echo "Use '$0 delete' to remove it first, or '$0 status' to check its status." + exit 0 + fi + + echo "Creating KinD cluster '${CLUSTER_NAME}'..." + kind create cluster --name "${CLUSTER_NAME}" --wait 60s + + echo "" + echo "Cluster created successfully!" + echo "" + echo "To run integration tests:" + echo " go test -tags=integration -v ./..." + echo "" + echo "To delete the cluster when done:" + echo " $0 delete" +} + +delete_cluster() { + check_kind + + if ! cluster_exists; then + echo "Cluster '${CLUSTER_NAME}' does not exist." + exit 0 + fi + + echo "Deleting KinD cluster '${CLUSTER_NAME}'..." + kind delete cluster --name "${CLUSTER_NAME}" + echo "Cluster deleted successfully!" +} + +status_cluster() { + check_kind + + if cluster_exists; then + echo "Cluster '${CLUSTER_NAME}' exists." + echo "" + echo "Cluster info:" + kubectl cluster-info --context "kind-${CLUSTER_NAME}" 2>/dev/null || echo " (unable to get cluster info)" + echo "" + echo "Nodes:" + kubectl get nodes --context "kind-${CLUSTER_NAME}" 2>/dev/null || echo " (unable to get nodes)" + else + echo "Cluster '${CLUSTER_NAME}' does not exist." + echo "Use '$0 create' to create it." + fi +} + +case "${1:-}" in +create) + create_cluster + ;; +delete) + delete_cluster + ;; +status) + status_cluster + ;; +*) + usage + ;; +esac diff --git a/scripts/version.sh b/scripts/version.sh index 72a2f5b..82385be 100755 --- a/scripts/version.sh +++ b/scripts/version.sh @@ -1,7 +1,7 @@ #!/usr/bin/env bash set -euo pipefail -cd $(dirname "${BASH_SOURCE[0]}") +cd "$(dirname "${BASH_SOURCE[0]}")" last_tag="$(git describe --tags --abbrev=0)" version="$last_tag"