-
-
This commit is contained in:
124
datasource/store/duckdb.go
Normal file
124
datasource/store/duckdb.go
Normal file
@@ -0,0 +1,124 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"datasource/types"
|
||||
)
|
||||
|
||||
type Store struct {
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
func NewStore(dbPath string) (*Store, error) {
|
||||
db, err := sql.Open("duckdb", dbPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// keep one connection ready for collector's next tick
|
||||
db.SetMaxIdleConns(1)
|
||||
|
||||
// force sequential access to prevent DuckDB locking errors
|
||||
db.SetMaxOpenConns(1)
|
||||
|
||||
migrationQuery := `
|
||||
CREATE TABLE IF NOT EXISTS metrics (
|
||||
service VARCHAR,
|
||||
timestamp_sec BIGINT,
|
||||
up INTEGER,
|
||||
uptime_sec BIGINT,
|
||||
memory_usage_bytes BIGINT
|
||||
)`
|
||||
|
||||
if _, err := db.Exec(migrationQuery); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Store{db: db}, nil
|
||||
}
|
||||
|
||||
func (s *Store) Close() error {
|
||||
return s.db.Close()
|
||||
}
|
||||
|
||||
func (s *Store) SaveMetric(ctx context.Context, metric *types.Metric) error {
|
||||
query := `
|
||||
INSERT INTO metrics (
|
||||
service,
|
||||
timestamp_sec,
|
||||
up,
|
||||
uptime_sec,
|
||||
memory_usage_bytes
|
||||
) VALUES (?, ?, ?, ?, ?)`
|
||||
|
||||
_, err := s.db.ExecContext(ctx, query,
|
||||
metric.Service,
|
||||
metric.TimestampSec,
|
||||
metric.Up,
|
||||
metric.UptimeSec,
|
||||
metric.MemoryUsageBytes,
|
||||
)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Store) GetMetrics(ctx context.Context, service string, t0, t1 int64) ([]*types.Metric, error) {
|
||||
query := `
|
||||
SELECT
|
||||
timestamp_sec,
|
||||
up,
|
||||
uptime_sec,
|
||||
memory_usage_bytes
|
||||
FROM metrics
|
||||
WHERE
|
||||
service = ? AND
|
||||
timestamp_sec BETWEEN ? AND ?
|
||||
ORDER BY timestamp_sec`
|
||||
|
||||
rows, err := s.db.QueryContext(ctx, query, service, t0, t1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var metrics []*types.Metric
|
||||
|
||||
for rows.Next() {
|
||||
metric := &types.Metric{Service: service}
|
||||
if err := rows.Scan(&metric.TimestampSec, &metric.Up, &metric.UptimeSec, &metric.MemoryUsageBytes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
metrics = append(metrics, metric)
|
||||
}
|
||||
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
func (s *Store) GetAllMetrics(ctx context.Context) ([]*types.Metric, error) {
|
||||
query := "SELECT service, timestamp_sec, up, uptime_sec, memory_usage_bytes FROM metrics"
|
||||
|
||||
rows, err := s.db.QueryContext(ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var metrics []*types.Metric
|
||||
|
||||
for rows.Next() {
|
||||
metric := new(types.Metric)
|
||||
if err := rows.Scan(&metric.Service, &metric.TimestampSec, &metric.Up, &metric.UptimeSec, &metric.MemoryUsageBytes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
metrics = append(metrics, metric)
|
||||
}
|
||||
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
// todo (david)
|
||||
// func (s *Store) DeleteMetrics(ctx context.Context, t0, t1 int64) error {
|
||||
// return nil
|
||||
// }
|
||||
Reference in New Issue
Block a user