Skip to content

Commit 3e68cb9

Browse files
author
ffffwh
committed
Merge branch 'master' into 4.22.07.x
# Conflicts: # driver/driver.go # driver/mysql/applier_incr.go # g/g.go
2 parents 26dca3d + c1bf0af commit 3e68cb9

20 files changed

+282
-172
lines changed

api/handler/v2/database.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package v2
22

33
import (
4+
"context"
45
"fmt"
56
"net/http"
67
"strings"
@@ -123,7 +124,7 @@ func listOracleSchema(logger hclog.Logger, reqParam *models.ListDatabaseSchemasR
123124
}
124125
reqParam.Password = realPwd
125126
}
126-
oracleDb, err := config.NewDB(&config.OracleConfig{
127+
oracleDb, err := config.NewDB(context.TODO(), &config.OracleConfig{
127128
User: reqParam.User,
128129
Password: reqParam.Password,
129130
Host: reqParam.Host,
@@ -206,7 +207,7 @@ func ListDatabaseColumnsV2(c echo.Context) error {
206207
}
207208
reqParam.Password = realPwd
208209
}
209-
oracleDb, err := config.NewDB(&config.OracleConfig{
210+
oracleDb, err := config.NewDB(context.TODO(), &config.OracleConfig{
210211
User: reqParam.User,
211212
Password: reqParam.Password,
212213
Host: reqParam.Host,

driver/common/common.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@ const (
2828
)
2929

3030
const (
31-
TaskStateComplete int = iota
32-
TaskStateRestart
33-
TaskStateDead
31+
TaskStateDead = 2
3432
)
3533

3634
const (

driver/common/dumper.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package common
88

99
import (
10+
"context"
1011
"sync"
1112

1213
"github.com/actiontech/dtle/g"
@@ -16,6 +17,7 @@ type GetChunkDataFn func() (nRows int64, err error)
1617
type PrepareFn func() (err error)
1718

1819
type Dumper struct {
20+
Ctx context.Context
1921
Logger g.LoggerType
2022
ChunkSize int64
2123
TableSchema string
@@ -36,9 +38,10 @@ type Dumper struct {
3638
PrepareForDumping PrepareFn
3739
}
3840

39-
func NewDumper(table *Table, chunkSize int64, logger g.LoggerType, memory *int64) *Dumper {
41+
func NewDumper(ctx context.Context, table *Table, chunkSize int64, logger g.LoggerType, memory *int64) *Dumper {
4042

4143
dumper := &Dumper{
44+
Ctx: ctx,
4245
Logger: logger,
4346
TableSchema: table.TableSchema,
4447
TableName: table.TableName,

driver/common/utils.go

+8
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package common
88

99
import (
1010
"fmt"
11+
"github.com/hashicorp/nomad/plugins/drivers"
1112
"github.com/pingcap/tidb/parser/format"
1213
"regexp"
1314
"strconv"
@@ -37,3 +38,10 @@ func MysqlVersionInDigit(v string) (int, error) {
3738

3839
return m0*10000 + m1*100 + m2, nil
3940
}
41+
42+
func WriteWaitCh(ch chan<- *drivers.ExitResult, r *drivers.ExitResult) {
43+
select {
44+
case ch<-r:
45+
default:
46+
}
47+
}

driver/driver.go

+10-16
Original file line numberDiff line numberDiff line change
@@ -655,22 +655,12 @@ func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *dr
655655
return
656656
case <-d.ctx.Done():
657657
return
658-
case <-handle.ctx.Done():
659-
result := &drivers.ExitResult{
660-
ExitCode: 0,
661-
Signal: 0,
662-
OOMKilled: false,
663-
Err: nil,
664-
}
665-
ch <- result
666-
case result := <-handle.waitCh: // Do not refer to handle.runner.waitCh. It might be nil.
667-
handle.stateLock.Lock()
668-
handle.procState = drivers.TaskStateExited
669-
handle.stateLock.Unlock()
670-
ch <- result
658+
case <-handle.doneCh:
659+
ch <- handle.exitResult.Copy()
671660
}
672661
}
673662

663+
// StopTask will not be called if the task has already exited (e.g. onError).
674664
func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error {
675665
d.logger.Info("StopTask", "id", taskID, "signal", signal)
676666
handle, ok := d.tasks.Get(taskID)
@@ -793,10 +783,15 @@ func (d *Driver) SignalTask(taskID string, signal string) error {
793783
return errors.New(string(bs))
794784
}
795785
case "finish":
786+
if h.runner == nil {
787+
return fmt.Errorf("h.runner is nil")
788+
}
796789
return h.runner.Finish1()
797790
case "pause":
798791
d.logger.Info("pause a task", "taskID", taskID)
799-
h := d.tasks.store[taskID]
792+
if h.runner == nil {
793+
return fmt.Errorf("h.runner is nil")
794+
}
800795
err := h.runner.Shutdown()
801796
if err != nil {
802797
d.logger.Error("error when pausing a task", "taskID", taskID, "err", err)
@@ -806,7 +801,6 @@ func (d *Driver) SignalTask(taskID string, signal string) error {
806801
return nil
807802
case "resume":
808803
d.logger.Info("resume a task", "taskID", taskID)
809-
h := d.tasks.store[taskID]
810804
err := h.resumeTask(d)
811805
if err != nil {
812806
d.logger.Error("error when resuming a task", "taskID", taskID, "err", err)
@@ -817,7 +811,7 @@ func (d *Driver) SignalTask(taskID string, signal string) error {
817811
return nil
818812
}
819813

820-
return nil
814+
return fmt.Errorf("DTLE_BUG SignalTask. should not reach here")
821815
}
822816

823817
func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) {

driver/handle.go

+45-20
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package mysql
22

33
import (
4-
"context"
54
"fmt"
65
"sync"
76
"time"
@@ -33,12 +32,12 @@ type taskHandle struct {
3332

3433
runner DriverHandle
3534

36-
ctx context.Context
37-
cancelFunc context.CancelFunc
38-
waitCh chan *drivers.ExitResult
39-
stats *common.TaskStatistics
35+
waitCh chan *drivers.ExitResult
36+
doneCh chan struct{}
37+
stats *common.TaskStatistics
4038

4139
driverConfig *common.MySQLDriverConfig
40+
shutdown bool
4241
}
4342

4443
func newDtleTaskHandle(logger g.LoggerType, cfg *drivers.TaskConfig, state drivers.TaskState, started time.Time) *taskHandle {
@@ -50,12 +49,24 @@ func newDtleTaskHandle(logger g.LoggerType, cfg *drivers.TaskConfig, state drive
5049
startedAt: started,
5150
completedAt: time.Time{},
5251
exitResult: nil,
53-
waitCh: make(chan *drivers.ExitResult, 1),
52+
waitCh: make(chan *drivers.ExitResult),
53+
doneCh: make(chan struct{}),
5454
}
55-
h.ctx, h.cancelFunc = context.WithCancel(context.TODO())
55+
go h.watchWaitCh()
5656
return h
5757
}
5858

59+
func (h *taskHandle) watchWaitCh() {
60+
select {
61+
case r := <-h.waitCh:
62+
h.stateLock.Lock()
63+
h.exitResult = r
64+
h.stateLock.Unlock()
65+
close(h.doneCh)
66+
case <-h.doneCh:
67+
}
68+
}
69+
5970
func (h *taskHandle) TaskStatus() (*drivers.TaskStatus, error) {
6071
h.stateLock.RLock()
6172
defer h.stateLock.RUnlock()
@@ -83,13 +94,14 @@ func (h *taskHandle) TaskStatus() (*drivers.TaskStatus, error) {
8394
}, nil
8495
}
8596

97+
// used when h.runner has not been setup
8698
func (h *taskHandle) onError(err error) {
87-
h.waitCh <- &drivers.ExitResult{
99+
common.WriteWaitCh(h.waitCh, &drivers.ExitResult{
88100
ExitCode: common.TaskStateDead,
89101
Signal: 0,
90102
OOMKilled: false,
91103
Err: err,
92-
}
104+
})
93105
}
94106

95107
func (h *taskHandle) IsRunning() bool {
@@ -137,7 +149,8 @@ func (h *taskHandle) run(d *Driver) {
137149
t := time.NewTimer(0)
138150
for {
139151
select {
140-
case <-h.ctx.Done():
152+
case <-h.doneCh:
153+
if !t.Stop() { <-t.C }
141154
return
142155
case <-t.C:
143156
if h.runner != nil {
@@ -168,12 +181,12 @@ func (h *taskHandle) NewRunner(d *Driver) (runner DriverHandle, err error) {
168181
case common.TaskTypeSrc:
169182
if h.driverConfig.OracleConfig != nil {
170183
h.logger.Debug("found oracle src", "OracleConfig", h.driverConfig.OracleConfig)
171-
runner, err = extractor.NewExtractorOracle(ctx, h.driverConfig, h.logger, d.storeManager, h.waitCh)
184+
runner, err = extractor.NewExtractorOracle(ctx, h.driverConfig, h.logger, d.storeManager, h.waitCh, d.ctx)
172185
if err != nil {
173186
return nil, errors.Wrap(err, "NewExtractor")
174187
}
175188
} else {
176-
runner, err = mysql.NewExtractor(ctx, h.driverConfig, h.logger, d.storeManager, h.waitCh, h.ctx)
189+
runner, err = mysql.NewExtractor(ctx, h.driverConfig, h.logger, d.storeManager, h.waitCh, d.ctx)
177190
if err != nil {
178191
return nil, errors.Wrap(err, "NewOracleExtractor")
179192
}
@@ -183,13 +196,13 @@ func (h *taskHandle) NewRunner(d *Driver) (runner DriverHandle, err error) {
183196
if h.driverConfig.KafkaConfig != nil {
184197
h.logger.Debug("found kafka", "KafkaConfig", h.driverConfig.KafkaConfig)
185198
runner, err = kafka.NewKafkaRunner(ctx, h.driverConfig.KafkaConfig, h.logger,
186-
d.storeManager, d.config.NatsAdvertise, h.waitCh, h.ctx)
199+
d.storeManager, d.config.NatsAdvertise, h.waitCh, d.ctx)
187200
if err != nil {
188201
return nil, errors.Wrap(err, "NewKafkaRunner")
189202
}
190203
} else {
191204
runner, err = mysql.NewApplier(ctx, h.driverConfig, h.logger, d.storeManager,
192-
d.config.NatsAdvertise, h.waitCh, d.eventer, h.taskConfig, h.ctx)
205+
d.config.NatsAdvertise, h.waitCh, d.eventer, h.taskConfig, d.ctx)
193206
if err != nil {
194207
return nil, errors.Wrap(err, "NewApplier")
195208
}
@@ -265,23 +278,35 @@ func (h *taskHandle) emitStats(ru *common.TaskStatistics) {
265278
}
266279
}
267280

268-
func (h *taskHandle) Destroy() bool {
269-
h.stateLock.RLock()
270-
//driver.des
271-
h.cancelFunc()
281+
func (h *taskHandle) Destroy() {
282+
if h.shutdown {
283+
return
284+
}
285+
h.stateLock.Lock()
286+
h.shutdown = true
287+
h.stateLock.Unlock()
288+
289+
common.WriteWaitCh(h.waitCh, &drivers.ExitResult{
290+
ExitCode: 0,
291+
Signal: 0,
292+
OOMKilled: false,
293+
Err: nil,
294+
})
295+
272296
if h.runner != nil {
273297
err := h.runner.Shutdown()
274298
if err != nil {
275299
h.logger.Error("error in h.runner.Shutdown", "err", err)
276300
}
277301
}
278-
return h.procState == drivers.TaskStateExited
279302
}
280303

281304
type DriverHandle interface {
282305
Run()
283306

284-
// Shutdown is used to stop the task
307+
// Shutdown is used to stop the task.
308+
// Do not send ExitResult in Shutdown().
309+
// pause API will call Shutdown and the task should not exit.
285310
Shutdown() error
286311

287312
// Stats returns aggregated stats of the driver

driver/kafka/kafka3.go

+15-9
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,10 @@ type KafkaRunner struct {
4646
natsConn *gonats.Conn
4747
waitCh chan *drivers.ExitResult
4848

49-
ctx context.Context
50-
shutdown bool
51-
shutdownCh chan struct{}
49+
ctx context.Context
50+
shutdown bool
51+
shutdownCh chan struct{}
52+
shutdownLock sync.Mutex
5253

5354
kafkaConfig *common.KafkaConfig
5455
kafkaMgr *KafkaManager
@@ -157,9 +158,14 @@ func (kr *KafkaRunner) updateGtidLoop() {
157158
}
158159

159160
func (kr *KafkaRunner) Shutdown() error {
161+
kr.logger.Debug("Shutting down")
162+
kr.shutdownLock.Lock()
163+
defer kr.shutdownLock.Unlock()
164+
160165
if kr.shutdown {
161166
return nil
162167
}
168+
163169
if kr.natsConn != nil {
164170
kr.natsConn.Close()
165171
}
@@ -634,14 +640,14 @@ func (kr *KafkaRunner) initiateStreaming() error {
634640
}
635641

636642
func (kr *KafkaRunner) onError(state int, err error) {
643+
kr.logger.Info("onError", "err", err, "hasShutdown", kr.shutdown)
644+
637645
if kr.shutdown {
638646
return
639647
}
640648

641649
switch state {
642-
case common.TaskStateComplete:
643-
kr.logger.Info("Done migrating")
644-
case common.TaskStateRestart, common.TaskStateDead:
650+
case common.TaskStateDead:
645651
msg := &common.ControlMsg{
646652
Msg: err.Error(),
647653
Type: common.ControlMsgError,
@@ -660,13 +666,13 @@ func (kr *KafkaRunner) onError(state int, err error) {
660666
}
661667
}
662668

663-
kr.waitCh <- &drivers.ExitResult{
669+
common.WriteWaitCh(kr.waitCh, &drivers.ExitResult{
664670
ExitCode: state,
665671
Signal: 0,
666672
OOMKilled: false,
667673
Err: err,
668-
}
669-
kr.Shutdown()
674+
})
675+
_ = kr.Shutdown()
670676
}
671677

672678
func (kr *KafkaRunner) kafkaTransformSnapshotData(

0 commit comments

Comments
 (0)