В этом руководстве мы покажем вам, как создать простую программу Go, которая отслеживает папку на наличие новых CSV-файлов, загружает данные в базу данных PostgreSQL и удаляет файл после успешной загрузки данных. Мы будем использовать библиотеку pgx для подключения к базе данных PostgreSQL.

Предпосылки

  • Перейти (v1.16 или новее)
  • База данных PostgreSQL
  • библиотека pgx: github.com/jackc/pgx/v4

Обзор

Программа, которую мы будем создавать, состоит из следующих компонентов:

  1. Database: структура, обертывающая соединение pgx.
  2. CSVLoader: структура, отвечающая за чтение CSV-файлов и загрузку их содержимого в базу данных.
  3. Основная функция, которая инициализирует объекты Database и CSVLoader и начинает отслеживать папку на наличие новых CSV-файлов.

Пошаговое руководство по коду

Подключение к базе данных

Начнем с определения структуры Database для переноса соединения pgx:

type Database struct {
 db *pgx.Conn
}

Затем мы создаем функцию, которая подключается к базе данных PostgreSQL и возвращает экземпляр Database:

func NewDatabase(connStr string) (*Database, error) {
 db, err := pgx.Connect(context.Background(), connStr)
 if err != nil {
  return nil, err
 }
 return &Database{db: db}, nil
}

CSVLoader

Структура CSVLoader отвечает за чтение файлов CSV и загрузку их содержимого в базу данных. Мы определяем его следующим образом:

type CSVLoader struct {
 db *Database
}

Затем мы создаем функцию для создания экземпляра нового объекта CSVLoader, передавая экземпляр Database:

func NewCSVLoader(db *Database) *CSVLoader {
 return &CSVLoader{db: db}
}

Чтение CSV-файлов

Мы создаем метод readCSV для структуры CSVLoader для чтения содержимого CSV-файла и возврата данных в виде фрагмента строковых фрагментов:

func (c *CSVLoader) readCSV(filePath string) ([][]string, error) {
 // ...
}

Загрузка данных CSV в базу данных

Метод loadCSVToDatabase структуры CSVLoader отвечает за загрузку данных CSV в базу данных. Он запускает транзакцию, вставляет данные в базу данных и фиксирует транзакцию. Если во время процесса возникает ошибка, он откатывает транзакцию:

func (c *CSVLoader) loadCSVToDatabase(filePath string) bool {
 // ...
}

Мониторинг папки для новых файлов CSV

В основной функции мы создаем экземпляры Database и CSVLoader и используем тикер для периодической проверки папки на наличие новых CSV-файлов:

func main() {
 // ...

 // Monitor the folder at specified intervals.
 ticker := time.NewTicker(checkInterval)
 for range ticker.C {
  log.Printf("Checking for new CSV files in %s\n", folderPath)
  csvLoader.checkAndLoadCSVFiles()
 }
}

И весь фрагмент кода

package main

import (
 "context"
 "encoding/csv"
 "fmt"
 "log"
 "os"
 "path/filepath"
 "time"

 "github.com/jackc/pgx/v4"
)

const folderPath = "./csv-data"
const checkInterval = 10 * time.Second

type Database struct {
 db *pgx.Conn
}

// NewDatabase creates a new database connection.
func NewDatabase(connStr string) (*Database, error) {
 db, err := pgx.Connect(context.Background(), connStr)
 if err != nil {
  return nil, err
 }
 return &Database{db: db}, nil
}

// Close closes the database connection.
func (d *Database) Close(ctx context.Context) {
 d.db.Close(ctx)
}

// CSVLoader is a wrapper around database.
type CSVLoader struct {
 db *Database
}

// NewCSVLoader creates a new CSV loader.
func NewCSVLoader(db *Database) *CSVLoader {
 return &CSVLoader{db: db}
}

// checkAndLoadCSVFiles checks and loads csv files.
func (c *CSVLoader) checkAndLoadCSVFiles() {
 files, err := os.ReadDir(folderPath)
 if err != nil {
  log.Fatalf("Error reading directory %s: %v", folderPath, err)
 }

 for _, file := range files {
  if filepath.Ext(file.Name()) == ".csv" {
   filePath := filepath.Join(folderPath, file.Name())
   fmt.Println("Loading file:", filePath)

   if c.loadCSVToDatabase(filePath) {
    err := os.Remove(filePath)
    if err != nil {
     fmt.Printf("Error removing file %s: %v\n", filePath, err)
    } else {
     fmt.Println("Successfully removed file:", filePath)
    }
   } else {
    fmt.Println("Failed to load file:", filePath)
   }
  }
 }
}

// readCSV reads a CSV file into slice of string slices.
func (c *CSVLoader) readCSV(filePath string) ([][]string, error) {
 file, err := os.Open(filePath)
 if err != nil {
  return nil, fmt.Errorf("error opening file %s: %v", filePath, err)
 }
 defer file.Close()

 reader := csv.NewReader(file)
 records, err := reader.ReadAll()
 if err != nil {
  return nil, fmt.Errorf("error reading CSV data from file %s: %v", filePath, err)
 }

 return records, nil
}

// loadCSVToDatabase loads a CSV file to database.
func (c *CSVLoader) loadCSVToDatabase(filePath string) bool {
 ctx := context.Background()
 records, err := c.readCSV(filePath)

 // Start a transaction.
 tx, err := c.db.db.Begin(ctx)
 if err != nil {
  fmt.Printf("Error starting transaction: %v\n", err)
  return false
 }

 for _, record := range records {
  log.Printf("Record: %v", record)
  // Assuming the CSV has two columns: id and name.
  id := record[0]
  name := record[1]
  log.Printf("Inserting id: %s, name: %s\n", id, name)

  query := "INSERT INTO mytable (id, name) VALUES ($1, $2)"
  _, err := tx.Exec(ctx, query, id, name)
  if err != nil {
   fmt.Printf("Error inserting data into database: %v\n", err)
   err := tx.Rollback(ctx)
   if err != nil {
    fmt.Printf("Error rolling back transaction: %v\n", err)
    return false
   }
   return false
  }
 }

 // Commit the transaction.
 err = tx.Commit(ctx)
 if err != nil {
  fmt.Printf("Error committing transaction: %v\n", err)
  return false
 }
 log.Printf("Successfully loaded CSV file %s to database\n", filePath)

 return true
}

func main() {
 // Set up the PostgreSQL connection.
 connStr := "postgres://myuser:mypassword@localhost/mydatabase?sslmode=disable"
 db, err := NewDatabase(connStr)
 if err != nil {
  log.Fatalf("Error connecting to database: %v", err)
 }

 ctx := context.Background()
 defer db.Close(ctx)

 // Create a new CSVLoader with the database connection.
 csvLoader := NewCSVLoader(db)

 // Monitor the folder at specified intervals.
 ticker := time.NewTicker(checkInterval)
 for range ticker.C {
  log.Printf("Checking for new CSV files in %s\n", folderPath)
  csvLoader.checkAndLoadCSVFiles()
 }
}

И docker-compose.yaml, который я использовал для создания базы данных postgres:

version: '3.8'

services:
  postgres:
    image: postgres:alpine
    environment:
      POSTGRES_USER: myuser
      POSTGRES_PASSWORD: mypassword
      POSTGRES_DB: mydatabase
    ports:
      - "5432:5432"
    volumes:
      - ./postgres-data:/var/lib/postgresql/data

volumes:
  postgres-data:

Запуск программы

  1. Замените переменную connStr в основной функции своей собственной строкой подключения к PostgreSQL.
  2. Обновите константу folderPath, чтобы она указывала на папку, в которой вы хотите отслеживать новые файлы CSV.
  3. Соберите и запустите программу: go build -o csv-loader && ./csv-loader

Теперь всякий раз, когда в папку добавляется новый CSV-файл, программа загружает его содержимое в базу данных PostgreSQL и удаляет файл при успешной загрузке.

Вот и все! У вас есть приложение go для отслеживания новых CSV-файлов в папке, которые будут загружены в БД и удалены из папки.

Если вам нравится читать статьи на Medium и вы заинтересованы в том, чтобы стать участником, я буду рад поделиться с вами своей реферальной ссылкой!

https://medium.com/@adamszpilewicz/membership