
В этом руководстве мы покажем вам, как создать простую программу Go, которая отслеживает папку на наличие новых CSV-файлов, загружает данные в базу данных PostgreSQL и удаляет файл после успешной загрузки данных. Мы будем использовать библиотеку pgx для подключения к базе данных PostgreSQL.
Предпосылки
- Перейти (v1.16 или новее)
- База данных PostgreSQL
- библиотека pgx:
github.com/jackc/pgx/v4
Обзор
Программа, которую мы будем создавать, состоит из следующих компонентов:
Database: структура, обертывающая соединение pgx.CSVLoader: структура, отвечающая за чтение CSV-файлов и загрузку их содержимого в базу данных.- Основная функция, которая инициализирует объекты
DatabaseиCSVLoaderи начинает отслеживать папку на наличие новых CSV-файлов.
Пошаговое руководство по коду
Подключение к базе данных
Начнем с определения структуры Database для переноса соединения pgx:
type Database struct {
db *pgx.Conn
}
Затем мы создаем функцию, которая подключается к базе данных PostgreSQL и возвращает экземпляр Database:
func NewDatabase(connStr string) (*Database, error) {
db, err := pgx.Connect(context.Background(), connStr)
if err != nil {
return nil, err
}
return &Database{db: db}, nil
}
CSVLoader
Структура CSVLoader отвечает за чтение файлов CSV и загрузку их содержимого в базу данных. Мы определяем его следующим образом:
type CSVLoader struct {
db *Database
}
Затем мы создаем функцию для создания экземпляра нового объекта CSVLoader, передавая экземпляр Database:
func NewCSVLoader(db *Database) *CSVLoader {
return &CSVLoader{db: db}
}
Чтение CSV-файлов
Мы создаем метод readCSV для структуры CSVLoader для чтения содержимого CSV-файла и возврата данных в виде фрагмента строковых фрагментов:
func (c *CSVLoader) readCSV(filePath string) ([][]string, error) {
// ...
}
Загрузка данных CSV в базу данных
Метод loadCSVToDatabase структуры CSVLoader отвечает за загрузку данных CSV в базу данных. Он запускает транзакцию, вставляет данные в базу данных и фиксирует транзакцию. Если во время процесса возникает ошибка, он откатывает транзакцию:
func (c *CSVLoader) loadCSVToDatabase(filePath string) bool {
// ...
}
Мониторинг папки для новых файлов CSV
В основной функции мы создаем экземпляры Database и CSVLoader и используем тикер для периодической проверки папки на наличие новых CSV-файлов:
func main() {
// ...
// Monitor the folder at specified intervals.
ticker := time.NewTicker(checkInterval)
for range ticker.C {
log.Printf("Checking for new CSV files in %s\n", folderPath)
csvLoader.checkAndLoadCSVFiles()
}
}
И весь фрагмент кода
package main
import (
"context"
"encoding/csv"
"fmt"
"log"
"os"
"path/filepath"
"time"
"github.com/jackc/pgx/v4"
)
const folderPath = "./csv-data"
const checkInterval = 10 * time.Second
type Database struct {
db *pgx.Conn
}
// NewDatabase creates a new database connection.
func NewDatabase(connStr string) (*Database, error) {
db, err := pgx.Connect(context.Background(), connStr)
if err != nil {
return nil, err
}
return &Database{db: db}, nil
}
// Close closes the database connection.
func (d *Database) Close(ctx context.Context) {
d.db.Close(ctx)
}
// CSVLoader is a wrapper around database.
type CSVLoader struct {
db *Database
}
// NewCSVLoader creates a new CSV loader.
func NewCSVLoader(db *Database) *CSVLoader {
return &CSVLoader{db: db}
}
// checkAndLoadCSVFiles checks and loads csv files.
func (c *CSVLoader) checkAndLoadCSVFiles() {
files, err := os.ReadDir(folderPath)
if err != nil {
log.Fatalf("Error reading directory %s: %v", folderPath, err)
}
for _, file := range files {
if filepath.Ext(file.Name()) == ".csv" {
filePath := filepath.Join(folderPath, file.Name())
fmt.Println("Loading file:", filePath)
if c.loadCSVToDatabase(filePath) {
err := os.Remove(filePath)
if err != nil {
fmt.Printf("Error removing file %s: %v\n", filePath, err)
} else {
fmt.Println("Successfully removed file:", filePath)
}
} else {
fmt.Println("Failed to load file:", filePath)
}
}
}
}
// readCSV reads a CSV file into slice of string slices.
func (c *CSVLoader) readCSV(filePath string) ([][]string, error) {
file, err := os.Open(filePath)
if err != nil {
return nil, fmt.Errorf("error opening file %s: %v", filePath, err)
}
defer file.Close()
reader := csv.NewReader(file)
records, err := reader.ReadAll()
if err != nil {
return nil, fmt.Errorf("error reading CSV data from file %s: %v", filePath, err)
}
return records, nil
}
// loadCSVToDatabase loads a CSV file to database.
func (c *CSVLoader) loadCSVToDatabase(filePath string) bool {
ctx := context.Background()
records, err := c.readCSV(filePath)
// Start a transaction.
tx, err := c.db.db.Begin(ctx)
if err != nil {
fmt.Printf("Error starting transaction: %v\n", err)
return false
}
for _, record := range records {
log.Printf("Record: %v", record)
// Assuming the CSV has two columns: id and name.
id := record[0]
name := record[1]
log.Printf("Inserting id: %s, name: %s\n", id, name)
query := "INSERT INTO mytable (id, name) VALUES ($1, $2)"
_, err := tx.Exec(ctx, query, id, name)
if err != nil {
fmt.Printf("Error inserting data into database: %v\n", err)
err := tx.Rollback(ctx)
if err != nil {
fmt.Printf("Error rolling back transaction: %v\n", err)
return false
}
return false
}
}
// Commit the transaction.
err = tx.Commit(ctx)
if err != nil {
fmt.Printf("Error committing transaction: %v\n", err)
return false
}
log.Printf("Successfully loaded CSV file %s to database\n", filePath)
return true
}
func main() {
// Set up the PostgreSQL connection.
connStr := "postgres://myuser:mypassword@localhost/mydatabase?sslmode=disable"
db, err := NewDatabase(connStr)
if err != nil {
log.Fatalf("Error connecting to database: %v", err)
}
ctx := context.Background()
defer db.Close(ctx)
// Create a new CSVLoader with the database connection.
csvLoader := NewCSVLoader(db)
// Monitor the folder at specified intervals.
ticker := time.NewTicker(checkInterval)
for range ticker.C {
log.Printf("Checking for new CSV files in %s\n", folderPath)
csvLoader.checkAndLoadCSVFiles()
}
}
И docker-compose.yaml, который я использовал для создания базы данных postgres:
version: '3.8'
services:
postgres:
image: postgres:alpine
environment:
POSTGRES_USER: myuser
POSTGRES_PASSWORD: mypassword
POSTGRES_DB: mydatabase
ports:
- "5432:5432"
volumes:
- ./postgres-data:/var/lib/postgresql/data
volumes:
postgres-data:
Запуск программы
- Замените переменную
connStrв основной функции своей собственной строкой подключения к PostgreSQL. - Обновите константу
folderPath, чтобы она указывала на папку, в которой вы хотите отслеживать новые файлы CSV. - Соберите и запустите программу:
go build -o csv-loader && ./csv-loader
Теперь всякий раз, когда в папку добавляется новый CSV-файл, программа загружает его содержимое в базу данных PostgreSQL и удаляет файл при успешной загрузке.
Вот и все! У вас есть приложение go для отслеживания новых CSV-файлов в папке, которые будут загружены в БД и удалены из папки.
Если вам нравится читать статьи на Medium и вы заинтересованы в том, чтобы стать участником, я буду рад поделиться с вами своей реферальной ссылкой!