Skip to content

Commit 3179773

Browse files
committed
Write docs
1 parent 35cadef commit 3179773

File tree

20 files changed

+99
-6
lines changed

20 files changed

+99
-6
lines changed

binlogmsg/binlogmsg.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ import (
2121
nppbmsg "github.com/huangjunwen/nproto/v2/pb/msg"
2222
)
2323

24+
// MsgPipe pipes messages from MySQL (>=8.0.2) msg tables to downstream.
25+
// Messages from MsgPipe have type *rawenc.RawData. So downstream must be able
26+
// to handle this type of messages.
2427
type MsgPipe struct {
2528
// Immutable fields.
2629
downstream interface{} // msg.MsgPublisher or msg.MsgAsyncPublisher
@@ -36,8 +39,14 @@ type MsgPipe struct {
3639
// MsgTableFilter returns true if a given table is a msg table.
3740
type MsgTableFilter func(schema, table string) bool
3841

42+
// MsgPipeOption is option in creating MsgPipe.
3943
type MsgPipeOption func(*MsgPipe) error
4044

45+
// NewMsgPipe creates a new msg pipe:
46+
// - downstream: must be MsgPublisher or MsgAsyncPublisher.
47+
// - masterCfg: master connection to read (full dump) and write (delete published messages).
48+
// - slaveCfg: slave config for binlog subscription.
49+
// - tableFilter: determine whether a table is used to store messages.
4150
func NewMsgPipe(
4251
downstream interface{},
4352
masterCfg *mycanal.FullDumpConfig,
@@ -71,6 +80,7 @@ func NewMsgPipe(
7180
return pipe, nil
7281
}
7382

83+
// Run the main loop (flush messages to downstream) until context.Done().
7484
func (pipe *MsgPipe) Run(ctx context.Context) (err error) {
7585
for {
7686
pipe.run(ctx)
@@ -309,6 +319,11 @@ func (pipe *MsgPipe) flushMsgEntry(ctx context.Context, entry msgEntry, cb func(
309319

310320
}
311321

322+
// NewMsgPublisher creates a publisher to publish (store) message to MySQL msg tables:
323+
// - encoder: encoder for messages.
324+
// - q: *sql.DB/*sql.Tx/*sql.Conn/...
325+
// - schema: database name.
326+
// - table: msg table name, the table must be created by CreateMsgTable.
312327
func NewMsgPublisher(encoder npenc.Encoder, q sqlh.Queryer, schema, table string) MsgPublisherFunc {
313328
return func(ctx context.Context, spec MsgSpec, msg interface{}) error {
314329
if err := AssertMsgType(spec, msg); err != nil {

binlogmsg/doc.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
// Package binlogmsg contains publisher implemenation to 'publish' (store) messages to MySQL8
2+
// tables then flush to downstream publisher using binlog notification.
3+
//
4+
// Since messages are stored in normal MySQL tables, all ACID properties are applied to them.
5+
package binlogmsg

binlogmsg/options.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,17 @@ import (
88
)
99

1010
var (
11+
// DefaultLockName is the default value of PipeOptLockName.
1112
DefaultLockName = "nproto.binlogmsg"
1213

14+
// DefaultMaxInflight is the default value of PipeOptMaxInflight.
1315
DefaultMaxInflight = 4096
1416

17+
// DefaultRetryWait is the default value of PipeOptRetryWait.
1518
DefaultRetryWait = 5 * time.Second
1619
)
1720

21+
// PipeOptLogger sets logger for MsgPipe.
1822
func PipeOptLogger(logger logr.Logger) MsgPipeOption {
1923
return func(pipe *MsgPipe) error {
2024
if logger == nil {
@@ -25,6 +29,8 @@ func PipeOptLogger(logger logr.Logger) MsgPipeOption {
2529
}
2630
}
2731

32+
// PipeOptLockName sets the lock name using in MySQL get lock ("SELECT GET_LOCK"):
33+
// only one instance of pipes can run for the same lock name.
2834
func PipeOptLockName(lockName string) MsgPipeOption {
2935
return func(pipe *MsgPipe) error {
3036
if lockName == "" {
@@ -35,6 +41,7 @@ func PipeOptLockName(lockName string) MsgPipeOption {
3541
}
3642
}
3743

44+
// PipeOptMaxInflight sets the max number of messages inflight (publishing).
3845
func PipeOptMaxInflight(maxInflight int) MsgPipeOption {
3946
return func(pipe *MsgPipe) error {
4047
if maxInflight < 1 {
@@ -45,6 +52,7 @@ func PipeOptMaxInflight(maxInflight int) MsgPipeOption {
4552
}
4653
}
4754

55+
// PipeOptRetryWait sets the interval between retries due to all kinds of errors.
4856
func PipeOptRetryWait(t time.Duration) MsgPipeOption {
4957
return func(pipe *MsgPipe) error {
5058
if t <= 0 {

binlogmsg/pj.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ import (
1010
. "github.com/huangjunwen/nproto/v2/msg"
1111
)
1212

13+
// PbJsonPublisher creates a msg publisher using protobuf or json for encoding:
14+
// - If msg is proto.Message, then use protobuf.
15+
// - Otherwise use json.
1316
func PbJsonPublisher(q sqlh.Queryer, schema, table string) MsgPublisherFunc {
1417

1518
pbPublisher := NewMsgPublisher(pjenc.DefaultPbEncoder, q, schema, table)

msg/common.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
// Package msg contains high level types/interfaces for msg implementations.
21
package msg
32

43
import (
@@ -98,6 +97,7 @@ func (spec *msgSpec) String() string {
9897
return fmt.Sprintf("MsgSpec(%s %s)", spec.subjectName, spec.msgType.String())
9998
}
10099

100+
// MustRawDataMsgSpec is must-version of NewRawDataMsgSpec.
101101
func MustRawDataMsgSpec(subjectName string) MsgSpec {
102102
spec, err := NewRawDataMsgSpec(subjectName)
103103
if err != nil {
@@ -106,6 +106,7 @@ func MustRawDataMsgSpec(subjectName string) MsgSpec {
106106
return spec
107107
}
108108

109+
// NewRawDataMsgSpec validates and creates a new MsgSpec with *rawenc.RawData msg type.
109110
func NewRawDataMsgSpec(subjectName string) (MsgSpec, error) {
110111
if !SubjectNameRegexp.MatchString(subjectName) {
111112
return nil, fmt.Errorf("SubjectName format invalid")

msg/doc.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
// Package msg contains high level types/interfaces for msg implementations.
2+
package msg

msg/publisher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"context"
55
)
66

7-
// MsgPublisher is used to publish messages reliably, e.g. at least once delivery.
7+
// MsgPublisher is used to publish messages.
88
type MsgPublisher interface {
99
// Publish publishes a message to the given subject. It returns nil if success.
1010
Publish(ctx context.Context, spec MsgSpec, msg interface{}) error

msg/subscriber.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ type MsgSubscriber interface {
1414
Subscribe(spec MsgSpec, queue string, handler MsgHandler, opts ...interface{}) error
1515
}
1616

17-
// MsgHandler handles messages. A message should be redelivered if the handler returns an error.
18-
// Except that the error's type is nproto.Error and error.Retryable() is false.
17+
// MsgHandler handles messages. For 'at least once delivery' implementations, a message
18+
// should be redelivered if the handler returns an error. Otherwise it may or may not
19+
// be redelivered.
1920
type MsgHandler func(context.Context, interface{}) error
2021

2122
// MsgSubscriberFunc is an adapter to allow the use of ordinary functions as MsgSubscriber.

natsrpc/client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,19 @@ import (
1616
. "github.com/huangjunwen/nproto/v2/rpc"
1717
)
1818

19+
// ClientConn wraps nats.Conn into 'client side rpc connection'.
1920
type ClientConn struct {
2021
// Immutable fields.
2122
subjectPrefix string
2223
timeout time.Duration
2324
nc *nats.Conn
2425
}
2526

27+
// ClientConnOption is option in creating ClientConn.
2628
type ClientConnOption func(*ClientConn) error
2729

30+
// NewClientConn creates a new ClientConn. `nc` must have MaxReconnect < 0
31+
// (e.g. never give up trying to reconnect).
2832
func NewClientConn(nc *nats.Conn, opts ...ClientConnOption) (*ClientConn, error) {
2933

3034
if nc.Opts.MaxReconnect >= 0 {
@@ -45,6 +49,7 @@ func NewClientConn(nc *nats.Conn, opts ...ClientConnOption) (*ClientConn, error)
4549
return cc, nil
4650
}
4751

52+
// Client creates an rpc client using specified encoder and decoder.
4853
func (cc *ClientConn) Client(encoder npenc.Encoder, decoder npenc.Decoder) RPCClientFunc {
4954
return func(spec RPCSpec) RPCHandler {
5055
return cc.makeHandler(spec, encoder, decoder)

natsrpc/doc.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
// Package natsrpc is an rpc implemenation using nats as transport.
2+
package natsrpc

0 commit comments

Comments
 (0)