diff --git a/app/application/http/controller/log.go b/app/application/http/controller/log.go new file mode 100644 index 00000000..8c1193ec --- /dev/null +++ b/app/application/http/controller/log.go @@ -0,0 +1,133 @@ +package controller + +import ( + "errors" + "github.com/donknap/dpanel/app/application/logic" + "github.com/donknap/dpanel/common/dao" + "github.com/gin-gonic/gin" + "github.com/we7coreteam/w7-rangine-go/src/http/controller" +) + +type Log struct { + controller.Abstract +} + +func (self Log) Task(http *gin.Context) { + type ParamsValidate struct { + SiteId int32 `form:"siteId" binding:"required,number"` + } + + params := ParamsValidate{} + if !self.Validate(http, ¶ms) { + return + } + + taskRow, _ := dao.Task.Where(dao.Task.TaskLinkID.Eq(params.SiteId)).First() + if taskRow == nil { + self.JsonResponseWithError(http, errors.New("当前没有进行的中任务"), 500) + return + } + if taskRow.Status != logic.STATUS_PROCESSING { + self.JsonResponseWithoutError(http, gin.H{ + "status": taskRow.Status, + "message": taskRow.Message, + }) + return + } + task := logic.NewContainerTask() + stepLog := task.GetTaskStepLog(taskRow.TaskLinkID) + if stepLog == nil { + self.JsonResponseWithError(http, errors.New("当前没有进行的中任务或是已经完成"), 500) + return + } + self.JsonResponseWithoutError(http, gin.H{ + logic.STEP_IMAGE_PULL: stepLog.GetProcess(), + }) + + // json := `{ + // "code": 200, + // "data": { + // "imagePull": { + // "1f7ce2fa46ab": { + // "downloading": 25, + // "extracting": 0 + // }, + // "249ff3a7bbe6": { + // "downloading": 67, + // "extracting": 0 + // }, + // "33777aea940a": { + // "downloading": 0, + // "extracting": 0 + // }, + // "48824c101c6a": { + // "downloading": 100, + // "extracting": 0 + // }, + // "5dcbbe73fcf0": { + // "downloading": 0, + // "extracting": 0 + // }, + // "5fdd7aa4a423": { + // "downloading": 0, + // "extracting": 0 + // }, + // "706c00f0b7d2": { + // "downloading": 0, + // "extracting": 0 + // }, + // "749c44fa1213": { + // "downloading": 60, + // "extracting": 30 + // }, + // "8e9959c9dd31": { + // "downloading": 0, + // "extracting": 0 + // }, + // "92eeb6cb0068": { + // "downloading": 0, + // "extracting": 0 + // }, + // "9c62851e2826": { + // "downloading": 0, + // "extracting": 0 + // }, + // "aa5d47f22b64": { + // "downloading": 100, + // "extracting": 0 + // }, + // "b3a08d032c4e": { + // "downloading": 0, + // "extracting": 0 + // }, + // "c2e56069baaf": { + // "downloading": 0, + // "extracting": 0 + // }, + // "dfc8a35621ec": { + // "downloading": 0, + // "extracting": 0 + // }, + // "e83ad87cf6a6": { + // "downloading": 100, + // "extracting": 0 + // }, + // "e84c71b81827": { + // "downloading": 0, + // "extracting": 0 + // }, + // "f82137d66483": { + // "downloading": 0, + // "extracting": 0 + // } + // } + // } + //} + //` + // http.String(200, json) + return +} + +func (self Log) Run(http *gin.Context) { + +} diff --git a/app/application/http/controller/site.go b/app/application/http/controller/site.go index d4ac16ee..ed14d091 100644 --- a/app/application/http/controller/site.go +++ b/app/application/http/controller/site.go @@ -35,6 +35,12 @@ func (self Site) CreateByImage(http *gin.Context) { Links: params.Links, } + siteRow := &entity.Site{ + SiteID: params.SiteId, + SiteName: params.SiteName, + SiteURL: params.SiteDomain, + } + err := dao.Q.Transaction(func(tx *dao.Query) error { site, _ := tx.Site.Where(dao.Site.SiteID.Eq(params.SiteId)).First() if site != nil { @@ -52,12 +58,8 @@ func (self Site) CreateByImage(http *gin.Context) { if err != nil { return err } - siteRow := &entity.Site{ - ContainerID: containerRow.ID, - SiteID: params.SiteId, - SiteName: params.SiteName, - SiteURL: params.SiteDomain, - } + + siteRow.ContainerID = containerRow.ID err = tx.Site.Create(siteRow) if err != nil { return err @@ -70,20 +72,37 @@ func (self Site) CreateByImage(http *gin.Context) { return } - task := logic.NewContainerTask() - taskRow := &logic.CreateMessage{ - Name: params.SiteId, - Image: params.Image, - RunParams: runParams, + err = dao.Q.Transaction(func(tx *dao.Query) (err error) { + _, err = tx.Task.Where(tx.Task.TaskLinkID.Eq(siteRow.ID)).Delete() + if err != nil { + return err + } + task := logic.NewContainerTask() + runTaskRow := &logic.CreateMessage{ + Name: params.SiteId, + TaskLinkId: siteRow.ID, + Image: params.Image, + RunParams: runParams, + } + task.QueueCreate <- runTaskRow + return nil + }) + + if err != nil { + self.JsonResponseWithError(http, err, 500) + return } - task.QueueCreate <- taskRow - self.JsonSuccessResponse(http) + + self.JsonResponseWithoutError(http, gin.H{ + "siteId": siteRow.ID, + }) return } func (self Site) GetList(http *gin.Context) { type ParamsValidate struct { Page int `form:"page,default=1" binding:"omitempty,gt=0"` + PageSize int `form:"pageSize" binding:"omitempty"` SiteName string `form:"siteName" binding:"omitempty"` Sort string `form:"sort,default=new" binding:"omitempty,oneof=hot new"` } @@ -95,13 +114,16 @@ func (self Site) GetList(http *gin.Context) { if params.Page < 1 { params.Page = 1 } - limit := 20 + if params.PageSize < 1 { + params.PageSize = 10 + } query := dao.Site.Preload(dao.Site.Container.Select(dao.Container.ID, dao.Container.Image, dao.Container.Status)) if params.SiteName != "" { query = query.Where(dao.Site.SiteName.Like("%" + params.SiteName + "%")) } - list, total, _ := query.FindByPage((params.Page-1)*limit, limit) + query = query.Order(dao.Site.ID.Desc()) + list, total, _ := query.FindByPage((params.Page-1)*params.PageSize, params.PageSize) self.JsonResponseWithoutError(http, gin.H{ "total": total, "page": params.Page, diff --git a/app/application/logic/container-task.go b/app/application/logic/container-task.go index 4c81173d..f426ceeb 100644 --- a/app/application/logic/container-task.go +++ b/app/application/logic/container-task.go @@ -1,18 +1,34 @@ package logic import ( + "bufio" "context" - "fmt" + "encoding/json" "github.com/docker/docker/api/types" "github.com/donknap/dpanel/common/service/docker" "github.com/we7coreteam/w7-rangine-go-support/src/facade" - "log" + "io" "log/slog" + "math" ) +const REGISTER_NAME = "containerTask" + +func RegisterContainerTask() { + err := facade.GetContainer().NamedSingleton(REGISTER_NAME, func() *ContainerTask { + obj := &ContainerTask{} + obj.QueueCreate = make(chan *CreateMessage) + obj.stepLog = make(map[int32]*stepMessage) + return obj + }) + if err != nil { + panic(err) + } +} + func NewContainerTask() *ContainerTask { var obj *ContainerTask - err := facade.GetContainer().NamedResolve(&obj, "containerTask") + err := facade.GetContainer().NamedResolve(&obj, REGISTER_NAME) if err != nil { slog.Error(err.Error()) } @@ -20,28 +36,47 @@ func NewContainerTask() *ContainerTask { } type CreateMessage struct { - Name string - Image string - RunParams *ContainerRunParams + Name string + Image string + TaskLinkId int32 + RunParams *ContainerRunParams } type ContainerTask struct { QueueCreate chan *CreateMessage + stepLog map[int32]*stepMessage // 用于记录部署任务日志 + sdk *docker.Builder +} + +func (self *ContainerTask) GetTaskStepLog(taskId int32) *stepMessage { + if stepLog, ok := self.stepLog[taskId]; ok { + return stepLog + } + return nil } func (self *ContainerTask) CreateLoop() { + sdk, err := docker.NewDockerClient() + if err != nil { + panic(err) + } + self.sdk = sdk + for { select { case message := <-self.QueueCreate: - log.Printf("build %s from %s starting", message.Name, message.Image) - sdk, err := docker.NewDockerClient() - if err != nil { - fmt.Printf("%v \n", err) - } + // 拿到部署任务后,先新建一个任务对象 + // 用于记录进行状态(数据库中) + // 在本单例对象中建立一个map对象,存放过程中的数据,这些数据不入库 + self.stepLog[message.TaskLinkId] = newStepMessage(message.TaskLinkId) + self.stepLog[message.TaskLinkId].step(STEP_IMAGE_PULL) + self.pullImage(message) + + self.stepLog[message.TaskLinkId].step(STEP_CONTAINER_BUILD) builder := sdk.GetContainerCreateBuilder() builder.WithImage(message.Image) + builder.WithContext(context.Background()) builder.WithContainerName(message.Name) - if message.RunParams.Ports != nil { for _, value := range message.RunParams.Ports { builder.WithPort(value.Host, value.Dest) @@ -69,13 +104,82 @@ func (self *ContainerTask) CreateLoop() { builder.WithPrivileged() response, err := builder.Execute() if err != nil { - log.Printf("%v \n", err) + slog.Error(err.Error()) + self.stepLog[message.TaskLinkId].err(err) + return } - log.Printf("%v \n", response.ID) + slog.Info("Container id: ", response) + + self.stepLog[message.TaskLinkId].step(STEP_CONTAINER_RUN) err = sdk.Client.ContainerStart(context.Background(), response.ID, types.ContainerStartOptions{}) if err != nil { - log.Printf("%v \n", err) + slog.Error(err.Error()) + self.stepLog[message.TaskLinkId].err(err) + return } + self.stepLog[message.TaskLinkId].success() + delete(self.stepLog, message.TaskLinkId) + } + } +} + +func (self *ContainerTask) pullImage(message *CreateMessage) { + type progressDetail struct { + Id string `json:"id"` + Status string `json:"status"` + ProgressDetail struct { + Current float64 `json:"current"` + Total float64 `json:"total"` + } `json:"progressDetail"` + } + type progress struct { + Downloading float64 `json:"downloading"` + Extracting float64 `json:"extracting"` + } + slog.Info("pull image ", message) + //尝试拉取镜像 + reader, err := self.sdk.Client.ImagePull(context.Background(), message.Image, types.ImagePullOptions{}) + if err != nil { + self.stepLog[message.TaskLinkId].error = err + return + } + defer reader.Close() + + // 解析进度数据 + pg := make(map[string]*progress) + out := bufio.NewReader(reader) + for { + str, err := out.ReadString('\n') + if err == io.EOF { + break + } else { + pd := &progressDetail{} + err = json.Unmarshal([]byte(str), pd) + if err != nil { + self.stepLog[message.TaskLinkId].error = err + return + } + if pd.Status == "Pulling fs layer" { + pg[pd.Id] = &progress{ + Extracting: 0, + Downloading: 0, + } + } + if pd.ProgressDetail.Total > 0 && pd.Status == "Downloading" { + pg[pd.Id].Downloading = math.Floor((pd.ProgressDetail.Current / pd.ProgressDetail.Total) * 100) + } + if pd.ProgressDetail.Total > 0 && pd.Status == "Extracting" { + pg[pd.Id].Extracting = math.Floor((pd.ProgressDetail.Current / pd.ProgressDetail.Total) * 100) + } + if pd.Status == "Download complete" { + pg[pd.Id].Downloading = 100 + } + if pd.Status == "Pull complete" { + pg[pd.Id].Extracting = 100 + } + + // 进度信息 + self.stepLog[message.TaskLinkId].process(pg) } } } diff --git a/app/application/logic/task.go b/app/application/logic/task.go new file mode 100644 index 00000000..a8fa8790 --- /dev/null +++ b/app/application/logic/task.go @@ -0,0 +1,66 @@ +package logic + +import ( + "github.com/donknap/dpanel/common/dao" + "github.com/donknap/dpanel/common/entity" +) + +const ( + STEP_IMAGE_PULL = "imagePull" + STEP_IMAGE_BUILD = "imageBuild" + STEP_CONTAINER_BUILD = "containerBuild" + STEP_CONTAINER_RUN = "containerRun" +) + +func newStepMessage(taskLinkId int32) *stepMessage { + task := &stepMessage{} + taskRow, _ := dao.Task.Where(dao.Task.TaskLinkID.Eq(taskLinkId)).First() + if taskRow == nil { + taskRow = &entity.Task{ + TaskLinkID: taskLinkId, + Status: STATUS_STOP, + Message: "", + } + dao.Task.Create(taskRow) + } + task.recordId = taskRow.ID + return task +} + +type stepMessage struct { + progress interface{} // 进度信息 json 格式 + currentStep string // 当前进行的步骤 + error error // 发生的错误 + recordId int32 +} + +// 记录任务错误 +func (self *stepMessage) err(err error) { + dao.Task.Where(dao.Task.ID.Eq(self.recordId)).Updates(entity.Task{ + Status: STATUS_ERROR, + Message: err.Error(), + }) +} + +// 更新任务进度 +func (self *stepMessage) step(step string) { + dao.Task.Where(dao.Task.ID.Eq(self.recordId)).Updates(entity.Task{ + Status: STATUS_PROCESSING, + Message: step, + }) +} + +func (self *stepMessage) process(data interface{}) { + self.progress = data +} + +func (self *stepMessage) GetProcess() interface{} { + return self.progress +} + +func (self *stepMessage) success() { + dao.Task.Where(dao.Task.ID.Eq(self.recordId)).Updates(entity.Task{ + Status: STATUS_SUCCESS, + Message: "", + }) +} diff --git a/app/application/logic/types.go b/app/application/logic/types.go index bb1e0450..b4bc5c36 100644 --- a/app/application/logic/types.go +++ b/app/application/logic/types.go @@ -1,5 +1,12 @@ package logic +const ( + STATUS_STOP = 0 // 未开始 + STATUS_PROCESSING = 10 // 进行中 + STATUS_ERROR = 20 // 有错误 + STATUS_SUCCESS = 30 // 部署成功 +) + type MappingItem struct { Host string `json:"host"` Dest string `json:"dest"` diff --git a/app/application/provider.go b/app/application/provider.go index 24cd6a38..58fad393 100755 --- a/app/application/provider.go +++ b/app/application/provider.go @@ -7,7 +7,6 @@ import ( common "github.com/donknap/dpanel/common/middleware" "github.com/gin-gonic/gin" "github.com/we7coreteam/w7-rangine-go-support/src/console" - "github.com/we7coreteam/w7-rangine-go-support/src/facade" http_server "github.com/we7coreteam/w7-rangine-go/src/http/server" ) @@ -15,7 +14,7 @@ type Provider struct { } func (provider *Provider) Register(httpServer *http_server.Server, console console.Console) { - provider.initContainerTask() + logic.RegisterContainerTask() // 注册一个 application:test 命令 console.RegisterCommand(new(command.Test)) @@ -31,19 +30,13 @@ func (provider *Provider) Register(httpServer *http_server.Server, console conso cors.POST("/app/run-env/source-env", controller.RunEnv{}.SupportRunEnv) cors.POST("/app/run-env/php-ext", controller.RunEnv{}.PhpExt) + // 站点相关 cors.POST("/app/site/create-by-image", controller.Site{}.CreateByImage) cors.POST("/app/site/get-list", controller.Site{}.GetList) + + // 日志相关 + cors.POST("/app/log/task", controller.Log{}.Task) + cors.POST("/app/log/run", controller.Log{}.Run) }, ) } - -func (provider Provider) initContainerTask() { - err := facade.GetContainer().NamedSingleton("containerTask", func() *logic.ContainerTask { - obj := &logic.ContainerTask{} - obj.QueueCreate = make(chan *logic.CreateMessage) - return obj - }) - if err != nil { - panic(err) - } -} diff --git a/common/dao/gen.go b/common/dao/gen.go index 500692fc..607e081f 100644 --- a/common/dao/gen.go +++ b/common/dao/gen.go @@ -20,6 +20,7 @@ var ( Container *container RunEnv *runEnv Site *site + Task *task ) func SetDefault(db *gorm.DB, opts ...gen.DOOption) { @@ -27,6 +28,7 @@ func SetDefault(db *gorm.DB, opts ...gen.DOOption) { Container = &Q.Container RunEnv = &Q.RunEnv Site = &Q.Site + Task = &Q.Task } func Use(db *gorm.DB, opts ...gen.DOOption) *Query { @@ -35,6 +37,7 @@ func Use(db *gorm.DB, opts ...gen.DOOption) *Query { Container: newContainer(db, opts...), RunEnv: newRunEnv(db, opts...), Site: newSite(db, opts...), + Task: newTask(db, opts...), } } @@ -44,6 +47,7 @@ type Query struct { Container container RunEnv runEnv Site site + Task task } func (q *Query) Available() bool { return q.db != nil } @@ -54,6 +58,7 @@ func (q *Query) clone(db *gorm.DB) *Query { Container: q.Container.clone(db), RunEnv: q.RunEnv.clone(db), Site: q.Site.clone(db), + Task: q.Task.clone(db), } } @@ -71,6 +76,7 @@ func (q *Query) ReplaceDB(db *gorm.DB) *Query { Container: q.Container.replaceDB(db), RunEnv: q.RunEnv.replaceDB(db), Site: q.Site.replaceDB(db), + Task: q.Task.replaceDB(db), } } @@ -78,6 +84,7 @@ type queryCtx struct { Container IContainerDo RunEnv IRunEnvDo Site ISiteDo + Task ITaskDo } func (q *Query) WithContext(ctx context.Context) *queryCtx { @@ -85,6 +92,7 @@ func (q *Query) WithContext(ctx context.Context) *queryCtx { Container: q.Container.WithContext(ctx), RunEnv: q.RunEnv.WithContext(ctx), Site: q.Site.WithContext(ctx), + Task: q.Task.WithContext(ctx), } } diff --git a/common/dao/ims_task.gen.go b/common/dao/ims_task.gen.go new file mode 100644 index 00000000..044ffce1 --- /dev/null +++ b/common/dao/ims_task.gen.go @@ -0,0 +1,392 @@ +// Code generated by gorm.io/gen. DO NOT EDIT. +// Code generated by gorm.io/gen. DO NOT EDIT. +// Code generated by gorm.io/gen. DO NOT EDIT. + +package dao + +import ( + "context" + + "gorm.io/gorm" + "gorm.io/gorm/clause" + "gorm.io/gorm/schema" + + "gorm.io/gen" + "gorm.io/gen/field" + + "gorm.io/plugin/dbresolver" + + "github.com/donknap/dpanel/common/entity" +) + +func newTask(db *gorm.DB, opts ...gen.DOOption) task { + _task := task{} + + _task.taskDo.UseDB(db, opts...) + _task.taskDo.UseModel(&entity.Task{}) + + tableName := _task.taskDo.TableName() + _task.ALL = field.NewAsterisk(tableName) + _task.ID = field.NewInt32(tableName, "id") + _task.TaskLinkID = field.NewInt32(tableName, "task_link_id") + _task.Status = field.NewInt32(tableName, "status") + _task.Message = field.NewString(tableName, "message") + + _task.fillFieldMap() + + return _task +} + +type task struct { + taskDo + + ALL field.Asterisk + ID field.Int32 + TaskLinkID field.Int32 + Status field.Int32 + Message field.String + + fieldMap map[string]field.Expr +} + +func (t task) Table(newTableName string) *task { + t.taskDo.UseTable(newTableName) + return t.updateTableName(newTableName) +} + +func (t task) As(alias string) *task { + t.taskDo.DO = *(t.taskDo.As(alias).(*gen.DO)) + return t.updateTableName(alias) +} + +func (t *task) updateTableName(table string) *task { + t.ALL = field.NewAsterisk(table) + t.ID = field.NewInt32(table, "id") + t.TaskLinkID = field.NewInt32(table, "task_link_id") + t.Status = field.NewInt32(table, "status") + t.Message = field.NewString(table, "message") + + t.fillFieldMap() + + return t +} + +func (t *task) GetFieldByName(fieldName string) (field.OrderExpr, bool) { + _f, ok := t.fieldMap[fieldName] + if !ok || _f == nil { + return nil, false + } + _oe, ok := _f.(field.OrderExpr) + return _oe, ok +} + +func (t *task) fillFieldMap() { + t.fieldMap = make(map[string]field.Expr, 4) + t.fieldMap["id"] = t.ID + t.fieldMap["task_link_id"] = t.TaskLinkID + t.fieldMap["status"] = t.Status + t.fieldMap["message"] = t.Message +} + +func (t task) clone(db *gorm.DB) task { + t.taskDo.ReplaceConnPool(db.Statement.ConnPool) + return t +} + +func (t task) replaceDB(db *gorm.DB) task { + t.taskDo.ReplaceDB(db) + return t +} + +type taskDo struct{ gen.DO } + +type ITaskDo interface { + gen.SubQuery + Debug() ITaskDo + WithContext(ctx context.Context) ITaskDo + WithResult(fc func(tx gen.Dao)) gen.ResultInfo + ReplaceDB(db *gorm.DB) + ReadDB() ITaskDo + WriteDB() ITaskDo + As(alias string) gen.Dao + Session(config *gorm.Session) ITaskDo + Columns(cols ...field.Expr) gen.Columns + Clauses(conds ...clause.Expression) ITaskDo + Not(conds ...gen.Condition) ITaskDo + Or(conds ...gen.Condition) ITaskDo + Select(conds ...field.Expr) ITaskDo + Where(conds ...gen.Condition) ITaskDo + Order(conds ...field.Expr) ITaskDo + Distinct(cols ...field.Expr) ITaskDo + Omit(cols ...field.Expr) ITaskDo + Join(table schema.Tabler, on ...field.Expr) ITaskDo + LeftJoin(table schema.Tabler, on ...field.Expr) ITaskDo + RightJoin(table schema.Tabler, on ...field.Expr) ITaskDo + Group(cols ...field.Expr) ITaskDo + Having(conds ...gen.Condition) ITaskDo + Limit(limit int) ITaskDo + Offset(offset int) ITaskDo + Count() (count int64, err error) + Scopes(funcs ...func(gen.Dao) gen.Dao) ITaskDo + Unscoped() ITaskDo + Create(values ...*entity.Task) error + CreateInBatches(values []*entity.Task, batchSize int) error + Save(values ...*entity.Task) error + First() (*entity.Task, error) + Take() (*entity.Task, error) + Last() (*entity.Task, error) + Find() ([]*entity.Task, error) + FindInBatch(batchSize int, fc func(tx gen.Dao, batch int) error) (results []*entity.Task, err error) + FindInBatches(result *[]*entity.Task, batchSize int, fc func(tx gen.Dao, batch int) error) error + Pluck(column field.Expr, dest interface{}) error + Delete(...*entity.Task) (info gen.ResultInfo, err error) + Update(column field.Expr, value interface{}) (info gen.ResultInfo, err error) + UpdateSimple(columns ...field.AssignExpr) (info gen.ResultInfo, err error) + Updates(value interface{}) (info gen.ResultInfo, err error) + UpdateColumn(column field.Expr, value interface{}) (info gen.ResultInfo, err error) + UpdateColumnSimple(columns ...field.AssignExpr) (info gen.ResultInfo, err error) + UpdateColumns(value interface{}) (info gen.ResultInfo, err error) + UpdateFrom(q gen.SubQuery) gen.Dao + Attrs(attrs ...field.AssignExpr) ITaskDo + Assign(attrs ...field.AssignExpr) ITaskDo + Joins(fields ...field.RelationField) ITaskDo + Preload(fields ...field.RelationField) ITaskDo + FirstOrInit() (*entity.Task, error) + FirstOrCreate() (*entity.Task, error) + FindByPage(offset int, limit int) (result []*entity.Task, count int64, err error) + ScanByPage(result interface{}, offset int, limit int) (count int64, err error) + Scan(result interface{}) (err error) + Returning(value interface{}, columns ...string) ITaskDo + UnderlyingDB() *gorm.DB + schema.Tabler +} + +func (t taskDo) Debug() ITaskDo { + return t.withDO(t.DO.Debug()) +} + +func (t taskDo) WithContext(ctx context.Context) ITaskDo { + return t.withDO(t.DO.WithContext(ctx)) +} + +func (t taskDo) ReadDB() ITaskDo { + return t.Clauses(dbresolver.Read) +} + +func (t taskDo) WriteDB() ITaskDo { + return t.Clauses(dbresolver.Write) +} + +func (t taskDo) Session(config *gorm.Session) ITaskDo { + return t.withDO(t.DO.Session(config)) +} + +func (t taskDo) Clauses(conds ...clause.Expression) ITaskDo { + return t.withDO(t.DO.Clauses(conds...)) +} + +func (t taskDo) Returning(value interface{}, columns ...string) ITaskDo { + return t.withDO(t.DO.Returning(value, columns...)) +} + +func (t taskDo) Not(conds ...gen.Condition) ITaskDo { + return t.withDO(t.DO.Not(conds...)) +} + +func (t taskDo) Or(conds ...gen.Condition) ITaskDo { + return t.withDO(t.DO.Or(conds...)) +} + +func (t taskDo) Select(conds ...field.Expr) ITaskDo { + return t.withDO(t.DO.Select(conds...)) +} + +func (t taskDo) Where(conds ...gen.Condition) ITaskDo { + return t.withDO(t.DO.Where(conds...)) +} + +func (t taskDo) Order(conds ...field.Expr) ITaskDo { + return t.withDO(t.DO.Order(conds...)) +} + +func (t taskDo) Distinct(cols ...field.Expr) ITaskDo { + return t.withDO(t.DO.Distinct(cols...)) +} + +func (t taskDo) Omit(cols ...field.Expr) ITaskDo { + return t.withDO(t.DO.Omit(cols...)) +} + +func (t taskDo) Join(table schema.Tabler, on ...field.Expr) ITaskDo { + return t.withDO(t.DO.Join(table, on...)) +} + +func (t taskDo) LeftJoin(table schema.Tabler, on ...field.Expr) ITaskDo { + return t.withDO(t.DO.LeftJoin(table, on...)) +} + +func (t taskDo) RightJoin(table schema.Tabler, on ...field.Expr) ITaskDo { + return t.withDO(t.DO.RightJoin(table, on...)) +} + +func (t taskDo) Group(cols ...field.Expr) ITaskDo { + return t.withDO(t.DO.Group(cols...)) +} + +func (t taskDo) Having(conds ...gen.Condition) ITaskDo { + return t.withDO(t.DO.Having(conds...)) +} + +func (t taskDo) Limit(limit int) ITaskDo { + return t.withDO(t.DO.Limit(limit)) +} + +func (t taskDo) Offset(offset int) ITaskDo { + return t.withDO(t.DO.Offset(offset)) +} + +func (t taskDo) Scopes(funcs ...func(gen.Dao) gen.Dao) ITaskDo { + return t.withDO(t.DO.Scopes(funcs...)) +} + +func (t taskDo) Unscoped() ITaskDo { + return t.withDO(t.DO.Unscoped()) +} + +func (t taskDo) Create(values ...*entity.Task) error { + if len(values) == 0 { + return nil + } + return t.DO.Create(values) +} + +func (t taskDo) CreateInBatches(values []*entity.Task, batchSize int) error { + return t.DO.CreateInBatches(values, batchSize) +} + +// Save : !!! underlying implementation is different with GORM +// The method is equivalent to executing the statement: db.Clauses(clause.OnConflict{UpdateAll: true}).Create(values) +func (t taskDo) Save(values ...*entity.Task) error { + if len(values) == 0 { + return nil + } + return t.DO.Save(values) +} + +func (t taskDo) First() (*entity.Task, error) { + if result, err := t.DO.First(); err != nil { + return nil, err + } else { + return result.(*entity.Task), nil + } +} + +func (t taskDo) Take() (*entity.Task, error) { + if result, err := t.DO.Take(); err != nil { + return nil, err + } else { + return result.(*entity.Task), nil + } +} + +func (t taskDo) Last() (*entity.Task, error) { + if result, err := t.DO.Last(); err != nil { + return nil, err + } else { + return result.(*entity.Task), nil + } +} + +func (t taskDo) Find() ([]*entity.Task, error) { + result, err := t.DO.Find() + return result.([]*entity.Task), err +} + +func (t taskDo) FindInBatch(batchSize int, fc func(tx gen.Dao, batch int) error) (results []*entity.Task, err error) { + buf := make([]*entity.Task, 0, batchSize) + err = t.DO.FindInBatches(&buf, batchSize, func(tx gen.Dao, batch int) error { + defer func() { results = append(results, buf...) }() + return fc(tx, batch) + }) + return results, err +} + +func (t taskDo) FindInBatches(result *[]*entity.Task, batchSize int, fc func(tx gen.Dao, batch int) error) error { + return t.DO.FindInBatches(result, batchSize, fc) +} + +func (t taskDo) Attrs(attrs ...field.AssignExpr) ITaskDo { + return t.withDO(t.DO.Attrs(attrs...)) +} + +func (t taskDo) Assign(attrs ...field.AssignExpr) ITaskDo { + return t.withDO(t.DO.Assign(attrs...)) +} + +func (t taskDo) Joins(fields ...field.RelationField) ITaskDo { + for _, _f := range fields { + t = *t.withDO(t.DO.Joins(_f)) + } + return &t +} + +func (t taskDo) Preload(fields ...field.RelationField) ITaskDo { + for _, _f := range fields { + t = *t.withDO(t.DO.Preload(_f)) + } + return &t +} + +func (t taskDo) FirstOrInit() (*entity.Task, error) { + if result, err := t.DO.FirstOrInit(); err != nil { + return nil, err + } else { + return result.(*entity.Task), nil + } +} + +func (t taskDo) FirstOrCreate() (*entity.Task, error) { + if result, err := t.DO.FirstOrCreate(); err != nil { + return nil, err + } else { + return result.(*entity.Task), nil + } +} + +func (t taskDo) FindByPage(offset int, limit int) (result []*entity.Task, count int64, err error) { + result, err = t.Offset(offset).Limit(limit).Find() + if err != nil { + return + } + + if size := len(result); 0 < limit && 0 < size && size < limit { + count = int64(size + offset) + return + } + + count, err = t.Offset(-1).Limit(-1).Count() + return +} + +func (t taskDo) ScanByPage(result interface{}, offset int, limit int) (count int64, err error) { + count, err = t.Count() + if err != nil { + return + } + + err = t.Offset(offset).Limit(limit).Scan(result) + return +} + +func (t taskDo) Scan(result interface{}) (err error) { + return t.DO.Scan(result) +} + +func (t taskDo) Delete(models ...*entity.Task) (result gen.ResultInfo, err error) { + return t.DO.Delete(models) +} + +func (t *taskDo) withDO(do gen.Dao) *taskDo { + t.DO = *do.(*gen.DO) + return t +} diff --git a/common/entity/ims_task.gen.go b/common/entity/ims_task.gen.go new file mode 100644 index 00000000..8c424751 --- /dev/null +++ b/common/entity/ims_task.gen.go @@ -0,0 +1,20 @@ +// Code generated by gorm.io/gen. DO NOT EDIT. +// Code generated by gorm.io/gen. DO NOT EDIT. +// Code generated by gorm.io/gen. DO NOT EDIT. + +package entity + +const TableNameTask = "ims_task" + +// Task mapped from table +type Task struct { + ID int32 `gorm:"column:id;type:INTEGER" json:"id"` + TaskLinkID int32 `gorm:"column:task_link_id;type:integer" json:"taskLinkId"` + Status int32 `gorm:"column:status;type:integer" json:"status"` + Message string `gorm:"column:message;type:text" json:"message"` +} + +// TableName Task's table name +func (*Task) TableName() string { + return TableNameTask +} diff --git a/common/service/docker/container-create.go b/common/service/docker/container-create.go index 7c6e2e17..a6415cbe 100644 --- a/common/service/docker/container-create.go +++ b/common/service/docker/container-create.go @@ -1,16 +1,13 @@ package docker import ( - "bufio" "context" "fmt" - "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/network" "github.com/docker/docker/client" "github.com/docker/go-connections/nat" v1 "github.com/opencontainers/image-spec/specs-go/v1" - "io" ) type ContainerCreateBuilder struct { @@ -21,6 +18,7 @@ type ContainerCreateBuilder struct { containerName string err error dockerSdk *client.Client + ctx context.Context } func (self *ContainerCreateBuilder) withSdk(sdk *client.Client) *ContainerCreateBuilder { @@ -28,6 +26,11 @@ func (self *ContainerCreateBuilder) withSdk(sdk *client.Client) *ContainerCreate return self } +func (self *ContainerCreateBuilder) WithContext(ctx context.Context) *ContainerCreateBuilder { + self.ctx = ctx + return self +} + func (self *ContainerCreateBuilder) WithContainerName(name string) *ContainerCreateBuilder { self.containerConfig.Hostname = fmt.Sprintf("%s.pod.dpanel.local", name) self.containerName = name @@ -85,25 +88,8 @@ func (self *ContainerCreateBuilder) Execute() (response container.CreateResponse if self.err != nil { return response, self.err } - ctx := context.Background() - reader, err := self.dockerSdk.ImagePull(ctx, self.containerConfig.Image, types.ImagePullOptions{}) - if err != nil { - return response, err - } - defer reader.Close() - - out := bufio.NewReader(reader) - for { - str, err := out.ReadString('\n') - if err == io.EOF { // 读到文件末尾 - break - } else { - fmt.Printf("有数据了 %v \n", string(str)) - } - } - return self.dockerSdk.ContainerCreate( - ctx, + self.ctx, self.containerConfig, self.hostConfig, self.networkingConfig, diff --git a/common/service/docker/docker.go b/common/service/docker/docker.go index 3bb13276..866c4319 100644 --- a/common/service/docker/docker.go +++ b/common/service/docker/docker.go @@ -9,12 +9,12 @@ import ( v1 "github.com/opencontainers/image-spec/specs-go/v1" ) -type dockerClient struct { +type Builder struct { Client *client.Client Ctx context.Context } -func NewDockerClient() (*dockerClient, error) { +func NewDockerClient() (*Builder, error) { client, err := client.NewClientWithOpts( client.FromEnv, client.WithAPIVersionNegotiation(), @@ -24,13 +24,13 @@ func NewDockerClient() (*dockerClient, error) { return nil, err } - return &dockerClient{ + return &Builder{ Client: client, Ctx: context.Background(), }, nil } -func (self dockerClient) GetContainerCreateBuilder() *ContainerCreateBuilder { +func (self Builder) GetContainerCreateBuilder() *ContainerCreateBuilder { builder := &ContainerCreateBuilder{ containerConfig: &container.Config{ ExposedPorts: make(nat.PortSet), diff --git a/database/gen-model.yaml b/database/gen-model.yaml index 256a5673..0c6c2814 100755 --- a/database/gen-model.yaml +++ b/database/gen-model.yaml @@ -7,4 +7,5 @@ relation: foreign_key: container_id type: has_one - table: ims_container - - table: ims_run_env \ No newline at end of file + - table: ims_run_env + - table: ims_task \ No newline at end of file diff --git a/dpanel.db b/dpanel.db index 973b9c16..9d488c27 100644 Binary files a/dpanel.db and b/dpanel.db differ diff --git a/tests/docker_test.go b/tests/docker_test.go index 4c41e0b1..2f1b3a74 100644 --- a/tests/docker_test.go +++ b/tests/docker_test.go @@ -1,11 +1,15 @@ package tests import ( + "bufio" "context" + "encoding/json" "fmt" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/donknap/dpanel/common/service/docker" + "io" + "math" "testing" ) @@ -17,6 +21,63 @@ func TestContainerRemove(t *testing.T) { } +type progressDetail struct { + Id string `json:"id"` + Status string `json:"status"` + ProgressDetail struct { + Current float64 `json:"current"` + Total float64 `json:"total"` + } `json:"progressDetail"` +} + +type pullImageProgress struct { + Downloading float64 + Extracting float64 +} + +func TestPullImage(t *testing.T) { + sdk, _ := docker.NewDockerClient() + //尝试拉取镜像 + reader, err := sdk.Client.ImagePull(context.Background(), "phpmyadmin", types.ImagePullOptions{}) + if err != nil { + fmt.Printf("%v \n", err) + } + + var progress map[string]*pullImageProgress + progress = make(map[string]*pullImageProgress) + + out := bufio.NewReader(reader) + for { + str, err := out.ReadString('\n') + if err == io.EOF { + break + } else { + pd := &progressDetail{} + json.Unmarshal([]byte(str), pd) + if pd.Status == "Pulling fs layer" { + progress[pd.Id] = &pullImageProgress{ + Extracting: 0, + Downloading: 0, + } + } + if pd.ProgressDetail.Total > 0 && pd.Status == "Downloading" { + progress[pd.Id].Downloading = math.Floor((pd.ProgressDetail.Current / pd.ProgressDetail.Total) * 100) + } + if pd.ProgressDetail.Total > 0 && pd.Status == "Extracting" { + progress[pd.Id].Extracting = math.Floor((pd.ProgressDetail.Current / pd.ProgressDetail.Total) * 100) + } + if pd.Status == "Download complete" { + progress[pd.Id].Downloading = 100 + } + if pd.Status == "Pull complete" { + progress[pd.Id].Extracting = 100 + } + + fmt.Printf("%v \n", progress["249ff3a7bbe6"]) + } + } +} + func TestCreateContainer(t *testing.T) { sdk, err := docker.NewDockerClient() if err != nil {