diff --git a/config/pgwatch-prometheus/metrics.yml b/config/pgwatch-prometheus/metrics.yml index 13d5116bce96ffb2d41d34fa2ebea29c7384446f..6effff1f571616bf992ceee85afec3c53162ba30 100644 --- a/config/pgwatch-prometheus/metrics.yml +++ b/config/pgwatch-prometheus/metrics.yml @@ -1348,6 +1348,7 @@ metrics: ) as rows_data_stats ) as rows_hdr_pdg_stats ) as relation_stats + where (bs*(relpages)/(1024*1024))::float > 1 order by is_na = 0 desc, bloat_pct desc limit 1000 gauges: @@ -1424,8 +1425,7 @@ metrics: ) as s ) as s2 ) as s3 - -- where not is_na - -- and tblpages*((pst).free_percent + (pst).dead_tuple_percent)::float4/100 >= 1 + where (bs*tblpages)/(1024*1024)::float > 1 order by is_na = 0 desc, bloat_pct desc limit 1000 gauges: diff --git a/docker-compose.yml b/docker-compose.yml index 1f19b2492576ca73b4ce47ac2c421331ccfc4788..5a6fd6c0d6e36c1bfb2b296e13bbec0ce1f42d2b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -154,7 +154,7 @@ services: restart: unless-stopped # PostgreSQL Reports Generator - Runs reports after 1 hour postgres-reports: - image: postgresai/reporter:1.0.2 + image: postgresai/reporter:1.0.3 container_name: postgres-reports working_dir: /app volumes: @@ -175,18 +175,34 @@ services: echo 'Waiting 30 minutes before generating reports...' && sleep 1800 && echo 'Starting PostgreSQL reports generation...' && + LAST_FULL_RUN=0 && while true; do - echo 'Extracting cluster and node name from instances.yml...' && - echo 'Generating PostgreSQL reports...' && - if [ -f /app/.pgwatch-config ] && grep -q '^api_key=' /app/.pgwatch-config; then - API_KEY=$$(grep '^api_key=' /app/.pgwatch-config | cut -d'=' -f2-) && - python postgres_reports.py --prometheus-url http://sink-prometheus:9090 --output /app/all_reports_$$(date +%Y%m%d_%H%M%S).json --token $$API_KEY --project postgres-ai-monitoring + CURRENT_TIME=$$(date +%s) && + HOURS_SINCE_FULL=$$(( (CURRENT_TIME - LAST_FULL_RUN) / 3600 )) && + if [ $$HOURS_SINCE_FULL -ge 24 ] || [ $$LAST_FULL_RUN -eq 0 ]; then + echo 'Generating all reports (24h cycle)...' && + if [ -f /app/.pgwatch-config ] && grep -q '^api_key=' /app/.pgwatch-config; then + API_KEY=$$(grep '^api_key=' /app/.pgwatch-config | cut -d'=' -f2-) && + python postgres_reports.py --prometheus-url http://sink-prometheus:9090 --output /app/all_reports_$$(date +%Y%m%d_%H%M%S).json --token $$API_KEY + else + echo 'No API key configured, generating reports without upload...' && + python postgres_reports.py --prometheus-url http://sink-prometheus:9090 --output /app/all_reports_$$(date +%Y%m%d_%H%M%S).json --no-upload + fi && + LAST_FULL_RUN=$$CURRENT_TIME && + echo 'Full reports generated. Next full run in 24 hours.' else - echo 'No API key configured, generating reports without upload...' && - python postgres_reports.py --prometheus-url http://sink-prometheus:9090 --output /app/all_reports_$$(date +%Y%m%d_%H%M%S).json --no-upload + echo 'Generating K004 report only (hourly cycle)...' && + if [ -f /app/.pgwatch-config ] && grep -q '^api_key=' /app/.pgwatch-config; then + API_KEY=$$(grep '^api_key=' /app/.pgwatch-config | cut -d'=' -f2-) && + python postgres_reports.py --prometheus-url http://sink-prometheus:9090 --check-id K004 --output /app/k004_$$(date +%Y%m%d_%H%M%S).json --token $$API_KEY + else + echo 'No API key configured, generating K004 without upload...' && + python postgres_reports.py --prometheus-url http://sink-prometheus:9090 --check-id K004 --output /app/k004_$$(date +%Y%m%d_%H%M%S).json --no-upload + fi && + echo 'K004 report generated.' fi && - echo 'Reports generated. Sleeping for 24 hours...' && - sleep 86400 + echo 'Sleeping for 1 hour...' && + sleep 3600 done " diff --git a/postgres_ai_helm/values.yaml b/postgres_ai_helm/values.yaml index 1ef626a45072a690cb8237fdc093636b01d3a43d..61444cad24af649e174536c82d9096dfbad2c150 100644 --- a/postgres_ai_helm/values.yaml +++ b/postgres_ai_helm/values.yaml @@ -74,7 +74,7 @@ flask: reporter: enabled: true - image: postgresai/reporter:1.0.2 + image: postgresai/reporter:1.0.3 imagePullPolicy: IfNotPresent schedule: "0 0 * * *" # Legacy: If clusterName/nodeName are set here, they override global settings @@ -82,7 +82,6 @@ reporter: # clusterName/nodeName per database in monitoredDatabases. clusterName: "" nodeName: "" - project: postgres-ai-monitoring apiUrl: https://postgres.ai/api/general successfulJobsHistoryLimit: 3 failedJobsHistoryLimit: 3 diff --git a/reporter/postgres_reports.py b/reporter/postgres_reports.py index c00314667a447f9f499d5f7991e11a7f0edd0759..6fee0c700e046ba4d577a39553b441c6a217d3ae 100644 --- a/reporter/postgres_reports.py +++ b/reporter/postgres_reports.py @@ -2,11 +2,11 @@ """ PostgreSQL Reports Generator using PromQL -This script generates reports for specific PostgreSQL check types (A002, A003, A004, A007, D004, F001, F004, F005, H001, H002, H004, K001, K003) +This script generates reports for specific PostgreSQL check types (A002, A003, A004, A007, D004, F001, F004, F005, H001, H002, H004, K001, K003, K004) by querying Prometheus metrics using PromQL queries. """ -__version__ = "1.0.2" +__version__ = "1.0.3" import requests import json @@ -107,6 +107,7 @@ class PostgresReportGenerator: data->>'index_definition' as index_definition, dbname from public.index_definitions + where dbname = %s order by data->>'indexrelname', time desc """ cursor.execute(query, (db_name,)) @@ -157,73 +158,62 @@ class PostgresReportGenerator: def _get_postgres_version_info(self, cluster: str, node_name: str) -> Dict[str, str]: """ - Fetch and parse Postgres version information from pgwatch settings metrics. - - Notes: - - This helper is intentionally defensive: it validates the returned setting_name label - (tests may stub query responses broadly by metric name substring). - - Uses a single query with a regex on setting_name to reduce roundtrips. + Generate A002 Version Information report. + + Args: + cluster: Cluster name + node_name: Node name + + Returns: + Dictionary containing version information """ - query = ( - f'last_over_time(pgwatch_settings_configured{{' - f'cluster="{cluster}", node_name="{node_name}", ' - f'setting_name=~"server_version|server_version_num"}}[3h])' - ) + print(f"Generating A002 Version Information report for cluster='{cluster}', node_name='{node_name}'...") + + # Query PostgreSQL version information using last_over_time to get most recent values + # Use 3h lookback to handle cases where metrics collection might be intermittent + version_queries = { + 'server_version': f'last_over_time(pgwatch_settings_configured{{cluster="{cluster}", node_name="{node_name}", setting_name="server_version"}}[3h])', + 'server_version_num': f'last_over_time(pgwatch_settings_configured{{cluster="{cluster}", node_name="{node_name}", setting_name="server_version_num"}}[3h])', + } - result = self.query_instant(query) - version_str = None - version_num = None - - if result.get("status") == "success": - if result.get("data", {}).get("result"): - for item in result["data"]["result"]: - metric = item.get("metric", {}) or {} - setting_name = metric.get("setting_name", "") - setting_value = metric.get("setting_value", "") - if setting_name == "server_version" and setting_value: - version_str = setting_value - elif setting_name == "server_version_num" and setting_value: - version_num = setting_value + version_data = {} + for metric_name, query in version_queries.items(): + result = self.query_instant(query) + if result.get('status') == 'success' and result.get('data', {}).get('result'): + if len(result['data']['result']) > 0: + # Extract setting_value from the metric labels + latest_value = result['data']['result'][0]['metric'].get('setting_value', '') + if latest_value: + version_data[metric_name] = latest_value + else: + print(f"Warning: A002 - No data for {metric_name} (cluster={cluster}, node_name={node_name})") else: - print(f"Warning: No version data found (cluster={cluster}, node_name={node_name})") - else: - print(f"Warning: Version query failed (cluster={cluster}, node_name={node_name}): status={result.get('status')}") + print(f"Warning: A002 - Query failed for {metric_name}: status={result.get('status')}") - server_version = version_str or "Unknown" - version_info: Dict[str, str] = { + # Format the version data + server_version = version_data.get('server_version', 'Unknown') + version_info = { "version": server_version, - "server_version_num": version_num or "Unknown", - "server_major_ver": "Unknown", - "server_minor_ver": "Unknown", + "server_version_num": version_data.get('server_version_num', 'Unknown'), } - - if server_version != "Unknown": - # Handle both formats: - # - "15.3" - # - "15.3 (Ubuntu 15.3-1.pgdg20.04+1)" - version_parts = server_version.split()[0].split(".") - if len(version_parts) >= 1 and version_parts[0]: + + # Parse major and minor version if we have a valid version string + if server_version and server_version != 'Unknown': + # Handle both formats: "14.5" and "14.5 (Ubuntu 14.5-1.pgdg20.04+1)" + version_parts = server_version.split()[0].split('.') + if len(version_parts) >= 1: version_info["server_major_ver"] = version_parts[0] if len(version_parts) >= 2: version_info["server_minor_ver"] = ".".join(version_parts[1:]) else: - version_info["server_minor_ver"] = "0" - - return version_info + version_info["server_minor_ver"] = '0' + else: + version_info["server_major_ver"] = 'Unknown' + version_info["server_minor_ver"] = 'Unknown' + else: + version_info["server_major_ver"] = 'Unknown' + version_info["server_minor_ver"] = 'Unknown' - def generate_a002_version_report(self, cluster: str = "local", node_name: str = "node-01") -> Dict[str, Any]: - """ - Generate A002 Version Information report. - - Args: - cluster: Cluster name - node_name: Node name - - Returns: - Dictionary containing version information - """ - print(f"Generating A002 Version Information report for cluster='{cluster}', node_name='{node_name}'...") - version_info = self._get_postgres_version_info(cluster, node_name) return self.format_report_data("A002", {"version": version_info}, node_name) def generate_a003_settings_report(self, cluster: str = "local", node_name: str = "node-01") -> Dict[str, Any]: @@ -248,18 +238,18 @@ class PostgresReportGenerator: if result.get('status') == 'success' and result.get('data', {}).get('result'): for item in result['data']['result']: # Extract setting name from labels - setting_name = item['metric'].get('setting_name', '') - setting_value = item['metric'].get('setting_value', '') + setting_name = item['metric'].get('tag_setting_name', '') + setting_value = item['metric'].get('tag_setting_value', '') # Skip if we don't have a setting name if not setting_name: continue # Get additional metadata from labels - category = item['metric'].get('category', 'Other') - unit = item['metric'].get('unit', '') - context = item['metric'].get('context', '') - vartype = item['metric'].get('vartype', '') + category = item['metric'].get('tag_category', 'Other') + unit = item['metric'].get('tag_unit', '') + context = item['metric'].get('tag_context', '') + vartype = item['metric'].get('tag_vartype', '') settings_data[setting_name] = { "setting": setting_value, @@ -360,10 +350,10 @@ class PostgresReportGenerator: if result.get('status') == 'success' and result.get('data', {}).get('result'): for item in result['data']['result']: # Extract setting information from labels - setting_name = item['metric'].get('setting_name', '') - value = item['metric'].get('setting_value', '') - unit = item['metric'].get('unit', '') - category = item['metric'].get('category', 'Other') + setting_name = item['metric'].get('tag_setting_name', '') + value = item['metric'].get('tag_setting_value', '') + unit = item['metric'].get('tag_unit', '') + category = item['metric'].get('tag_category', 'Other') # Skip if we don't have a setting name if not setting_name: @@ -733,7 +723,7 @@ class PostgresReportGenerator: pgstat_data = {} if result.get('status') == 'success' and result.get('data', {}).get('result'): for item in result['data']['result']: - setting_name = item['metric'].get('setting_name', '') + setting_name = item['metric'].get('tag_setting_name', '') # Skip if no setting name if not setting_name: @@ -741,11 +731,11 @@ class PostgresReportGenerator: # Filter for pg_stat_statements and related settings if setting_name in pgstat_settings: - setting_value = item['metric'].get('setting_value', '') - category = item['metric'].get('category', 'Statistics') - unit = item['metric'].get('unit', '') - context = item['metric'].get('context', '') - vartype = item['metric'].get('vartype', '') + setting_value = item['metric'].get('tag_setting_value', '') + category = item['metric'].get('tag_category', 'Statistics') + unit = item['metric'].get('tag_unit', '') + context = item['metric'].get('tag_context', '') + vartype = item['metric'].get('tag_vartype', '') pgstat_data[setting_name] = { "setting": setting_value, @@ -923,15 +913,15 @@ class PostgresReportGenerator: autovacuum_data = {} if result.get('status') == 'success' and result.get('data', {}).get('result'): for item in result['data']['result']: - setting_name = item['metric'].get('setting_name', 'unknown') + setting_name = item['metric'].get('tag_setting_name', 'unknown') # Filter for autovacuum and vacuum settings if setting_name in autovacuum_settings: - setting_value = item['metric'].get('setting_value', '') - category = item['metric'].get('category', 'Autovacuum') - unit = item['metric'].get('unit', '') - context = item['metric'].get('context', '') - vartype = item['metric'].get('vartype', '') + setting_value = item['metric'].get('tag_setting_value', '') + category = item['metric'].get('tag_category', 'Autovacuum') + unit = item['metric'].get('tag_unit', '') + context = item['metric'].get('tag_context', '') + vartype = item['metric'].get('tag_vartype', '') autovacuum_data[setting_name] = { "setting": setting_value, @@ -973,12 +963,13 @@ class PostgresReportGenerator: bloated_indexes_by_db = {} for db_name in databases: - # Query btree bloat using multiple metrics for each database with last_over_time [1d] + # Query btree bloat using multiple metrics for each database bloat_queries = { 'extra_size': f'last_over_time(pgwatch_pg_btree_bloat_extra_size{{cluster="{cluster}", node_name="{node_name}", datname="{db_name}"}}[3h])', 'extra_pct': f'last_over_time(pgwatch_pg_btree_bloat_extra_pct{{cluster="{cluster}", node_name="{node_name}", datname="{db_name}"}}[3h])', 'bloat_size': f'last_over_time(pgwatch_pg_btree_bloat_bloat_size{{cluster="{cluster}", node_name="{node_name}", datname="{db_name}"}}[3h])', 'bloat_pct': f'last_over_time(pgwatch_pg_btree_bloat_bloat_pct{{cluster="{cluster}", node_name="{node_name}", datname="{db_name}"}}[3h])', + 'fillfactor': f'last_over_time(pgwatch_pg_btree_bloat_fillfactor{{cluster="{cluster}", node_name="{node_name}", datname="{db_name}"}}[3h])', } bloated_indexes = {} @@ -1002,16 +993,43 @@ class PostgresReportGenerator: "extra_pct": 0, "bloat_size": 0, "bloat_pct": 0, + "fillfactor": None, } value = float(item['value'][1]) if item.get('value') else 0 bloated_indexes[index_key][metric_type] = value + + # Query index_size from pg_class (relation_size_bytes for indexes) + # relkind="105" is ASCII code for 'i' (index) + index_size_query = f'last_over_time(pgwatch_pg_class_relation_size_bytes{{cluster="{cluster}", node_name="{node_name}", datname="{db_name}", relkind="105"}}[3h])' + index_size_result = self.query_instant(index_size_query) + index_sizes = {} + if index_size_result.get('status') == 'success' and index_size_result.get('data', {}).get('result'): + for item in index_size_result['data']['result']: + schema_name = item['metric'].get('schemaname', 'unknown') + index_name = item['metric'].get('relname', 'unknown') + # For indexes, we need to map back to the table - we'll store by index name and match later + index_size_bytes = float(item['value'][1]) if item.get('value') else 0 + # Store with schema.index_name as key to match with bloated_indexes + index_sizes[f"{schema_name}.{index_name}"] = index_size_bytes + + # Add index_size to bloated_indexes + for index_key in bloated_indexes.keys(): + schema_name = bloated_indexes[index_key]['schema_name'] + index_name = bloated_indexes[index_key]['index_name'] + size_key = f"{schema_name}.{index_name}" + bloated_indexes[index_key]['index_size'] = index_sizes.get(size_key, 0) # Convert to list and add pretty formatting bloated_indexes_list = [] total_bloat_size = 0 for index_data in bloated_indexes.values(): + # index_size already in bytes from pg_class + index_size_bytes = index_data['index_size'] + + # Add formatted fields + index_data['index_size_pretty'] = self.format_bytes(index_size_bytes) index_data['extra_size_pretty'] = self.format_bytes(index_data['extra_size']) index_data['bloat_size_pretty'] = self.format_bytes(index_data['bloat_size']) @@ -1086,7 +1104,7 @@ class PostgresReportGenerator: memory_data = {} if result.get('status') == 'success' and result.get('data', {}).get('result'): for item in result['data']['result']: - setting_name = item['metric'].get('setting_name', '') + setting_name = item['metric'].get('tag_setting_name', '') # Skip if no setting name if not setting_name: @@ -1094,11 +1112,11 @@ class PostgresReportGenerator: # Filter for memory-related settings if setting_name in memory_settings: - setting_value = item['metric'].get('setting_value', '') - category = item['metric'].get('category', 'Memory') - unit = item['metric'].get('unit', '') - context = item['metric'].get('context', '') - vartype = item['metric'].get('vartype', '') + setting_value = item['metric'].get('tag_setting_value', '') + category = item['metric'].get('tag_category', 'Memory') + unit = item['metric'].get('tag_unit', '') + context = item['metric'].get('tag_context', '') + vartype = item['metric'].get('tag_vartype', '') memory_data[setting_name] = { "setting": setting_value, @@ -1246,13 +1264,12 @@ class PostgresReportGenerator: bloated_tables_by_db = {} for db_name in databases: # Query table bloat using multiple metrics for each database - # Try with 10h window first, then fall back to instant query bloat_queries = { - 'real_size': f'last_over_time(pgwatch_pg_table_bloat_real_size{{cluster="{cluster}", node_name="{node_name}", datname="{db_name}"}}[3h])', 'extra_size': f'last_over_time(pgwatch_pg_table_bloat_extra_size{{cluster="{cluster}", node_name="{node_name}", datname="{db_name}"}}[3h])', 'extra_pct': f'last_over_time(pgwatch_pg_table_bloat_extra_pct{{cluster="{cluster}", node_name="{node_name}", datname="{db_name}"}}[3h])', 'bloat_size': f'last_over_time(pgwatch_pg_table_bloat_bloat_size{{cluster="{cluster}", node_name="{node_name}", datname="{db_name}"}}[3h])', 'bloat_pct': f'last_over_time(pgwatch_pg_table_bloat_bloat_pct{{cluster="{cluster}", node_name="{node_name}", datname="{db_name}"}}[3h])', + 'fillfactor': f'last_over_time(pgwatch_pg_table_bloat_fillfactor{{cluster="{cluster}", node_name="{node_name}", datname="{db_name}"}}[3h])', } bloated_tables = {} @@ -1269,27 +1286,69 @@ class PostgresReportGenerator: bloated_tables[table_key] = { "schema_name": schema_name, "table_name": table_name, - "real_size": 0, "extra_size": 0, "extra_pct": 0, "bloat_size": 0, "bloat_pct": 0, + "fillfactor": None, } value = float(item['value'][1]) if item.get('value') else 0 bloated_tables[table_key][metric_type] = value else: - if metric_type == 'real_size': # Only log once per database + if metric_type == 'extra_size': # Only log once per database print(f"Warning: F004 - No bloat data for database {db_name}, metric {metric_type}") + + # Query real_size from pg_class (total_relation_size_bytes for tables) + # relkind="114" is ASCII code for 'r' (regular table) + real_size_query = f'last_over_time(pgwatch_pg_class_total_relation_size_bytes{{cluster="{cluster}", node_name="{node_name}", datname="{db_name}", relkind="114"}}[3h])' + real_size_result = self.query_instant(real_size_query) + table_real_sizes = {} + if real_size_result.get('status') == 'success' and real_size_result.get('data', {}).get('result'): + for item in real_size_result['data']['result']: + schema_name = item['metric'].get('schemaname', 'unknown') + table_name = item['metric'].get('relname', 'unknown') + table_key = f"{schema_name}.{table_name}" + real_size_bytes = float(item['value'][1]) if item.get('value') else 0 + table_real_sizes[table_key] = real_size_bytes + + # Add real_size to bloated_tables + for table_key in bloated_tables.keys(): + bloated_tables[table_key]['real_size'] = table_real_sizes.get(table_key, 0) + + # Query last_vacuum from pg_stat_all_tables + last_vacuum_query = f'last_over_time(pgwatch_pg_stat_all_tables_last_vacuum{{cluster="{cluster}", node_name="{node_name}", datname="{db_name}"}}[3h])' + last_vacuum_result = self.query_instant(last_vacuum_query) + last_vacuum_data = {} + if last_vacuum_result.get('status') == 'success' and last_vacuum_result.get('data', {}).get('result'): + for item in last_vacuum_result['data']['result']: + schema_name = item['metric'].get('schemaname', 'unknown') + table_name = item['metric'].get('relname', 'unknown') + table_key = f"{schema_name}.{table_name}" + # Value is epoch timestamp + timestamp = float(item['value'][1]) if item.get('value') else 0 + if timestamp > 0: + from datetime import datetime + last_vacuum_data[table_key] = datetime.fromtimestamp(timestamp).isoformat() # Convert to list and add pretty formatting bloated_tables_list = [] total_bloat_size = 0 - for table_data in bloated_tables.values(): - table_data['real_size_pretty'] = self.format_bytes(table_data['real_size']) + for table_key, table_data in bloated_tables.items(): + # real_size already in bytes from pg_total_relation_size + real_size_bytes = table_data['real_size'] + + # Calculate live_data_size + live_data_size = max(0, real_size_bytes - table_data['bloat_size']) + + # Add formatted fields + table_data['real_size_pretty'] = self.format_bytes(real_size_bytes) table_data['extra_size_pretty'] = self.format_bytes(table_data['extra_size']) table_data['bloat_size_pretty'] = self.format_bytes(table_data['bloat_size']) + table_data['live_data_size'] = live_data_size + table_data['live_data_size_pretty'] = self.format_bytes(live_data_size) + table_data['last_vacuum'] = last_vacuum_data.get(table_key) bloated_tables_list.append(table_data) total_bloat_size += table_data['bloat_size'] @@ -1440,6 +1499,177 @@ class PostgresReportGenerator: postgres_version=self._get_postgres_version_info(cluster, node_name), ) + def generate_k004_hourly_cumulative_metrics_report( + self, + cluster: str = "local", + node_name: str = "node-01", + time_range_hours: int = 24, + topn_limit: int = 100, + ) -> Dict[str, Any]: + """ + Generate K004 hourly snapshots report. + + This report is intended to be executed hourly and provides: + - N+1 hourly snapshots of pgss data with TopN vectors (limited to topn_limit entries): + * topN by total exec time + * topN by total plan time + * topN by temp bytes written + * topN by WAL generation + * topN by calls + * topN by shared blks read + - N hourly snapshots of TopN wait_event_type:wait_event--queryid pairs + - N hourly pgss mean time (exec+plan) + """ + print(f"Generating K004 hourly snapshots report for {time_range_hours} hours...") + + if time_range_hours < 1: + time_range_hours = 1 + + # Align to hour boundaries + end_time = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0) + start_time = end_time - timedelta(hours=time_range_hours) + start_eval = end_time - timedelta(hours=time_range_hours - 1) + + # Build hour bucket timestamps + bucket_ends: List[datetime] = [] + for i in range(time_range_hours): + bucket_ends.append(start_eval + timedelta(hours=i)) + + def _to_epoch(dt: datetime) -> int: + return int(dt.timestamp()) + + # ------------------------- + # pgss hourly snapshots + # ------------------------- + pgss_metrics_config = [ + ("exec_time", "pgwatch_pg_stat_statements_exec_time_total"), + ("plan_time", "pgwatch_pg_stat_statements_plan_time_total"), + ("temp_bytes_written", "pgwatch_pg_stat_statements_temp_bytes_written"), + ("wal_bytes", "pgwatch_pg_stat_statements_wal_bytes"), + ("calls", "pgwatch_pg_stat_statements_calls"), + ("shared_blks_read", "pgwatch_pg_stat_statements_shared_bytes_read_total"), + ] + + pgss_snapshots = [] + for bucket_end in bucket_ends: + bucket_start = bucket_end - timedelta(hours=1) + snapshot = { + "hour_start": bucket_start.isoformat(), + "hour_end": bucket_end.isoformat(), + "topn_vectors": {}, + "mean_time_ms": 0.0, + } + + # For each metric, get top N queryids + for metric_name, prom_metric in pgss_metrics_config: + query = f'topk({topn_limit}, sum by (queryid) (increase({prom_metric}{{cluster="{cluster}", node_name="{node_name}"}}[1h]) @ {_to_epoch(bucket_end)}))' + res = self.query_instant(query) + + topn_list = [] + if res.get("status") == "success" and res.get("data", {}).get("result"): + for item in res["data"]["result"]: + queryid = item.get("metric", {}).get("queryid", "") + try: + value = float(item.get("value", [None, 0.0])[1]) + # Convert bytes to blocks for shared_blks_read + if metric_name == "shared_blks_read": + value = value / 8192.0 + topn_list.append({"queryid": queryid, "value": value}) + except (TypeError, ValueError, IndexError): + continue + + snapshot["topn_vectors"][f"topn_by_{metric_name}"] = topn_list + + # Calculate mean time (exec + plan) for this hour + exec_query = f'sum(increase(pgwatch_pg_stat_statements_exec_time_total{{cluster="{cluster}", node_name="{node_name}"}}[1h]) @ {_to_epoch(bucket_end)})' + plan_query = f'sum(increase(pgwatch_pg_stat_statements_plan_time_total{{cluster="{cluster}", node_name="{node_name}"}}[1h]) @ {_to_epoch(bucket_end)})' + calls_query = f'sum(increase(pgwatch_pg_stat_statements_calls{{cluster="{cluster}", node_name="{node_name}"}}[1h]) @ {_to_epoch(bucket_end)})' + + total_exec = 0.0 + total_plan = 0.0 + total_calls = 0.0 + + for q, var in [(exec_query, "exec"), (plan_query, "plan"), (calls_query, "calls")]: + res = self.query_instant(q) + if res.get("status") == "success" and res.get("data", {}).get("result"): + try: + val = float(res["data"]["result"][0]["value"][1]) + if var == "exec": + total_exec = val + elif var == "plan": + total_plan = val + else: + total_calls = val + except (KeyError, IndexError, TypeError, ValueError): + pass + + if total_calls > 0: + snapshot["mean_time_ms"] = (total_exec + total_plan) / total_calls + + pgss_snapshots.append(snapshot) + + # ------------------------- + # Wait events hourly snapshots + # ------------------------- + wait_snapshots = [] + for bucket_end in bucket_ends: + bucket_start = bucket_end - timedelta(hours=1) + snapshot = { + "hour_start": bucket_start.isoformat(), + "hour_end": bucket_end.isoformat(), + "topn_wait_events": [], + } + + # Get top wait_event_type:wait_event--queryid pairs + # Note: wait_events metric uses 'query_id' label (Postgres 14+) + query = f'topk({topn_limit}, sum by (query_id, wait_event_type, wait_event) (avg_over_time(pgwatch_wait_events_total{{cluster="{cluster}", node_name="{node_name}", query_id!=""}}[1h]) @ {_to_epoch(bucket_end)}))' + res = self.query_instant(query) + + if res.get("status") == "success" and res.get("data", {}).get("result"): + for item in res["data"]["result"]: + metric = item.get("metric", {}) + query_id = metric.get("query_id", "") + wait_event_type = metric.get("wait_event_type", "") + wait_event = metric.get("wait_event", "") + + if not query_id: # Skip server processes + continue + + try: + count = float(item.get("value", [None, 0.0])[1]) + snapshot["topn_wait_events"].append({ + "queryid": query_id, + "wait_event_type": wait_event_type, + "wait_event": wait_event, + "count": count, + }) + except (TypeError, ValueError, IndexError): + continue + + wait_snapshots.append(snapshot) + + return self.format_report_data( + "K004", + { + "summary": { + "time_range_hours": time_range_hours, + "start_time": start_time.isoformat(), + "end_time": end_time.isoformat(), + "topn_limit": topn_limit, + "notes": [ + f"This report provides {time_range_hours} hourly snapshots of pgss and wait events data.", + "pgss snapshots include TopN queries by multiple metrics (exec_time, plan_time, temp_bytes, wal_bytes, calls, shared_blks_read).", + "Wait events snapshots show TopN wait_event_type:wait_event--queryid combinations.", + "Mean time represents average (exec_time + plan_time) per call for each hour.", + ], + }, + "pgss_hourly_snapshots": pgss_snapshots, + "wait_events_hourly_snapshots": wait_snapshots, + }, + node_name, + postgres_version=self._get_postgres_version_info(cluster, node_name), + ) + def _get_pgss_metrics_data(self, cluster: str, node_name: str, start_time: datetime, end_time: datetime) -> List[ Dict[str, Any]]: """ @@ -1677,11 +1907,11 @@ class PostgresReportGenerator: return metrics_dict def format_bytes(self, bytes_value: float) -> str: - """Format bytes value for human readable display.""" + """Format bytes value for human readable display using binary units.""" if bytes_value == 0: return "0 B" - units = ['B', 'KB', 'MB', 'GB', 'TB'] + units = ['B', 'KiB', 'MiB', 'GiB', 'TiB'] unit_index = 0 value = float(bytes_value) @@ -1815,6 +2045,7 @@ class PostgresReportGenerator: "K001": "Globally aggregated query metrics", "K002": "Workload type", "K003": "Top-50 queries by total_time", + "K004": "Hourly snapshots with TopN queries and wait events", "L001": "Table sizes", "L002": "Data types being used", "L003": "Integer out-of-range risks in PKs", @@ -2038,6 +2269,7 @@ class PostgresReportGenerator: ('H004', self.generate_h004_redundant_indexes_report), ('K001', self.generate_k001_query_calls_report), ('K003', self.generate_k003_top_queries_report), + ('K004', self.generate_k004_hourly_cumulative_metrics_report), ] for check_id, report_func in report_types: @@ -2355,7 +2587,7 @@ def main(): help='Disable combining primary and replica reports into single report') parser.add_argument('--check-id', choices=['A002', 'A003', 'A004', 'A007', 'D004', 'F001', 'F004', 'F005', 'G001', 'H001', 'H002', - 'H004', 'K001', 'K003', 'ALL'], + 'H004', 'K001', 'K003', 'K004', 'ALL'], help='Specific check ID to generate (default: ALL)') parser.add_argument('--output', default='-', help='Output file (default: stdout)') @@ -2472,6 +2704,8 @@ def main(): report = generator.generate_k001_query_calls_report(cluster, args.node_name) elif args.check_id == 'K003': report = generator.generate_k003_top_queries_report(cluster, args.node_name) + elif args.check_id == 'K004': + report = generator.generate_k004_hourly_cumulative_metrics_report(cluster, args.node_name) output_filename = f"{cluster}_{args.check_id}.json" if len(clusters_to_process) > 1 else args.output