-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocessor_record_reader.go
112 lines (97 loc) · 3.34 KB
/
processor_record_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// Copyright 2024 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package backup
import (
"context"
"fmt"
"log/slog"
"github.com/aerospike/backup-go/internal/asinfo"
"github.com/aerospike/backup-go/internal/metrics"
"github.com/aerospike/backup-go/io/aerospike/xdr"
"github.com/aerospike/backup-go/models"
"github.com/aerospike/backup-go/pipeline"
"golang.org/x/sync/semaphore"
)
// recordReaderProcessor configures and creates record readers pipelines.
type recordReaderProcessor[T models.TokenConstraint] struct {
xdrConfig *ConfigBackupXDR
// add scanConfig in the future.
aerospikeClient AerospikeClient
infoClient *asinfo.InfoClient
state *State
scanLimiter *semaphore.Weighted
rpsCollector *metrics.Collector
logger *slog.Logger
}
// newRecordReaderProcessor returns a new record reader processor.
func newRecordReaderProcessor[T models.TokenConstraint](
xdrConfig *ConfigBackupXDR,
aerospikeClient AerospikeClient,
infoClient *asinfo.InfoClient,
state *State,
scanLimiter *semaphore.Weighted,
rpsCollector *metrics.Collector,
logger *slog.Logger,
) *recordReaderProcessor[T] {
logger.Debug("created new records reader processor")
return &recordReaderProcessor[T]{
xdrConfig: xdrConfig,
aerospikeClient: aerospikeClient,
infoClient: infoClient,
scanLimiter: scanLimiter,
state: state,
rpsCollector: rpsCollector,
logger: logger,
}
}
// recordReaderConfigForXDR creates reader config for XDR.
func (rr *recordReaderProcessor[T]) recordReaderConfigForXDR() *xdr.RecordReaderConfig {
localHostPort := fmt.Sprintf("%s:%d", rr.xdrConfig.LocalAddress, rr.xdrConfig.LocalPort)
localTCPAddr := fmt.Sprintf(":%d", rr.xdrConfig.LocalPort)
tcpConfig := xdr.NewTCPConfig(
localTCPAddr,
rr.xdrConfig.TLSConfig,
rr.xdrConfig.ReadTimeout,
rr.xdrConfig.WriteTimeout,
rr.xdrConfig.ResultQueueSize,
rr.xdrConfig.AckQueueSize,
rr.xdrConfig.MaxConnections,
rr.rpsCollector,
)
return xdr.NewRecordReaderConfig(
rr.xdrConfig.DC,
rr.xdrConfig.Namespace,
rr.xdrConfig.Rewind,
localHostPort,
tcpConfig,
rr.xdrConfig.InfoPolingPeriod,
rr.xdrConfig.StartTimeout,
rr.xdrConfig.MaxThroughput,
rr.xdrConfig.Forward,
)
}
// newReadWorkersXDR returns an XDR reader worker. The XDR reader worker will always
// use *models.ASBXToken.
func (rr *recordReaderProcessor[T]) newReadWorkersXDR(ctx context.Context,
) ([]pipeline.Worker[*models.ASBXToken], error) {
// For xdr we will have 1 worker always.
readWorkers := make([]pipeline.Worker[*models.ASBXToken], 1)
readerConfig := rr.recordReaderConfigForXDR()
reader, err := xdr.NewRecordReader(ctx, rr.infoClient, readerConfig, rr.logger)
if err != nil {
return nil, fmt.Errorf("failed to create xdr reader: %w", err)
}
readWorkers[0] = pipeline.NewReadWorker[*models.ASBXToken](reader)
return readWorkers, nil
}