Skip to content

Commit 6153f5f

Browse files
committed
Locker implementations are now public
1 parent 0a90e57 commit 6153f5f

File tree

7 files changed

+84
-53
lines changed

7 files changed

+84
-53
lines changed

broker/consulbroker/broker.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,5 +53,5 @@ func New(config *Config) (*Broker, error) {
5353
// Task makes a best effort to ensure that exactly one instance of a task is executing in a cluster.
5454
// Task may be re-started when needed until it's been closed.
5555
func (b *Broker) NewTask(key string, fun task.Func) (*task.Task, error) {
56-
return task.New(newConsulLocker(b.client, key, LockWaitTime), fun), nil
56+
return task.New(NewLocker(b.client, key, LockWaitTime), fun), nil
5757
}

broker/consulbroker/locker.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,25 @@ import (
88
"github.com/monetha/go-distributed/locker"
99
)
1010

11-
// consulLocker wraps Consul distributed lock by implementing Locker interface.
12-
type consulLocker struct {
11+
// Locker wraps Consul distributed lock by implementing Locker interface.
12+
type Locker struct {
1313
client *api.Client
1414
key string
1515
lockWaitTime time.Duration
1616
lock *api.Lock
1717
}
1818

19-
// NewConsulLocker creates new consulLocker instance.
20-
func newConsulLocker(client *api.Client, key string, lockWaitTime time.Duration) *consulLocker {
21-
return &consulLocker{
19+
// NewLocker creates new Locker instance.
20+
func NewLocker(client *api.Client, key string, lockWaitTime time.Duration) *Locker {
21+
return &Locker{
2222
client: client,
2323
key: key,
2424
lockWaitTime: lockWaitTime,
2525
}
2626
}
2727

2828
// Key returns the name of locker.
29-
func (l *consulLocker) Key() string {
29+
func (l *Locker) Key() string {
3030
return l.key
3131
}
3232

@@ -37,7 +37,7 @@ func (l *consulLocker) Key() string {
3737
// communication errors, operator intervention, etc. It is NOT safe to
3838
// assume that the locker is held until Unlock(), application must be able
3939
// to handle the locker being lost.
40-
func (l *consulLocker) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
40+
func (l *Locker) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
4141
if l.lock != nil {
4242
return nil, locker.ErrLockHeld
4343
}
@@ -66,7 +66,7 @@ func (l *consulLocker) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
6666

6767
// Unlock released the locker. It is an error to call this
6868
// if the locker is not currently held.
69-
func (l *consulLocker) Unlock() error {
69+
func (l *Locker) Unlock() error {
7070
if l.lock == nil {
7171
return locker.ErrLockNotHeld
7272
}

broker/consulbroker/locker_test.go

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package consulbroker
2+
3+
import "github.com/monetha/go-distributed/locker"
4+
5+
var (
6+
_ locker.Locker = &Locker{} // ensure Locker implements Locker interface
7+
)

broker/mutexbroker/broker.go

+1-43
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func New() *Broker {
2323

2424
// NewTask creates new long-running task in current process.
2525
func (b *Broker) NewTask(key string, fun task.Func) (*task.Task, error) {
26-
return task.New(newMutexLocker(b, key), fun), nil
26+
return task.New(NewLocker(b, key), fun), nil
2727
}
2828

2929
func (b *Broker) lock(name string, stopCh <-chan struct{}) error {
@@ -53,45 +53,3 @@ func (b *Broker) unlock(name string) error {
5353
<-mc
5454
return nil
5555
}
56-
57-
func newMutexLocker(broker *Broker, key string) *mutexLocker {
58-
return &mutexLocker{
59-
broker: broker,
60-
key: key,
61-
}
62-
}
63-
64-
type mutexLocker struct {
65-
broker *Broker
66-
key string
67-
leaderCh chan struct{}
68-
}
69-
70-
func (l *mutexLocker) Key() string {
71-
return l.key
72-
}
73-
74-
func (l *mutexLocker) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
75-
if l.leaderCh != nil {
76-
return nil, locker.ErrLockHeld
77-
}
78-
79-
err := l.broker.lock(l.key, stopCh)
80-
if err != nil {
81-
return nil, err
82-
}
83-
84-
l.leaderCh = make(chan struct{})
85-
return l.leaderCh, nil
86-
}
87-
88-
func (l *mutexLocker) Unlock() error {
89-
if l.leaderCh == nil {
90-
return locker.ErrLockNotHeld
91-
}
92-
93-
close(l.leaderCh)
94-
l.leaderCh = nil
95-
96-
return l.broker.unlock(l.key)
97-
}

broker/mutexbroker/example_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func ExampleNew() {
2323
}
2424
defer t.Close()
2525

26-
t2, err := b.NewTask("task2/key", task2)
26+
t2, err := b.NewTask("task/key", task2)
2727
if err != nil {
2828
log.Printf("New task: %v", err)
2929
return

broker/mutexbroker/locker.go

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package mutexbroker
2+
3+
import (
4+
"github.com/monetha/go-distributed/locker"
5+
)
6+
7+
// Locker implements "distributed locker" using channel.
8+
type Locker struct {
9+
broker *Broker
10+
key string
11+
leaderCh chan struct{}
12+
}
13+
14+
// NewLocker creates new Locker instance.
15+
func NewLocker(broker *Broker, key string) *Locker {
16+
return &Locker{
17+
broker: broker,
18+
key: key,
19+
}
20+
}
21+
22+
// Key returns the name of locker.
23+
func (l *Locker) Key() string {
24+
return l.key
25+
}
26+
27+
// Lock attempts to acquire the locker and blocks while doing so.
28+
// Providing a non-nil stopCh can be used to abort the locker attempt.
29+
// Returns a channel that is closed if our locker is lost or an error.
30+
// This channel could be closed at any time due to session invalidation,
31+
// communication errors, operator intervention, etc. It is NOT safe to
32+
// assume that the locker is held until Unlock(), application must be able
33+
// to handle the locker being lost.
34+
func (l *Locker) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
35+
if l.leaderCh != nil {
36+
return nil, locker.ErrLockHeld
37+
}
38+
39+
err := l.broker.lock(l.key, stopCh)
40+
if err != nil {
41+
return nil, err
42+
}
43+
44+
l.leaderCh = make(chan struct{})
45+
return l.leaderCh, nil
46+
}
47+
48+
// Unlock released the locker. It is an error to call this
49+
// if the locker is not currently held.
50+
func (l *Locker) Unlock() error {
51+
if l.leaderCh == nil {
52+
return locker.ErrLockNotHeld
53+
}
54+
55+
close(l.leaderCh)
56+
l.leaderCh = nil
57+
58+
return l.broker.unlock(l.key)
59+
}

broker/mutexbroker/locker_test.go

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package mutexbroker
2+
3+
import "github.com/monetha/go-distributed/locker"
4+
5+
var (
6+
_ locker.Locker = &Locker{} // ensure Locker implements Locker interface
7+
)

0 commit comments

Comments
 (0)