OneCache vs BoltDB

In this little experiment, I set out to test OneCache (a distributed in-memory key/value store) and BoltDB (a single-file disk-based key/value store).

The programs are separated as producer and consumer versions in NATS and Iris. In unison, they just count the words in a text file (in our example, I choose the English translation of Jose Rizal’s novel entitled “The Social Cancer”). You may download it here.

NATS publisher version (pub.go)

package main

import (
"bufio"
"fmt"
"github.com/fatih/set"
"github.com/nats-io/nats"
"log"
"os"
"strings"
"time"
)

func main() {
var s = set.New()

f, _ := os.Open("socialcancer.doc")
defer f.Close()

scanner := bufio.NewScanner(f)

scanner.Split(bufio.ScanWords)

cnt := 0

natsConnection, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal("Error connecting to NATS server")
}

defer natsConnection.Close()
fmt.Println("Connected to NATS server: " + nats.DefaultURL)

start := time.Now()

var msg *nats.Msg

for scanner.Scan() {
cnt++

//line := scanner.Text()

r := strings.NewReplacer(".", "", ":", "", ",", "", "'", "", ";", "",
"[", "", "]", "", "#", "", "?", "", "(", "", ")", "", "\"", "")
newline := r.Replace(scanner.Text())

fmt.Println(newline)
s.Add(newline)

/*
Output:

The
Project
Gutenberg
EBook
of
The
Social
Cancer,
*/

msg = &nats.Msg{Subject: "pipe", Data: []byte(newline)}
natsConnection.PublishMsg(msg)

if cnt == 300 {
break
}
}

elapsed := time.Since(start)
fmt.Println("Elapsed time: ", elapsed)

msg = &nats.Msg{Subject: "end", Data: []byte("end")}
natsConnection.PublishMsg(msg)

fmt.Println("Count: ", cnt)

fmt.Println("set")
fmt.Println(s)
fmt.Println("set size")
fmt.Println(s.Size())

fmt.Scanln()
}

You can modify the program if you wish to continue processing beyond the arbitrary 300-word count. The goal is to see that the consumer version should be able to get those 300 words and process it accordingly.

NATS subscriber version using BoltDB (sub.go)

package main

import (
"fmt"
"github.com/nats-io/nats"
"log"
"runtime"
"strconv"
"sync/atomic"

"github.com/fatih/set"
"github.com/joyrexus/buckets"
)

var s = set.New()
var ops uint64 = 0
var bx, _ = buckets.Open("bolt")

func msgHandler(msg *nats.Msg) {
data := string(msg.Data)

bucket, _ := bx.New([]byte("bucket"))

got, err := bucket.Get([]byte(data))
if err != nil {
// Put key/value into the bucket
key, value := []byte(data), []byte("1")
if err := bucket.Put(key, value); err != nil {
fmt.Println("Error insert item: ", err)
} else {
fmt.Println(data, " 1")
}
} else {
//increment
key := []byte(data)
v, _ := strconv.Atoi(string(got))
v = v + 1
value := []byte(strconv.Itoa(v))

if err := bucket.Put(key, value); err != nil {
fmt.Println("Error increment: ", err)
} else {
fmt.Println(data, " ", v)
}
}

//add to set
s.Add(data)

atomic.AddUint64(&ops, 1)
if ops == 300 {
fmt.Println("set")
fmt.Println(s)
fmt.Println("set size")
fmt.Println(s.Size())
fmt.Println("ops")
fmt.Println(ops)
s.Clear()
}
}

func endHandler(msg *nats.Msg) {
fmt.Println("set")
fmt.Println(s)
fmt.Println("set size")
fmt.Println(s.Size())
fmt.Println("ops")
fmt.Println(ops)
s.Clear()
}

func main() {
natsConnection, err := nats.Connect(nats.DefaultURL)

if err != nil {
log.Fatal("Error connecting to NATS server")
}

defer bx.Close()

runtime.GOMAXPROCS(runtime.NumCPU())

fmt.Println("Connected to " + nats.DefaultURL)

fmt.Println("Subscribing to subject ")

natsConnection.Subscribe("pipe", msgHandler)

natsConnection.Subscribe("end", endHandler)

fmt.Scanln()
}

NATS subscriber version using OneCache

package main

import (
"fmt"
"github.com/nats-io/nats"
"log"
"runtime"
"sync/atomic"

"github.com/bradfitz/gomemcache/memcache"
"github.com/fatih/set"
)

var mc = memcache.New("192.168.0.133:11211")
var s = set.New()
var ops uint64 = 0

func msgHandler(msg *nats.Msg) {
data := string(msg.Data)

_, err := mc.Get(data)
if err != nil {
mc.Set(&memcache.Item{
Key: data,
Value: []byte("0"),
Expiration: 0,
})
}

//add to set
s.Add(data)

var i uint64
i, err = mc.Increment(data, 1)
if err != nil {
log.Println("Error in increment, ", err)
}

atomic.AddUint64(&ops, 1)

fmt.Println(data, " ", i)
}

func endHandler(msg *nats.Msg) {
fmt.Println("set")
fmt.Println(s)
fmt.Println("set size")
fmt.Println(s.Size())
fmt.Println("ops")
fmt.Println(ops)
s.Clear()
}

func main() {
natsConnection, err := nats.Connect(nats.DefaultURL)

if err != nil {
log.Fatal("Error connecting to NATS server")
}

runtime.GOMAXPROCS(runtime.NumCPU())

fmt.Println("Connected to " + nats.DefaultURL)

fmt.Println("Subscribing to subject ")

natsConnection.Subscribe("pipe", msgHandler)

natsConnection.Subscribe("end", endHandler)

fmt.Scanln()
}

The result is disappointing with OneCache. I expect to see the same result set regardless how many times I run the subscriber program. With BoltDB, it is consistent (it should be the same as long as you don’t modify the program, of course).

Initially, I thought the problem was with NATS. But when I wrote the Iris versions of publisher and subscriber, the culprit is with OneCache (not Iris or NATS).

Moreover, with NATS, if you publish a topic, NATS expects that the corresponding subscriber on the other end will process it immediately. In our example, the “end” topic was not processed because it was published too soon and the subscriber is “slow” (in our case, it needs to wait for the end signal). The same is true with Iris.

However, NATS and Iris are message transports and are not designed with message acknowledgement and retry. If you need the latter features, you need NSQ. You can read more of it github.com/IrisMQ.

Iris publisher version

package main

import (
"bufio"
"fmt"
"log"
"os"
"strings"
"time"

"github.com/fatih/set"
"gopkg.in/project-iris/iris-go.v1"
)

func main() {
var s = set.New()
f, _ := os.Open("socialcancer.txt")
defer f.Close()

scanner := bufio.NewScanner(f)
scanner.Split(bufio.ScanWords)
cnt := 0

conn, err := iris.Connect(55555)
if err != nil {
log.Fatalf("failed to connect to the Iris relay: %v.", err)
} else {
log.Println("Connected to port 55555")
}

start := time.Now()

for scanner.Scan() {
cnt++

//line := scanner.Text()

r := strings.NewReplacer(".", "", ":", "", ",", "", "'", "", ";", "",
"[", "", "]", "", "#", "", "?", "", "(", "", ")", "", "\"", "")
newline := r.Replace(scanner.Text())

fmt.Println(newline)
s.Add(newline)

/*
Output:

The
Project
Gutenberg
EBook
of
The
Social
Cancer,
*/
s.Add(newline)

conn.Publish("pipe", []byte(newline))

if cnt == 300 {
break
}
}

elapsed := time.Since(start)
log.Printf("Time took %s", elapsed)

conn.Publish("end", []byte(""))

defer conn.Close()

fmt.Println("Count: ", cnt)

fmt.Println("set")
fmt.Println(s)
fmt.Println("set size")
fmt.Println(s.Size())

fmt.Scanln()
}

Iris consumer version

package main

import (
"fmt"
"log"
"runtime"
"sync/atomic"

"github.com/bradfitz/gomemcache/memcache"
"github.com/fatih/set"
"gopkg.in/project-iris/iris-go.v1"
)

type topicEvent struct{}
type endEvent struct{}

var mc = memcache.New("192.168.0.133:11211")
var s = set.New()
var ops uint64 = 0

func (t topicEvent) HandleEvent(msg []byte) {
data := string(msg)

_, err := mc.Get(data)
if err != nil {
mc.Set(&memcache.Item{
Key: data,
Value: []byte("0"),
Expiration: 0,
})
}

//add to set
s.Add(data)

_, err = mc.Increment(data, 1)
if err != nil {
log.Println("Error in increment, ", err)
}

atomic.AddUint64(&ops, 1)
}

func (e endEvent) HandleEvent(msg []byte) {
fmt.Println("set")
fmt.Println(s)
fmt.Println("set size")
fmt.Println(s.Size())
fmt.Println("ops")
fmt.Println(ops)
s.Clear()
}

func main() {

conn, err := iris.Connect(55555)
if err != nil {
log.Fatalf("failed to connect to the Iris relay: %v.", err)
} else {
log.Println("Connected to port 55555")
}

var topicHandler = new(topicEvent)
var endHandler = new(endEvent)

conn.Subscribe("pipe", topicHandler, &iris.TopicLimits{
EventThreads: runtime.NumCPU(),
EventMemory: 64 * 1024 * 1024,
})

conn.Subscribe("end", endHandler, &iris.TopicLimits{
EventThreads: runtime.NumCPU(),
EventMemory: 64 * 1024 * 1024,
})

defer conn.Close()

fmt.Scanln()
}
One clap, two clap, three clap, forty?

By clapping more or less, you can signal to us which stories really stand out.