Можно ли использовать оператор PostgreSQL COPY FROM STDIN
для загрузки данных из файла CSV путем передачи какого-либо объекта Reader
или Writer
так же, как это делается в Java? Какую библиотеку мне следует использовать? Пример Котлина для справки:
val cm = CopyManager(conn as BaseConnection)
val total = cm.copyIn("COPY my_table FROM STDIN FORMAT csv", inputStream)
Глядя на две самые популярные библиотеки postgres Golang:
Драйвер поддерживает это с помощью методов стандартной библиотеки, но вам нужно будет использовать переменную io.Reader
для передачи каждой строки CSV.
package main
import (
"bufio"
"context"
"database/sql"
"fmt"
"io"
"log"
"os"
"os/signal"
"github.com/lib/pq"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
if err := run(ctx); err != nil {
log.Fatal(err)
}
}
func run(ctx context.Context) error {
// Open CSV file (this could also be os.Stdin, etc)
f, err := os.Open("/tmp/csv")
if err != nil {
return err
}
defer f.Close()
// Open database connection using pq driver
db, err := sql.Open("postgres", "postgres://postgres@localhost:5432/postgres?sslmode=disable")
if err != nil {
return err
}
defer db.Close()
// Execute copy
if err = copyFrom(ctx, db, "my_table", f); err != nil {
return err
}
return nil
}
func copyFrom(ctx context.Context, db *sql.DB, table string, r io.Reader) error {
query := fmt.Sprintf("COPY %s FROM STDIN WITH (FORMAT csv)", pq.QuoteIdentifier(table))
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
stmt, err := tx.PrepareContext(ctx, query)
if err != nil {
return err
}
sc := bufio.NewScanner(r)
for sc.Scan() {
if _, err = stmt.ExecContext(ctx, sc.Text()); err != nil {
return err
}
}
if err = sc.Err(); err != nil {
return err
}
if _, err = stmt.ExecContext(ctx); err != nil {
return err
}
return tx.Commit()
}
Примечание
COPY
операторы, по крайней мере один из которых все еще открытТип нижнего уровня pgconn.PgConn
имеет метод CopyFrom
, который позволяет передать произвольный оператор и io.Reader
. Если вы подключаетесь через пакет stdlib db
, вы все равно можете получить доступ к базовому pgconn.PgConn
, как показано ниже, хотя существуют и другие способы обработки соединений/пулов и т. д. при использовании pgx, так что стоит взглянуть и на них тоже. .
package main
import (
"context"
"database/sql"
"fmt"
"io"
"log"
"os"
"os/signal"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/stdlib"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
if err := run(ctx); err != nil {
log.Fatal(err)
}
}
func run(ctx context.Context) error {
// Open CSV file (this could also be os.Stdin, etc)
f, err := os.Open("/tmp/csv")
if err != nil {
return err
}
defer f.Close()
// Open database connection using pgx driver
db, err := sql.Open("pgx", "postgres://postgres@localhost:5432/postgres?sslmode=disable")
if err != nil {
return err
}
defer db.Close()
// Execute copy
if err = copyFrom(ctx, db, "my_table", f); err != nil {
return err
}
return nil
}
func copyFrom(ctx context.Context, db *sql.DB, table string, r io.Reader) error {
query := fmt.Sprintf("COPY %s FROM STDIN WITH (FORMAT csv)", pgx.Identifier{table}.Sanitize())
conn, err := db.Conn(ctx)
if err != nil {
return err
}
defer conn.Close()
return conn.Raw(func(driverConn any) error {
pgConn := driverConn.(*stdlib.Conn).Conn().PgConn()
_, err := pgConn.CopyFrom(ctx, r, query)
return err
})
}