Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Stage 1: Go Builder (Backend)
FROM golang:1.24.2-alpine AS go-builder


# Install git, as it might be needed by go mod download or go build
RUN apk add --no-cache git

Expand All @@ -19,6 +20,8 @@ RUN cd server && go build -ldflags="-w -s" -o /app/olake-server .

# Stage 2: Frontend Builder
FROM node:20-alpine AS node-builder

# Reuse build-time arguments during UI build if needed
WORKDIR /app/ui

# Install pnpm globally
Expand Down
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ BACKEND_ENV_VARS = \
OLAKE_POSTGRES_PORT=5432 \
OLAKE_POSTGRES_DBNAME=postgres \
OLAKE_POSTGRES_SSLMODE=disable \
LOGS_DIR=./logger/logs \
LOGS_DIR=$(PWD)/logger/logs \
SESSION_ON=true \
TEMPORAL_ADDRESS=localhost:7233 \
CONTAINER_REGISTRY_BASE=registry-1.docker.io
CONTAINER_REGISTRY_BASE=registry-1.docker.io \
PERSISTENT_DIR=$(PWD)/olake-config

# Frontend environment variables
FRONTEND_ENV_VARS = \
Expand Down
30 changes: 13 additions & 17 deletions server/cmd/temporal-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,25 @@ import (
)

func main() {
// Initialize telemetry
telemetry.InitTelemetry()
// check constants
// Initialize constants and logger before telemetry so build info is available
constants.Init()

// init logger
logger.Init()

// init log cleaner
utils.InitLogCleaner(docker.GetDefaultConfigDir(), utils.GetLogRetentionPeriod())

// init database
err := database.Init()
db, err := database.Init()
if err != nil {
logs.Critical("Failed to initialize database: %s", err)
os.Exit(1)
logger.Fatalf("Failed to initialize database: %s", err)
return
}
telemetry.InitTelemetry(db)

// init log cleaner
utils.InitLogCleaner(docker.GetDefaultConfigDir(), utils.GetLogRetentionPeriod())

logs.Info("Starting Olake Temporal worker...")
logger.Info("Starting Olake Temporal worker...")
// create temporal client
tClient, err := temporal.NewClient()
if err != nil {
logs.Critical("Failed to create Temporal client: %v", err)
logs.Critical("Failed to create Temporal client: %s", err)
os.Exit(1)
}
defer tClient.Close()
Expand All @@ -49,7 +45,7 @@ func main() {
go func() {
err := worker.Start()
if err != nil {
logs.Critical("Failed to start worker: %v", err)
logs.Critical("Failed to start worker: %s", err)
os.Exit(1)
}
}()
Expand All @@ -60,9 +56,9 @@ func main() {

// wait for termination signal
sig := <-signalChan
logs.Info("Received signal %v, shutting down worker...", sig)
logger.Infof("Received signal %s, shutting down worker...", sig)

// stop the worker
worker.Stop()
logs.Info("Worker stopped. Goodbye!")
logger.Info("Worker stopped. Goodbye!")
}
14 changes: 13 additions & 1 deletion server/internal/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,26 @@ var (
EnvLogLevel = "LOG_LEVEL"
EnvLogFormat = "LOG_FORMAT"
OrderByUpdatedAtDesc = "-updated_at"
// Frontend index path key
FrontendIndexPath = "FRONTEND_INDEX_PATH"
DockerTaskQueue = "OLAKE_DOCKER_TASK_QUEUE"
K8sTaskQueue = "OLAKE_K8S_TASK_QUEUE"
)

// Supported database/source types
var SupportedDriverTypes = []string{
var SupportedSourceTypes = []string{
"mysql",
"postgres",
"oracle",
"mongodb",
}

// Supported database/source types
var SupportedDestinationTypes = []string{
"parquet",
"iceberg",
}

var RequiredConfigVariable = []string{
"OLAKE_POSTGRES_USER",
"OLAKE_POSTGRES_PASSWORD",
Expand All @@ -55,6 +66,7 @@ func Init() {
viper.SetDefault("RELEASE_CHANNEL", releasechannel)
viper.SetDefault("BASE_HOST", defaultBaseHost)
viper.SetDefault("BASE_URL", fmt.Sprintf("%s:%v", viper.GetString("BASE_HOST"), viper.GetString("PORT")))
viper.SetDefault(FrontendIndexPath, "/opt/frontend/dist/index.html")

checkForRequiredVariables(RequiredConfigVariable)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,26 @@ import (
"github.com/datazip/olake-ui/server/internal/models"
)

func Init() error {
type Database struct {
ormer orm.Ormer
}

func Init() (*Database, error) {
// register driver
uri, err := BuildPostgresURIFromConfig()
if err != nil {
return fmt.Errorf("failed to build postgres uri: %s", err)
return nil, fmt.Errorf("failed to build postgres uri: %s", err)
}

err = orm.RegisterDriver("postgres", orm.DRPostgres)
if err != nil {
return fmt.Errorf("failed to register postgres driver: %s", err)
return nil, fmt.Errorf("failed to register postgres driver: %s", err)
}

// register database
err = orm.RegisterDataBase("default", "postgres", uri)
if err != nil {
return fmt.Errorf("failed to register postgres database: %s", err)
return nil, fmt.Errorf("failed to register postgres database: %s", err)
}

// enable session by default
Expand All @@ -54,7 +59,7 @@ func Init() error {
// Create tables if they do not exist
err = orm.RunSyncdb("default", false, true)
if err != nil {
return fmt.Errorf("failed to sync database schema: %s", err)
return nil, fmt.Errorf("failed to sync database schema: %s", err)
}
// Add session table if sessions are enabled
if web.BConfig.WebConfig.Session.SessionOn {
Expand All @@ -65,10 +70,10 @@ func Init() error {
);`).Exec()

if err != nil {
return fmt.Errorf("failed to create session table: %s", err)
return nil, fmt.Errorf("failed to create session table: %s", err)
}
}
return nil
return &Database{ormer: orm.NewOrm()}, nil
}

// BuildPostgresURIFromConfig reads POSTGRES_DB_HOST, POSTGRES_DB_PORT, etc. from app.conf
Expand Down
83 changes: 24 additions & 59 deletions server/internal/database/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,130 +3,95 @@ package database
import (
"fmt"

"github.com/beego/beego/v2/client/orm"

"github.com/datazip/olake-ui/server/internal/constants"
"github.com/datazip/olake-ui/server/internal/models"
"github.com/datazip/olake-ui/server/utils"
)

// DestinationORM handles database operations for destinations
type DestinationORM struct {
ormer orm.Ormer
TableName string
}

func NewDestinationORM() *DestinationORM {
return &DestinationORM{
ormer: orm.NewOrm(),
TableName: constants.TableNameMap[constants.DestinationTable],
}
}

// decryptDestinationSliceConfigs decrypts config fields for a slice of destinations
func (r *DestinationORM) decryptDestinationSliceConfigs(destinations []*models.Destination) error {
func (db *Database) decryptDestinationSliceConfigs(destinations []*models.Destination) error {
for _, dest := range destinations {
dConfig, err := utils.Decrypt(dest.Config)
if err != nil {
return fmt.Errorf("failed to decrypt destination config: %s", err)
return fmt.Errorf("failed to decrypt destination config id[%d]: %s", dest.ID, err)
}
dest.Config = dConfig
}
return nil
}

func (r *DestinationORM) Create(destination *models.Destination) error {
func (db *Database) CreateDestination(destination *models.Destination) error {
// Encrypt config before saving
eConfig, err := utils.Encrypt(destination.Config)
if err != nil {
return fmt.Errorf("failed to encrypt destination config: %s", err)
return fmt.Errorf("failed to encrypt destination config id[%d]: %s", destination.ID, err)
}
destination.Config = eConfig
_, err = r.ormer.Insert(destination)
_, err = db.ormer.Insert(destination)
return err
}

func (r *DestinationORM) GetAll() ([]*models.Destination, error) {
func (db *Database) ListDestinations() ([]*models.Destination, error) {
var destinations []*models.Destination
_, err := r.ormer.QueryTable(r.TableName).RelatedSel().OrderBy(constants.OrderByUpdatedAtDesc).All(&destinations)
_, err := db.ormer.QueryTable(constants.TableNameMap[constants.DestinationTable]).RelatedSel().OrderBy(constants.OrderByUpdatedAtDesc).All(&destinations)
if err != nil {
return nil, fmt.Errorf("failed to get all destinations: %s", err)
return nil, fmt.Errorf("failed to list destinations: %s", err)
}

// Decrypt config after reading
if err := r.decryptDestinationSliceConfigs(destinations); err != nil {
return nil, fmt.Errorf("failed to decrypt destination config: %s", err)
if err := db.decryptDestinationSliceConfigs(destinations); err != nil {
return nil, err
}

return destinations, nil
}

func (r *DestinationORM) GetAllByProjectID(projectID string) ([]*models.Destination, error) {
func (db *Database) ListDestinationsByProjectID(projectID string) ([]*models.Destination, error) {
var destinations []*models.Destination
_, err := r.ormer.QueryTable(r.TableName).Filter("project_id", projectID).RelatedSel().OrderBy(constants.OrderByUpdatedAtDesc).All(&destinations)
_, err := db.ormer.QueryTable(constants.TableNameMap[constants.DestinationTable]).Filter("project_id", projectID).RelatedSel().OrderBy(constants.OrderByUpdatedAtDesc).All(&destinations)
if err != nil {
return nil, fmt.Errorf("failed to get all destinations by project_id[%s]: %s", projectID, err)
return nil, fmt.Errorf("failed to list destinations project_id[%s]: %s", projectID, err)
}

// Decrypt config after reading
if err := r.decryptDestinationSliceConfigs(destinations); err != nil {
return nil, fmt.Errorf("failed to decrypt destination config: %s", err)
if err := db.decryptDestinationSliceConfigs(destinations); err != nil {
return nil, err
}

return destinations, nil
}

func (r *DestinationORM) GetByID(id int) (*models.Destination, error) {
func (db *Database) GetDestinationByID(id int) (*models.Destination, error) {
destination := &models.Destination{ID: id}
err := r.ormer.Read(destination)
err := db.ormer.Read(destination)
if err != nil {
return nil, fmt.Errorf("failed to get destination by ID: %s", err)
return nil, fmt.Errorf("failed to get destination id[%d]: %s", id, err)
}

// Decrypt config after reading
dConfig, err := utils.Decrypt(destination.Config)
if err != nil {
return nil, fmt.Errorf("failed to decrypt config for destination[%d]: %s", destination.ID, err)
return nil, fmt.Errorf("failed to decrypt destination config id[%d]: %s", destination.ID, err)
}
destination.Config = dConfig
return destination, nil
}

func (r *DestinationORM) Update(destination *models.Destination) error {
func (db *Database) UpdateDestination(destination *models.Destination) error {
// Encrypt config before saving
eConfig, err := utils.Encrypt(destination.Config)
if err != nil {
return fmt.Errorf("failed to encrypt destination config: %s", err)
return fmt.Errorf("failed to encrypt destination[%d] config: %s", destination.ID, err)
}
destination.Config = eConfig
_, err = r.ormer.Update(destination)
_, err = db.ormer.Update(destination)
return err
}

func (r *DestinationORM) Delete(id int) error {
func (db *Database) DeleteDestination(id int) error {
destination := &models.Destination{ID: id}
// Use ORM's Delete method which will automatically handle the soft delete
// by setting the DeletedAt field due to the ORM tags in BaseModel
_, err := r.ormer.Delete(destination)
_, err := db.ormer.Delete(destination)
return err
}

// GetByNameAndType retrieves destinations by name, destType, and project ID
func (r *DestinationORM) GetByNameAndType(name, destType, projectID string) ([]*models.Destination, error) {
var destinations []*models.Destination
_, err := r.ormer.QueryTable(r.TableName).
Filter("name", name).
Filter("dest_type", destType).
Filter("project_id", projectID).
All(&destinations)
if err != nil {
return nil, fmt.Errorf("failed to get destination in project[%s] by name[%s] and type[%s]: %s", projectID, name, destType, err)
}

// Decrypt config after reading
if err := r.decryptDestinationSliceConfigs(destinations); err != nil {
return nil, fmt.Errorf("failed to decrypt destination config: %s", err)
}

return destinations, nil
}
Loading