Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

apiv2(ticdc): Add swagger doc #8353

Merged
merged 15 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 6 additions & 13 deletions cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func (h *OpenAPI) PauseChangefeed(c *gin.Context) {
// @Tags changefeed
// @Accept json
// @Produce json
// @Param changefeed-id path string true "changefeed_id"
// @Param changefeed_id path string true "changefeed_id"
// @Success 202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/changefeeds/{changefeed_id}/resume [post]
Expand Down Expand Up @@ -423,12 +423,7 @@ func (h *OpenAPI) ResumeChangefeed(c *gin.Context) {
// @Accept json
// @Produce json
// @Param changefeed_id path string true "changefeed_id"
// @Param target_ts body integer false "changefeed target ts"
// @Param sink_uri body string false "sink uri"
// @Param filter_rules body []string false "filter rules"
// @Param ignore_txn_start_ts body integer false "ignore transaction start ts"
// @Param mounter_worker_num body integer false "mounter worker nums"
// @Param sink_config body config.SinkConfig false "sink config"
// @Param changefeedConfig body model.ChangefeedConfig true "changefeed config"
// @Success 202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/changefeeds/{changefeed_id} [put]
Expand Down Expand Up @@ -579,8 +574,7 @@ func (h *OpenAPI) RebalanceTables(c *gin.Context) {
// @Accept json
// @Produce json
// @Param changefeed_id path string true "changefeed_id"
// @Param table_id body integer true "table_id"
// @Param capture_id body string true "capture_id"
// @Param MoveTable body model.MoveTableReq true "move table request"
// @Success 202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/changefeeds/{changefeed_id}/tables/move_table [post]
Expand All @@ -599,10 +593,7 @@ func (h *OpenAPI) MoveTable(c *gin.Context) {
return
}

data := struct {
CaptureID string `json:"capture_id"`
TableID int64 `json:"table_id"`
}{}
data := model.MoveTableReq{}
err = c.BindJSON(&data)
if err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.Wrap(err))
Expand Down Expand Up @@ -647,6 +638,8 @@ func (h *OpenAPI) ResignOwner(c *gin.Context) {
// @Tags processor
// @Accept json
// @Produce json
// @Param changefeed_id path string true "changefeed ID"
// @Param capture_id path string true "capture ID"
// @Success 200 {object} model.ProcessorDetail
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/processors/{changefeed_id}/{capture_id} [get]
Expand Down
2 changes: 1 addition & 1 deletion cdc/api/v2/api_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
cfg.Engine = model.SortInMemory
cfg.ReplicaConfig = ToAPIReplicaConfig(config.GetDefaultReplicaConfig())
cfg.ReplicaConfig.EnableSyncPoint = true
cfg.ReplicaConfig.SyncPointInterval = 30 * time.Second
cfg.ReplicaConfig.SyncPointInterval = JSONDuration{30 * time.Second}
cfg.PDAddrs = []string{"a", "b"}
cfg.CertPath = "p1"
cfg.CAPath = "p2"
Expand Down
64 changes: 58 additions & 6 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ const (

// createChangefeed handles create changefeed request,
// it returns the changefeed's changefeedInfo that it just created
// CreateChangefeed creates a changefeed
// @Summary Create changefeed
// @Description create a new changefeed
// @Tags changefeed,v2
// @Accept json
// @Produce json
// @Param changefeed body ChangefeedConfig true "changefeed config"
// @Success 200 {object} ChangeFeedInfo
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v2/changefeeds [post]
func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
ctx := c.Request.Context()
cfg := &ChangefeedConfig{ReplicaConfig: GetDefaultReplicaConfig()}
Expand Down Expand Up @@ -161,11 +171,11 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
// listChangeFeeds lists all changgefeeds in cdc cluster
// @Summary List changefeed
// @Description list all changefeeds in cdc cluster
// @Tags changefeed
// @Tags changefeed,v2
// @Accept json
// @Produce json
// @Param state query string false "state"
// @Success 200 {array} model.ChangefeedCommonInfo
// @Success 200 {array} ChangefeedCommonInfo
// @Failure 500 {object} model.HTTPError
// @Router /api/v2/changefeeds [get]
func (h *OpenAPIV2) listChangeFeeds(c *gin.Context) {
Expand Down Expand Up @@ -293,6 +303,17 @@ func (h *OpenAPIV2) verifyTable(c *gin.Context) {
// Can only update a changefeed's: TargetTs, SinkURI,
// ReplicaConfig, PDAddrs, CAPath, CertPath, KeyPath,
// SyncPointEnabled, SyncPointInterval
// UpdateChangefeed updates a changefeed
// @Summary Update a changefeed
// @Description Update a changefeed
// @Tags changefeed,v2
// @Accept json
// @Produce json
// @Param changefeed_id path string true "changefeed_id"
// @Param changefeedConfig body ChangefeedConfig true "changefeed config"
// @Success 202 {object} ChangeFeedInfo
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v2/changefeeds/{changefeed_id} [put]
func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
ctx := c.Request.Context()

Expand Down Expand Up @@ -392,11 +413,11 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
// getChangefeed get detailed info of a changefeed
// @Summary Get changefeed
// @Description get detail information of a changefeed
// @Tags changefeed
// @Tags changefeed,v2
// @Accept json
// @Produce json
// @Param changefeed_id path string true "changefeed_id"
// @Success 200 {object} model.ChangefeedDetail
// @Success 200 {object} ChangeFeedInfo
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v2/changefeeds/{changefeed_id} [get]
func (h *OpenAPIV2) getChangeFeed(c *gin.Context) {
Expand Down Expand Up @@ -455,7 +476,17 @@ func (h *OpenAPIV2) getChangeFeed(c *gin.Context) {
c.JSON(http.StatusOK, detail)
}

// deleteChangefeed handles delete changefeed request,
// deleteChangefeed handles delete changefeed request
// RemoveChangefeed removes a changefeed
// @Summary Remove a changefeed
// @Description Remove a changefeed
// @Tags changefeed,v2
// @Accept json
// @Produce json
// @Param changefeed_id path string true "changefeed_id"
// @Success 204
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v2/changefeeds/{changefeed_id} [delete]
func (h *OpenAPIV2) deleteChangefeed(c *gin.Context) {
ctx := c.Request.Context()
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))
Expand Down Expand Up @@ -558,6 +589,17 @@ func (h *OpenAPIV2) getChangeFeedMetaInfo(c *gin.Context) {
}

// resumeChangefeed handles resume changefeed request.
// ResumeChangefeed resumes a changefeed
// @Summary Resume a changefeed
// @Description Resume a changefeed
// @Tags changefeed,v2
// @Accept json
// @Produce json
// @Param changefeed_id path string true "changefeed_id"
// @Param resumeConfig body ResumeChangefeedConfig true "resume config"
// @Success 202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v2/changefeeds/{changefeed_id}/resume [post]
func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) {
ctx := c.Request.Context()
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))
Expand Down Expand Up @@ -647,6 +689,16 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) {
}

// pauseChangefeed handles pause changefeed request
// PauseChangefeed pauses a changefeed
// @Summary Pause a changefeed
// @Description Pause a changefeed
// @Tags changefeed,v2
// @Accept json
// @Produce json
// @Param changefeed_id path string true "changefeed_id"
// @Success 202 {object} EmptyResponse
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v2/changefeeds/{changefeed_id}/pause [post]
func (h *OpenAPIV2) pauseChangefeed(c *gin.Context) {
ctx := c.Request.Context()

Expand Down Expand Up @@ -708,7 +760,7 @@ func toAPIModel(
Namespace: info.Namespace,
ID: info.ID,
SinkURI: sinkURI,
CreateTime: info.CreateTime,
CreateTime: model.JSONTime(info.CreateTime),
StartTs: info.StartTs,
TargetTs: info.TargetTs,
AdminJobType: info.AdminJobType,
Expand Down
7 changes: 7 additions & 0 deletions cdc/api/v2/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
)

// @Summary Check the health status of a TiCDC cluster
// @Description Check the health status of a TiCDC cluster
// @Tags common,v2
// @Produce json
// @Success 200 {object} EmptyResponse
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v2/health [get]
func (h *OpenAPIV2) health(c *gin.Context) {
if !h.capture.IsOwner() {
middleware.ForwardToOwnerMiddleware(h.capture)(c)
Expand Down
10 changes: 10 additions & 0 deletions cdc/api/v2/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ import (
"go.uber.org/zap"
)

// SetLogLevel changes TiCDC log level dynamically.
// @Summary Change TiCDC log level
// @Description change TiCDC log level dynamically
// @Tags common,v2
// @Accept json
// @Produce json
// @Param log_level body LogLevelReq true "log level"
// @Success 200 {object} EmptyResponse
// @Failure 400 {object} model.HTTPError
// @Router /api/v2/log [post]
func (h *OpenAPIV2) setLogLevel(c *gin.Context) {
req := &LogLevelReq{}
err := c.BindJSON(&req)
Expand Down
101 changes: 75 additions & 26 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,23 +119,57 @@ type ProcessorCommonInfo struct {
CaptureID string `json:"capture_id"`
}

// JSONDuration used to wrap duration into json format
type JSONDuration struct {
duration time.Duration
}

// MarshalJSON marshal duration to string
func (d JSONDuration) MarshalJSON() ([]byte, error) {
return json.Marshal(d.duration.String())
}

// UnmarshalJSON unmarshal json value to wrapped duration
func (d *JSONDuration) UnmarshalJSON(b []byte) error {
var v interface{}
if err := json.Unmarshal(b, &v); err != nil {
return err
}
switch value := v.(type) {
case float64:
d.duration = time.Duration(value)
return nil
case string:
var err error
d.duration, err = time.ParseDuration(value)
if err != nil {
return err
}
return nil
default:
return errors.New("invalid duration")
}
}

// ReplicaConfig is a duplicate of config.ReplicaConfig
type ReplicaConfig struct {
MemoryQuota uint64 `json:"memory_quota"`
CaseSensitive bool `json:"case_sensitive"`
EnableOldValue bool `json:"enable_old_value"`
ForceReplicate bool `json:"force_replicate"`
IgnoreIneligibleTable bool `json:"ignore_ineligible_table"`
CheckGCSafePoint bool `json:"check_gc_safe_point"`
EnableSyncPoint bool `json:"enable_sync_point"`
BDRMode bool `json:"bdr_mode"`
SyncPointInterval time.Duration `json:"sync_point_interval"`
SyncPointRetention time.Duration `json:"sync_point_retention"`
Filter *FilterConfig `json:"filter"`
Mounter *MounterConfig `json:"mounter"`
Sink *SinkConfig `json:"sink"`
Consistent *ConsistentConfig `json:"consistent"`
Scheduler *ChangefeedSchedulerConfig `json:"scheduler"`
MemoryQuota uint64 `json:"memory_quota"`
CaseSensitive bool `json:"case_sensitive"`
EnableOldValue bool `json:"enable_old_value"`
ForceReplicate bool `json:"force_replicate"`
IgnoreIneligibleTable bool `json:"ignore_ineligible_table"`
CheckGCSafePoint bool `json:"check_gc_safe_point"`
EnableSyncPoint bool `json:"enable_sync_point"`
BDRMode bool `json:"bdr_mode"`

SyncPointInterval JSONDuration `json:"sync_point_interval" swaggertype:"string"`
SyncPointRetention JSONDuration `json:"sync_point_retention" swaggertype:"string"`

Filter *FilterConfig `json:"filter"`
Mounter *MounterConfig `json:"mounter"`
Sink *SinkConfig `json:"sink"`
Consistent *ConsistentConfig `json:"consistent"`
Scheduler *ChangefeedSchedulerConfig `json:"scheduler"`
}

// ToInternalReplicaConfig coverts *v2.ReplicaConfig into *config.ReplicaConfig
Expand All @@ -147,8 +181,8 @@ func (c *ReplicaConfig) ToInternalReplicaConfig() *config.ReplicaConfig {
res.ForceReplicate = c.ForceReplicate
res.CheckGCSafePoint = c.CheckGCSafePoint
res.EnableSyncPoint = c.EnableSyncPoint
res.SyncPointInterval = c.SyncPointInterval
res.SyncPointRetention = c.SyncPointRetention
res.SyncPointInterval = c.SyncPointInterval.duration
res.SyncPointRetention = c.SyncPointRetention.duration
res.BDRMode = c.BDRMode

if c.Filter != nil {
Expand Down Expand Up @@ -263,8 +297,8 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
IgnoreIneligibleTable: false,
CheckGCSafePoint: cloned.CheckGCSafePoint,
EnableSyncPoint: cloned.EnableSyncPoint,
SyncPointInterval: cloned.SyncPointInterval,
SyncPointRetention: cloned.SyncPointRetention,
SyncPointInterval: JSONDuration{cloned.SyncPointInterval},
SyncPointRetention: JSONDuration{cloned.SyncPointRetention},
BDRMode: cloned.BDRMode,
}

Expand Down Expand Up @@ -377,8 +411,8 @@ func GetDefaultReplicaConfig() *ReplicaConfig {
EnableOldValue: true,
CheckGCSafePoint: true,
EnableSyncPoint: false,
SyncPointInterval: 10 * time.Second,
SyncPointRetention: 24 * time.Hour,
SyncPointInterval: JSONDuration{10 * time.Second},
SyncPointRetention: JSONDuration{24 * time.Hour},
Filter: &FilterConfig{
Rules: []string{"*.*"},
},
Expand Down Expand Up @@ -561,11 +595,11 @@ type ResolveLockReq struct {

// ChangeFeedInfo describes the detail of a ChangeFeed
type ChangeFeedInfo struct {
UpstreamID uint64 `json:"upstream_id,omitempty"`
Namespace string `json:"namespace,omitempty"`
ID string `json:"id,omitempty"`
SinkURI string `json:"sink_uri,omitempty"`
CreateTime time.Time `json:"create_time"`
UpstreamID uint64 `json:"upstream_id,omitempty"`
Namespace string `json:"namespace,omitempty"`
ID string `json:"id,omitempty"`
SinkURI string `json:"sink_uri,omitempty"`
CreateTime model.JSONTime `json:"create_time"`
// Start sync at this commit ts if `StartTs` is specify or using the CreateTime of changefeed.
StartTs uint64 `json:"start_ts,omitempty"`
// The ChangeFeed will exits until sync to timestamp TargetTs
Expand Down Expand Up @@ -642,3 +676,18 @@ type ProcessorDetail struct {
// All table ids that this processor are replicating.
Tables []int64 `json:"table_ids"`
}

// Liveness is the liveness status of a capture.
// Liveness can only be changed from alive to stopping, and no way back.
type Liveness int32

// ServerStatus holds some common information of a server
type ServerStatus struct {
Version string `json:"version"`
GitHash string `json:"git_hash"`
ID string `json:"id"`
ClusterID string `json:"cluster_id"`
Pid int `json:"pid"`
IsOwner bool `json:"is_owner"`
Liveness Liveness `json:"liveness"`
}
9 changes: 9 additions & 0 deletions cdc/api/v2/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@ import (
"github.com/gin-gonic/gin"
)

// ResignOwner makes the current owner resign
// @Summary Notify the owner to resign
// @Description Notify the current owner to resign
// @Tags owner,v2
// @Accept json
// @Produce json
// @Success 202 {object} EmptyResponse
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v2/owner/resign [post]
func (h *OpenAPIV2) resignOwner(c *gin.Context) {
o, _ := h.capture.GetOwner()
if o != nil {
Expand Down
Loading