From 68f4a85659fa1f1f0de91c2878dbf883c860b954 Mon Sep 17 00:00:00 2001 From: Brian Hoffman Date: Wed, 23 Oct 2019 06:34:50 -0400 Subject: [PATCH 01/22] first cut --- core/net.go | 162 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 106 insertions(+), 56 deletions(-) diff --git a/core/net.go b/core/net.go index 6926d8923f..c8c963ec86 100644 --- a/core/net.go +++ b/core/net.go @@ -1,17 +1,20 @@ package core import ( + "crypto/rand" "errors" "fmt" + "gx/ipfs/QmUadX5EcvrBmxAV9sE7wUWtWSqxns5K84qKJBixmcT1w9/go-datastore" + "gx/ipfs/QmYxUdYY9S6yg5tSPVin5GFTvtfsLauVcr7reHDD3dM8xf/go-libp2p-routing" "sync" "time" + "golang.org/x/crypto/nacl/box" + libp2p "gx/ipfs/QmTW4SdgBWq9GjsBsHeUx8WuGxzhgzAf88UMH2w62PC8yK/go-libp2p-crypto" "gx/ipfs/QmTbxNB1NwDesLmKTscr4udL2tVP7MaxvXnD1D9yX7g3PN/go-cid" - peer "gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer" - "gx/ipfs/QmerPMzPk1mJVowm8KgmoknWa4yCYvvugMPsgWmDNUvDLW/go-multihash" + "gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer" - "github.com/OpenBazaar/openbazaar-go/ipfs" "github.com/OpenBazaar/openbazaar-go/pb" "github.com/OpenBazaar/openbazaar-go/repo" "github.com/golang/protobuf/proto" @@ -70,64 +73,111 @@ func (n *OpenBazaarNode) SendOfflineMessage(p peer.ID, k *libp2p.PubKey, m *pb.M if merr != nil { return merr } - // TODO: this function blocks if the recipient's public key is not on the local machine - ciphertext, cerr := n.EncryptMessage(p, k, messageBytes) - if cerr != nil { - return cerr - } - addr, aerr := n.MessageStorage.Store(p, ciphertext) - if aerr != nil { - return aerr - } - mh, mherr := multihash.FromB58String(p.Pretty()) - if mherr != nil { - return mherr - } - /* TODO: We are just using a default prefix length for now. Eventually we will want to customize this, - but we will need some way to get the recipient's desired prefix length. Likely will be in profile. */ - pointer, err := ipfs.NewPointer(mh, DefaultPointerPrefixLength, addr, ciphertext) - if err != nil { - return err - } - if m.MessageType != pb.Message_OFFLINE_ACK { - pointer.Purpose = ipfs.MESSAGE - pointer.CancelID = &p - err = n.Datastore.Pointers().Put(pointer) - if err != nil { - return err - } - } - log.Debugf("Sending offline message to: %s, Message Type: %s, PointerID: %s, Location: %s", p.Pretty(), m.MessageType.String(), pointer.Cid.String(), pointer.Value.Addrs[0].String()) - OfflineMessageWaitGroup.Add(2) - go func() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - err := ipfs.PublishPointer(n.DHT, ctx, pointer) - if err != nil { - log.Error(err) - } - // Push provider to our push nodes for redundancy - for _, p := range n.PushNodes { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - err := ipfs.PutPointerToPeer(n.DHT, ctx, p, pointer) + // Encrypt envelope for relay server + + // Generate an ephemeral key pair + _, ephemPriv, _ := box.GenerateKey(rand.Reader) + + ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout) + defer cancel() + if k == nil { + var pubKey libp2p.PubKey + keyval, err := n.IpfsNode.Repo.Datastore().Get(datastore.NewKey(KeyCachePrefix + p.Pretty())) + if err != nil { + pubKey, err = routing.GetPublicKey(n.IpfsNode.Routing, ctx, p) if err != nil { - log.Error(err) + log.Errorf("Failed to find public key for %s", p.Pretty()) + return err + } + } else { + pubKey, err = libp2p.UnmarshalPublicKey(keyval) + if err != nil { + log.Errorf("Failed to find public key for %s", p.Pretty()) + return err } } + k = &pubKey + } - OfflineMessageWaitGroup.Done() - }() - go func() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - err := n.Pubsub.Publisher.Publish(ctx, pointer.Cid.String(), ciphertext) - if err != nil { - log.Error(err) - } - OfflineMessageWaitGroup.Done() - }() + // Generate nonce + var nonce [24]byte + noncedata := make([]byte, 24) + rand.Read(noncedata) + for i := 0; i < 24; i++ { + nonce[i] = noncedata[i] + } + + // Encrypt + var ciphertext []byte + keybytes, _ := libp2p.MarshalPublicKey(*k) + var pkey *[32]byte + copy(pkey[:], keybytes[:32]) + + box.Seal(ciphertext, messageBytes, &nonce, pkey, ephemPriv) + + // TODO: this function blocks if the recipient's public key is not on the local machine + //ciphertext, cerr := n.EncryptMessage(p, k, messageBytes) + //if cerr != nil { + // return cerr + //} + + // Send to webrelay + + // + //addr, aerr := n.MessageStorage.Store(p, ciphertext) + //if aerr != nil { + // return aerr + //} + //mh, mherr := multihash.FromB58String(p.Pretty()) + //if mherr != nil { + // return mherr + //} + ///* TODO: We are just using a default prefix length for now. Eventually we will want to customize this, + // but we will need some way to get the recipient's desired prefix length. Likely will be in profile. */ + //pointer, err := ipfs.NewPointer(mh, DefaultPointerPrefixLength, addr, ciphertext) + //if err != nil { + // return err + //} + //if m.MessageType != pb.Message_OFFLINE_ACK { + // pointer.Purpose = ipfs.MESSAGE + // pointer.CancelID = &p + // err = n.Datastore.Pointers().Put(pointer) + // if err != nil { + // return err + // } + //} + //log.Debugf("Sending offline message to: %s, Message Type: %s, PointerID: %s, Location: %s", p.Pretty(), m.MessageType.String(), pointer.Cid.String(), pointer.Value.Addrs[0].String()) + //OfflineMessageWaitGroup.Add(2) + //go func() { + // ctx, cancel := context.WithCancel(context.Background()) + // defer cancel() + // err := ipfs.PublishPointer(n.DHT, ctx, pointer) + // if err != nil { + // log.Error(err) + // } + // + // // Push provider to our push nodes for redundancy + // for _, p := range n.PushNodes { + // ctx, cancel := context.WithCancel(context.Background()) + // defer cancel() + // err := ipfs.PutPointerToPeer(n.DHT, ctx, p, pointer) + // if err != nil { + // log.Error(err) + // } + // } + // + // OfflineMessageWaitGroup.Done() + //}() + //go func() { + // ctx, cancel := context.WithCancel(context.Background()) + // defer cancel() + // err := n.Pubsub.Publisher.Publish(ctx, pointer.Cid.String(), ciphertext) + // if err != nil { + // log.Error(err) + // } + // OfflineMessageWaitGroup.Done() + //}() return nil } From 408da267fd53453708d4406554f49cf8b8c587f9 Mon Sep 17 00:00:00 2001 From: Brian Hoffman Date: Thu, 24 Oct 2019 12:05:55 -0700 Subject: [PATCH 02/22] Add web relay constants --- schema/constants.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/schema/constants.go b/schema/constants.go index ff48b727f6..72f26c05ed 100644 --- a/schema/constants.go +++ b/schema/constants.go @@ -51,6 +51,8 @@ const ( DataPushNodeTwo = "QmPPg2qeF3n2KvTRXRZLaTwHCw8JxzF4uZK93RfMoDvf2o" DataPushNodeThree = "QmY8puEnVx66uEet64gAf4VZRo7oUyMCwG6KdB9KM92EGQ" + WebRelayOne = "wss://webchat.ob1.io:8080" + BootstrapNodeTestnet_BrooklynFlea = "/ip4/165.227.117.91/tcp/4001/ipfs/Qmaa6De5QYNqShzPb9SGSo8vLmoUte8mnWgzn4GYwzuUYA" BootstrapNodeTestnet_Shipshewana = "/ip4/46.101.221.165/tcp/4001/ipfs/QmVAQYg7ygAWTWegs8HSV2kdW1MqW8WMrmpqKG1PQtkgTC" BootstrapNodeDefault_LeMarcheSerpette = "/ip4/107.170.133.32/tcp/4001/ipfs/QmUZRGLhcKXF1JyuaHgKm23LvqcoMYwtb9jmh8CkP4og3K" @@ -79,6 +81,8 @@ var ( BootstrapNodeTestnet_BrooklynFlea, BootstrapNodeTestnet_Shipshewana, } + + WebRelayServers = []string{WebRelayOne} ) func EthereumDefaultOptions() map[string]interface{} { From 4443c266e9f23db9efd35bec211f6cb5a048f84a Mon Sep 17 00:00:00 2001 From: Brian Hoffman Date: Thu, 24 Oct 2019 12:06:09 -0700 Subject: [PATCH 03/22] Retrieve web relays from configuration --- schema/configuration.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/schema/configuration.go b/schema/configuration.go index 89ef896ebd..7632e23058 100644 --- a/schema/configuration.go +++ b/schema/configuration.go @@ -415,6 +415,39 @@ func GetRepublishInterval(cfgBytes []byte) (time.Duration, error) { return d, nil } +func GetWebRelays(cfgBytes []byte) ([]string, error) { + var cfgIface interface{} + err := json.Unmarshal(cfgBytes, &cfgIface) + if err != nil { + return nil, MalformedConfigError + } + + var webRelays []string + + cfg, ok := cfgIface.(map[string]interface{}) + if !ok { + return webRelays, MalformedConfigError + } + + wrcfg, ok := cfg["WebRelays"] + if !ok { + return webRelays, MalformedConfigError + } + wr, ok := wrcfg.([]interface{}) + if !ok { + return webRelays, MalformedConfigError + } + + for _, nd := range wr { + ndStr, ok := nd.(string) + if !ok { + return webRelays, MalformedConfigError + } + webRelays = append(webRelays, ndStr) + } + return webRelays, nil +} + func GetDataSharing(cfgBytes []byte) (*DataSharing, error) { var cfgIface interface{} err := json.Unmarshal(cfgBytes, &cfgIface) From 28af6e957dc9c76e2dd781902e3c764b42779230 Mon Sep 17 00:00:00 2001 From: Brian Hoffman Date: Thu, 24 Oct 2019 12:06:22 -0700 Subject: [PATCH 04/22] Make sure initial config file has webrelays key --- repo/init.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/repo/init.go b/repo/init.go index e7d2ce868b..0b742e9901 100644 --- a/repo/init.go +++ b/repo/init.go @@ -179,6 +179,8 @@ func addConfigExtensions(repoRoot string) error { APIRouter: schema.IPFSCachingRouterDefaultURI, } + wr = schema.WebRelayServers + t = schema.TorConfig{} ) if err := r.SetConfigKey("Wallets", schema.DefaultWalletsConfig()); err != nil { @@ -187,6 +189,9 @@ func addConfigExtensions(repoRoot string) error { if err := r.SetConfigKey("DataSharing", ds); err != nil { return err } + if err := r.SetConfigKey("WebRelays", wr); err != nil { + return err + } if err := r.SetConfigKey("Bootstrap-testnet", schema.BootstrapAddressesTestnet); err != nil { return err } From 7ae20cf1c1cc13e5ff5fdb828e3d5be9d7b62784 Mon Sep 17 00:00:00 2001 From: Brian Hoffman Date: Thu, 24 Oct 2019 13:30:42 -0700 Subject: [PATCH 05/22] Configure web relays from config file --- cmd/start.go | 16 ++++++++++++---- mobile/node.go | 8 ++++++++ 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/cmd/start.go b/cmd/start.go index 320a3d267e..a6960c2ee5 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -237,6 +237,11 @@ func (x *Start) Execute(args []string) error { log.Error("scan data sharing config:", err) return err } + webRelays, err := schema.GetWebRelays(configFile) + if err != nil { + log.Error("scan web relays config:", err) + return err + } dropboxToken, err := schema.GetDropboxApiToken(configFile) if err != nil { log.Error("scan dropbox api token:", err) @@ -578,10 +583,12 @@ func (x *Start) Execute(args []string) error { subscriber := ipfs.NewPubsubSubscriber(context.Background(), nd.PeerHost, nd.Routing, nd.Repo.Datastore(), nd.PubSub) ps := ipfs.Pubsub{Publisher: publisher, Subscriber: subscriber} - var rootHash string - if cachedIPNSRecord != nil { - rootHash = string(cachedIPNSRecord.Value) - } + var rootHash string + if cachedIPNSRecord != nil { + rootHash = string(cachedIPNSRecord.Value) + } + + wm := obnet.NewWebRelayManager(webRelays, identity.PeerID) // OpenBazaar node setup core.Node = &core.OpenBazaarNode{ @@ -595,6 +602,7 @@ func (x *Start) Execute(args []string) error { OfflineMessageFailoverTimeout: 30 * time.Second, Pubsub: ps, PushNodes: pushNodes, + WebRelayManager: wm, RegressionTestEnable: x.Regtest, RepoPath: repoPath, RootHash: rootHash, diff --git a/mobile/node.go b/mobile/node.go index 87b98cbd17..9482c3ecab 100644 --- a/mobile/node.go +++ b/mobile/node.go @@ -151,6 +151,11 @@ func NewNodeWithConfig(config *NodeConfig, password string, mnemonic string) (*N return nil, err } + webRelays, err := apiSchema.GetWebRelays(configFile) + if err != nil { + return nil, err + } + walletsConfig, err := apiSchema.GetWalletsConfig(configFile) if err != nil { return nil, err @@ -275,6 +280,8 @@ func NewNodeWithConfig(config *NodeConfig, password string, mnemonic string) (*N pushNodes = append(pushNodes, p) } + wm := obnet.NewWebRelayManager(webRelays, identity.PeerID) + // OpenBazaar node setup node := &core.OpenBazaarNode{ BanManager: bm, @@ -283,6 +290,7 @@ func NewNodeWithConfig(config *NodeConfig, password string, mnemonic string) (*N Multiwallet: mw, OfflineMessageFailoverTimeout: 5 * time.Second, PushNodes: pushNodes, + WebRelayManager: wm, RepoPath: config.RepoPath, UserAgent: core.USERAGENT, IPNSQuorumSize: uint(ipnsExtraConfig.DHTQuorumSize), From 3dba113030959d3c42ed735a085b6f4c6ea3b73e Mon Sep 17 00:00:00 2001 From: Brian Hoffman Date: Thu, 24 Oct 2019 13:31:18 -0700 Subject: [PATCH 06/22] Add web relay manager to core --- core/core.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/core.go b/core/core.go index 9133e75fd6..800805d3b4 100644 --- a/core/core.go +++ b/core/core.go @@ -159,6 +159,9 @@ type OpenBazaarNode struct { // Manage blocked peers BanManager *net.BanManager + // Web Relay nodes + WebRelayManager *net.WebRelayManager + // Allow other nodes to push data to this node for storage AcceptStoreRequests bool From d66d2e81c511656724f9363413a8a7cd6f489792 Mon Sep 17 00:00:00 2001 From: Brian Hoffman Date: Thu, 24 Oct 2019 13:31:28 -0700 Subject: [PATCH 07/22] New web relay manager object --- net/web_relay_manager.go | 116 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 net/web_relay_manager.go diff --git a/net/web_relay_manager.go b/net/web_relay_manager.go new file mode 100644 index 0000000000..c45c311921 --- /dev/null +++ b/net/web_relay_manager.go @@ -0,0 +1,116 @@ +package net + +import ( + "crypto/sha256" + "encoding/binary" + "encoding/json" + "fmt" + "github.com/gorilla/websocket" + "github.com/mr-tron/base58/base58" + "github.com/multiformats/go-multihash" +) + +type WebRelayManager struct { + webrelays []string + peerID string +} + +type EncryptedMessage struct { + Message string `json:"encryptedMessage"` + Recipient string `json:"recipient"` +} + +type TypedMessage struct{ + Type string + Data json.RawMessage +} + +type SubscribeMessage struct { + UserID string `json:"userID"` + SubscriptionKey string `json:"subscriptionKey"` +} + +func NewWebRelayManager(webrelays []string, sender string) *WebRelayManager { + return &WebRelayManager{webrelays, sender} +} + +func (wrm *WebRelayManager) SendRelayMessage(ciphertext string, recipient string) { + encryptedmessage := EncryptedMessage{ + Message: ciphertext, + Recipient: recipient, + } + + data, _ := json.Marshal(encryptedmessage) + + typedmessage := TypedMessage{ + Type: "EncryptedMessage", + Data: data, + } + + outgoing, _ := json.Marshal(typedmessage) + fmt.Println(string(outgoing)) + + // Transmit the encrypted message to the webrelay + wrm.authToWebRelay(wrm.webrelays[0], outgoing) + +} + +func (wrm *WebRelayManager) authToWebRelay(server string, msg []byte) { + + // Generate subscription key for web relay + peerIDMultihash, _ := multihash.FromB58String(wrm.peerID) + decoded, _ := multihash.Decode(peerIDMultihash) + digest := decoded.Digest + prefix := digest[:8] + + prefix64 := binary.BigEndian.Uint64(prefix) + + // Then shifting + shiftedPrefix64 := prefix64>>uint(48) + + // Then converting back to a byte array + shiftedBytes := make([]byte, 8) + binary.BigEndian.PutUint64(shiftedBytes, shiftedPrefix64) + + hashedShiftedPrefix := sha256.Sum256(shiftedBytes) + + subscriptionKey, _ := multihash.Encode(hashedShiftedPrefix[:], multihash.SHA2_256) + + // Generate subscribe message + subscribeMessage := SubscribeMessage{ + UserID: wrm.peerID, + SubscriptionKey: base58.Encode([]byte(subscriptionKey)), + } + + data, _ := json.Marshal(subscribeMessage) + typedmessage := TypedMessage{ + Type: "SubscribeMessage", + Data: data, + } + fmt.Println(typedmessage) + + socketmessage, _ := json.Marshal(typedmessage) + + // Connect to websocket server + fmt.Printf("connecting to %s", server) + + c, _, err := websocket.DefaultDialer.Dial(server, nil) + if err != nil { + log.Fatal("dial:", err) + } + defer c.Close() + + err = c.WriteMessage(websocket.TextMessage, socketmessage) + if err != nil { + fmt.Println("write:", err) + return + } + + err = c.WriteMessage(websocket.TextMessage, msg) + if err != nil { + fmt.Println("write:", err) + return + } + +} + From 6bc2acd06681cf16dd2995188b559d5b0bb4eba5 Mon Sep 17 00:00:00 2001 From: Brian Hoffman Date: Thu, 24 Oct 2019 13:35:11 -0700 Subject: [PATCH 08/22] Formatting update --- net/web_relay_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/net/web_relay_manager.go b/net/web_relay_manager.go index c45c311921..201b8ed3f0 100644 --- a/net/web_relay_manager.go +++ b/net/web_relay_manager.go @@ -92,7 +92,7 @@ func (wrm *WebRelayManager) authToWebRelay(server string, msg []byte) { socketmessage, _ := json.Marshal(typedmessage) // Connect to websocket server - fmt.Printf("connecting to %s", server) + fmt.Printf("connecting to %s\n", server) c, _, err := websocket.DefaultDialer.Dial(server, nil) if err != nil { From e0cc81ba3692f5bb55973f1c962a69c9f69b04c2 Mon Sep 17 00:00:00 2001 From: Brian Hoffman Date: Thu, 24 Oct 2019 13:35:24 -0700 Subject: [PATCH 09/22] Update sending offline messages to include relay gofmt gofmt Update deps Updates return error --- cmd/start.go | 10 +-- core/net.go | 151 ++++++++++++++++++--------------------- mobile/node.go | 5 +- net/web_relay_manager.go | 20 +++--- 4 files changed, 89 insertions(+), 97 deletions(-) diff --git a/cmd/start.go b/cmd/start.go index a6960c2ee5..14f0841a67 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -583,10 +583,10 @@ func (x *Start) Execute(args []string) error { subscriber := ipfs.NewPubsubSubscriber(context.Background(), nd.PeerHost, nd.Routing, nd.Repo.Datastore(), nd.PubSub) ps := ipfs.Pubsub{Publisher: publisher, Subscriber: subscriber} - var rootHash string - if cachedIPNSRecord != nil { - rootHash = string(cachedIPNSRecord.Value) - } + var rootHash string + if cachedIPNSRecord != nil { + rootHash = string(cachedIPNSRecord.Value) + } wm := obnet.NewWebRelayManager(webRelays, identity.PeerID) @@ -602,7 +602,7 @@ func (x *Start) Execute(args []string) error { OfflineMessageFailoverTimeout: 30 * time.Second, Pubsub: ps, PushNodes: pushNodes, - WebRelayManager: wm, + WebRelayManager: wm, RegressionTestEnable: x.Regtest, RepoPath: repoPath, RootHash: rootHash, diff --git a/core/net.go b/core/net.go index c8c963ec86..c2b2c384c2 100644 --- a/core/net.go +++ b/core/net.go @@ -1,7 +1,7 @@ package core import ( - "crypto/rand" + "encoding/base64" "errors" "fmt" "gx/ipfs/QmUadX5EcvrBmxAV9sE7wUWtWSqxns5K84qKJBixmcT1w9/go-datastore" @@ -9,11 +9,14 @@ import ( "sync" "time" - "golang.org/x/crypto/nacl/box" + "github.com/OpenBazaar/openbazaar-go/net" libp2p "gx/ipfs/QmTW4SdgBWq9GjsBsHeUx8WuGxzhgzAf88UMH2w62PC8yK/go-libp2p-crypto" "gx/ipfs/QmTbxNB1NwDesLmKTscr4udL2tVP7MaxvXnD1D9yX7g3PN/go-cid" "gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer" + "gx/ipfs/QmerPMzPk1mJVowm8KgmoknWa4yCYvvugMPsgWmDNUvDLW/go-multihash" + + "github.com/OpenBazaar/openbazaar-go/ipfs" "github.com/OpenBazaar/openbazaar-go/pb" "github.com/OpenBazaar/openbazaar-go/repo" @@ -74,16 +77,11 @@ func (n *OpenBazaarNode) SendOfflineMessage(p peer.ID, k *libp2p.PubKey, m *pb.M return merr } - // Encrypt envelope for relay server - - // Generate an ephemeral key pair - _, ephemPriv, _ := box.GenerateKey(rand.Reader) - ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout) defer cancel() if k == nil { var pubKey libp2p.PubKey - keyval, err := n.IpfsNode.Repo.Datastore().Get(datastore.NewKey(KeyCachePrefix + p.Pretty())) + keyval, err := n.IpfsNode.Repo.Datastore().Get(datastore.NewKey("/pubkey/" + p.Pretty())) if err != nil { pubKey, err = routing.GetPublicKey(n.IpfsNode.Routing, ctx, p) if err != nil { @@ -100,84 +98,75 @@ func (n *OpenBazaarNode) SendOfflineMessage(p peer.ID, k *libp2p.PubKey, m *pb.M k = &pubKey } - // Generate nonce - var nonce [24]byte - noncedata := make([]byte, 24) - rand.Read(noncedata) - for i := 0; i < 24; i++ { - nonce[i] = noncedata[i] + relayciphertext, err := net.Encrypt(*k, messageBytes) + if err != nil { + return fmt.Errorf("Error: %s", err.Error()) } - // Encrypt - var ciphertext []byte - keybytes, _ := libp2p.MarshalPublicKey(*k) - var pkey *[32]byte - copy(pkey[:], keybytes[:32]) + // Base64 encode + encodedCipherText := base64.StdEncoding.EncodeToString(relayciphertext) - box.Seal(ciphertext, messageBytes, &nonce, pkey, ephemPriv) + n.WebRelayManager.SendRelayMessage(encodedCipherText, p.String()) // TODO: this function blocks if the recipient's public key is not on the local machine - //ciphertext, cerr := n.EncryptMessage(p, k, messageBytes) - //if cerr != nil { - // return cerr - //} - - // Send to webrelay - - // - //addr, aerr := n.MessageStorage.Store(p, ciphertext) - //if aerr != nil { - // return aerr - //} - //mh, mherr := multihash.FromB58String(p.Pretty()) - //if mherr != nil { - // return mherr - //} - ///* TODO: We are just using a default prefix length for now. Eventually we will want to customize this, - // but we will need some way to get the recipient's desired prefix length. Likely will be in profile. */ - //pointer, err := ipfs.NewPointer(mh, DefaultPointerPrefixLength, addr, ciphertext) - //if err != nil { - // return err - //} - //if m.MessageType != pb.Message_OFFLINE_ACK { - // pointer.Purpose = ipfs.MESSAGE - // pointer.CancelID = &p - // err = n.Datastore.Pointers().Put(pointer) - // if err != nil { - // return err - // } - //} - //log.Debugf("Sending offline message to: %s, Message Type: %s, PointerID: %s, Location: %s", p.Pretty(), m.MessageType.String(), pointer.Cid.String(), pointer.Value.Addrs[0].String()) - //OfflineMessageWaitGroup.Add(2) - //go func() { - // ctx, cancel := context.WithCancel(context.Background()) - // defer cancel() - // err := ipfs.PublishPointer(n.DHT, ctx, pointer) - // if err != nil { - // log.Error(err) - // } - // - // // Push provider to our push nodes for redundancy - // for _, p := range n.PushNodes { - // ctx, cancel := context.WithCancel(context.Background()) - // defer cancel() - // err := ipfs.PutPointerToPeer(n.DHT, ctx, p, pointer) - // if err != nil { - // log.Error(err) - // } - // } - // - // OfflineMessageWaitGroup.Done() - //}() - //go func() { - // ctx, cancel := context.WithCancel(context.Background()) - // defer cancel() - // err := n.Pubsub.Publisher.Publish(ctx, pointer.Cid.String(), ciphertext) - // if err != nil { - // log.Error(err) - // } - // OfflineMessageWaitGroup.Done() - //}() + ciphertext, cerr := n.EncryptMessage(p, k, messageBytes) + if cerr != nil { + return cerr + } + + addr, aerr := n.MessageStorage.Store(p, ciphertext) + if aerr != nil { + return aerr + } + mh, mherr := multihash.FromB58String(p.Pretty()) + if mherr != nil { + return mherr + } + /* TODO: We are just using a default prefix length for now. Eventually we will want to customize this, + but we will need some way to get the recipient's desired prefix length. Likely will be in profile. */ + pointer, err := ipfs.NewPointer(mh, DefaultPointerPrefixLength, addr, ciphertext) + if err != nil { + return err + } + if m.MessageType != pb.Message_OFFLINE_ACK { + pointer.Purpose = ipfs.MESSAGE + pointer.CancelID = &p + err = n.Datastore.Pointers().Put(pointer) + if err != nil { + return err + } + } + log.Debugf("Sending offline message to: %s, Message Type: %s, PointerID: %s, Location: %s", p.Pretty(), m.MessageType.String(), pointer.Cid.String(), pointer.Value.Addrs[0].String()) + OfflineMessageWaitGroup.Add(2) + go func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := ipfs.PublishPointer(n.DHT, ctx, pointer) + if err != nil { + log.Error(err) + } + + // Push provider to our push nodes for redundancy + for _, p := range n.PushNodes { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := ipfs.PutPointerToPeer(n.DHT, ctx, p, pointer) + if err != nil { + log.Error(err) + } + } + + OfflineMessageWaitGroup.Done() + }() + go func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := n.Pubsub.Publisher.Publish(ctx, pointer.Cid.String(), ciphertext) + if err != nil { + log.Error(err) + } + OfflineMessageWaitGroup.Done() + }() return nil } diff --git a/mobile/node.go b/mobile/node.go index 9482c3ecab..f229d7a85b 100644 --- a/mobile/node.go +++ b/mobile/node.go @@ -26,6 +26,8 @@ import ( ipfslogging "gx/ipfs/QmbkT7eMTyXfpeyB3ZMxxcxg7XH8t6uXp49jqzz4HB7BGF/go-log/writer" "gx/ipfs/Qmc85NSvmSG4Frn9Vb2cBc1rMyULH6D3TNVEfCzSKoUpip/go-multiaddr-net" + _ "net/http/pprof" + "github.com/OpenBazaar/openbazaar-go/api" "github.com/OpenBazaar/openbazaar-go/core" "github.com/OpenBazaar/openbazaar-go/ipfs" @@ -51,7 +53,6 @@ import ( "github.com/natefinch/lumberjack" "github.com/op/go-logging" "github.com/tyler-smith/go-bip39" - _ "net/http/pprof" ) var log = logging.MustGetLogger("mobile") @@ -290,7 +291,7 @@ func NewNodeWithConfig(config *NodeConfig, password string, mnemonic string) (*N Multiwallet: mw, OfflineMessageFailoverTimeout: 5 * time.Second, PushNodes: pushNodes, - WebRelayManager: wm, + WebRelayManager: wm, RepoPath: config.RepoPath, UserAgent: core.USERAGENT, IPNSQuorumSize: uint(ipnsExtraConfig.DHTQuorumSize), diff --git a/net/web_relay_manager.go b/net/web_relay_manager.go index 201b8ed3f0..8f6114338a 100644 --- a/net/web_relay_manager.go +++ b/net/web_relay_manager.go @@ -5,14 +5,17 @@ import ( "encoding/binary" "encoding/json" "fmt" + + "github.com/btcsuite/btcutil/base58" + + "gx/ipfs/QmerPMzPk1mJVowm8KgmoknWa4yCYvvugMPsgWmDNUvDLW/go-multihash" + "github.com/gorilla/websocket" - "github.com/mr-tron/base58/base58" - "github.com/multiformats/go-multihash" ) type WebRelayManager struct { webrelays []string - peerID string + peerID string } type EncryptedMessage struct { @@ -20,7 +23,7 @@ type EncryptedMessage struct { Recipient string `json:"recipient"` } -type TypedMessage struct{ +type TypedMessage struct { Type string Data json.RawMessage } @@ -36,7 +39,7 @@ func NewWebRelayManager(webrelays []string, sender string) *WebRelayManager { func (wrm *WebRelayManager) SendRelayMessage(ciphertext string, recipient string) { encryptedmessage := EncryptedMessage{ - Message: ciphertext, + Message: ciphertext, Recipient: recipient, } @@ -66,7 +69,7 @@ func (wrm *WebRelayManager) authToWebRelay(server string, msg []byte) { prefix64 := binary.BigEndian.Uint64(prefix) // Then shifting - shiftedPrefix64 := prefix64>>uint(48) + shiftedPrefix64 := prefix64 >> uint(48) // Then converting back to a byte array shiftedBytes := make([]byte, 8) @@ -78,8 +81,8 @@ func (wrm *WebRelayManager) authToWebRelay(server string, msg []byte) { // Generate subscribe message subscribeMessage := SubscribeMessage{ - UserID: wrm.peerID, - SubscriptionKey: base58.Encode([]byte(subscriptionKey)), + UserID: wrm.peerID, + SubscriptionKey: base58.Encode(subscriptionKey), } data, _ := json.Marshal(subscribeMessage) @@ -113,4 +116,3 @@ func (wrm *WebRelayManager) authToWebRelay(server string, msg []byte) { } } - From 8cdce94a7a3075adb04858b8783c820f0af79bb7 Mon Sep 17 00:00:00 2001 From: Brian Hoffman Date: Thu, 7 Nov 2019 11:56:07 -0500 Subject: [PATCH 10/22] Refactor to send message to all relays Keep connections open and refactor sending outbound messages to all connections. --- net/web_relay_manager.go | 89 +++++++++++++++++++++++++--------------- 1 file changed, 56 insertions(+), 33 deletions(-) diff --git a/net/web_relay_manager.go b/net/web_relay_manager.go index 8f6114338a..9e282da6be 100644 --- a/net/web_relay_manager.go +++ b/net/web_relay_manager.go @@ -14,8 +14,9 @@ import ( ) type WebRelayManager struct { - webrelays []string - peerID string + webrelays []string + peerID string + connections []*websocket.Conn } type EncryptedMessage struct { @@ -34,34 +35,25 @@ type SubscribeMessage struct { } func NewWebRelayManager(webrelays []string, sender string) *WebRelayManager { - return &WebRelayManager{webrelays, sender} -} -func (wrm *WebRelayManager) SendRelayMessage(ciphertext string, recipient string) { - encryptedmessage := EncryptedMessage{ - Message: ciphertext, - Recipient: recipient, - } + // Establish connections + var conns []*websocket.Conn + for _, relay := range webrelays { - data, _ := json.Marshal(encryptedmessage) + // Connect and subscribe to websocket server + conn, err := connectToServer(relay, sender) + if err != nil { + log.Error("Could not connect to: %s", relay) + } - typedmessage := TypedMessage{ - Type: "EncryptedMessage", - Data: data, + conns = append(conns, conn) } - - outgoing, _ := json.Marshal(typedmessage) - fmt.Println(string(outgoing)) - - // Transmit the encrypted message to the webrelay - wrm.authToWebRelay(wrm.webrelays[0], outgoing) - + return &WebRelayManager{webrelays, sender, conns} } -func (wrm *WebRelayManager) authToWebRelay(server string, msg []byte) { - +func connectToServer(relay string, sender string) (*websocket.Conn, error) { // Generate subscription key for web relay - peerIDMultihash, _ := multihash.FromB58String(wrm.peerID) + peerIDMultihash, _ := multihash.FromB58String(sender) decoded, _ := multihash.Decode(peerIDMultihash) digest := decoded.Digest prefix := digest[:8] @@ -81,7 +73,7 @@ func (wrm *WebRelayManager) authToWebRelay(server string, msg []byte) { // Generate subscribe message subscribeMessage := SubscribeMessage{ - UserID: wrm.peerID, + UserID: sender, SubscriptionKey: base58.Encode(subscriptionKey), } @@ -90,29 +82,60 @@ func (wrm *WebRelayManager) authToWebRelay(server string, msg []byte) { Type: "SubscribeMessage", Data: data, } - fmt.Println(typedmessage) + fmt.Println("Sending SubscribeMessage:", typedmessage) socketmessage, _ := json.Marshal(typedmessage) // Connect to websocket server - fmt.Printf("connecting to %s\n", server) + fmt.Printf("Connecting to relay server: %s\n", relay) - c, _, err := websocket.DefaultDialer.Dial(server, nil) + c, _, err := websocket.DefaultDialer.Dial(relay, nil) if err != nil { log.Fatal("dial:", err) + return nil, err } - defer c.Close() err = c.WriteMessage(websocket.TextMessage, socketmessage) if err != nil { fmt.Println("write:", err) - return + return nil, err } - err = c.WriteMessage(websocket.TextMessage, msg) - if err != nil { - fmt.Println("write:", err) - return + fmt.Printf("Successfully connected and subscribed to: %s\n", relay) + + return c, nil +} + +func (wrm *WebRelayManager) SendRelayMessage(ciphertext string, recipient string) { + encryptedmessage := EncryptedMessage{ + Message: ciphertext, + Recipient: recipient, + } + + data, _ := json.Marshal(encryptedmessage) + + typedmessage := TypedMessage{ + Type: "EncryptedMessage", + Data: data, + } + + outgoing, _ := json.Marshal(typedmessage) + fmt.Println(string(outgoing)) + + // Transmit the encrypted message to the webrelay + wrm.authToWebRelay(wrm.webrelays[0], outgoing) + +} + +func (wrm *WebRelayManager) authToWebRelay(server string, msg []byte) { + + for _, conn := range wrm.connections { + err := conn.WriteMessage(websocket.TextMessage, msg) + if err != nil { + fmt.Println("write:", err) + return + } + fmt.Printf("Successfully sent message to relay: %s\n", conn.RemoteAddr()) } } From 46f617263907eb3a2be4a78f5cc6fec9b76ad7ed Mon Sep 17 00:00:00 2001 From: Brian Hoffman Date: Thu, 7 Nov 2019 12:27:51 -0500 Subject: [PATCH 11/22] Listening to websocket for incomings Fix peer ID Fix peer ID --- core/net.go | 2 +- net/web_relay_manager.go | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/core/net.go b/core/net.go index c2b2c384c2..92017513b8 100644 --- a/core/net.go +++ b/core/net.go @@ -106,7 +106,7 @@ func (n *OpenBazaarNode) SendOfflineMessage(p peer.ID, k *libp2p.PubKey, m *pb.M // Base64 encode encodedCipherText := base64.StdEncoding.EncodeToString(relayciphertext) - n.WebRelayManager.SendRelayMessage(encodedCipherText, p.String()) + n.WebRelayManager.SendRelayMessage(encodedCipherText, p.Pretty()) // TODO: this function blocks if the recipient's public key is not on the local machine ciphertext, cerr := n.EncryptMessage(p, k, messageBytes) diff --git a/net/web_relay_manager.go b/net/web_relay_manager.go index 9e282da6be..913926346a 100644 --- a/net/web_relay_manager.go +++ b/net/web_relay_manager.go @@ -86,6 +86,7 @@ func connectToServer(relay string, sender string) (*websocket.Conn, error) { socketmessage, _ := json.Marshal(typedmessage) + fmt.Println(string(socketmessage)) // Connect to websocket server fmt.Printf("Connecting to relay server: %s\n", relay) @@ -103,6 +104,25 @@ func connectToServer(relay string, sender string) (*websocket.Conn, error) { fmt.Printf("Successfully connected and subscribed to: %s\n", relay) + go func() { + for { + // read in a message + _, p, err := c.ReadMessage() + if err != nil { + fmt.Println(err) + //return nil, err + } + // print out that message for clarity + fmt.Printf("Received incoming message from relay: %s\n", string(p)) + + //if err := c.WriteMessage(messageType, p); err != nil { + // fmt.Println(err) + // //return nil, err + //} + + } + }() + return c, nil } From 252985f42fe3eb45377f2e070129f5de48dbfde7 Mon Sep 17 00:00:00 2001 From: Brian Hoffman Date: Thu, 7 Nov 2019 15:22:55 -0500 Subject: [PATCH 12/22] break out if connection is dead --- net/web_relay_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/net/web_relay_manager.go b/net/web_relay_manager.go index 913926346a..ff72e0fa29 100644 --- a/net/web_relay_manager.go +++ b/net/web_relay_manager.go @@ -110,7 +110,7 @@ func connectToServer(relay string, sender string) (*websocket.Conn, error) { _, p, err := c.ReadMessage() if err != nil { fmt.Println(err) - //return nil, err + break } // print out that message for clarity fmt.Printf("Received incoming message from relay: %s\n", string(p)) From 22286951f55c3b5fb845fcb695b8f7926a646670 Mon Sep 17 00:00:00 2001 From: Brian Hoffman Date: Fri, 8 Nov 2019 09:02:11 -0500 Subject: [PATCH 13/22] Send offline anyways for testing --- core/net.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/net.go b/core/net.go index 92017513b8..470215faf5 100644 --- a/core/net.go +++ b/core/net.go @@ -616,7 +616,7 @@ func (n *OpenBazaarNode) SendChat(peerID string, chatMessage *pb.Chat) error { ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout) defer cancel() err = n.Service.SendMessage(ctx, p, &m) - if err != nil && chatMessage.Flag != pb.Chat_TYPING { + if chatMessage.Flag != pb.Chat_TYPING { if err := n.SendOfflineMessage(p, nil, &m); err != nil { log.Errorf("failed to send offline message: %v", err) return err From 8dcb680c5892f8a3e00d9d0ddaa585e3acf0582d Mon Sep 17 00:00:00 2001 From: Brian Hoffman Date: Fri, 8 Nov 2019 10:14:09 -0500 Subject: [PATCH 14/22] Handle incoming socket messages --- cmd/start.go | 3 +++ mobile/node.go | 2 ++ net/web_relay_manager.go | 55 ++++++++++++++++++++++++++++++++-------- 3 files changed, 50 insertions(+), 10 deletions(-) diff --git a/cmd/start.go b/cmd/start.go index 14f0841a67..e4fdf3758c 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -684,6 +684,7 @@ func (x *Start) Execute(args []string) error { }() } } + core.Node.Service = service.New(core.Node, sqliteDB) core.Node.Service.WaitForReady() log.Info("OpenBazaar Service Ready") @@ -692,6 +693,8 @@ func (x *Start) Execute(args []string) error { core.Node.StartPointerRepublisher() core.Node.StartRecordAgingNotifier() + core.Node.WebRelayManager.ConnectToRelays(core.Node.Service) + core.Node.PublishLock.Unlock() err = core.Node.UpdateFollow() if err != nil { diff --git a/mobile/node.go b/mobile/node.go index f229d7a85b..e7bbc099d6 100644 --- a/mobile/node.go +++ b/mobile/node.go @@ -483,6 +483,8 @@ func (n *Node) start() error { n.OpenBazaarNode.PointerRepublisher = PR MR.Wait() + n.OpenBazaarNode.WebRelayManager.ConnectToRelays(n.OpenBazaarNode.Service) + n.OpenBazaarNode.PublishLock.Unlock() publishUnlocked = true n.OpenBazaarNode.UpdateFollow() diff --git a/net/web_relay_manager.go b/net/web_relay_manager.go index ff72e0fa29..246f5441ed 100644 --- a/net/web_relay_manager.go +++ b/net/web_relay_manager.go @@ -5,11 +5,15 @@ import ( "encoding/binary" "encoding/json" "fmt" + peer "gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer" - "github.com/btcsuite/btcutil/base58" + "github.com/OpenBazaar/openbazaar-go/pb" + "github.com/golang/protobuf/ptypes/any" "gx/ipfs/QmerPMzPk1mJVowm8KgmoknWa4yCYvvugMPsgWmDNUvDLW/go-multihash" + "github.com/btcsuite/btcutil/base58" + "github.com/gorilla/websocket" ) @@ -17,6 +21,7 @@ type WebRelayManager struct { webrelays []string peerID string connections []*websocket.Conn + obService NetworkService } type EncryptedMessage struct { @@ -34,24 +39,34 @@ type SubscribeMessage struct { SubscriptionKey string `json:"subscriptionKey"` } +type SubscribeResponse struct { + Subscribe string `json:"subscribe"` +} + func NewWebRelayManager(webrelays []string, sender string) *WebRelayManager { + return &WebRelayManager{webrelays, sender, nil, nil} +} + +func (wrm *WebRelayManager) ConnectToRelays(service NetworkService) { + + // Set WRM service + wrm.obService = service // Establish connections var conns []*websocket.Conn - for _, relay := range webrelays { + for _, relay := range wrm.webrelays { // Connect and subscribe to websocket server - conn, err := connectToServer(relay, sender) + conn, err := wrm.connectToServer(relay, wrm.peerID) if err != nil { log.Error("Could not connect to: %s", relay) } conns = append(conns, conn) } - return &WebRelayManager{webrelays, sender, conns} } -func connectToServer(relay string, sender string) (*websocket.Conn, error) { +func (wrm *WebRelayManager) connectToServer(relay string, sender string) (*websocket.Conn, error) { // Generate subscription key for web relay peerIDMultihash, _ := multihash.FromB58String(sender) decoded, _ := multihash.Decode(peerIDMultihash) @@ -102,7 +117,7 @@ func connectToServer(relay string, sender string) (*websocket.Conn, error) { return nil, err } - fmt.Printf("Successfully connected and subscribed to: %s\n", relay) + fmt.Printf("Successfully connected to %s and subscribed to: %s\n", relay, base58.Encode(subscriptionKey)) go func() { for { @@ -115,10 +130,30 @@ func connectToServer(relay string, sender string) (*websocket.Conn, error) { // print out that message for clarity fmt.Printf("Received incoming message from relay: %s\n", string(p)) - //if err := c.WriteMessage(messageType, p); err != nil { - // fmt.Println(err) - // //return nil, err - //} + if string(p) == "{\"subscribe\": true}" { + log.Debugf("Received subscribe success message") + } else { + // turn encrypted message into OFFLINE_RELAY and process normally + m := new(pb.Message) + m.MessageType = pb.Message_OFFLINE_RELAY + m.Payload = &any.Any{Value: p} + + handler := wrm.obService.HandlerForMsgType(m.MessageType) + + peerID, _ := peer.IDB58Decode(sender) + + if peerID != "" { + m, err = handler(peerID, m, nil) + if err != nil { + if m != nil { + log.Debugf("%s handle message error: %s", m.MessageType.String(), err.Error()) + } else { + log.Errorf("Error: %s", err.Error()) + } + } + log.Debugf("Received OFFLINE_RELAY2 message from %s", peerID.Pretty()) + } + } } }() From c6cde780e4c54a1b8a0394895f328163aae11367 Mon Sep 17 00:00:00 2001 From: Brian Hoffman Date: Fri, 8 Nov 2019 11:25:51 -0500 Subject: [PATCH 15/22] Reconnection to web relay --- net/web_relay_manager.go | 52 +++++++++++++++++++++++++--------------- 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/net/web_relay_manager.go b/net/web_relay_manager.go index 246f5441ed..9435a4ec3e 100644 --- a/net/web_relay_manager.go +++ b/net/web_relay_manager.go @@ -4,8 +4,8 @@ import ( "crypto/sha256" "encoding/binary" "encoding/json" - "fmt" peer "gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer" + "time" "github.com/OpenBazaar/openbazaar-go/pb" "github.com/golang/protobuf/ptypes/any" @@ -62,7 +62,7 @@ func (wrm *WebRelayManager) ConnectToRelays(service NetworkService) { log.Error("Could not connect to: %s", relay) } - conns = append(conns, conn) + wrm.connections = append(conns, conn) } } @@ -97,38 +97,36 @@ func (wrm *WebRelayManager) connectToServer(relay string, sender string) (*webso Type: "SubscribeMessage", Data: data, } - fmt.Println("Sending SubscribeMessage:", typedmessage) socketmessage, _ := json.Marshal(typedmessage) - fmt.Println(string(socketmessage)) // Connect to websocket server - fmt.Printf("Connecting to relay server: %s\n", relay) + log.Debugf("Connecting to relay server: %s\n", relay) c, _, err := websocket.DefaultDialer.Dial(relay, nil) if err != nil { - log.Fatal("dial:", err) + log.Error("dial:", err) return nil, err } err = c.WriteMessage(websocket.TextMessage, socketmessage) if err != nil { - fmt.Println("write:", err) + log.Debugf("write:", err) return nil, err } - fmt.Printf("Successfully connected to %s and subscribed to: %s\n", relay, base58.Encode(subscriptionKey)) + log.Debugf("Successfully connected to %s and subscribed to: %s\n", relay, base58.Encode(subscriptionKey)) go func() { for { // read in a message _, p, err := c.ReadMessage() if err != nil { - fmt.Println(err) + log.Debugf("Connection to relay has an error: %s", err) + log.Debugf("Attempting to reconnect to the relay...") + wrm.reconnectToRelay(relay, sender) break } - // print out that message for clarity - fmt.Printf("Received incoming message from relay: %s\n", string(p)) if string(p) == "{\"subscribe\": true}" { log.Debugf("Received subscribe success message") @@ -161,6 +159,17 @@ func (wrm *WebRelayManager) connectToServer(relay string, sender string) (*webso return c, nil } +func (wrm *WebRelayManager) reconnectToRelay(relay string, sender string) { + conn, err := wrm.connectToServer(relay, wrm.peerID) + if err != nil { + log.Error("Could not connect to: %s", relay) + time.Sleep(10 * time.Second) + wrm.reconnectToRelay(relay, wrm.peerID) + } else { + wrm.connections = append(wrm.connections, conn) + } +} + func (wrm *WebRelayManager) SendRelayMessage(ciphertext string, recipient string) { encryptedmessage := EncryptedMessage{ Message: ciphertext, @@ -175,22 +184,27 @@ func (wrm *WebRelayManager) SendRelayMessage(ciphertext string, recipient string } outgoing, _ := json.Marshal(typedmessage) - fmt.Println(string(outgoing)) + log.Debugf("Sending encrypted relay message: %s", string(outgoing)) // Transmit the encrypted message to the webrelay wrm.authToWebRelay(wrm.webrelays[0], outgoing) - } func (wrm *WebRelayManager) authToWebRelay(server string, msg []byte) { - for _, conn := range wrm.connections { - err := conn.WriteMessage(websocket.TextMessage, msg) - if err != nil { - fmt.Println("write:", err) - return + if conn != nil { + err := conn.WriteMessage(websocket.TextMessage, msg) + if err != nil { + log.Debugf("write:", err) + } else { + log.Debugf("Successfully sent message to relay: %s\n", conn.RemoteAddr()) + } + } - fmt.Printf("Successfully sent message to relay: %s\n", conn.RemoteAddr()) + } + + if len(wrm.connections) == 0 { + log.Debugf("There are no websocket connections to send relay message to") } } From 7c52b1f4390a72e8749347d8e2e2c98e634c823d Mon Sep 17 00:00:00 2001 From: Brian Hoffman Date: Fri, 8 Nov 2019 11:32:31 -0500 Subject: [PATCH 16/22] Replace the offline piece --- core/net.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/net.go b/core/net.go index 470215faf5..92017513b8 100644 --- a/core/net.go +++ b/core/net.go @@ -616,7 +616,7 @@ func (n *OpenBazaarNode) SendChat(peerID string, chatMessage *pb.Chat) error { ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout) defer cancel() err = n.Service.SendMessage(ctx, p, &m) - if chatMessage.Flag != pb.Chat_TYPING { + if err != nil && chatMessage.Flag != pb.Chat_TYPING { if err := n.SendOfflineMessage(p, nil, &m); err != nil { log.Errorf("failed to send offline message: %v", err) return err From a44752895f6b7aa6452774052f876031917bc179 Mon Sep 17 00:00:00 2001 From: Brian Hoffman Date: Fri, 8 Nov 2019 12:18:46 -0500 Subject: [PATCH 17/22] Refactor relayed messages out of offline --- core/net.go | 57 ++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 41 insertions(+), 16 deletions(-) diff --git a/core/net.go b/core/net.go index 92017513b8..31c55fc704 100644 --- a/core/net.go +++ b/core/net.go @@ -45,6 +45,7 @@ func (n *OpenBazaarNode) sendMessage(peerID string, k *libp2p.PubKey, message pb return err } ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout) + n.SendRelayedMessage(p, k, &message) // send relayed message immediately defer cancel() err = n.Service.SendMessage(ctx, p, &message) if err != nil { @@ -57,25 +58,11 @@ func (n *OpenBazaarNode) sendMessage(peerID string, k *libp2p.PubKey, message pb return nil } -// SendOfflineMessage Supply of a public key is optional, if nil is instead provided n.EncryptMessage does a lookup -func (n *OpenBazaarNode) SendOfflineMessage(p peer.ID, k *libp2p.PubKey, m *pb.Message) error { - pubKeyBytes, err := n.IpfsNode.PrivateKey.GetPublic().Bytes() - if err != nil { - return err - } - ser, err := proto.Marshal(m) - if err != nil { - return err - } - sig, err := n.IpfsNode.PrivateKey.Sign(ser) +func (n *OpenBazaarNode) SendRelayedMessage(p peer.ID, k *libp2p.PubKey, m *pb.Message) error { + messageBytes, err := n.getMessageBytes(m) if err != nil { return err } - env := pb.Envelope{Message: m, Pubkey: pubKeyBytes, Signature: sig} - messageBytes, merr := proto.Marshal(&env) - if merr != nil { - return merr - } ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout) defer cancel() @@ -108,6 +95,38 @@ func (n *OpenBazaarNode) SendOfflineMessage(p peer.ID, k *libp2p.PubKey, m *pb.M n.WebRelayManager.SendRelayMessage(encodedCipherText, p.Pretty()) + return nil +} + +func (n *OpenBazaarNode) getMessageBytes(m *pb.Message) ([]byte, error) { + pubKeyBytes, err := n.IpfsNode.PrivateKey.GetPublic().Bytes() + if err != nil { + return nil, err + } + ser, err := proto.Marshal(m) + if err != nil { + return nil, err + } + sig, err := n.IpfsNode.PrivateKey.Sign(ser) + if err != nil { + return nil, err + } + + env := pb.Envelope{Message: m, Pubkey: pubKeyBytes, Signature: sig} + messageBytes, merr := proto.Marshal(&env) + if merr != nil { + return nil, merr + } + return messageBytes, nil +} + +// SendOfflineMessage Supply of a public key is optional, if nil is instead provided n.EncryptMessage does a lookup +func (n *OpenBazaarNode) SendOfflineMessage(p peer.ID, k *libp2p.PubKey, m *pb.Message) error { + messageBytes, err := n.getMessageBytes(m) + if err != nil { + return err + } + // TODO: this function blocks if the recipient's public key is not on the local machine ciphertext, cerr := n.EncryptMessage(p, k, messageBytes) if cerr != nil { @@ -319,6 +338,8 @@ func (n *OpenBazaarNode) ResendCachedOrderMessage(orderID string, msgType pb.Mes return fmt.Errorf("unable to decode invalid peer ID for order (%s) and message type (%s)", orderID, msgType.String()) } + n.SendRelayedMessage(p, nil, &msg.Msg) // send relayed message immediately + ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout) defer cancel() @@ -614,6 +635,7 @@ func (n *OpenBazaarNode) SendChat(peerID string, chatMessage *pb.Chat) error { return err } ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout) + n.SendRelayedMessage(p, nil, &m) // send relayed message immediately defer cancel() err = n.Service.SendMessage(ctx, p, &m) if err != nil && chatMessage.Flag != pb.Chat_TYPING { @@ -834,6 +856,9 @@ func (n *OpenBazaarNode) SendOrderPayment(peerID string, paymentMessage *pb.Orde if err != nil { return err } + + n.SendRelayedMessage(p, nil, &m) // send relayed message immediately + ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout) err = n.Service.SendMessage(ctx, p, &m) cancel() From f6cea9bc18f936a24d4a5b9589407906af242505 Mon Sep 17 00:00:00 2001 From: Brian Hoffman Date: Sat, 9 Nov 2019 11:17:33 -0500 Subject: [PATCH 18/22] Updated comments --- core/net.go | 1 + net/web_relay_manager.go | 8 +++++++- schema/configuration.go | 1 + 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/core/net.go b/core/net.go index 31c55fc704..dce9743fe7 100644 --- a/core/net.go +++ b/core/net.go @@ -58,6 +58,7 @@ func (n *OpenBazaarNode) sendMessage(peerID string, k *libp2p.PubKey, message pb return nil } +// SendRelayedMessage - send message through web relay manager to recipient func (n *OpenBazaarNode) SendRelayedMessage(p peer.ID, k *libp2p.PubKey, m *pb.Message) error { messageBytes, err := n.getMessageBytes(m) if err != nil { diff --git a/net/web_relay_manager.go b/net/web_relay_manager.go index 9435a4ec3e..d396b539f8 100644 --- a/net/web_relay_manager.go +++ b/net/web_relay_manager.go @@ -17,6 +17,7 @@ import ( "github.com/gorilla/websocket" ) +// WebRelayManager - manages connections to web relay servers type WebRelayManager struct { webrelays []string peerID string @@ -24,31 +25,36 @@ type WebRelayManager struct { obService NetworkService } +// EncryptedMessage - message envelope for relay messages type EncryptedMessage struct { Message string `json:"encryptedMessage"` Recipient string `json:"recipient"` } +// TypedMessage - generic typed message for transport type TypedMessage struct { Type string Data json.RawMessage } +// SubscribeMessage - authentication message for web relay server type SubscribeMessage struct { UserID string `json:"userID"` SubscriptionKey string `json:"subscriptionKey"` } +// SubscribeResponse - for marshaling authN response from web relay server type SubscribeResponse struct { Subscribe string `json:"subscribe"` } +// NewWebRelayManager - creates a web relay manager to maintain connections func NewWebRelayManager(webrelays []string, sender string) *WebRelayManager { return &WebRelayManager{webrelays, sender, nil, nil} } +// ConnectToRelays - initiate websocket connections to the relay servers configured func (wrm *WebRelayManager) ConnectToRelays(service NetworkService) { - // Set WRM service wrm.obService = service diff --git a/schema/configuration.go b/schema/configuration.go index 7632e23058..beb53a775a 100644 --- a/schema/configuration.go +++ b/schema/configuration.go @@ -415,6 +415,7 @@ func GetRepublishInterval(cfgBytes []byte) (time.Duration, error) { return d, nil } +// GetWebRelays - retrieves web relay server addresses from config file func GetWebRelays(cfgBytes []byte) ([]string, error) { var cfgIface interface{} err := json.Unmarshal(cfgBytes, &cfgIface) From 74bcbcecdc4131ee995e54a295d482e3104c577b Mon Sep 17 00:00:00 2001 From: Brian Hoffman Date: Sat, 9 Nov 2019 11:28:00 -0500 Subject: [PATCH 19/22] Refactor unused parameter --- net/web_relay_manager.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/net/web_relay_manager.go b/net/web_relay_manager.go index d396b539f8..f35b05d8d8 100644 --- a/net/web_relay_manager.go +++ b/net/web_relay_manager.go @@ -130,7 +130,7 @@ func (wrm *WebRelayManager) connectToServer(relay string, sender string) (*webso if err != nil { log.Debugf("Connection to relay has an error: %s", err) log.Debugf("Attempting to reconnect to the relay...") - wrm.reconnectToRelay(relay, sender) + wrm.reconnectToRelay(relay) break } @@ -165,12 +165,12 @@ func (wrm *WebRelayManager) connectToServer(relay string, sender string) (*webso return c, nil } -func (wrm *WebRelayManager) reconnectToRelay(relay string, sender string) { +func (wrm *WebRelayManager) reconnectToRelay(relay string) { conn, err := wrm.connectToServer(relay, wrm.peerID) if err != nil { log.Error("Could not connect to: %s", relay) time.Sleep(10 * time.Second) - wrm.reconnectToRelay(relay, wrm.peerID) + wrm.reconnectToRelay(relay) } else { wrm.connections = append(wrm.connections, conn) } From a805f458a22ca70c191834e53dfc18f0b6bfd188 Mon Sep 17 00:00:00 2001 From: Brian Hoffman Date: Sat, 9 Nov 2019 11:30:07 -0500 Subject: [PATCH 20/22] Send relay message to all configured relays --- net/web_relay_manager.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/net/web_relay_manager.go b/net/web_relay_manager.go index f35b05d8d8..b4e52b6ad5 100644 --- a/net/web_relay_manager.go +++ b/net/web_relay_manager.go @@ -192,8 +192,10 @@ func (wrm *WebRelayManager) SendRelayMessage(ciphertext string, recipient string outgoing, _ := json.Marshal(typedmessage) log.Debugf("Sending encrypted relay message: %s", string(outgoing)) - // Transmit the encrypted message to the webrelay - wrm.authToWebRelay(wrm.webrelays[0], outgoing) + // Transmit the encrypted message to each configured web relay + for _, relay := range wrm.webrelays { + wrm.authToWebRelay(relay, outgoing) + } } func (wrm *WebRelayManager) authToWebRelay(server string, msg []byte) { @@ -205,7 +207,6 @@ func (wrm *WebRelayManager) authToWebRelay(server string, msg []byte) { } else { log.Debugf("Successfully sent message to relay: %s\n", conn.RemoteAddr()) } - } } From 5340cc829d468be15a7f28774966287d11271639 Mon Sep 17 00:00:00 2001 From: Brian Hoffman Date: Sun, 10 Nov 2019 08:10:37 -0500 Subject: [PATCH 21/22] Refactor broadcast message --- net/web_relay_manager.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/net/web_relay_manager.go b/net/web_relay_manager.go index b4e52b6ad5..9560b0275e 100644 --- a/net/web_relay_manager.go +++ b/net/web_relay_manager.go @@ -192,13 +192,11 @@ func (wrm *WebRelayManager) SendRelayMessage(ciphertext string, recipient string outgoing, _ := json.Marshal(typedmessage) log.Debugf("Sending encrypted relay message: %s", string(outgoing)) - // Transmit the encrypted message to each configured web relay - for _, relay := range wrm.webrelays { - wrm.authToWebRelay(relay, outgoing) - } + // Transmit the encrypted message to each web relay socket connection + wrm.broadcastMessage(outgoing) } -func (wrm *WebRelayManager) authToWebRelay(server string, msg []byte) { +func (wrm *WebRelayManager) broadcastMessage(msg []byte) { for _, conn := range wrm.connections { if conn != nil { err := conn.WriteMessage(websocket.TextMessage, msg) From 5a468735af205ec3b33bd1d8aae0873101750864 Mon Sep 17 00:00:00 2001 From: Brian Hoffman Date: Sun, 10 Nov 2019 08:15:12 -0500 Subject: [PATCH 22/22] Add comment for codacy --- net/web_relay_manager.go | 1 + 1 file changed, 1 insertion(+) diff --git a/net/web_relay_manager.go b/net/web_relay_manager.go index 9560b0275e..25edbf445e 100644 --- a/net/web_relay_manager.go +++ b/net/web_relay_manager.go @@ -176,6 +176,7 @@ func (wrm *WebRelayManager) reconnectToRelay(relay string) { } } +// SendRelayMessage - Wrap relay message in encrypted envelope and broadcast func (wrm *WebRelayManager) SendRelayMessage(ciphertext string, recipient string) { encryptedmessage := EncryptedMessage{ Message: ciphertext,