Skip to content

Commit a1532de

Browse files
author
ffffwh
committed
Merge branch 'master' into 4.22.07.x
2 parents 3e68cb9 + 411d21d commit a1532de

18 files changed

+224
-249
lines changed

api/docs/docs.go

+3
Original file line numberDiff line numberDiff line change
@@ -3017,6 +3017,9 @@ var doc = `{
30173017
"binlog_relay": {
30183018
"type": "boolean"
30193019
},
3020+
"dump_entry_limit": {
3021+
"type": "integer"
3022+
},
30203023
"expand_syntax_support": {
30213024
"type": "boolean"
30223025
},

api/docs/swagger.json

+3
Original file line numberDiff line numberDiff line change
@@ -3001,6 +3001,9 @@
30013001
"binlog_relay": {
30023002
"type": "boolean"
30033003
},
3004+
"dump_entry_limit": {
3005+
"type": "integer"
3006+
},
30043007
"expand_syntax_support": {
30053008
"type": "boolean"
30063009
},

api/docs/swagger.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,8 @@ definitions:
591591
type: boolean
592592
binlog_relay:
593593
type: boolean
594+
dump_entry_limit:
595+
type: integer
594596
expand_syntax_support:
595597
type: boolean
596598
gtid:

api/handler/v2/job.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -592,6 +592,7 @@ func buildDatabaseSrcTaskConfigMap(config *models.SrcTaskConfig) map[string]inte
592592
addNotRequiredParamToMap(taskConfigInNomadFormat, config.MysqlSrcTaskConfig.BinlogRelay, "BinlogRelay")
593593
addNotRequiredParamToMap(taskConfigInNomadFormat, config.MysqlSrcTaskConfig.Gtid, "Gtid")
594594
addNotRequiredParamToMap(taskConfigInNomadFormat, config.MysqlSrcTaskConfig.ExpandSyntaxSupport, "ExpandSyntaxSupport")
595+
addNotRequiredParamToMap(taskConfigInNomadFormat, config.MysqlSrcTaskConfig.DumpEntryLimit, "DumpEntryLimit")
595596
taskConfigInNomadFormat["ConnectionConfig"] = buildMysqlConnectionConfigMap(config.ConnectionConfig)
596597
}
597598
// for Oracle
@@ -634,8 +635,8 @@ func buildMysqlTableConfigMap(configs []*models.TableConfig) []map[string]interf
634635
configMap := make(map[string]interface{})
635636
if len(c.ColumnMapFrom) != 0 {
636637
configMap["ColumnMapFrom"] = c.ColumnMapFrom
637-
if len(c.ColumnMapTO) != 0 {
638-
configMap["ColumnMapTo"] = c.ColumnMapTO
638+
if len(c.ColumnMapTo) != 0 {
639+
configMap["ColumnMapTo"] = c.ColumnMapTo
639640
}
640641
}
641642
addNotRequiredParamToMap(configMap, c.TableName, "TableName")
@@ -799,6 +800,7 @@ func buildBasicTaskProfile(logger g.LoggerType, jobId string, srcTaskDetail *mod
799800
BinlogRelay: srcTaskDetail.TaskConfig.MysqlSrcTaskConfig.BinlogRelay,
800801
WaitOnJob: srcTaskDetail.TaskConfig.MysqlSrcTaskConfig.WaitOnJob,
801802
AutoGtid: srcTaskDetail.TaskConfig.MysqlSrcTaskConfig.AutoGtid,
803+
DumpEntryLimit: srcTaskDetail.TaskConfig.MysqlSrcTaskConfig.DumpEntryLimit,
802804
}
803805
} else if srcTaskDetail.TaskConfig.OracleSrcTaskConfig != nil {
804806
srcConfig.OracleSrcTaskConfig = &models.OracleSrcTaskConfig{
@@ -951,7 +953,7 @@ func buildSrcTaskDetail(taskName string, internalTaskConfig common.DtleTaskConfi
951953
TableRegex: tb.TableRegex,
952954
TableRename: tb.TableRename,
953955
ColumnMapFrom: tb.ColumnMapFrom,
954-
ColumnMapTO: tb.ColumnMapTo,
956+
ColumnMapTo: tb.ColumnMapTo,
955957
Where: tb.Where,
956958
})
957959
}
@@ -1000,6 +1002,7 @@ func buildSrcTaskDetail(taskName string, internalTaskConfig common.DtleTaskConfi
10001002
Gtid: internalTaskConfig.Gtid,
10011003
BinlogRelay: internalTaskConfig.BinlogRelay,
10021004
WaitOnJob: internalTaskConfig.WaitOnJob,
1005+
DumpEntryLimit: internalTaskConfig.DumpEntryLimit,
10031006
}
10041007
}
10051008
srcTaskDetail.TaskConfig.ConnectionConfig = connectionConfig
@@ -1932,6 +1935,7 @@ func ReverseJobV2(c echo.Context, filterJobType DtleJobType) error {
19321935
BinlogRelay: originalJob.BasicTaskProfile.Configuration.SrcConfig.MysqlSrcTaskConfig.BinlogRelay,
19331936
WaitOnJob: consulJobItem.JobId,
19341937
AutoGtid: true,
1938+
DumpEntryLimit: originalJob.BasicTaskProfile.Configuration.SrcConfig.MysqlSrcTaskConfig.DumpEntryLimit,
19351939
},
19361940
}
19371941
reverseJobParam.DestTask = &models.DestTaskConfig{

api/models/job_v2.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ type MysqlSrcTaskConfig struct {
152152
BinlogRelay bool `json:"binlog_relay"`
153153
WaitOnJob string `json:"wait_on_job"`
154154
AutoGtid bool `json:"auto_gtid"`
155+
DumpEntryLimit int `json:"dump_entry_limit"`
155156
}
156157

157158
type OracleSrcTaskConfig struct {
@@ -186,7 +187,7 @@ type TableConfig struct {
186187
TableRegex string `json:"table_regex"`
187188
TableRename string `json:"table_rename"`
188189
ColumnMapFrom []string `json:"column_map_from"`
189-
ColumnMapTO []string `json:"column_map_to"`
190+
ColumnMapTo []string `json:"column_map_to"`
190191
Where string `json:"where"`
191192
}
192193
type DatabaseConnectionConfig struct {

driver/common/dumper.go

+4
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ type Dumper struct {
2727
Table *Table
2828
Iteration int64
2929
Columns string
30+
// ResultsChannel should be closed after writing all entries.
3031
ResultsChannel chan *DumpEntry
32+
// Set Err (if there is) before closing ResultsChannel.
33+
Err error
3134
shutdown bool
3235
ShutdownCh chan struct{}
3336
shutdownLock sync.Mutex
@@ -71,6 +74,7 @@ func (d *Dumper) Dump() error {
7174

7275
nRows, err := d.GetChunkData()
7376
if err != nil {
77+
d.Err = err
7478
d.Logger.Error("error at dump", "err", err)
7579
break
7680
}

driver/common/store.go

+11
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,17 @@ func (sm *StoreManager) PutSourceType(jobName, sourceType string) error {
142142
return nil
143143
}
144144

145+
func (sm *StoreManager) GetNatsIfExist(jobName string) (string, bool, error) {
146+
natsKey := fmt.Sprintf("dtle/%v/NatsAddr", jobName)
147+
kv, err := sm.consulStore.Get(natsKey)
148+
if err == store.ErrKeyNotFound {
149+
return "", false, nil
150+
} else if err != nil {
151+
return "", false, err
152+
}
153+
return string(kv.Value), true, nil
154+
}
155+
145156
func (sm *StoreManager) DstPutNats(jobName string, natsAddr string, stopCh chan struct{}, onWatchError func(error)) error {
146157
sm.logger.Debug("DstPutNats")
147158

driver/common/taskconfig.go

+1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ type DtleTaskConfig struct {
6464
DependencyHistorySize int `codec:"DependencyHistorySize"`
6565
UseMySQLDependency bool `codec:"UseMySQLDependency"`
6666
ForeignKeyChecks bool `codec:"ForeignKeyChecks"`
67+
DumpEntryLimit int `codec:"DumpEntryLimit"`
6768

6869
SkipCreateDbTable bool `codec:"SkipCreateDbTable"`
6970
SkipPrivilegeCheck bool `codec:"SkipPrivilegeCheck"`

driver/common/type.schema

-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ struct DumpEntry {
77
TbSQL []string
88
ValuesX [][]*[]byte
99
TotalCount int64
10-
Err string
1110
Table []byte
1211
ColumnMapTo []string
1312
}

driver/common/type.schema.gen.go

-55
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ type DumpEntry struct {
2121
TbSQL []string
2222
ValuesX [][]*[]byte
2323
TotalCount int64
24-
Err string
2524
Table []byte
2625
ColumnMapTo []string
2726
}
@@ -228,21 +227,6 @@ func (d *DumpEntry) Size() (s uint64) {
228227
}
229228

230229
}
231-
{
232-
l := uint64(len(d.Err))
233-
234-
{
235-
236-
t := l
237-
for t >= 0x80 {
238-
t >>= 7
239-
s++
240-
}
241-
s++
242-
243-
}
244-
s += l
245-
}
246230
{
247231
l := uint64(len(d.Table))
248232

@@ -560,25 +544,6 @@ func (d *DumpEntry) Marshal(buf []byte) ([]byte, error) {
560544
buf[i+7+0] = byte(d.TotalCount >> 56)
561545

562546
}
563-
{
564-
l := uint64(len(d.Err))
565-
566-
{
567-
568-
t := uint64(l)
569-
570-
for t >= 0x80 {
571-
buf[i+8] = byte(t) | 0x80
572-
t >>= 7
573-
i++
574-
}
575-
buf[i+8] = byte(t)
576-
i++
577-
578-
}
579-
copy(buf[i+8:], d.Err)
580-
i += l
581-
}
582547
{
583548
l := uint64(len(d.Table))
584549

@@ -924,26 +889,6 @@ func (d *DumpEntry) Unmarshal(buf []byte) (uint64, error) {
924889
{
925890
l := uint64(0)
926891

927-
{
928-
929-
bs := uint8(7)
930-
t := uint64(buf[i+8] & 0x7F)
931-
for buf[i+8]&0x80 == 0x80 {
932-
i++
933-
t |= uint64(buf[i+8]&0x7F) << bs
934-
bs += 7
935-
}
936-
i++
937-
938-
l = t
939-
940-
}
941-
d.Err = string(buf[i+8 : i+8+l])
942-
i += l
943-
}
944-
{
945-
l := uint64(0)
946-
947892
{
948893

949894
bs := uint8(7)

driver/driver.go

+10-30
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,8 @@ var (
168168
hclspec.NewLiteral(`true`)),
169169
"ForeignKeyChecks": hclspec.NewDefault(hclspec.NewAttr("ForeignKeyChecks", "bool", false),
170170
hclspec.NewLiteral(`true`)),
171+
"DumpEntryLimit": hclspec.NewDefault(hclspec.NewAttr("DumpEntryLimit", "number", false),
172+
hclspec.NewLiteral(`67108864`)),
171173
"OracleConfig": hclspec.NewBlock("OracleConfig", false, hclspec.NewObject(map[string]*hclspec.Spec{
172174
"ServiceName": hclspec.NewAttr("ServiceName", "string", true),
173175
"Host": hclspec.NewAttr("Host", "string", true),
@@ -390,30 +392,16 @@ func (d *Driver) SetConfig(c *base.Config) (err error) {
390392
}
391393

392394
func (d *Driver) loopCleanRelayDir() {
393-
stopCh := make(chan struct{})
394-
defer close(stopCh)
395-
396395
cleanDataDir := func() {
397396
files, err := ioutil.ReadDir(path.Join(d.config.DataDir, "binlog"))
398397
if err != nil {
399-
d.logger.Info("read dir failed", "dataDir", d.config.DataDir, "err", err)
400-
return
401-
}
402-
403-
jobs, err := d.storeManager.FindJobList()
404-
if err != nil {
405-
d.logger.Error("list jobs failed", "err", err)
398+
d.logger.Error("read dir failed", "dataDir", d.config.DataDir, "err", err)
406399
return
407400
}
408401

409402
for _, file := range files {
410-
existUnuseDir := true
411-
for i := range jobs {
412-
if jobs[i].JobId == file.Name() {
413-
existUnuseDir = false
414-
}
415-
}
416-
if !existUnuseDir {
403+
_, exist, err := d.storeManager.GetNatsIfExist(file.Name())
404+
if exist || err != nil {
417405
continue
418406
}
419407
if err := os.RemoveAll(path.Join(d.config.DataDir, "binlog", file.Name())); err != nil {
@@ -422,21 +410,13 @@ func (d *Driver) loopCleanRelayDir() {
422410
}
423411
}
424412

425-
jobKeysCh, err := d.storeManager.WatchTree("/dtleJobList/", stopCh)
426-
if err != nil {
427-
d.logger.Error("watch job tree error", "err", err)
428-
}
429413
cleanDuration := 12 * time.Hour
430414
cleanDelay := time.NewTimer(cleanDuration)
431415
defer cleanDelay.Stop()
432416
for {
433-
select {
434-
case <-jobKeysCh:
435-
cleanDataDir()
436-
case <-cleanDelay.C:
437-
cleanDataDir()
438-
}
439417
cleanDelay.Reset(cleanDuration)
418+
<-cleanDelay.C
419+
cleanDataDir()
440420
}
441421
}
442422

@@ -560,7 +540,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
560540
return nil, nil, errors.Wrap(err, "SetDriverState")
561541
}
562542

563-
h := newDtleTaskHandle(d.logger, cfg, drivers.TaskStateRunning, time.Now().Round(time.Millisecond))
543+
h := newDtleTaskHandle(d.ctx, d.logger, cfg, drivers.TaskStateRunning, time.Now().Round(time.Millisecond))
564544
h.driverConfig = &common.MySQLDriverConfig{DtleTaskConfig: dtleTaskConfig}
565545
d.tasks.Set(cfg.ID, h)
566546
AllocIdTaskNameToTaskHandler.Set(cfg.AllocID, cfg.Name, cfg.ID, h)
@@ -655,8 +635,8 @@ func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *dr
655635
return
656636
case <-d.ctx.Done():
657637
return
658-
case <-handle.doneCh:
659-
ch <- handle.exitResult.Copy()
638+
case <-handle.ctx.Done():
639+
ch <- handle.GetExitResult()
660640
}
661641
}
662642

0 commit comments

Comments
 (0)